ATLAS Offline Software
Functions | Variables
python.AthenaMPConfig Namespace Reference

Functions

def athenaMPRunArgsToFlags (runArgs, flags)
 
def AthenaMPCfg (flags)
 
int getChunkSize (flags)
 

Variables

 flags = initConfigFlags()
 
 Files
 
 MaxEvents
 
 NumProcs
 
 cfg = MainServicesCfg(flags)
 

Function Documentation

◆ AthenaMPCfg()

def python.AthenaMPConfig.AthenaMPCfg (   flags)

Definition at line 49 of file AthenaMPConfig.py.

49 def AthenaMPCfg(flags):
50 
51  os.putenv('XRD_ENABLEFORKHANDLERS','1')
52  os.putenv('XRD_RUNFORKHANDLER','1')
53 
54  result = ComponentAccumulator()
55 
56  # Configure MP Event Loop Manager
57  mpevtloop = CompFactory.AthMpEvtLoopMgr(EventPrintoutInterval = flags.Exec.EventPrintoutInterval)
58 
59  mpevtloop.NWorkers = flags.Concurrency.NumProcs
60 
61  # For e.g. event generation, if we use SharedQueue the job does not complete correctly
62  myStrategy = flags.MP.Strategy
63  if flags.Input.Files == [] and flags.MP.Strategy == 'SharedQueue':
64  msg.info('MP strategy "SharedQueue" will not work without input files when maxEvents=-1. Switching to "RoundRobin" just in case')
65  myStrategy = 'RoundRobin'
66 
67  mpevtloop.Strategy = myStrategy
68  mpevtloop.WorkerTopDir = flags.MP.WorkerTopDir
69  mpevtloop.OutputReportFile = flags.MP.OutputReportFile
70  mpevtloop.CollectSubprocessLogs = flags.MP.CollectSubprocessLogs
71  mpevtloop.PollingInterval = flags.MP.PollingInterval
72  mpevtloop.MemSamplingInterval = flags.MP.MemSamplingInterval
73  mpevtloop.IsPileup = flags.Common.ProductionStep in [ProductionStep.Digitization, ProductionStep.PileUpPresampling, ProductionStep.FastChain] and flags.Digitization.PileUp
74  mpevtloop.EventsBeforeFork = 0 if myStrategy == 'EventService' else flags.MP.EventsBeforeFork
75 
76  # Configure Gaudi File Manager
77  filemgr = CompFactory.FileMgr(LogFile="FileManagerLog")
78  result.addService(filemgr)
79 
80  # Save PoolFileCatalog.xml if exists in the run directory
81  # The saved file will be copied over to workers' run directories just after forking
82  if os.path.isfile('PoolFileCatalog.xml'):
83  shutil.copyfile('PoolFileCatalog.xml','PoolFileCatalog.xml.AthenaMP-saved')
84 
85  # Compute event chunk size
86  chunk_size = getChunkSize(flags)
87  if chunk_size < 1:
88  msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
89  chunk_size = 1
90 
91  # Configure Strategy
92  debug_worker = flags.Concurrency.DebugWorkers
93  event_range_channel = flags.MP.EventRangeChannel
94  use_shared_reader = flags.MP.UseSharedReader
95  use_shared_writer = flags.MP.UseSharedWriter
96  unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
97 
98  if myStrategy == 'SharedQueue' or myStrategy == 'RoundRobin':
99  if use_shared_reader:
100  AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
101 
102  if flags.Input.Format is Format.BS:
103  evSel = CompFactory.EventSelectorByteStream("EventSelector")
104 
105  from ByteStreamCnvSvc.ByteStreamConfig import ByteStreamReadCfg
106  bscfg = ByteStreamReadCfg(flags)
107  result.merge(bscfg)
108  else:
109  evSel = CompFactory.EventSelectorAthenaPool("EventSelector")
110 
111  inputStreamingTool = AthenaSharedMemoryTool("InputStreamingTool",
112  SharedMemoryName=f"InputStream{unique_id}")
113 
114  from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
115  result.merge(PoolReadCfg(flags))
116  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
117  result.merge(AthenaPoolSharedIOCnvSvcCfg(flags, InputStreamingTool=inputStreamingTool))
118 
119  evSel.SharedMemoryTool = AthenaSharedMemoryTool("EventStreamingTool",
120  SharedMemoryName=f"EventStream{unique_id}")
121  result.addService(evSel)
122 
123  if use_shared_writer:
124  if any((flags.Output.doWriteESD,
125  flags.Output.doWriteAOD,
126  flags.Output.doWriteDAOD,
127  flags.Output.doWriteRDO)) or flags.Output.HITSFileName!='':
128  AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
129  outputStreamingTool = AthenaSharedMemoryTool("OutputStreamingTool",
130  SharedMemoryName=f"OutputStream{unique_id}")
131 
132  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolSharedIOCnvSvcCfg
133  result.merge(AthenaPoolSharedIOCnvSvcCfg(flags, OutputStreamingTool=outputStreamingTool))
134 
135  if myStrategy == 'SharedQueue':
136  queue_provider = CompFactory.SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
137  IsPileup=mpevtloop.IsPileup,
138  EventsBeforeFork=mpevtloop.EventsBeforeFork,
139  ChunkSize=chunk_size)
140  mpevtloop.Tools += [ queue_provider ]
141 
142  if flags.Concurrency.NumThreads > 0:
143  if mpevtloop.IsPileup:
144  raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
145  from AthenaConfiguration.MainServicesConfig import AthenaMtesEventLoopMgrCfg
146  result.merge(AthenaMtesEventLoopMgrCfg(flags))
147  queue_consumer = CompFactory.SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
148  EventsBeforeFork=mpevtloop.EventsBeforeFork,
149  Debug=debug_worker)
150  else:
151  queue_consumer = CompFactory.SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
152  UseSharedWriter=use_shared_writer,
153  IsPileup=mpevtloop.IsPileup,
154  IsRoundRobin=(myStrategy=='RoundRobin'),
155  EventsBeforeFork=mpevtloop.EventsBeforeFork,
156  ReadEventOrders=flags.MP.ReadEventOrders,
157  EventOrdersFile=flags.MP.EventOrdersFile,
158  Debug=debug_worker)
159  mpevtloop.Tools += [ queue_consumer ]
160 
161  if use_shared_writer:
162  shared_writer = CompFactory.SharedWriterTool(MotherProcess=(mpevtloop.EventsBeforeFork>0),
163  IsPileup=mpevtloop.IsPileup,
164  Debug=debug_worker)
165  mpevtloop.Tools += [ shared_writer ]
166 
167  elif myStrategy=='EventService':
168  channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
169  channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
170 
171  mpevtloop.Tools += [ CompFactory.EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
172  EventRangeChannel = event_range_channel,
173  DoCaching=flags.MP.EvtRangeScattererCaching) ]
174  mpevtloop.Tools += [ CompFactory.EvtRangeProcessor(IsPileup=mpevtloop.IsPileup,
175  Channel2Scatterer = channelScatterer2Processor,
176  Channel2EvtSel = channelProcessor2EvtSel,
177  Debug=debug_worker) ]
178 
179  from AthenaServices.OutputStreamSequencerSvcConfig import OutputStreamSequencerSvcCfg
180  result.merge(OutputStreamSequencerSvcCfg(flags,incidentName="NextEventRange"))
181  else:
182  msg.warning("Unknown strategy %s. No MP tools will be configured", myStrategy)
183 
184  result.addService(mpevtloop, primary=True)
185 
186  return result
187 

