49def AthenaMPCfg(flags):
50
51 os.putenv('XRD_ENABLEFORKHANDLERS','1')
52 os.putenv('XRD_RUNFORKHANDLER','1')
53
54 result = ComponentAccumulator()
55
56
57 mpevtloop = CompFactory.AthMpEvtLoopMgr(EventPrintoutInterval = flags.Exec.EventPrintoutInterval)
58
59 mpevtloop.NWorkers = flags.Concurrency.NumProcs
60
61
62 myStrategy = flags.MP.Strategy
63 if flags.Input.Files == [] and flags.MP.Strategy == 'SharedQueue':
64 msg.info('MP strategy "SharedQueue" will not work without input files when maxEvents=-1. Switching to "RoundRobin" just in case')
65 myStrategy = 'RoundRobin'
66
67 mpevtloop.Strategy = myStrategy
68 mpevtloop.WorkerTopDir = flags.MP.WorkerTopDir
69 mpevtloop.OutputReportFile = flags.MP.OutputReportFile
70 mpevtloop.CollectSubprocessLogs = flags.MP.CollectSubprocessLogs
71 mpevtloop.PollingInterval = flags.MP.PollingInterval
72 mpevtloop.MemSamplingInterval = flags.MP.MemSamplingInterval
73 mpevtloop.IsPileup = flags.Common.ProductionStep in [ProductionStep.Digitization, ProductionStep.PileUpPresampling, ProductionStep.FastChain] and flags.Digitization.PileUp
74 mpevtloop.EventsBeforeFork = 0 if myStrategy == 'EventService' else flags.MP.EventsBeforeFork
75
76
77 filemgr = CompFactory.FileMgr(LogFile="FileManagerLog")
78 result.addService(filemgr)
79
80
81
82 if os.path.isfile('PoolFileCatalog.xml'):
83 shutil.copyfile('PoolFileCatalog.xml','PoolFileCatalog.xml.AthenaMP-saved')
84
85
86 chunk_size = getChunkSize(flags)
87 if chunk_size < 1:
88 msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
89 chunk_size = 1
90
91
92 debug_worker = flags.Concurrency.DebugWorkers
93 event_range_channel = flags.MP.EventRangeChannel
94 use_shared_reader = flags.MP.UseSharedReader
95 use_shared_writer = flags.MP.UseSharedWriter
96 unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
97
98 if myStrategy == 'SharedQueue' or myStrategy == 'RoundRobin':
99 if use_shared_reader:
100 AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
101
102 if flags.Input.Format is Format.BS:
103 evSel = CompFactory.EventSelectorByteStream("EventSelector")
104
105 from ByteStreamCnvSvc.ByteStreamConfig import ByteStreamReadCfg
106 bscfg = ByteStreamReadCfg(flags)
107 result.merge(bscfg)
108 else:
109 evSel = CompFactory.EventSelectorAthenaPool("EventSelector")
110
112 SharedMemoryName=f"InputStream{unique_id}")
113
114 from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
115 result.merge(PoolReadCfg(flags))
116 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
117 result.merge(AthenaPoolSharedIOCnvSvcCfg(flags, InputStreamingTool=inputStreamingTool))
118
120 SharedMemoryName=f"EventStream{unique_id}")
121 result.addService(evSel)
122
123 if use_shared_writer:
124 if any((flags.Output.doWriteESD,
125 flags.Output.doWriteAOD,
126 flags.Output.doWriteDAOD,
127 flags.Output.doWriteRDO)) or flags.Output.HITSFileName!='':
128 AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
130 SharedMemoryName=f"OutputStream{unique_id}")
131
132 from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
133 result.merge(AthenaPoolSharedIOCnvSvcCfg(flags, OutputStreamingTool=outputStreamingTool))
134
135 if myStrategy == 'SharedQueue':
136 queue_provider = CompFactory.SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
137 IsPileup=mpevtloop.IsPileup,
138 EventsBeforeFork=mpevtloop.EventsBeforeFork,
139 ChunkSize=chunk_size)
140 mpevtloop.Tools += [ queue_provider ]
141
142 if flags.Concurrency.NumThreads > 0:
143 if mpevtloop.IsPileup:
144 raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
145 from AthenaConfiguration.MainServicesConfig import AthenaMtesEventLoopMgrCfg
146 result.merge(AthenaMtesEventLoopMgrCfg(flags))
147 queue_consumer = CompFactory.SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
148 EventsBeforeFork=mpevtloop.EventsBeforeFork,
149 Debug=debug_worker)
150 else:
151 queue_consumer = CompFactory.SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
152 UseSharedWriter=use_shared_writer,
153 IsPileup=mpevtloop.IsPileup,
154 IsRoundRobin=(myStrategy=='RoundRobin'),
155 EventsBeforeFork=mpevtloop.EventsBeforeFork,
156 ReadEventOrders=flags.MP.ReadEventOrders,
157 EventOrdersFile=flags.MP.EventOrdersFile,
158 Debug=debug_worker)
159 mpevtloop.Tools += [ queue_consumer ]
160
161 if use_shared_writer:
162 shared_writer = CompFactory.SharedWriterTool(MotherProcess=(mpevtloop.EventsBeforeFork>0),
163 IsPileup=mpevtloop.IsPileup,
164 Debug=debug_worker)
165 mpevtloop.Tools += [ shared_writer ]
166
167 elif myStrategy=='EventService':
168 channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
169 channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
170
171 mpevtloop.Tools += [ CompFactory.EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
172 EventRangeChannel = event_range_channel,
173 DoCaching=flags.MP.EvtRangeScattererCaching) ]
174 mpevtloop.Tools += [ CompFactory.EvtRangeProcessor(IsPileup=mpevtloop.IsPileup,
175 Channel2Scatterer = channelScatterer2Processor,
176 Channel2EvtSel = channelProcessor2EvtSel,
177 Debug=debug_worker) ]
178
179 from AthenaServices.OutputStreamSequencerSvcConfig import OutputStreamSequencerSvcCfg
180 result.merge(OutputStreamSequencerSvcCfg(flags,incidentName="NextEventRange"))
181 else:
182 msg.warning("Unknown strategy %s. No MP tools will be configured", myStrategy)
183
184 result.addService(mpevtloop, primary=True)
185
186 return result
187