ATLAS Offline Software
Loading...
Searching...
No Matches
python.AthenaMPConfig Namespace Reference

Functions

 athenaMPRunArgsToFlags (runArgs, flags)
 AthenaMPCfg (flags)
int getChunkSize (flags)

Variables

 flags = initConfigFlags()
 Files
 MaxEvents
 NumProcs
 cfg = MainServicesCfg(flags)

Function Documentation

◆ AthenaMPCfg()

python.AthenaMPConfig.AthenaMPCfg ( flags)

Definition at line 49 of file AthenaMPConfig.py.

49def AthenaMPCfg(flags):
50
51 os.putenv('XRD_ENABLEFORKHANDLERS','1')
52 os.putenv('XRD_RUNFORKHANDLER','1')
53
54 result = ComponentAccumulator()
55
56 # Configure MP Event Loop Manager
57 mpevtloop = CompFactory.AthMpEvtLoopMgr(EventPrintoutInterval = flags.Exec.EventPrintoutInterval)
58
59 mpevtloop.NWorkers = flags.Concurrency.NumProcs
60
61 # For e.g. event generation, if we use SharedQueue the job does not complete correctly
62 myStrategy = flags.MP.Strategy
63 if flags.Input.Files == [] and flags.MP.Strategy == 'SharedQueue':
64 msg.info('MP strategy "SharedQueue" will not work without input files when maxEvents=-1. Switching to "RoundRobin" just in case')
65 myStrategy = 'RoundRobin'
66
67 mpevtloop.Strategy = myStrategy
68 mpevtloop.WorkerTopDir = flags.MP.WorkerTopDir
69 mpevtloop.OutputReportFile = flags.MP.OutputReportFile
70 mpevtloop.CollectSubprocessLogs = flags.MP.CollectSubprocessLogs
71 mpevtloop.PollingInterval = flags.MP.PollingInterval
72 mpevtloop.MemSamplingInterval = flags.MP.MemSamplingInterval
73 mpevtloop.IsPileup = flags.Common.ProductionStep in [ProductionStep.Digitization, ProductionStep.PileUpPresampling, ProductionStep.FastChain] and flags.Digitization.PileUp
74 mpevtloop.EventsBeforeFork = 0 if myStrategy == 'EventService' else flags.MP.EventsBeforeFork
75
76 # Configure Gaudi File Manager
77 filemgr = CompFactory.FileMgr(LogFile="FileManagerLog")
78 result.addService(filemgr)
79
80 # Save PoolFileCatalog.xml if exists in the run directory
81 # The saved file will be copied over to workers' run directories just after forking
82 if os.path.isfile('PoolFileCatalog.xml'):
83 shutil.copyfile('PoolFileCatalog.xml','PoolFileCatalog.xml.AthenaMP-saved')
84
85 # Compute event chunk size
86 chunk_size = getChunkSize(flags)
87 if chunk_size < 1:
88 msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
89 chunk_size = 1
90
91 # Configure Strategy
92 debug_worker = flags.Concurrency.DebugWorkers
93 event_range_channel = flags.MP.EventRangeChannel
94 use_shared_reader = flags.MP.UseSharedReader
95 use_shared_writer = flags.MP.UseSharedWriter
96 unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
97
98 if myStrategy == 'SharedQueue' or myStrategy == 'RoundRobin':
99 if use_shared_reader:
100 AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
101
102 if flags.Input.Format is Format.BS:
103 evSel = CompFactory.EventSelectorByteStream("EventSelector")
104
105 from ByteStreamCnvSvc.ByteStreamConfig import ByteStreamReadCfg
106 bscfg = ByteStreamReadCfg(flags)
107 result.merge(bscfg)
108 else:
109 evSel = CompFactory.EventSelectorAthenaPool("EventSelector")
110
111 inputStreamingTool = AthenaSharedMemoryTool("InputStreamingTool",
112 SharedMemoryName=f"InputStream{unique_id}")
113
114 from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
115 result.merge(PoolReadCfg(flags))
116 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
117 result.merge(AthenaPoolSharedIOCnvSvcCfg(flags, InputStreamingTool=inputStreamingTool))
118
119 evSel.SharedMemoryTool = AthenaSharedMemoryTool("EventStreamingTool",
120 SharedMemoryName=f"EventStream{unique_id}")
121 result.addService(evSel)
122
123 if use_shared_writer:
124 if any((flags.Output.doWriteESD,
125 flags.Output.doWriteAOD,
126 flags.Output.doWriteDAOD,
127 flags.Output.doWriteRDO)) or flags.Output.HITSFileName!='':
128 AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
129 outputStreamingTool = AthenaSharedMemoryTool("OutputStreamingTool",
130 SharedMemoryName=f"OutputStream{unique_id}")
131
132 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
133 result.merge(AthenaPoolSharedIOCnvSvcCfg(flags, OutputStreamingTool=outputStreamingTool))
134
135 if myStrategy == 'SharedQueue':
136 queue_provider = CompFactory.SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
137 IsPileup=mpevtloop.IsPileup,
138 EventsBeforeFork=mpevtloop.EventsBeforeFork,
139 ChunkSize=chunk_size)
140 mpevtloop.Tools += [ queue_provider ]
141
142 if flags.Concurrency.NumThreads > 0:
143 if mpevtloop.IsPileup:
144 raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
145 from AthenaConfiguration.MainServicesConfig import AthenaMtesEventLoopMgrCfg
146 result.merge(AthenaMtesEventLoopMgrCfg(flags))
147 queue_consumer = CompFactory.SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
148 EventsBeforeFork=mpevtloop.EventsBeforeFork,
149 Debug=debug_worker)
150 else:
151 queue_consumer = CompFactory.SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
152 UseSharedWriter=use_shared_writer,
153 IsPileup=mpevtloop.IsPileup,
154 IsRoundRobin=(myStrategy=='RoundRobin'),
155 EventsBeforeFork=mpevtloop.EventsBeforeFork,
156 ReadEventOrders=flags.MP.ReadEventOrders,
157 EventOrdersFile=flags.MP.EventOrdersFile,
158 Debug=debug_worker)
159 mpevtloop.Tools += [ queue_consumer ]
160
161 if use_shared_writer:
162 shared_writer = CompFactory.SharedWriterTool(MotherProcess=(mpevtloop.EventsBeforeFork>0),
163 IsPileup=mpevtloop.IsPileup,
164 Debug=debug_worker)
165 mpevtloop.Tools += [ shared_writer ]
166
167 elif myStrategy=='EventService':
168 channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
169 channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
170
171 mpevtloop.Tools += [ CompFactory.EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
172 EventRangeChannel = event_range_channel,
173 DoCaching=flags.MP.EvtRangeScattererCaching) ]
174 mpevtloop.Tools += [ CompFactory.EvtRangeProcessor(IsPileup=mpevtloop.IsPileup,
175 Channel2Scatterer = channelScatterer2Processor,
176 Channel2EvtSel = channelProcessor2EvtSel,
177 Debug=debug_worker) ]
178
179 from AthenaServices.OutputStreamSequencerSvcConfig import OutputStreamSequencerSvcCfg
180 result.merge(OutputStreamSequencerSvcCfg(flags,incidentName="NextEventRange"))
181 else:
182 msg.warning("Unknown strategy %s. No MP tools will be configured", myStrategy)
183
184 result.addService(mpevtloop, primary=True)
185
186 return result
187
This class provides the IPCTool for SharedMemory objects.