◆ athenaMPRunArgsToFlags()

def python.AthenaMPConfig.athenaMPRunArgsToFlags (   runArgs,
  flags 
)
Fill MP configuration flags from run arguments.

Definition at line 14 of file AthenaMPConfig.py.

14 def athenaMPRunArgsToFlags(runArgs, flags):
15  """Fill MP configuration flags from run arguments."""
16  if hasattr(runArgs, "athenaMPWorkerTopDir"):
17  flags.MP.WorkerTopDir = runArgs.athenaMPWorkerTopDir
18 
19  if hasattr(runArgs, "athenaMPOutputReportFile"):
20  flags.MP.OutputReportFile = runArgs.athenaMPOutputReportFile
21 
22  if hasattr(runArgs, "athenaMPCollectSubprocessLogs"):
23  flags.MP.CollectSubprocessLogs = runArgs.athenaMPCollectSubprocessLogs
24 
25  if hasattr(runArgs, "athenaMPStrategy"):
26  flags.MP.Strategy = runArgs.athenaMPStrategy
27 
28  if hasattr(runArgs, "athenaMPReadEventOrders"):
29  flags.MP.ReadEventOrders = runArgs.athenaMPReadEventOrders
30 
31  if hasattr(runArgs, "athenaMPEventOrdersFile"):
32  flags.MP.EventOrdersFile = runArgs.athenaMPEventOrdersFile
33 
34  if hasattr(runArgs, "athenaMPEventsBeforeFork"):
35  flags.MP.EventsBeforeFork = runArgs.athenaMPEventsBeforeFork
36 
37  if hasattr(runArgs, "sharedWriter"):
38  flags.MP.UseSharedWriter = runArgs.sharedWriter
39 
40  if hasattr(runArgs, "sharedReader"):
41  flags.MP.UseSharedReader = runArgs.sharedReader
42 
43  if hasattr(runArgs, "parallelCompression"):
44  flags.MP.UseParallelCompression = runArgs.parallelCompression
45 
46  if hasattr(runArgs, "eventService"):
47  flags.MP.Strategy = "EventService"
48 

◆ getChunkSize()

int python.AthenaMPConfig.getChunkSize (   flags)

Definition at line 188 of file AthenaMPConfig.py.

