Loading [MathJax]/extensions/tex2jax.js
ATLAS Offline Software
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Functions | Variables
python.AthenaMPConfig Namespace Reference

Functions

def athenaMPRunArgsToFlags (runArgs, flags)
 
def AthenaMPCfg (flags)
 
int getChunkSize (flags)
 

Variables

 flags = initConfigFlags()
 
 Files
 
 MaxEvents
 
 NumProcs
 
 cfg = MainServicesCfg(flags)
 

Function Documentation

◆ AthenaMPCfg()

def python.AthenaMPConfig.AthenaMPCfg (   flags)

Definition at line 49 of file AthenaMPConfig.py.

49 def AthenaMPCfg(flags):
50 
51  os.putenv('XRD_ENABLEFORKHANDLERS','1')
52  os.putenv('XRD_RUNFORKHANDLER','1')
53 
54  result = ComponentAccumulator()
55 
56  # Configure MP Event Loop Manager
57  mpevtloop = CompFactory.AthMpEvtLoopMgr(EventPrintoutInterval = flags.Exec.EventPrintoutInterval)
58 
59  mpevtloop.NWorkers = flags.Concurrency.NumProcs
60  mpevtloop.Strategy = flags.MP.Strategy
61  mpevtloop.WorkerTopDir = flags.MP.WorkerTopDir
62  mpevtloop.OutputReportFile = flags.MP.OutputReportFile
63  mpevtloop.CollectSubprocessLogs = flags.MP.CollectSubprocessLogs
64  mpevtloop.PollingInterval = flags.MP.PollingInterval
65  mpevtloop.MemSamplingInterval = flags.MP.MemSamplingInterval
66  mpevtloop.IsPileup = flags.Common.ProductionStep in [ProductionStep.Digitization, ProductionStep.PileUpPresampling, ProductionStep.FastChain] and flags.Digitization.PileUp
67  mpevtloop.EventsBeforeFork = 0 if flags.MP.Strategy == 'EventService' else flags.MP.EventsBeforeFork
68 
69  # Configure Gaudi File Manager
70  filemgr = CompFactory.FileMgr(LogFile="FileManagerLog")
71  result.addService(filemgr)
72 
73  # Save PoolFileCatalog.xml if exists in the run directory
74  # The saved file will be copied over to workers' run directories just after forking
75  if os.path.isfile('PoolFileCatalog.xml'):
76  shutil.copyfile('PoolFileCatalog.xml','PoolFileCatalog.xml.AthenaMP-saved')
77 
78  # Compute event chunk size
79  chunk_size = getChunkSize(flags)
80  if chunk_size < 1:
81  msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
82  chunk_size = 1
83 
84  # Configure Strategy
85  debug_worker = flags.Concurrency.DebugWorkers
86  event_range_channel = flags.MP.EventRangeChannel
87  use_shared_reader = flags.MP.UseSharedReader
88  use_shared_writer = flags.MP.UseSharedWriter
89  unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
90 
91  if flags.MP.Strategy == 'SharedQueue' or flags.MP.Strategy == 'RoundRobin':
92  if use_shared_reader:
93  AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
94 
95  if flags.Input.Format is Format.BS:
96  evSel = CompFactory.EventSelectorByteStream("EventSelector")
97 
98  from ByteStreamCnvSvc.ByteStreamConfig import ByteStreamReadCfg
99  bscfg = ByteStreamReadCfg(flags)
100  result.merge(bscfg)
101  else:
102  evSel = CompFactory.EventSelectorAthenaPool("EventSelector")
103 
104  inputStreamingTool = AthenaSharedMemoryTool("InputStreamingTool",
105  SharedMemoryName=f"InputStream{unique_id}")
106 
107  from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
108  result.merge(PoolReadCfg(flags))
109  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
110  result.merge(AthenaPoolCnvSvcCfg(flags, InputStreamingTool=inputStreamingTool))
111 
112  evSel.SharedMemoryTool = AthenaSharedMemoryTool("EventStreamingTool",
113  SharedMemoryName=f"EventStream{unique_id}")
114  result.addService(evSel)
115 
116  if use_shared_writer:
117  if any((flags.Output.doWriteESD,
118  flags.Output.doWriteAOD,
119  flags.Output.doWriteDAOD,
120  flags.Output.doWriteRDO)) or flags.Output.HITSFileName!='':
121  AthenaSharedMemoryTool = CompFactory.AthenaSharedMemoryTool
122  outputStreamingTool = AthenaSharedMemoryTool("OutputStreamingTool",
123  SharedMemoryName=f"OutputStream{unique_id}")
124 
125  from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
126  result.merge(AthenaPoolCnvSvcCfg(flags, OutputStreamingTool=outputStreamingTool))
127 
128  if flags.MP.Strategy == 'SharedQueue':
129  queue_provider = CompFactory.SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
130  IsPileup=mpevtloop.IsPileup,
131  EventsBeforeFork=mpevtloop.EventsBeforeFork,
132  ChunkSize=chunk_size)
133 
134  if flags.Concurrency.NumThreads > 0:
135  if mpevtloop.IsPileup:
136  raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
137  from AthenaConfiguration.MainServicesConfig import AthenaMtesEventLoopMgrCfg
138  result.merge(AthenaMtesEventLoopMgrCfg(flags))
139  queue_consumer = CompFactory.SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
140  EventsBeforeFork=mpevtloop.EventsBeforeFork,
141  Debug=debug_worker)
142  else:
143  queue_consumer = CompFactory.SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
144  UseSharedWriter=use_shared_writer,
145  IsPileup=mpevtloop.IsPileup,
146  IsRoundRobin=(flags.MP.Strategy=='RoundRobin'),
147  EventsBeforeFork=mpevtloop.EventsBeforeFork,
148  ReadEventOrders=flags.MP.ReadEventOrders,
149  EventOrdersFile=flags.MP.EventOrdersFile,
150  Debug=debug_worker)
151  mpevtloop.Tools += [ queue_provider, queue_consumer ]
152 
153  if use_shared_writer:
154  shared_writer = CompFactory.SharedWriterTool(MotherProcess=(mpevtloop.EventsBeforeFork>0),
155  IsPileup=mpevtloop.IsPileup,
156  Debug=debug_worker)
157  mpevtloop.Tools += [ shared_writer ]
158 
159  elif flags.MP.Strategy=='EventService':
160  channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
161  channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
162 
163  mpevtloop.Tools += [ CompFactory.EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
164  EventRangeChannel = event_range_channel,
165  DoCaching=flags.MP.EvtRangeScattererCaching) ]
166  mpevtloop.Tools += [ CompFactory.EvtRangeProcessor(IsPileup=mpevtloop.IsPileup,
167  Channel2Scatterer = channelScatterer2Processor,
168  Channel2EvtSel = channelProcessor2EvtSel,
169  Debug=debug_worker) ]
170 
171  from AthenaServices.OutputStreamSequencerSvcConfig import OutputStreamSequencerSvcCfg
172  result.merge(OutputStreamSequencerSvcCfg(flags,incidentName="NextEventRange"))
173  else:
174  msg.warning("Unknown strategy %s. No MP tools will be configured", flags.MP.Strategy)
175 
176  result.addService(mpevtloop, primary=True)
177 
178  return result
179 

