3 from AthenaConfiguration.AllConfigFlags
import initConfigFlags, GetFileMD
4 from AthenaConfiguration.ComponentAccumulator
import ComponentAccumulator
5 from AthenaConfiguration.ComponentFactory
import CompFactory
6 from AthenaConfiguration.Enums
import Format, ProductionStep
7 from AthenaConfiguration.MainServicesConfig
import MainServicesCfg
9 from AthenaCommon.Logging
import log
as msg
11 import os, shutil, uuid
15 """Fill MP configuration flags from run arguments."""
16 if hasattr(runArgs,
"athenaMPWorkerTopDir"):
17 flags.MP.WorkerTopDir = runArgs.athenaMPWorkerTopDir
19 if hasattr(runArgs,
"athenaMPOutputReportFile"):
20 flags.MP.OutputReportFile = runArgs.athenaMPOutputReportFile
22 if hasattr(runArgs,
"athenaMPCollectSubprocessLogs"):
23 flags.MP.CollectSubprocessLogs = runArgs.athenaMPCollectSubprocessLogs
25 if hasattr(runArgs,
"athenaMPStrategy"):
26 flags.MP.Strategy = runArgs.athenaMPStrategy
28 if hasattr(runArgs,
"athenaMPReadEventOrders"):
29 flags.MP.ReadEventOrders = runArgs.athenaMPReadEventOrders
31 if hasattr(runArgs,
"athenaMPEventOrdersFile"):
32 flags.MP.EventOrdersFile = runArgs.athenaMPEventOrdersFile
34 if hasattr(runArgs,
"athenaMPEventsBeforeFork"):
35 flags.MP.EventsBeforeFork = runArgs.athenaMPEventsBeforeFork
37 if hasattr(runArgs,
"sharedWriter"):
38 flags.MP.UseSharedWriter = runArgs.sharedWriter
40 if hasattr(runArgs,
"sharedReader"):
41 flags.MP.UseSharedReader = runArgs.sharedReader
43 if hasattr(runArgs,
"parallelCompression"):
44 flags.MP.UseParallelCompression = runArgs.parallelCompression
46 if hasattr(runArgs,
"eventService"):
47 flags.MP.Strategy =
"EventService"
51 os.putenv(
'XRD_ENABLEFORKHANDLERS',
'1')
52 os.putenv(
'XRD_RUNFORKHANDLER',
'1')
57 mpevtloop = CompFactory.AthMpEvtLoopMgr(EventPrintoutInterval = flags.Exec.EventPrintoutInterval)
59 mpevtloop.NWorkers = flags.Concurrency.NumProcs
62 myStrategy = flags.MP.Strategy
63 if flags.Input.Files == []
and flags.MP.Strategy ==
'SharedQueue':
64 msg.info(
'MP strategy "SharedQueue" will not work without input files when maxEvents=-1. Switching to "RoundRobin" just in case')
65 myStrategy =
'RoundRobin'
67 mpevtloop.Strategy = myStrategy
68 mpevtloop.WorkerTopDir = flags.MP.WorkerTopDir
69 mpevtloop.OutputReportFile = flags.MP.OutputReportFile
70 mpevtloop.CollectSubprocessLogs = flags.MP.CollectSubprocessLogs
71 mpevtloop.PollingInterval = flags.MP.PollingInterval
72 mpevtloop.MemSamplingInterval = flags.MP.MemSamplingInterval
73 mpevtloop.IsPileup = flags.Common.ProductionStep
in [ProductionStep.Digitization, ProductionStep.PileUpPresampling, ProductionStep.FastChain]
and flags.Digitization.PileUp
74 mpevtloop.EventsBeforeFork = 0
if myStrategy ==
'EventService' else flags.MP.EventsBeforeFork
77 filemgr = CompFactory.FileMgr(LogFile=
"FileManagerLog")
78 result.addService(filemgr)
82 if os.path.isfile(
'PoolFileCatalog.xml'):
83 shutil.copyfile(
'PoolFileCatalog.xml',
'PoolFileCatalog.xml.AthenaMP-saved')
88 msg.warning(
'Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
92 debug_worker = flags.Concurrency.DebugWorkers
93 event_range_channel = flags.MP.EventRangeChannel
94 use_shared_reader = flags.MP.UseSharedReader
95 use_shared_writer = flags.MP.UseSharedWriter
96 unique_id = f
"{str(os.getpid())}-{uuid.uuid4().hex}"
98 if myStrategy ==
'SharedQueue' or myStrategy ==
'RoundRobin':
100 AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
102 if flags.Input.Format
is Format.BS:
103 evSel = CompFactory.EventSelectorByteStream(
"EventSelector")
105 from ByteStreamCnvSvc.ByteStreamConfig
import ByteStreamReadCfg
109 evSel = CompFactory.EventSelectorAthenaPool(
"EventSelector")
112 SharedMemoryName=f
"InputStream{unique_id}")
114 from AthenaPoolCnvSvc.PoolReadConfig
import PoolReadCfg
116 from AthenaPoolCnvSvc.PoolCommonConfig
import AthenaPoolSharedIOCnvSvcCfg
120 SharedMemoryName=f
"EventStream{unique_id}")
121 result.addService(evSel)
123 if use_shared_writer:
124 if any((flags.Output.doWriteESD,
125 flags.Output.doWriteAOD,
126 flags.Output.doWriteDAOD,
127 flags.Output.doWriteRDO))
or flags.Output.HITSFileName!=
'':
128 AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
130 SharedMemoryName=f
"OutputStream{unique_id}")
132 from AthenaPoolCnvSvc.PoolCommonConfig
import AthenaPoolSharedIOCnvSvcCfg
135 if myStrategy ==
'SharedQueue':
136 queue_provider = CompFactory.SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
137 IsPileup=mpevtloop.IsPileup,
138 EventsBeforeFork=mpevtloop.EventsBeforeFork,
139 ChunkSize=chunk_size)
140 mpevtloop.Tools += [ queue_provider ]
142 if flags.Concurrency.NumThreads > 0:
143 if mpevtloop.IsPileup:
144 raise Exception(
'Running pileup digitization in mixed MP+MT currently not supported')
145 from AthenaConfiguration.MainServicesConfig
import AthenaMtesEventLoopMgrCfg
147 queue_consumer = CompFactory.SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
148 EventsBeforeFork=mpevtloop.EventsBeforeFork,
151 queue_consumer = CompFactory.SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
152 UseSharedWriter=use_shared_writer,
153 IsPileup=mpevtloop.IsPileup,
154 IsRoundRobin=(myStrategy==
'RoundRobin'),
155 EventsBeforeFork=mpevtloop.EventsBeforeFork,
156 ReadEventOrders=flags.MP.ReadEventOrders,
157 EventOrdersFile=flags.MP.EventOrdersFile,
159 mpevtloop.Tools += [ queue_consumer ]
161 if use_shared_writer:
162 shared_writer = CompFactory.SharedWriterTool(MotherProcess=(mpevtloop.EventsBeforeFork>0),
163 IsPileup=mpevtloop.IsPileup,
165 mpevtloop.Tools += [ shared_writer ]
167 elif myStrategy==
'EventService':
168 channelScatterer2Processor =
"AthenaMP_Scatterer2Processor"
169 channelProcessor2EvtSel =
"AthenaMP_Processor2EvtSel"
171 mpevtloop.Tools += [ CompFactory.EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
172 EventRangeChannel = event_range_channel,
173 DoCaching=flags.MP.EvtRangeScattererCaching) ]
174 mpevtloop.Tools += [ CompFactory.EvtRangeProcessor(IsPileup=mpevtloop.IsPileup,
175 Channel2Scatterer = channelScatterer2Processor,
176 Channel2EvtSel = channelProcessor2EvtSel,
177 Debug=debug_worker) ]
179 from AthenaServices.OutputStreamSequencerSvcConfig
import OutputStreamSequencerSvcCfg
182 msg.warning(
"Unknown strategy %s. No MP tools will be configured", myStrategy)
184 result.addService(mpevtloop, primary=
True)
190 if flags.MP.ChunkSize > 0:
191 chunk_size = flags.MP.ChunkSize
192 msg.info(
'Chunk size set to %i', chunk_size)
194 if '_ATHENA_GENERIC_INPUTFILE_NAME_' in flags.Input.Files:
195 msg.info(
'Running an input-less job. Setting Chunk Size to 1')
199 if flags.MP.UseSharedReader:
200 msg.info(
'Shared Reader in use, chunk_size set to default (%i)', chunk_size)
202 elif flags.MP.ChunkSize == -1:
203 if md.get(
'file_comp_alg',-1) == 2:
204 chunk_size = md.get(
'auto_flush',-1)
205 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
207 msg.info(
'LZMA algorithm not in use, chunk_size set to default (%i)', chunk_size)
209 elif flags.MP.ChunkSize == -2:
210 if md.get(
'file_comp_alg',-1)
in [1,2]:
211 chunk_size = md.get(
'auto_flush',-1)
212 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
214 msg.info(
'LZMA nor ZLIB in use, chunk_size set to default (%i)', chunk_size)
216 elif flags.MP.ChunkSize == -3:
217 if md.get(
'file_comp_alg',-1)
in [1,2,4]:
218 chunk_size = md.get(
'auto_flush',-1)
219 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
221 msg.info(
'LZMA, ZLIB nor LZ4 in use, chunk_size set to (%i)', chunk_size)
223 elif flags.MP.ChunkSize <= -4:
224 chunk_size = md.get(
'auto_flush',-1)
225 msg.info(
'Chunk size set to auto flush (%i)', chunk_size)
227 msg.warning(
'Invalid ChunkSize, Chunk Size set to default (%i)', chunk_size)
232 if __name__==
"__main__":
235 from AthenaConfiguration.TestDefaults
import defaultTestFiles
237 flags.Input.Files = defaultTestFiles.ESD
238 flags.Exec.MaxEvents=10
239 flags.Concurrency.NumProcs=2
242 from AthenaPoolCnvSvc.PoolReadConfig
import EventSelectorAthenaPoolCfg