49def AthenaMPCfg(flags):
50
51 os.putenv('XRD_ENABLEFORKHANDLERS','1')
52 os.putenv('XRD_RUNFORKHANDLER','1')
53
54 result = ComponentAccumulator()
55
56
57 mpevtloop = CompFactory.AthMpEvtLoopMgr(EventPrintoutInterval = flags.Exec.EventPrintoutInterval)
58
59 mpevtloop.NWorkers = flags.Concurrency.NumProcs
60
61
62 myStrategy = flags.MP.Strategy
63 if flags.Input.Files == [] and flags.MP.Strategy == 'SharedQueue':
64 msg.info('MP strategy "SharedQueue" will not work without input files when maxEvents=-1. Switching to "RoundRobin" just in case')
65 myStrategy = 'RoundRobin'
66
67 mpevtloop.Strategy = myStrategy
68 mpevtloop.WorkerTopDir = flags.MP.WorkerTopDir
69 mpevtloop.OutputReportFile = flags.MP.OutputReportFile
70 mpevtloop.CollectSubprocessLogs = flags.MP.CollectSubprocessLogs
71 mpevtloop.PollingInterval = flags.MP.PollingInterval
72 mpevtloop.MemSamplingInterval = flags.MP.MemSamplingInterval
73 mpevtloop.IsPileup = flags.Common.ProductionStep in [ProductionStep.Digitization, ProductionStep.PileUpPresampling, ProductionStep.FastChain] and flags.Digitization.PileUp
74 mpevtloop.EventsBeforeFork = 0 if myStrategy == 'EventService' else flags.MP.EventsBeforeFork
75
76
77 filemgr = CompFactory.FileMgr(LogFile="FileManagerLog")
78 result.addService(filemgr)
79
80
81
82 if os.path.isfile('PoolFileCatalog.xml'):
83 shutil.copyfile('PoolFileCatalog.xml','PoolFileCatalog.xml.AthenaMP-saved')
84
85
86 chunk_size = getChunkSize(flags)
87 if chunk_size < 1:
88 msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
89 chunk_size = 1
90 elif chunk_size > 1:
91
92
93
94
95 requested_events = flags.Exec.MaxEvents
96 available_events =
max(0, flags.Input.FileNentries - flags.Exec.SkipEvents)
97 total_entries = available_events
if requested_events == -1
else min(available_events, requested_events)
98 if 0 < total_entries <= (flags.Concurrency.NumProcs-1) * chunk_size:
99 chunk_size = 1
100 msg.warning('Not enough events (%d) to fill all workers (%d) with chunk size %d. Chunk size is set to 1',
101 total_entries, flags.Concurrency.NumProcs, chunk_size)
102
103
104 debug_worker = flags.Concurrency.DebugWorkers
105 event_range_channel = flags.MP.EventRangeChannel
106 use_shared_reader = flags.MP.UseSharedReader
107 use_shared_writer = flags.MP.UseSharedWriter
108 unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
109
110 if myStrategy == 'SharedQueue' or myStrategy == 'RoundRobin':
111 if use_shared_reader:
112 AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
113
114 if flags.Input.Format is Format.BS:
115 evSel = CompFactory.EventSelectorByteStream("EventSelector")
116
117 from ByteStreamCnvSvc.ByteStreamConfig import ByteStreamReadCfg
118 bscfg = ByteStreamReadCfg(flags)
119 result.merge(bscfg)
120 else:
121 evSel = CompFactory.EventSelectorAthenaPoolSharedIO("EventSelector")
122
124 SharedMemoryName=f"InputStream{unique_id}")
125
126 from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
127 result.merge(PoolReadCfg(flags))
128 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
129 result.merge(AthenaPoolSharedIOCnvSvcCfg(flags, InputStreamingTool=inputStreamingTool))
130
132 SharedMemoryName=f"EventStream{unique_id}")
133 result.addService(evSel)
134
135 if use_shared_writer:
136 if any((flags.Output.doWriteESD,
137 flags.Output.doWriteAOD,
138 flags.Output.doWriteDAOD,
139 flags.Output.doWriteRDO)) or flags.Output.HITSFileName!='':
140 AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
142 SharedMemoryName=f"OutputStream{unique_id}")
143
144 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
145 result.merge(AthenaPoolSharedIOCnvSvcCfg(flags, OutputStreamingTool=outputStreamingTool))
146
147 if myStrategy == 'SharedQueue':
148 queue_provider = CompFactory.SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
149 IsPileup=mpevtloop.IsPileup,
150 EventsBeforeFork=mpevtloop.EventsBeforeFork,
151 ChunkSize=chunk_size)
152 mpevtloop.Tools += [ queue_provider ]
153
154 if flags.Concurrency.NumThreads > 0:
155 if mpevtloop.IsPileup:
156 raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
157 from AthenaConfiguration.MainServicesConfig import AthenaMtesEventLoopMgrCfg
158 result.merge(AthenaMtesEventLoopMgrCfg(flags))
159 queue_consumer = CompFactory.SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
160 EventsBeforeFork=mpevtloop.EventsBeforeFork,
161 Debug=debug_worker)
162 else:
163 queue_consumer = CompFactory.SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
164 UseSharedWriter=use_shared_writer,
165 IsPileup=mpevtloop.IsPileup,
166 IsRoundRobin=(myStrategy=='RoundRobin'),
167 EventsBeforeFork=mpevtloop.EventsBeforeFork,
168 ReadEventOrders=flags.MP.ReadEventOrders,
169 EventOrdersFile=flags.MP.EventOrdersFile,
170 Debug=debug_worker)
171 mpevtloop.Tools += [ queue_consumer ]
172
173 if use_shared_writer:
174 shared_writer = CompFactory.SharedWriterTool(MotherProcess=(mpevtloop.EventsBeforeFork>0),
175 IsPileup=mpevtloop.IsPileup,
176 Debug=debug_worker)
177 mpevtloop.Tools += [ shared_writer ]
178
179 elif myStrategy=='EventService':
180 channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
181 channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
182
183 mpevtloop.Tools += [ CompFactory.EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
184 EventRangeChannel = event_range_channel,
185 DoCaching=flags.MP.EvtRangeScattererCaching) ]
186 mpevtloop.Tools += [ CompFactory.EvtRangeProcessor(IsPileup=mpevtloop.IsPileup,
187 Channel2Scatterer = channelScatterer2Processor,
188 Channel2EvtSel = channelProcessor2EvtSel,
189 Debug=debug_worker) ]
190
191 from AthenaServices.OutputStreamSequencerSvcConfig import OutputStreamSequencerSvcCfg
192 result.merge(OutputStreamSequencerSvcCfg(flags,incidentName="NextEventRange"))
193 else:
194 msg.warning("Unknown strategy %s. No MP tools will be configured", myStrategy)
195
196 result.addService(mpevtloop, primary=True)
197
198 return result
199