ATLAS Offline Software
Control/AthenaMP/python/PyComps.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 #-----Python imports---#
4 import os, sys, shutil, uuid
5 
6 #-----Athena imports---#
7 from AthenaCommon.Logging import log as msg
8 
9 from AthenaMP.AthenaMPConf import AthMpEvtLoopMgr
10 class MpEvtLoopMgr(AthMpEvtLoopMgr):
11  def __init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw):
12 
13  from AthenaCommon.AppMgr import theApp
14  self.nThreads = theApp._opts.threads
15 
16 
17  kw['name'] = name
18  super(MpEvtLoopMgr, self).__init__(**kw)
19 
20  os.putenv('XRD_ENABLEFORKHANDLERS','1')
21  os.putenv('XRD_RUNFORKHANDLER','1')
22 
23  from .AthenaMPFlags import jobproperties as jp
24  self.WorkerTopDir = jp.AthenaMPFlags.WorkerTopDir()
25  self.OutputReportFile = jp.AthenaMPFlags.OutputReportFile()
26  self.CollectSubprocessLogs = jp.AthenaMPFlags.CollectSubprocessLogs()
27  self.Strategy = jp.AthenaMPFlags.Strategy()
28  self.PollingInterval = jp.AthenaMPFlags.PollingInterval()
29  self.MemSamplingInterval = jp.AthenaMPFlags.MemSamplingInterval()
30  self.EventsBeforeFork = jp.AthenaMPFlags.EventsBeforeFork()
31  self.IsPileup = isPileup
32 
33  if self.Strategy=='EventService':
34  self.EventsBeforeFork = 0
35 
36  from AthenaCommon.AppMgr import theApp as app
37  app.EventLoop = self.getFullJobOptName()
38 
39  # Enable FileMgr logging
40  from GaudiSvc.GaudiSvcConf import FileMgr
41  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
42  svcMgr+=FileMgr(LogFile="FileManagerLog")
43 
44  # Save PoolFileCatalog.xml if exists in the run directory
45  if os.path.isfile('PoolFileCatalog.xml'):
46  shutil.copyfile('PoolFileCatalog.xml','PoolFileCatalog.xml.AthenaMP-saved')
47 
49 
50  def configureStrategy(self,strategy,pileup,events_before_fork) -> None :
51  from .AthenaMPFlags import jobproperties as jp
52  import AthenaCommon.ConcurrencyFlags # noqa: F401
53  event_range_channel = jp.AthenaMPFlags.EventRangeChannel()
54 
55  chunk_size = getChunkSize()
56  if chunk_size < 1:
57  msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
58  chunk_size = 1
59 
60  debug_worker = jp.ConcurrencyFlags.DebugWorkers()
61  use_shared_reader = jp.AthenaMPFlags.UseSharedReader()
62  use_shared_writer = jp.AthenaMPFlags.UseSharedWriter()
63  use_parallel_compression = jp.AthenaMPFlags.UseParallelCompression()
64  unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
65 
66  if strategy=='SharedQueue' or strategy=='RoundRobin':
67  if use_shared_reader:
68  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
69  svcMgr.PoolSvc.MaxFilesOpen = 2
70  from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
71  svcMgr.EventSelector.SharedMemoryTool = AthenaSharedMemoryTool("EventStreamingTool", SharedMemoryName=f"EventStream{unique_id}")
72  if 'AthenaPoolCnvSvc.ReadAthenaPool' in sys.modules:
73  svcMgr.AthenaPoolCnvSvc.InputStreamingTool = AthenaSharedMemoryTool("InputStreamingTool", SharedMemoryName=f"InputStream{unique_id}")
74  if use_shared_writer:
75  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
76  if 'AthenaPoolCnvSvc.WriteAthenaPool' in sys.modules:
77  from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
78  svcMgr.AthenaPoolCnvSvc.OutputStreamingTool = AthenaSharedMemoryTool("OutputStreamingTool", SharedMemoryName=f"OutputStream{unique_id}")
79  svcMgr.AthenaPoolCnvSvc.ParallelCompression=use_parallel_compression
80 
81  from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueProvider
82  self.Tools += [ SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
83  IsPileup=pileup,
84  EventsBeforeFork=events_before_fork,
85  ChunkSize=chunk_size) ]
86 
87  if (self.nThreads >= 1):
88  if(pileup):
89  raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
90  from AthenaMPTools.AthenaMPToolsConf import SharedHiveEvtQueueConsumer
91  self.Tools += [ SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
92  EventsBeforeFork=events_before_fork,
93  Debug=debug_worker) ]
94  else:
95  from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueConsumer
96  self.Tools += [ SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
97  UseSharedWriter=use_shared_writer,
98  IsPileup=pileup,
99  IsRoundRobin=(strategy=='RoundRobin'),
100  EventsBeforeFork=events_before_fork,
101  ReadEventOrders=jp.AthenaMPFlags.ReadEventOrders(),
102  EventOrdersFile=jp.AthenaMPFlags.EventOrdersFile(),
103  Debug=debug_worker) ]
104  if use_shared_writer:
105  from AthenaMPTools.AthenaMPToolsConf import SharedWriterTool
106  self.Tools += [ SharedWriterTool(MotherProcess=(events_before_fork>0),
107  IsPileup=pileup,
108  Debug=debug_worker) ]
109 
110  # Enable seeking
111  if not use_shared_reader:
113 
114  elif strategy=='EventService':
115  channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
116  channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
117 
118  from AthenaMPTools.AthenaMPToolsConf import EvtRangeScatterer
119  self.Tools += [ EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
120  EventRangeChannel = event_range_channel,
121  DoCaching=jp.AthenaMPFlags.EvtRangeScattererCaching()) ]
122 
123  from AthenaMPTools.AthenaMPToolsConf import EvtRangeProcessor
124  self.Tools += [ EvtRangeProcessor(IsPileup=pileup,
125  Channel2Scatterer = channelScatterer2Processor,
126  Channel2EvtSel = channelProcessor2EvtSel,
127  Debug=debug_worker) ]
128  # Enable seeking
130 
131  else:
132  msg.warning("Unknown strategy. No MP tools will be configured")
133 
134 def setupEvtSelForSeekOps() -> None:
135  """ try to install seek-stuff on the EventSelector side """
136  #import sys
137  #from AthenaCommon.Logging import log as msg
138  msg.debug("setupEvtSelForSeekOps:")
139  if 'AthenaRootComps.ReadAthenaRoot' in sys.modules:
140  # athenarootcomps has seeking enabled by default
141  msg.info('=> Seeking enabled.')
142  return
143 
144  if 'AthenaPoolCnvSvc.ReadAthenaPool' not in sys.modules:
145 
146  msg.info( "Cannot enable 'seeking' b/c module "
147  "[AthenaPoolCnvSvc.ReadAthenaPool] hasn't been imported..." )
148  msg.info( "Modify your jobOptions to import that module "
149  "(or just ignore this message)" )
150  return
151 
152  from AthenaCommon.AppMgr import theApp, AthAppMgr
153  if theApp.state() != AthAppMgr.State.OFFLINE:
154  msg.info( "C++ ApplicationMgr already instantiated, probably seeking "
155  "will be ill-configured..." )
156  msg.info( "EventSelector writers should implement updateHandlers" )
157 
158  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
159  from AthenaCommon.Configurable import Configurable
160  collectionType = svcMgr.EventSelector.properties()["CollectionType"]
161 
162  if collectionType in ( "ImplicitROOT", Configurable.propertyNoValue, ):
163  msg.info ( "=> Seeking enabled." )
164 
165  else:
166  msg.warning( "Input seeking is not compatible with collection type of %s",
167  svcMgr.EventSelector.properties()["CollectionType"] )
168  msg.warning( "=> Seeking disabled." )
169  return
170 
171 def getChunkSize() -> int :
172  from .AthenaMPFlags import jobproperties as jp
173  from PyUtils.MetaReaderPeeker import metadata
174  chunk_size = 1
175  if (jp.AthenaMPFlags.ChunkSize() > 0):
176  chunk_size = jp.AthenaMPFlags.ChunkSize()
177  msg.info('Chunk size set to %i', chunk_size)
178  elif metadata['file_size'] is not None:
179  #Don't use auto flush for shared reader
180  if (jp.AthenaMPFlags.UseSharedReader()):
181  msg.info('Shared Reader in use, chunk_size set to default (%i)', chunk_size)
182  #Use auto flush only if file is compressed with LZMA, else use default chunk_size
183  elif (jp.AthenaMPFlags.ChunkSize() == -1):
184  if (metadata['file_comp_alg'] == 2):
185  chunk_size = metadata['auto_flush']
186  msg.info('Chunk size set to auto flush (%i)', chunk_size)
187  else:
188  msg.info('LZMA algorithm not in use, chunk_size set to default (%i)', chunk_size)
189  #Use auto flush only if file is compressed with LZMA or ZLIB, else use default chunk_size
190  elif (jp.AthenaMPFlags.ChunkSize() == -2):
191  if (metadata['file_comp_alg'] == 1 or metadata['file_comp_alg'] == 2):
192  chunk_size = metadata['auto_flush']
193  msg.info('Chunk size set to auto flush (%i)', chunk_size)
194  else:
195  msg.info('LZMA nor ZLIB in use, chunk_size set to default (%i)', chunk_size)
196  #Use auto flush only if file is compressed with LZMA, ZLIB or LZ4, else use default chunk_size
197  elif (jp.AthenaMPFlags.ChunkSize() == -3):
198  if (metadata['file_comp_alg'] == 1 or metadata['file_comp_alg'] == 2 or metadata['file_comp_alg'] == 4):
199  chunk_size = metadata['auto_flush']
200  msg.info('Chunk size set to auto flush (%i)', chunk_size)
201  else:
202  msg.info('LZMA, ZLIB nor LZ4 in use, chunk_size set to (%i)', chunk_size)
203  #Use auto flush value for chunk_size, regarldess of compression algorithm
204  elif (jp.AthenaMPFlags.ChunkSize() <= -4):
205  chunk_size = metadata['auto_flush']
206  msg.info('Chunk size set to auto flush (%i)', chunk_size)
207  else:
208  msg.warning('Invalid ChunkSize, Chunk Size set to default (%i)', chunk_size)
209 
210  return chunk_size
SharedWriterTool
Definition: SharedWriterTool.h:13
python.PyComps.MpEvtLoopMgr.IsPileup
IsPileup
Definition: Control/AthenaMP/python/PyComps.py:31
python.PyComps.MpEvtLoopMgr
Definition: Control/AthenaMP/python/PyComps.py:10
python.PyComps.MpEvtLoopMgr.Strategy
Strategy
Definition: Control/AthenaMP/python/PyComps.py:27
python.PyComps.MpEvtLoopMgr.EventsBeforeFork
EventsBeforeFork
Definition: Control/AthenaMP/python/PyComps.py:30
python.PyComps.MpEvtLoopMgr.WorkerTopDir
WorkerTopDir
init base class
Definition: Control/AthenaMP/python/PyComps.py:24
python.PyComps.MpEvtLoopMgr.configureStrategy
None configureStrategy(self, strategy, pileup, events_before_fork)
Definition: Control/AthenaMP/python/PyComps.py:50
python.PyComps.MpEvtLoopMgr.CollectSubprocessLogs
CollectSubprocessLogs
Definition: Control/AthenaMP/python/PyComps.py:26
python.PyComps.getChunkSize
int getChunkSize()
Definition: Control/AthenaMP/python/PyComps.py:171
Configurable
athena/gaudi ----------------------------------------------------------—
SharedEvtQueueConsumer
Definition: SharedEvtQueueConsumer.h:22
python.PyComps.setupEvtSelForSeekOps
None setupEvtSelForSeekOps()
Definition: Control/AthenaMP/python/PyComps.py:134
python.PyComps.MpEvtLoopMgr.MemSamplingInterval
MemSamplingInterval
Definition: Control/AthenaMP/python/PyComps.py:29
EvtRangeProcessor
Definition: EvtRangeProcessor.h:25
python.PyComps.MpEvtLoopMgr.__init__
def __init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw)
Definition: Control/AthenaMP/python/PyComps.py:11
if
if(febId1==febId2)
Definition: LArRodBlockPhysicsV0.cxx:569
SharedEvtQueueProvider
Definition: SharedEvtQueueProvider.h:16
AthenaSharedMemoryTool
This class provides the IPCTool for SharedMemory objects.
Definition: AthenaSharedMemoryTool.h:33
python.PyComps.MpEvtLoopMgr.PollingInterval
PollingInterval
Definition: Control/AthenaMP/python/PyComps.py:28
EvtRangeScatterer
Definition: EvtRangeScatterer.h:19
python.PyComps.MpEvtLoopMgr.OutputReportFile
OutputReportFile
Definition: Control/AthenaMP/python/PyComps.py:25
SharedHiveEvtQueueConsumer
Definition: SharedHiveEvtQueueConsumer.h:20
python.PyComps.MpEvtLoopMgr.nThreads
nThreads
Definition: Control/AthenaMP/python/PyComps.py:14