◆ athenaMPRunArgsToFlags()

def python.AthenaMPConfig.athenaMPRunArgsToFlags (   runArgs,
  flags 
)
Fill MP configuration flags from run arguments.

Definition at line 14 of file AthenaMPConfig.py.

14 def athenaMPRunArgsToFlags(runArgs, flags):
15  """Fill MP configuration flags from run arguments."""
16  if hasattr(runArgs, "athenaMPWorkerTopDir"):
17  flags.MP.WorkerTopDir = runArgs.athenaMPWorkerTopDir
18 
19  if hasattr(runArgs, "athenaMPOutputReportFile"):
20  flags.MP.OutputReportFile = runArgs.athenaMPOutputReportFile
21 
22  if hasattr(runArgs, "athenaMPCollectSubprocessLogs"):
23  flags.MP.CollectSubprocessLogs = runArgs.athenaMPCollectSubprocessLogs
24 
25  if hasattr(runArgs, "athenaMPStrategy"):
26  flags.MP.Strategy = runArgs.athenaMPStrategy
27 
28  if hasattr(runArgs, "athenaMPReadEventOrders"):
29  flags.MP.ReadEventOrders = runArgs.athenaMPReadEventOrders
30 
31  if hasattr(runArgs, "athenaMPEventOrdersFile"):
32  flags.MP.EventOrdersFile = runArgs.athenaMPEventOrdersFile
33 
34  if hasattr(runArgs, "athenaMPEventsBeforeFork"):
35  flags.MP.EventsBeforeFork = runArgs.athenaMPEventsBeforeFork
36 
37  if hasattr(runArgs, "sharedWriter"):
38  flags.MP.UseSharedWriter = runArgs.sharedWriter
39 
40  if hasattr(runArgs, "sharedReader"):
41  flags.MP.UseSharedReader = runArgs.sharedReader
42 
43  if hasattr(runArgs, "parallelCompression"):
44  flags.MP.UseParallelCompression = runArgs.parallelCompression
45 
46  if hasattr(runArgs, "eventService"):
47  flags.MP.Strategy = "EventService"
48 

◆ getChunkSize()

int python.AthenaMPConfig.getChunkSize (   flags)

Definition at line 180 of file AthenaMPConfig.py.