◆ athenaMPRunArgsToFlags()

python.AthenaMPConfig.athenaMPRunArgsToFlags ( runArgs,
flags )
Fill MP configuration flags from run arguments.

Definition at line 14 of file AthenaMPConfig.py.

14def athenaMPRunArgsToFlags(runArgs, flags):
15 """Fill MP configuration flags from run arguments."""
16 if hasattr(runArgs, "athenaMPWorkerTopDir"):
17 flags.MP.WorkerTopDir = runArgs.athenaMPWorkerTopDir
18
19 if hasattr(runArgs, "athenaMPOutputReportFile"):
20 flags.MP.OutputReportFile = runArgs.athenaMPOutputReportFile
21
22 if hasattr(runArgs, "athenaMPCollectSubprocessLogs"):
23 flags.MP.CollectSubprocessLogs = runArgs.athenaMPCollectSubprocessLogs
24
25 if hasattr(runArgs, "athenaMPStrategy"):
26 flags.MP.Strategy = runArgs.athenaMPStrategy
27
28 if hasattr(runArgs, "athenaMPReadEventOrders"):
29 flags.MP.ReadEventOrders = runArgs.athenaMPReadEventOrders
30
31 if hasattr(runArgs, "athenaMPEventOrdersFile"):
32 flags.MP.EventOrdersFile = runArgs.athenaMPEventOrdersFile
33
34 if hasattr(runArgs, "athenaMPEventsBeforeFork"):
35 flags.MP.EventsBeforeFork = runArgs.athenaMPEventsBeforeFork
36
37 if hasattr(runArgs, "sharedWriter"):
38 flags.MP.UseSharedWriter = runArgs.sharedWriter
39
40 if hasattr(runArgs, "sharedReader"):
41 flags.MP.UseSharedReader = runArgs.sharedReader
42
43 if hasattr(runArgs, "parallelCompression"):
44 flags.MP.UseParallelCompression = runArgs.parallelCompression
45
46 if hasattr(runArgs, "eventService"):
47 flags.MP.Strategy = "EventService"
48

