51 os.putenv(
'XRD_ENABLEFORKHANDLERS',
'1')
52 os.putenv(
'XRD_RUNFORKHANDLER',
'1')
57 mpevtloop = CompFactory.AthMpEvtLoopMgr(EventPrintoutInterval = flags.Exec.EventPrintoutInterval)
59 mpevtloop.NWorkers = flags.Concurrency.NumProcs
60 mpevtloop.Strategy = flags.MP.Strategy
61 mpevtloop.WorkerTopDir = flags.MP.WorkerTopDir
62 mpevtloop.OutputReportFile = flags.MP.OutputReportFile
63 mpevtloop.CollectSubprocessLogs = flags.MP.CollectSubprocessLogs
64 mpevtloop.PollingInterval = flags.MP.PollingInterval
65 mpevtloop.MemSamplingInterval = flags.MP.MemSamplingInterval
66 mpevtloop.IsPileup = flags.Common.ProductionStep
in [ProductionStep.Digitization, ProductionStep.PileUpPresampling, ProductionStep.FastChain]
and flags.Digitization.PileUp
67 mpevtloop.EventsBeforeFork = 0
if flags.MP.Strategy ==
'EventService' else flags.MP.EventsBeforeFork
70 filemgr = CompFactory.FileMgr(LogFile=
"FileManagerLog")
71 result.addService(filemgr)
75 if os.path.isfile(
'PoolFileCatalog.xml'):
76 shutil.copyfile(
'PoolFileCatalog.xml',
'PoolFileCatalog.xml.AthenaMP-saved')
81 msg.warning(
'Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
85 debug_worker = flags.Concurrency.DebugWorkers
86 event_range_channel = flags.MP.EventRangeChannel
87 use_shared_reader = flags.MP.UseSharedReader
88 use_shared_writer = flags.MP.UseSharedWriter
89 unique_id = f
"{str(os.getpid())}-{uuid.uuid4().hex}"
91 if flags.MP.Strategy ==
'SharedQueue' or flags.MP.Strategy ==
'RoundRobin':
93 AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
95 if flags.Input.Format
is Format.BS:
96 evSel = CompFactory.EventSelectorByteStream(
"EventSelector")
98 from ByteStreamCnvSvc.ByteStreamConfig
import ByteStreamReadCfg
102 evSel = CompFactory.EventSelectorAthenaPool(
"EventSelector")
105 SharedMemoryName=f
"InputStream{unique_id}")
107 from AthenaPoolCnvSvc.PoolReadConfig
import PoolReadCfg
109 from AthenaPoolCnvSvc.PoolCommonConfig
import AthenaPoolCnvSvcCfg
113 SharedMemoryName=f
"EventStream{unique_id}")
114 result.addService(evSel)
116 if use_shared_writer:
117 if any((flags.Output.doWriteESD,
118 flags.Output.doWriteAOD,
119 flags.Output.doWriteDAOD,
120 flags.Output.doWriteRDO))
or flags.Output.HITSFileName!=
'':
121 AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
123 SharedMemoryName=f
"OutputStream{unique_id}")
125 from AthenaPoolCnvSvc.PoolCommonConfig
import AthenaPoolCnvSvcCfg
128 if flags.MP.Strategy ==
'SharedQueue':
129 queue_provider = CompFactory.SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
130 IsPileup=mpevtloop.IsPileup,
131 EventsBeforeFork=mpevtloop.EventsBeforeFork,
132 ChunkSize=chunk_size)
134 if flags.Concurrency.NumThreads > 0:
135 if mpevtloop.IsPileup:
136 raise Exception(
'Running pileup digitization in mixed MP+MT currently not supported')
137 from AthenaConfiguration.MainServicesConfig
import AthenaMtesEventLoopMgrCfg
139 queue_consumer = CompFactory.SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
140 EventsBeforeFork=mpevtloop.EventsBeforeFork,
143 queue_consumer = CompFactory.SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
144 UseSharedWriter=use_shared_writer,
145 IsPileup=mpevtloop.IsPileup,
146 IsRoundRobin=(flags.MP.Strategy==
'RoundRobin'),
147 EventsBeforeFork=mpevtloop.EventsBeforeFork,
148 ReadEventOrders=flags.MP.ReadEventOrders,
149 EventOrdersFile=flags.MP.EventOrdersFile,
151 mpevtloop.Tools += [ queue_provider, queue_consumer ]
153 if use_shared_writer:
154 shared_writer = CompFactory.SharedWriterTool(MotherProcess=(mpevtloop.EventsBeforeFork>0),
155 IsPileup=mpevtloop.IsPileup,
157 mpevtloop.Tools += [ shared_writer ]
159 elif flags.MP.Strategy==
'EventService':
160 channelScatterer2Processor =
"AthenaMP_Scatterer2Processor"
161 channelProcessor2EvtSel =
"AthenaMP_Processor2EvtSel"
163 mpevtloop.Tools += [ CompFactory.EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
164 EventRangeChannel = event_range_channel,
165 DoCaching=flags.MP.EvtRangeScattererCaching) ]
166 mpevtloop.Tools += [ CompFactory.EvtRangeProcessor(IsPileup=mpevtloop.IsPileup,
167 Channel2Scatterer = channelScatterer2Processor,
168 Channel2EvtSel = channelProcessor2EvtSel,
169 Debug=debug_worker) ]
171 from AthenaServices.OutputStreamSequencerSvcConfig
import OutputStreamSequencerSvcCfg
174 msg.warning(
"Unknown strategy %s. No MP tools will be configured", flags.MP.Strategy)
176 result.addService(mpevtloop, primary=
True)