180 def getChunkSize(flags) -> int:
181  chunk_size = 1
182  if flags.MP.ChunkSize > 0:
183  chunk_size = flags.MP.ChunkSize
184  msg.info('Chunk size set to %i', chunk_size)
185  else:
186  if '_ATHENA_GENERIC_INPUTFILE_NAME_' in flags.Input.Files:
187  msg.info('Running an input-less job. Setting Chunk Size to 1')
188  else:
189  md = GetFileMD(flags.Input.Files)
190  #Don't use auto flush for shared reader
191  if flags.MP.UseSharedReader:
192  msg.info('Shared Reader in use, chunk_size set to default (%i)', chunk_size)
193  #Use auto flush only if file is compressed with LZMA, else use default chunk_size
194  elif flags.MP.ChunkSize == -1:
195  if md.get('file_comp_alg',-1) == 2:
196  chunk_size = md.get('auto_flush',-1)
197  msg.info('Chunk size set to auto flush (%i)', chunk_size)
198  else:
199  msg.info('LZMA algorithm not in use, chunk_size set to default (%i)', chunk_size)
200  #Use auto flush only if file is compressed with LZMA or ZLIB, else use default chunk_size
201  elif flags.MP.ChunkSize == -2:
202  if md.get('file_comp_alg',-1) in [1,2]:
203  chunk_size = md.get('auto_flush',-1)
204  msg.info('Chunk size set to auto flush (%i)', chunk_size)
205  else:
206  msg.info('LZMA nor ZLIB in use, chunk_size set to default (%i)', chunk_size)
207  #Use auto flush only if file is compressed with LZMA, ZLIB or LZ4, else use default chunk_size
208  elif flags.MP.ChunkSize == -3:
209  if md.get('file_comp_alg',-1) in [1,2,4]:
210  chunk_size = md.get('auto_flush',-1)
211  msg.info('Chunk size set to auto flush (%i)', chunk_size)
212  else:
213  msg.info('LZMA, ZLIB nor LZ4 in use, chunk_size set to (%i)', chunk_size)
214  #Use auto flush value for chunk_size, regardless of compression algorithm
215  elif flags.MP.ChunkSize <= -4:
216  chunk_size = md.get('auto_flush',-1)
217  msg.info('Chunk size set to auto flush (%i)', chunk_size)
218  else:
219  msg.warning('Invalid ChunkSize, Chunk Size set to default (%i)', chunk_size)
220 
221  return chunk_size
222 
223 

Variable Documentation

◆ cfg

python.AthenaMPConfig.cfg = MainServicesCfg(flags)

Definition at line 233 of file AthenaMPConfig.py.

◆ Files

python.AthenaMPConfig.Files

Definition at line 229 of file AthenaMPConfig.py.

◆ flags

python.AthenaMPConfig.flags = initConfigFlags()

Definition at line 228 of file AthenaMPConfig.py.

◆ MaxEvents

python.AthenaMPConfig.MaxEvents

Definition at line 230 of file AthenaMPConfig.py.

◆ NumProcs

python.AthenaMPConfig.NumProcs

Definition at line 231 of file AthenaMPConfig.py.

python.AutoConfigFlags.GetFileMD
def GetFileMD(filenames, allowEmpty=True, maxLevel='peeker')
Definition: AutoConfigFlags.py:65
python.OutputStreamSequencerSvcConfig.OutputStreamSequencerSvcCfg
def OutputStreamSequencerSvcCfg(flags, incidentName='', reportingOn=False)
Definition: OutputStreamSequencerSvcConfig.py:6
python.AthenaMPConfig.AthenaMPCfg
def AthenaMPCfg(flags)
Definition: AthenaMPConfig.py:49
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
python.ByteStreamConfig.ByteStreamReadCfg
def ByteStreamReadCfg(flags, type_names=None)
Definition: Event/ByteStreamCnvSvc/python/ByteStreamConfig.py:25
python.PoolCommonConfig.AthenaPoolCnvSvcCfg
def AthenaPoolCnvSvcCfg(flags, **kwargs)
Definition: PoolCommonConfig.py:30
python.AthenaMPConfig.getChunkSize
int getChunkSize(flags)
Definition: AthenaMPConfig.py:180
python.AthenaMPConfig.athenaMPRunArgsToFlags
def athenaMPRunArgsToFlags(runArgs, flags)
Definition: AthenaMPConfig.py:14
AthenaSharedMemoryTool
This class provides the IPCTool for SharedMemory objects.
Definition: AthenaSharedMemoryTool.h:34
python.PoolReadConfig.PoolReadCfg
def PoolReadCfg(flags)
Definition: PoolReadConfig.py:69
python.MainServicesConfig.AthenaMtesEventLoopMgrCfg
def AthenaMtesEventLoopMgrCfg(flags, mtEs=False, channel='')
Definition: MainServicesConfig.py:124