ATLAS Offline Software
Control/AthenaMP/python/PyComps.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 #-----Python imports---#
4 import os, sys, shutil, uuid
5 
6 #-----Athena imports---#
7 from AthenaCommon.Logging import log as msg
8 
9 from AthenaMP.AthenaMPConf import AthMpEvtLoopMgr
10 class MpEvtLoopMgr(AthMpEvtLoopMgr):
11  def __init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw):
12 
13  from AthenaCommon.AppMgr import theApp
14  self.nThreads = theApp._opts.threads
15 
16 
17  kw['name'] = name
18  super(MpEvtLoopMgr, self).__init__(**kw)
19 
20  os.putenv('XRD_ENABLEFORKHANDLERS','1')
21  os.putenv('XRD_RUNFORKHANDLER','1')
22 
23  from .AthenaMPFlags import jobproperties as jp
24  self.WorkerTopDir = jp.AthenaMPFlags.WorkerTopDir()
25  self.OutputReportFile = jp.AthenaMPFlags.OutputReportFile()
26  self.CollectSubprocessLogs = jp.AthenaMPFlags.CollectSubprocessLogs()
27  self.Strategy = jp.AthenaMPFlags.Strategy()
28  self.PollingInterval = jp.AthenaMPFlags.PollingInterval()
29  self.MemSamplingInterval = jp.AthenaMPFlags.MemSamplingInterval()
30  self.EventsBeforeFork = jp.AthenaMPFlags.EventsBeforeFork()
31  self.IsPileup = isPileup
32 
33  if self.Strategy=='EventService':
34  self.EventsBeforeFork = 0
35 
36  from AthenaCommon.AppMgr import theApp as app
37  app.EventLoop = self.getFullJobOptName()
38 
39  # Enable FileMgr logging
40  from GaudiSvc.GaudiSvcConf import FileMgr
41  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
42  svcMgr+=FileMgr(LogFile="FileManagerLog")
43 
44  # Save PoolFileCatalog.xml if exists in the run directory
45  if os.path.isfile('PoolFileCatalog.xml'):
46  shutil.copyfile('PoolFileCatalog.xml','PoolFileCatalog.xml.AthenaMP-saved')
47 
49 
50  def configureStrategy(self,strategy,pileup,events_before_fork) -> None :
51  from .AthenaMPFlags import jobproperties as jp
52  import AthenaCommon.ConcurrencyFlags # noqa: F401
53  event_range_channel = jp.AthenaMPFlags.EventRangeChannel()
54 
55  chunk_size = getChunkSize()
56  if chunk_size < 1:
57  msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
58  chunk_size = 1
59 
60  debug_worker = jp.ConcurrencyFlags.DebugWorkers()
61  use_shared_reader = jp.AthenaMPFlags.UseSharedReader()
62  use_shared_writer = jp.AthenaMPFlags.UseSharedWriter()
63  use_parallel_compression = jp.AthenaMPFlags.UseParallelCompression()
64  unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
65 
66  if strategy=='SharedQueue' or strategy=='RoundRobin':
67  if use_shared_reader:
68  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
69  svcMgr.PoolSvc.MaxFilesOpen = 2
70  from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
71  svcMgr.EventSelector.SharedMemoryTool = AthenaSharedMemoryTool("EventStreamingTool", SharedMemoryName=f"EventStream{unique_id}")
72  if 'AthenaPoolCnvSvc.ReadAthenaPool' in sys.modules:
73  svcMgr.AthenaPoolCnvSvc.InputStreamingTool = AthenaSharedMemoryTool("InputStreamingTool", SharedMemoryName=f"InputStream{unique_id}")
74  if use_shared_writer:
75  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
76  if 'AthenaPoolCnvSvc.WriteAthenaPool' in sys.modules:
77  from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
78  svcMgr.AthenaPoolCnvSvc.OutputStreamingTool = AthenaSharedMemoryTool("OutputStreamingTool", SharedMemoryName=f"OutputStream{unique_id}")
79  svcMgr.AthenaPoolCnvSvc.ParallelCompression=use_parallel_compression
80 
81  from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueProvider
82  self.Tools += [ SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
83  IsPileup=pileup,
84  EventsBeforeFork=events_before_fork,
85  ChunkSize=chunk_size) ]
86 
87  # In pure MP, self.nThreads may be set to None - we want the pure MP setup in that case
88  if self.nThreads is not None and self.nThreads >= 1:
89  if(pileup):
90  raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
91  from AthenaMPTools.AthenaMPToolsConf import SharedHiveEvtQueueConsumer
92  self.Tools += [ SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
93  EventsBeforeFork=events_before_fork,
94  Debug=debug_worker) ]
95  else:
96  from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueConsumer
97  self.Tools += [ SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
98  UseSharedWriter=use_shared_writer,
99  IsPileup=pileup,
100  IsRoundRobin=(strategy=='RoundRobin'),
101  EventsBeforeFork=events_before_fork,
102  ReadEventOrders=jp.AthenaMPFlags.ReadEventOrders(),
103  EventOrdersFile=jp.AthenaMPFlags.EventOrdersFile(),
104  Debug=debug_worker) ]
105  if use_shared_writer:
106  from AthenaMPTools.AthenaMPToolsConf import SharedWriterTool
107  self.Tools += [ SharedWriterTool(MotherProcess=(events_before_fork>0),
108  IsPileup=pileup,
109  Debug=debug_worker) ]
110 
111  # Enable seeking
112  if not use_shared_reader:
114 
115  elif strategy=='EventService':
116  channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
117  channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
118 
119  from AthenaMPTools.AthenaMPToolsConf import EvtRangeScatterer
120  self.Tools += [ EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
121  EventRangeChannel = event_range_channel,
122  DoCaching=jp.AthenaMPFlags.EvtRangeScattererCaching()) ]
123 
124  from AthenaMPTools.AthenaMPToolsConf import EvtRangeProcessor
125  self.Tools += [ EvtRangeProcessor(IsPileup=pileup,
126  Channel2Scatterer = channelScatterer2Processor,
127  Channel2EvtSel = channelProcessor2EvtSel,
128  Debug=debug_worker) ]
129  # Enable seeking
131 
132  else:
133  msg.warning("Unknown strategy. No MP tools will be configured")
134 
135 def setupEvtSelForSeekOps() -> None:
136  """ try to install seek-stuff on the EventSelector side """
137  #import sys
138  #from AthenaCommon.Logging import log as msg
139  msg.debug("setupEvtSelForSeekOps:")
140  if 'AthenaRootComps.ReadAthenaRoot' in sys.modules:
141  # athenarootcomps has seeking enabled by default
142  msg.info('=> Seeking enabled.')
143  return
144 
145  if 'AthenaPoolCnvSvc.ReadAthenaPool' not in sys.modules:
146 
147  msg.info( "Cannot enable 'seeking' b/c module "
148  "[AthenaPoolCnvSvc.ReadAthenaPool] hasn't been imported..." )
149  msg.info( "Modify your jobOptions to import that module "
150  "(or just ignore this message)" )
151  return
152 
153  from AthenaCommon.AppMgr import theApp, AthAppMgr
154  if theApp.state() != AthAppMgr.State.OFFLINE:
155  msg.info( "C++ ApplicationMgr already instantiated, probably seeking "
156  "will be ill-configured..." )
157  msg.info( "EventSelector writers should implement updateHandlers" )
158 
159  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
160  from AthenaCommon.Configurable import Configurable
161  collectionType = svcMgr.EventSelector.properties()["CollectionType"]
162 
163  if collectionType in ( "ImplicitROOT", Configurable.propertyNoValue, ):
164  msg.info ( "=> Seeking enabled." )
165 
166  else:
167  msg.warning( "Input seeking is not compatible with collection type of %s",
168  svcMgr.EventSelector.properties()["CollectionType"] )
169  msg.warning( "=> Seeking disabled." )
170  return
171 
172 def getChunkSize() -> int :
173  from .AthenaMPFlags import jobproperties as jp
174  from PyUtils.MetaReaderPeeker import metadata
175  chunk_size = 1
176  # In jobs without input (e.g. event generation), metadata is an empty dictionary
177  if (jp.AthenaMPFlags.ChunkSize() > 0):
178  chunk_size = jp.AthenaMPFlags.ChunkSize()
179  msg.info('Chunk size set to %i', chunk_size)
180  elif 'file_size' in metadata and metadata['file_size'] is not None:
181  #Don't use auto flush for shared reader
182  if (jp.AthenaMPFlags.UseSharedReader()):
183  msg.info('Shared Reader in use, chunk_size set to default (%i)', chunk_size)
184  #Use auto flush only if file is compressed with LZMA, else use default chunk_size
185  elif (jp.AthenaMPFlags.ChunkSize() == -1):
186  if (metadata['file_comp_alg'] == 2):
187  chunk_size = metadata['auto_flush']
188  msg.info('Chunk size set to auto flush (%i)', chunk_size)
189  else:
190  msg.info('LZMA algorithm not in use, chunk_size set to default (%i)', chunk_size)
191  #Use auto flush only if file is compressed with LZMA or ZLIB, else use default chunk_size
192  elif (jp.AthenaMPFlags.ChunkSize() == -2):
193  if (metadata['file_comp_alg'] == 1 or metadata['file_comp_alg'] == 2):
194  chunk_size = metadata['auto_flush']
195  msg.info('Chunk size set to auto flush (%i)', chunk_size)
196  else:
197  msg.info('LZMA nor ZLIB in use, chunk_size set to default (%i)', chunk_size)
198  #Use auto flush only if file is compressed with LZMA, ZLIB or LZ4, else use default chunk_size
199  elif (jp.AthenaMPFlags.ChunkSize() == -3):
200  if (metadata['file_comp_alg'] == 1 or metadata['file_comp_alg'] == 2 or metadata['file_comp_alg'] == 4):
201  chunk_size = metadata['auto_flush']
202  msg.info('Chunk size set to auto flush (%i)', chunk_size)
203  else:
204  msg.info('LZMA, ZLIB nor LZ4 in use, chunk_size set to (%i)', chunk_size)
205  #Use auto flush value for chunk_size, regarldess of compression algorithm
206  elif (jp.AthenaMPFlags.ChunkSize() <= -4):
207  chunk_size = metadata['auto_flush']
208  msg.info('Chunk size set to auto flush (%i)', chunk_size)
209  else:
210  msg.warning('Invalid ChunkSize, Chunk Size set to default (%i)', chunk_size)
211 
212  return chunk_size
SharedWriterTool
Definition: SharedWriterTool.h:13
python.PyComps.MpEvtLoopMgr.IsPileup
IsPileup
Definition: Control/AthenaMP/python/PyComps.py:31
python.PyComps.MpEvtLoopMgr
Definition: Control/AthenaMP/python/PyComps.py:10
python.PyComps.MpEvtLoopMgr.Strategy
Strategy
Definition: Control/AthenaMP/python/PyComps.py:27
python.PyComps.MpEvtLoopMgr.EventsBeforeFork
EventsBeforeFork
Definition: Control/AthenaMP/python/PyComps.py:30
python.PyComps.MpEvtLoopMgr.WorkerTopDir
WorkerTopDir
init base class
Definition: Control/AthenaMP/python/PyComps.py:24
python.PyComps.MpEvtLoopMgr.configureStrategy
None configureStrategy(self, strategy, pileup, events_before_fork)
Definition: Control/AthenaMP/python/PyComps.py:50
python.PyComps.MpEvtLoopMgr.CollectSubprocessLogs
CollectSubprocessLogs
Definition: Control/AthenaMP/python/PyComps.py:26
python.PyComps.getChunkSize
int getChunkSize()
Definition: Control/AthenaMP/python/PyComps.py:172
Configurable
athena/gaudi ----------------------------------------------------------—
SharedEvtQueueConsumer
Definition: SharedEvtQueueConsumer.h:22
python.PyComps.setupEvtSelForSeekOps
None setupEvtSelForSeekOps()
Definition: Control/AthenaMP/python/PyComps.py:135
python.PyComps.MpEvtLoopMgr.MemSamplingInterval
MemSamplingInterval
Definition: Control/AthenaMP/python/PyComps.py:29
EvtRangeProcessor
Definition: EvtRangeProcessor.h:25
python.PyComps.MpEvtLoopMgr.__init__
def __init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw)
Definition: Control/AthenaMP/python/PyComps.py:11
if
if(febId1==febId2)
Definition: LArRodBlockPhysicsV0.cxx:567
SharedEvtQueueProvider
Definition: SharedEvtQueueProvider.h:16
AthenaSharedMemoryTool
This class provides the IPCTool for SharedMemory objects.
Definition: AthenaSharedMemoryTool.h:33
python.PyComps.MpEvtLoopMgr.PollingInterval
PollingInterval
Definition: Control/AthenaMP/python/PyComps.py:28
EvtRangeScatterer
Definition: EvtRangeScatterer.h:19
python.PyComps.MpEvtLoopMgr.OutputReportFile
OutputReportFile
Definition: Control/AthenaMP/python/PyComps.py:25
SharedHiveEvtQueueConsumer
Definition: SharedHiveEvtQueueConsumer.h:20
python.PyComps.MpEvtLoopMgr.nThreads
nThreads
Definition: Control/AthenaMP/python/PyComps.py:14