188 def getChunkSize(flags) -> int:
189  chunk_size = 1
190  if flags.MP.ChunkSize > 0:
191  chunk_size = flags.MP.ChunkSize
192  msg.info('Chunk size set to %i', chunk_size)
193  else:
194  if '_ATHENA_GENERIC_INPUTFILE_NAME_' in flags.Input.Files:
195  msg.info('Running an input-less job. Setting Chunk Size to 1')
196  else:
197  md = GetFileMD(flags.Input.Files)
198  #Don't use auto flush for shared reader
199  if flags.MP.UseSharedReader:
200  msg.info('Shared Reader in use, chunk_size set to default (%i)', chunk_size)
201  #Use auto flush only if file is compressed with LZMA, else use default chunk_size
202  elif flags.MP.ChunkSize == -1:
203  if md.get('file_comp_alg',-1) == 2:
204  chunk_size = md.get('auto_flush',-1)
205  msg.info('Chunk size set to auto flush (%i)', chunk_size)
206  else:
207  msg.info('LZMA algorithm not in use, chunk_size set to default (%i)', chunk_size)
208  #Use auto flush only if file is compressed with LZMA or ZLIB, else use default chunk_size
209  elif flags.MP.ChunkSize == -2:
210  if md.get('file_comp_alg',-1) in [1,2]:
211  chunk_size = md.get('auto_flush',-1)
212  msg.info('Chunk size set to auto flush (%i)', chunk_size)
213  else:
214  msg.info('LZMA nor ZLIB in use, chunk_size set to default (%i)', chunk_size)
215  #Use auto flush only if file is compressed with LZMA, ZLIB or LZ4, else use default chunk_size
216  elif flags.MP.ChunkSize == -3:
217  if md.get('file_comp_alg',-1) in [1,2,4]:
218  chunk_size = md.get('auto_flush',-1)
219  msg.info('Chunk size set to auto flush (%i)', chunk_size)
220  else:
221  msg.info('LZMA, ZLIB nor LZ4 in use, chunk_size set to (%i)', chunk_size)
222  #Use auto flush value for chunk_size, regardless of compression algorithm
223  elif flags.MP.ChunkSize <= -4:
224  chunk_size = md.get('auto_flush',-1)
225  msg.info('Chunk size set to auto flush (%i)', chunk_size)
226  else:
227  msg.warning('Invalid ChunkSize, Chunk Size set to default (%i)', chunk_size)
228 
229  return chunk_size
230 
231 

Variable Documentation

◆ cfg

python.AthenaMPConfig.cfg = MainServicesCfg(flags)

Definition at line 241 of file AthenaMPConfig.py.

◆ Files

python.AthenaMPConfig.Files

Definition at line 237 of file AthenaMPConfig.py.

◆ flags

python.AthenaMPConfig.flags = initConfigFlags()

Definition at line 236 of file AthenaMPConfig.py.

◆ MaxEvents

python.AthenaMPConfig.MaxEvents

Definition at line 238 of file AthenaMPConfig.py.

◆ NumProcs

python.AthenaMPConfig.NumProcs

Definition at line 239 of file AthenaMPConfig.py.

python.AutoConfigFlags.GetFileMD
def GetFileMD(filenames, allowEmpty=True, maxLevel='peeker')
Definition: AutoConfigFlags.py:65
python.AthenaMPConfig.AthenaMPCfg
def AthenaMPCfg(flags)
Definition: AthenaMPConfig.py:49
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
python.ByteStreamConfig.ByteStreamReadCfg
def ByteStreamReadCfg(flags, type_names=None)
Definition: Event/ByteStreamCnvSvc/python/ByteStreamConfig.py:25
python.PoolCommonConfig.AthenaPoolSharedIOCnvSvcCfg
def AthenaPoolSharedIOCnvSvcCfg(flags, **kwargs)
Definition: PoolCommonConfig.py:30
python.AthenaMPConfig.getChunkSize
int getChunkSize(flags)
Definition: AthenaMPConfig.py:188
python.AthenaMPConfig.athenaMPRunArgsToFlags
def athenaMPRunArgsToFlags(runArgs, flags)
Definition: AthenaMPConfig.py:14
python.OutputStreamSequencerSvcConfig.OutputStreamSequencerSvcCfg
def OutputStreamSequencerSvcCfg(flags, incidentName='', reportingOn=False, replaceRangeMode=False)
Definition: OutputStreamSequencerSvcConfig.py:6
AthenaSharedMemoryTool
This class provides the IPCTool for SharedMemory objects.
Definition: AthenaSharedMemoryTool.h:34
python.PoolReadConfig.PoolReadCfg
def PoolReadCfg(flags)
Definition: PoolReadConfig.py:71
python.MainServicesConfig.AthenaMtesEventLoopMgrCfg
def AthenaMtesEventLoopMgrCfg(flags, mtEs=False, channel='')
Definition: MainServicesConfig.py:176