◆ getChunkSize()

int python.AthenaMPConfig.getChunkSize ( flags)

Definition at line 188 of file AthenaMPConfig.py.

188def getChunkSize(flags) -> int:
189 chunk_size = 1
190 if flags.MP.ChunkSize > 0:
191 chunk_size = flags.MP.ChunkSize
192 msg.info('Chunk size set to %i', chunk_size)
193 else:
194 if '_ATHENA_GENERIC_INPUTFILE_NAME_' in flags.Input.Files:
195 msg.info('Running an input-less job. Setting Chunk Size to 1')
196 else:
197 md = GetFileMD(flags.Input.Files)
198 #Don't use auto flush for shared reader
199 if flags.MP.UseSharedReader:
200 msg.info('Shared Reader in use, chunk_size set to default (%i)', chunk_size)
201 #Use auto flush only if file is compressed with LZMA, else use default chunk_size
202 elif flags.MP.ChunkSize == -1:
203 if md.get('file_comp_alg',-1) == 2:
204 chunk_size = md.get('auto_flush',-1)
205 msg.info('Chunk size set to auto flush (%i)', chunk_size)
206 else:
207 msg.info('LZMA algorithm not in use, chunk_size set to default (%i)', chunk_size)
208 #Use auto flush only if file is compressed with LZMA or ZLIB, else use default chunk_size
209 elif flags.MP.ChunkSize == -2:
210 if md.get('file_comp_alg',-1) in [1,2]:
211 chunk_size = md.get('auto_flush',-1)
212 msg.info('Chunk size set to auto flush (%i)', chunk_size)
213 else:
214 msg.info('LZMA nor ZLIB in use, chunk_size set to default (%i)', chunk_size)
215 #Use auto flush only if file is compressed with LZMA, ZLIB or LZ4, else use default chunk_size
216 elif flags.MP.ChunkSize == -3:
217 if md.get('file_comp_alg',-1) in [1,2,4]:
218 chunk_size = md.get('auto_flush',-1)
219 msg.info('Chunk size set to auto flush (%i)', chunk_size)
220 else:
221 msg.info('LZMA, ZLIB nor LZ4 in use, chunk_size set to (%i)', chunk_size)
222 #Use auto flush value for chunk_size, regardless of compression algorithm
223 elif flags.MP.ChunkSize <= -4:
224 chunk_size = md.get('auto_flush',-1)
225 msg.info('Chunk size set to auto flush (%i)', chunk_size)
226 else:
227 msg.warning('Invalid ChunkSize, Chunk Size set to default (%i)', chunk_size)
228
229 return chunk_size
230
231

Variable Documentation

◆ cfg

python.AthenaMPConfig.cfg = MainServicesCfg(flags)

Definition at line 241 of file AthenaMPConfig.py.

◆ Files

python.AthenaMPConfig.Files

Definition at line 237 of file AthenaMPConfig.py.

◆ flags

python.AthenaMPConfig.flags = initConfigFlags()

Definition at line 236 of file AthenaMPConfig.py.

◆ MaxEvents

python.AthenaMPConfig.MaxEvents

Definition at line 238 of file AthenaMPConfig.py.

◆ NumProcs

python.AthenaMPConfig.NumProcs

Definition at line 239 of file AthenaMPConfig.py.