ATLAS Offline Software
Control/AthenaMP/python/PyComps.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2 
3 #-----Python imports---#
4 import os, sys, shutil, uuid
5 
6 #-----Athena imports---#
7 from AthenaCommon.Logging import log as msg
8 
9 from AthenaMP.AthenaMPConf import AthMpEvtLoopMgr
10 class MpEvtLoopMgr(AthMpEvtLoopMgr):
11  def __init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw):
12 
13  from AthenaCommon.AppMgr import theApp
14  self.nThreads = theApp._opts.threads
15 
16 
17  kw['name'] = name
18  super(MpEvtLoopMgr, self).__init__(**kw)
19 
20  os.putenv('XRD_ENABLEFORKHANDLERS','1')
21  os.putenv('XRD_RUNFORKHANDLER','1')
22 
23  from .AthenaMPFlags import jobproperties as jp
24  self.WorkerTopDir = jp.AthenaMPFlags.WorkerTopDir()
25  self.OutputReportFile = jp.AthenaMPFlags.OutputReportFile()
26  self.CollectSubprocessLogs = jp.AthenaMPFlags.CollectSubprocessLogs()
27  self.Strategy = jp.AthenaMPFlags.Strategy()
28  self.PollingInterval = jp.AthenaMPFlags.PollingInterval()
29  self.MemSamplingInterval = jp.AthenaMPFlags.MemSamplingInterval()
30  self.EventsBeforeFork = jp.AthenaMPFlags.EventsBeforeFork()
31  self.IsPileup = isPileup
32 
33  if self.Strategy=='EventService':
34  self.EventsBeforeFork = 0
35 
36  from AthenaCommon.AppMgr import theApp as app
37  app.EventLoop = self.getFullJobOptName()
38 
39  # Enable FileMgr logging
40  from GaudiSvc.GaudiSvcConf import FileMgr
41  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
42  svcMgr+=FileMgr(LogFile="FileManagerLog")
43 
44  # Save PoolFileCatalog.xml if exists in the run directory
45  if os.path.isfile('PoolFileCatalog.xml'):
46  shutil.copyfile('PoolFileCatalog.xml','PoolFileCatalog.xml.AthenaMP-saved')
47 
49 
50  def configureStrategy(self,strategy,pileup,events_before_fork) -> None :
51  from .AthenaMPFlags import jobproperties as jp
52  import AthenaCommon.ConcurrencyFlags # noqa: F401
53  event_range_channel = jp.AthenaMPFlags.EventRangeChannel()
54 
55  chunk_size = getChunkSize()
56  if chunk_size < 1:
57  msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
58  chunk_size = 1
59 
60  debug_worker = jp.ConcurrencyFlags.DebugWorkers()
61  use_shared_reader = jp.AthenaMPFlags.UseSharedReader()
62  use_shared_writer = jp.AthenaMPFlags.UseSharedWriter()
63  use_parallel_compression = jp.AthenaMPFlags.UseParallelCompression()
64  unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
65 
66  # For e.g. event generation, if we use SharedQueue the job does not complete correctly
67  if strategy == 'SharedQueue':
68  from AthenaCommon.AthenaCommonFlags import jobproperties as ajp
69  if (not ajp.AthenaCommonFlags.FilesInput.statusOn) or ajp.AthenaCommonFlags.FilesInput == []:
70  msg.info('MP strategy "SharedQueue" will not work without input files when maxEvents=-1. Switching to "RoundRobin" just in case')
71  strategy = 'RoundRobin'
72 
73  if strategy=='SharedQueue' or strategy=='RoundRobin':
74  if use_shared_reader:
75  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
76  svcMgr.PoolSvc.MaxFilesOpen = 2
77  from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
78  svcMgr.EventSelector.SharedMemoryTool = AthenaSharedMemoryTool("EventStreamingTool", SharedMemoryName=f"EventStream{unique_id}")
79  if 'AthenaPoolCnvSvc.ReadAthenaPool' in sys.modules:
80  svcMgr.AthenaPoolCnvSvc.InputStreamingTool = AthenaSharedMemoryTool("InputStreamingTool", SharedMemoryName=f"InputStream{unique_id}")
81  if use_shared_writer:
82  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
83  if 'AthenaPoolCnvSvc.WriteAthenaPool' in sys.modules:
84  from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
85  svcMgr.AthenaPoolCnvSvc.OutputStreamingTool = AthenaSharedMemoryTool("OutputStreamingTool", SharedMemoryName=f"OutputStream{unique_id}")
86  svcMgr.AthenaPoolCnvSvc.ParallelCompression=use_parallel_compression
87 
88  if strategy=='SharedQueue':
89  from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueProvider
90  self.Tools += [ SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
91  IsPileup=pileup,
92  EventsBeforeFork=events_before_fork,
93  ChunkSize=chunk_size) ]
94 
95  # In pure MP, self.nThreads may be set to None - we want the pure MP setup in that case
96  if self.nThreads is not None and self.nThreads >= 1:
97  if(pileup):
98  raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
99  from AthenaMPTools.AthenaMPToolsConf import SharedHiveEvtQueueConsumer
100  self.Tools += [ SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
101  EventsBeforeFork=events_before_fork,
102  Debug=debug_worker) ]
103  else:
104  from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueConsumer
105  self.Tools += [ SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
106  UseSharedWriter=use_shared_writer,
107  IsPileup=pileup,
108  IsRoundRobin=(strategy=='RoundRobin'),
109  EventsBeforeFork=events_before_fork,
110  ReadEventOrders=jp.AthenaMPFlags.ReadEventOrders(),
111  EventOrdersFile=jp.AthenaMPFlags.EventOrdersFile(),
112  Debug=debug_worker) ]
113  if use_shared_writer:
114  from AthenaMPTools.AthenaMPToolsConf import SharedWriterTool
115  self.Tools += [ SharedWriterTool(MotherProcess=(events_before_fork>0),
116  IsPileup=pileup,
117  Debug=debug_worker) ]
118  elif strategy=='EventService':
119  channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
120  channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
121 
122  from AthenaMPTools.AthenaMPToolsConf import EvtRangeScatterer
123  self.Tools += [ EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
124  EventRangeChannel = event_range_channel,
125  DoCaching=jp.AthenaMPFlags.EvtRangeScattererCaching()) ]
126 
127  from AthenaMPTools.AthenaMPToolsConf import EvtRangeProcessor
128  self.Tools += [ EvtRangeProcessor(IsPileup=pileup,
129  Channel2Scatterer = channelScatterer2Processor,
130  Channel2EvtSel = channelProcessor2EvtSel,
131  Debug=debug_worker) ]
132  else:
133  msg.warning("Unknown strategy. No MP tools will be configured")
134 
135 def getChunkSize() -> int :
136  from .AthenaMPFlags import jobproperties as jp
137  from PyUtils.MetaReaderPeeker import metadata
138  chunk_size = 1
139  # In jobs without input (e.g. event generation), metadata is an empty dictionary
140  if (jp.AthenaMPFlags.ChunkSize() > 0):
141  chunk_size = jp.AthenaMPFlags.ChunkSize()
142  msg.info('Chunk size set to %i', chunk_size)
143  elif 'file_size' in metadata and metadata['file_size'] is not None:
144  #Don't use auto flush for shared reader
145  if (jp.AthenaMPFlags.UseSharedReader()):
146  msg.info('Shared Reader in use, chunk_size set to default (%i)', chunk_size)
147  #Use auto flush only if file is compressed with LZMA, else use default chunk_size
148  elif (jp.AthenaMPFlags.ChunkSize() == -1):
149  if (metadata['file_comp_alg'] == 2):
150  chunk_size = metadata['auto_flush']
151  msg.info('Chunk size set to auto flush (%i)', chunk_size)
152  else:
153  msg.info('LZMA algorithm not in use, chunk_size set to default (%i)', chunk_size)
154  #Use auto flush only if file is compressed with LZMA or ZLIB, else use default chunk_size
155  elif (jp.AthenaMPFlags.ChunkSize() == -2):
156  if (metadata['file_comp_alg'] == 1 or metadata['file_comp_alg'] == 2):
157  chunk_size = metadata['auto_flush']
158  msg.info('Chunk size set to auto flush (%i)', chunk_size)
159  else:
160  msg.info('LZMA nor ZLIB in use, chunk_size set to default (%i)', chunk_size)
161  #Use auto flush only if file is compressed with LZMA, ZLIB or LZ4, else use default chunk_size
162  elif (jp.AthenaMPFlags.ChunkSize() == -3):
163  if (metadata['file_comp_alg'] == 1 or metadata['file_comp_alg'] == 2 or metadata['file_comp_alg'] == 4):
164  chunk_size = metadata['auto_flush']
165  msg.info('Chunk size set to auto flush (%i)', chunk_size)
166  else:
167  msg.info('LZMA, ZLIB nor LZ4 in use, chunk_size set to (%i)', chunk_size)
168  #Use auto flush value for chunk_size, regarldess of compression algorithm
169  elif (jp.AthenaMPFlags.ChunkSize() <= -4):
170  chunk_size = metadata['auto_flush']
171  msg.info('Chunk size set to auto flush (%i)', chunk_size)
172  else:
173  msg.warning('Invalid ChunkSize, Chunk Size set to default (%i)', chunk_size)
174 
175  return chunk_size
SharedWriterTool
Definition: SharedWriterTool.h:13
python.PyComps.MpEvtLoopMgr.IsPileup
IsPileup
Definition: Control/AthenaMP/python/PyComps.py:31
python.PyComps.MpEvtLoopMgr
Definition: Control/AthenaMP/python/PyComps.py:10
python.PyComps.MpEvtLoopMgr.Strategy
Strategy
Definition: Control/AthenaMP/python/PyComps.py:27
python.PyComps.MpEvtLoopMgr.EventsBeforeFork
EventsBeforeFork
Definition: Control/AthenaMP/python/PyComps.py:30
python.PyComps.MpEvtLoopMgr.WorkerTopDir
WorkerTopDir
init base class
Definition: Control/AthenaMP/python/PyComps.py:24
python.PyComps.MpEvtLoopMgr.configureStrategy
None configureStrategy(self, strategy, pileup, events_before_fork)
Definition: Control/AthenaMP/python/PyComps.py:50
python.PyComps.MpEvtLoopMgr.CollectSubprocessLogs
CollectSubprocessLogs
Definition: Control/AthenaMP/python/PyComps.py:26
python.PyComps.getChunkSize
int getChunkSize()
Definition: Control/AthenaMP/python/PyComps.py:135
SharedEvtQueueConsumer
Definition: SharedEvtQueueConsumer.h:24
python.PyComps.MpEvtLoopMgr.MemSamplingInterval
MemSamplingInterval
Definition: Control/AthenaMP/python/PyComps.py:29
EvtRangeProcessor
Definition: EvtRangeProcessor.h:26
python.PyComps.MpEvtLoopMgr.__init__
def __init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw)
Definition: Control/AthenaMP/python/PyComps.py:11
if
if(febId1==febId2)
Definition: LArRodBlockPhysicsV0.cxx:567
SharedEvtQueueProvider
Definition: SharedEvtQueueProvider.h:15
AthenaSharedMemoryTool
This class provides the IPCTool for SharedMemory objects.
Definition: AthenaSharedMemoryTool.h:34
python.PyComps.MpEvtLoopMgr.PollingInterval
PollingInterval
Definition: Control/AthenaMP/python/PyComps.py:28
EvtRangeScatterer
Definition: EvtRangeScatterer.h:19
python.PyComps.MpEvtLoopMgr.OutputReportFile
OutputReportFile
Definition: Control/AthenaMP/python/PyComps.py:25
SharedHiveEvtQueueConsumer
Definition: SharedHiveEvtQueueConsumer.h:23
python.PyComps.MpEvtLoopMgr.nThreads
nThreads
Definition: Control/AthenaMP/python/PyComps.py:14