ATLAS Offline Software
AthenaMPConfig.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
2 
3 from AthenaConfiguration.AllConfigFlags import initConfigFlags, GetFileMD
4 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
5 from AthenaConfiguration.ComponentFactory import CompFactory
6 from AthenaConfiguration.Enums import Format, ProductionStep
7 from AthenaConfiguration.MainServicesConfig import MainServicesCfg
8 
9 from AthenaCommon.Logging import log as msg
10 
11 import os, shutil, uuid
12 
13 
14 def athenaMPRunArgsToFlags(runArgs, flags):
15  """Fill MP configuration flags from run arguments."""
16  if hasattr(runArgs, "athenaMPWorkerTopDir"):
17  flags.MP.WorkerTopDir = runArgs.athenaMPWorkerTopDir
18 
19  if hasattr(runArgs, "athenaMPOutputReportFile"):
20  flags.MP.OutputReportFile = runArgs.athenaMPOutputReportFile
21 
22  if hasattr(runArgs, "athenaMPCollectSubprocessLogs"):
23  flags.MP.CollectSubprocessLogs = runArgs.athenaMPCollectSubprocessLogs
24 
25  if hasattr(runArgs, "athenaMPStrategy"):
26  flags.MP.Strategy = runArgs.athenaMPStrategy
27 
28  if hasattr(runArgs, "athenaMPReadEventOrders"):
29  flags.MP.ReadEventOrders = runArgs.athenaMPReadEventOrders
30 
31  if hasattr(runArgs, "athenaMPEventOrdersFile"):
32  flags.MP.EventOrdersFile = runArgs.athenaMPEventOrdersFile
33 
34  if hasattr(runArgs, "athenaMPEventsBeforeFork"):
35  flags.MP.EventsBeforeFork = runArgs.athenaMPEventsBeforeFork
36 
37  if hasattr(runArgs, "sharedWriter"):
38  flags.MP.UseSharedWriter = runArgs.sharedWriter
39 
40  if hasattr(runArgs, "sharedReader"):
41  flags.MP.UseSharedReader = runArgs.sharedReader
42 
43  if hasattr(runArgs, "parallelCompression"):
44  flags.MP.UseParallelCompression = runArgs.parallelCompression
45 
46  if hasattr(runArgs, "eventService"):
47  flags.MP.Strategy = "EventService"
48 
49 def AthenaMPCfg(flags):
50 
51  os.putenv('XRD_ENABLEFORKHANDLERS','1')
52  os.putenv('XRD_RUNFORKHANDLER','1')
53 
54  result = ComponentAccumulator()
55 
56  # Configure MP Event Loop Manager
57  mpevtloop = CompFactory.AthMpEvtLoopMgr(EventPrintoutInterval = flags.Exec.EventPrintoutInterval)
58 
59  mpevtloop.NWorkers = flags.Concurrency.NumProcs
60  mpevtloop.Strategy = flags.MP.Strategy
61  mpevtloop.WorkerTopDir = flags.MP.WorkerTopDir
62  mpevtloop.OutputReportFile = flags.MP.OutputReportFile
63  mpevtloop.CollectSubprocessLogs = flags.MP.CollectSubprocessLogs
64  mpevtloop.PollingInterval = flags.MP.PollingInterval
65  mpevtloop.MemSamplingInterval = flags.MP.MemSamplingInterval
66  mpevtloop.IsPileup = flags.Common.ProductionStep in [ProductionStep.Digitization, ProductionStep.PileUpPresampling] and flags.Digitization.PileUp
67  mpevtloop.EventsBeforeFork = 0 if flags.MP.Strategy == 'EventService' else flags.MP.EventsBeforeFork
68 
69  # Configure Gaudi File Manager
70  filemgr = CompFactory.FileMgr(LogFile="FileManagerLog")
71  result.addService(filemgr)
72 
73  # Save PoolFileCatalog.xml if exists in the run directory
74  # The saved file will be copied over to workers' run directories just after forking
75  if os.path.isfile('PoolFileCatalog.xml'):
76  shutil.copyfile('PoolFileCatalog.xml','PoolFileCatalog.xml.AthenaMP-saved')
77 
78  # Compute event chunk size
79  chunk_size = getChunkSize(flags)
80  if chunk_size < 1:
81  msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
82  chunk_size = 1
83 
84  # Configure Strategy
85  debug_worker = flags.Concurrency.DebugWorkers
86  event_range_channel = flags.MP.EventRangeChannel
87  use_shared_reader = flags.MP.UseSharedReader
88  use_shared_writer = flags.MP.UseSharedWriter
89  unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
90 
91  if flags.MP.Strategy == 'SharedQueue' or flags.MP.Strategy == 'RoundRobin':
92  if use_shared_reader:
93  AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
94 
95  if flags.Input.Format is Format.BS:
96  evSel = CompFactory.EventSelectorByteStream("EventSelector")
97 
98  from ByteStreamCnvSvc.ByteStreamConfig import ByteStreamReadCfg
99  bscfg = ByteStreamReadCfg(flags)
100  result.merge(bscfg)
101  else:
102  evSel = CompFactory.EventSelectorAthenaPool("EventSelector")
103 
104  inputStreamingTool = AthenaSharedMemoryTool("InputStreamingTool",
105  SharedMemoryName=f"InputStream{unique_id}")
106 
107  from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
108  result.merge(PoolReadCfg(flags))
109  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
110  result.merge(AthenaPoolCnvSvcCfg(flags, InputStreamingTool=inputStreamingTool))
111 
112  evSel.SharedMemoryTool = AthenaSharedMemoryTool("EventStreamingTool",
113  SharedMemoryName=f"EventStream{unique_id}")
114  result.addService(evSel)
115 
116  if use_shared_writer:
117  if any((flags.Output.doWriteESD,
118  flags.Output.doWriteAOD,
119  flags.Output.doWriteDAOD,
120  flags.Output.doWriteRDO)) or flags.Output.HITSFileName!='':
121  AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
122  outputStreamingTool = AthenaSharedMemoryTool("OutputStreamingTool",
123  SharedMemoryName=f"OutputStream{unique_id}")
124 
125  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
126  result.merge(AthenaPoolCnvSvcCfg(flags, OutputStreamingTool=outputStreamingTool))
127 
128  queue_provider = CompFactory.SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
129  IsPileup=mpevtloop.IsPileup,
130  EventsBeforeFork=mpevtloop.EventsBeforeFork,
131  ChunkSize=chunk_size)
132  if flags.Concurrency.NumThreads > 0:
133  if mpevtloop.IsPileup:
134  raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
135  from AthenaConfiguration.MainServicesConfig import AthenaMtesEventLoopMgrCfg
136  result.merge(AthenaMtesEventLoopMgrCfg(flags))
137  queue_consumer = CompFactory.SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
138  EventsBeforeFork=mpevtloop.EventsBeforeFork,
139  Debug=debug_worker)
140  else:
141  queue_consumer = CompFactory.SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
142  UseSharedWriter=use_shared_writer,
143  IsPileup=mpevtloop.IsPileup,
144  IsRoundRobin=(flags.MP.Strategy=='RoundRobin'),
145  EventsBeforeFork=mpevtloop.EventsBeforeFork,
146  ReadEventOrders=flags.MP.ReadEventOrders,
147  EventOrdersFile=flags.MP.EventOrdersFile,
148  Debug=debug_worker)
149  mpevtloop.Tools += [ queue_provider, queue_consumer ]
150 
151  if use_shared_writer:
152  shared_writer = CompFactory.SharedWriterTool(MotherProcess=(mpevtloop.EventsBeforeFork>0),
153  IsPileup=mpevtloop.IsPileup,
154  Debug=debug_worker)
155  mpevtloop.Tools += [ shared_writer ]
156 
157  elif flags.MP.Strategy=='EventService':
158  channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
159  channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
160 
161  mpevtloop.Tools += [ CompFactory.EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
162  EventRangeChannel = event_range_channel,
163  DoCaching=flags.MP.EvtRangeScattererCaching) ]
164  mpevtloop.Tools += [ CompFactory.EvtRangeProcessor(IsPileup=mpevtloop.IsPileup,
165  Channel2Scatterer = channelScatterer2Processor,
166  Channel2EvtSel = channelProcessor2EvtSel,
167  Debug=debug_worker) ]
168 
169  from AthenaServices.OutputStreamSequencerSvcConfig import OutputStreamSequencerSvcCfg
170  result.merge(OutputStreamSequencerSvcCfg(flags,incidentName="NextEventRange"))
171  else:
172  msg.warning("Unknown strategy %s. No MP tools will be configured", flags.MP.Strategy)
173 
174  result.addService(mpevtloop, primary=True)
175 
176  return result
177 
178 def getChunkSize(flags) -> int:
179  chunk_size = 1
180  if flags.MP.ChunkSize > 0:
181  chunk_size = flags.MP.ChunkSize
182  msg.info('Chunk size set to %i', chunk_size)
183  else:
184  md = GetFileMD(flags.Input.Files)
185  #Don't use auto flush for shared reader
186  if flags.MP.UseSharedReader:
187  msg.info('Shared Reader in use, chunk_size set to default (%i)', chunk_size)
188  #Use auto flush only if file is compressed with LZMA, else use default chunk_size
189  elif flags.MP.ChunkSize == -1:
190  if md.get('file_comp_alg',-1) == 2:
191  chunk_size = md.get('auto_flush',-1)
192  msg.info('Chunk size set to auto flush (%i)', chunk_size)
193  else:
194  msg.info('LZMA algorithm not in use, chunk_size set to default (%i)', chunk_size)
195  #Use auto flush only if file is compressed with LZMA or ZLIB, else use default chunk_size
196  elif flags.MP.ChunkSize == -2:
197  if md.get('file_comp_alg',-1) in [1,2]:
198  chunk_size = md.get('auto_flush',-1)
199  msg.info('Chunk size set to auto flush (%i)', chunk_size)
200  else:
201  msg.info('LZMA nor ZLIB in use, chunk_size set to default (%i)', chunk_size)
202  #Use auto flush only if file is compressed with LZMA, ZLIB or LZ4, else use default chunk_size
203  elif flags.MP.ChunkSize == -3:
204  if md.get('file_comp_alg',-1) in [1,2,4]:
205  chunk_size = md.get('auto_flush',-1)
206  msg.info('Chunk size set to auto flush (%i)', chunk_size)
207  else:
208  msg.info('LZMA, ZLIB nor LZ4 in use, chunk_size set to (%i)', chunk_size)
209  #Use auto flush value for chunk_size, regardless of compression algorithm
210  elif flags.MP.ChunkSize <= -4:
211  chunk_size = md.get('auto_flush',-1)
212  msg.info('Chunk size set to auto flush (%i)', chunk_size)
213  else:
214  msg.warning('Invalid ChunkSize, Chunk Size set to default (%i)', chunk_size)
215 
216  return chunk_size
217 
218 
219 if __name__=="__main__":
220 
221  # ----------------- Example with input file --------------
222  from AthenaConfiguration.TestDefaults import defaultTestFiles
223  flags = initConfigFlags()
224  flags.Input.Files = defaultTestFiles.ESD
225  flags.Exec.MaxEvents=10
226  flags.Concurrency.NumProcs=2
227 
228  cfg = MainServicesCfg(flags)
229  from AthenaPoolCnvSvc.PoolReadConfig import EventSelectorAthenaPoolCfg
230  cfg.merge(EventSelectorAthenaPoolCfg(flags))
231  cfg.run()
232  # ----------------- Example with input file --------------
233 
234  msg.info('All OK!')
python.OutputStreamSequencerSvcConfig.OutputStreamSequencerSvcCfg
def OutputStreamSequencerSvcCfg(flags, incidentName='', reportingOn=False)
Definition: OutputStreamSequencerSvcConfig.py:6
python.AthenaMPConfig.AthenaMPCfg
def AthenaMPCfg(flags)
Definition: AthenaMPConfig.py:49
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
python.AutoConfigFlags.GetFileMD
def GetFileMD(filenames, allowEmpty=True)
Definition: AutoConfigFlags.py:51
python.ByteStreamConfig.ByteStreamReadCfg
def ByteStreamReadCfg(flags, type_names=None)
Definition: Event/ByteStreamCnvSvc/python/ByteStreamConfig.py:25
python.MainServicesConfig.MainServicesCfg
def MainServicesCfg(flags, LoopMgr='AthenaEventLoopMgr')
Definition: MainServicesConfig.py:256
python.PoolCommonConfig.AthenaPoolCnvSvcCfg
def AthenaPoolCnvSvcCfg(flags, **kwargs)
Definition: PoolCommonConfig.py:29
python.AthenaMPConfig.getChunkSize
int getChunkSize(flags)
Definition: AthenaMPConfig.py:178
python.AthenaMPConfig.athenaMPRunArgsToFlags
def athenaMPRunArgsToFlags(runArgs, flags)
Definition: AthenaMPConfig.py:14
python.AllConfigFlags.initConfigFlags
def initConfigFlags()
Definition: AllConfigFlags.py:19
AthenaSharedMemoryTool
This class provides the IPCTool for SharedMemory objects.
Definition: AthenaSharedMemoryTool.h:33
python.PoolReadConfig.EventSelectorAthenaPoolCfg
def EventSelectorAthenaPoolCfg(flags)
Definition: PoolReadConfig.py:8
python.PoolReadConfig.PoolReadCfg
def PoolReadCfg(flags)
Definition: PoolReadConfig.py:69
python.MainServicesConfig.AthenaMtesEventLoopMgrCfg
def AthenaMtesEventLoopMgrCfg(flags, mtEs=False, channel='')
Definition: MainServicesConfig.py:120