ATLAS Offline Software
Public Member Functions | Public Attributes | List of all members
python.PyComps.MpEvtLoopMgr Class Reference
Inheritance diagram for python.PyComps.MpEvtLoopMgr:
Collaboration diagram for python.PyComps.MpEvtLoopMgr:

Public Member Functions

def __init__ (self, name='AthMpEvtLoopMgr', isPileup=False, **kw)
 
None configureStrategy (self, strategy, pileup, events_before_fork)
 

Public Attributes

 nThreads
 
 WorkerTopDir
 init base class More...
 
 OutputReportFile
 
 CollectSubprocessLogs
 
 Strategy
 
 PollingInterval
 
 MemSamplingInterval
 
 EventsBeforeFork
 
 IsPileup
 

Detailed Description

Definition at line 10 of file Control/AthenaMP/python/PyComps.py.

Constructor & Destructor Documentation

◆ __init__()

def python.PyComps.MpEvtLoopMgr.__init__ (   self,
  name = 'AthMpEvtLoopMgr',
  isPileup = False,
**  kw 
)

Definition at line 11 of file Control/AthenaMP/python/PyComps.py.

11  def __init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw):
12 
13  from AthenaCommon.AppMgr import theApp
14  self.nThreads = theApp._opts.threads
15 
16 
17  kw['name'] = name
18  super(MpEvtLoopMgr, self).__init__(**kw)
19 
20  os.putenv('XRD_ENABLEFORKHANDLERS','1')
21  os.putenv('XRD_RUNFORKHANDLER','1')
22 
23  from .AthenaMPFlags import jobproperties as jp
24  self.WorkerTopDir = jp.AthenaMPFlags.WorkerTopDir()
25  self.OutputReportFile = jp.AthenaMPFlags.OutputReportFile()
26  self.CollectSubprocessLogs = jp.AthenaMPFlags.CollectSubprocessLogs()
27  self.Strategy = jp.AthenaMPFlags.Strategy()
28  self.PollingInterval = jp.AthenaMPFlags.PollingInterval()
29  self.MemSamplingInterval = jp.AthenaMPFlags.MemSamplingInterval()
30  self.EventsBeforeFork = jp.AthenaMPFlags.EventsBeforeFork()
31  self.IsPileup = isPileup
32 
33  if self.Strategy=='EventService':
34  self.EventsBeforeFork = 0
35 
36  from AthenaCommon.AppMgr import theApp as app
37  app.EventLoop = self.getFullJobOptName()
38 
39  # Enable FileMgr logging
40  from GaudiSvc.GaudiSvcConf import FileMgr
41  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
42  svcMgr+=FileMgr(LogFile="FileManagerLog")
43 
44  # Save PoolFileCatalog.xml if exists in the run directory
45  if os.path.isfile('PoolFileCatalog.xml'):
46  shutil.copyfile('PoolFileCatalog.xml','PoolFileCatalog.xml.AthenaMP-saved')
47 
48  self.configureStrategy(self.Strategy,self.IsPileup,self.EventsBeforeFork)
49 

Member Function Documentation

◆ configureStrategy()

None python.PyComps.MpEvtLoopMgr.configureStrategy (   self,
  strategy,
  pileup,
  events_before_fork 
)

Definition at line 50 of file Control/AthenaMP/python/PyComps.py.

50  def configureStrategy(self,strategy,pileup,events_before_fork) -> None :
51  from .AthenaMPFlags import jobproperties as jp
52  import AthenaCommon.ConcurrencyFlags # noqa: F401
53  event_range_channel = jp.AthenaMPFlags.EventRangeChannel()
54 
55  chunk_size = getChunkSize()
56  if chunk_size < 1:
57  msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
58  chunk_size = 1
59 
60  debug_worker = jp.ConcurrencyFlags.DebugWorkers()
61  use_shared_reader = jp.AthenaMPFlags.UseSharedReader()
62  use_shared_writer = jp.AthenaMPFlags.UseSharedWriter()
63  use_parallel_compression = jp.AthenaMPFlags.UseParallelCompression()
64  unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
65 
66  if strategy=='SharedQueue' or strategy=='RoundRobin':
67  if use_shared_reader:
68  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
69  svcMgr.PoolSvc.MaxFilesOpen = 2
70  from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
71  svcMgr.EventSelector.SharedMemoryTool = AthenaSharedMemoryTool("EventStreamingTool", SharedMemoryName=f"EventStream{unique_id}")
72  if 'AthenaPoolCnvSvc.ReadAthenaPool' in sys.modules:
73  svcMgr.AthenaPoolCnvSvc.InputStreamingTool = AthenaSharedMemoryTool("InputStreamingTool", SharedMemoryName=f"InputStream{unique_id}")
74  if use_shared_writer:
75  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
76  if 'AthenaPoolCnvSvc.WriteAthenaPool' in sys.modules:
77  from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
78  svcMgr.AthenaPoolCnvSvc.OutputStreamingTool = AthenaSharedMemoryTool("OutputStreamingTool", SharedMemoryName=f"OutputStream{unique_id}")
79  svcMgr.AthenaPoolCnvSvc.ParallelCompression=use_parallel_compression
80 
81  from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueProvider
82  self.Tools += [ SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
83  IsPileup=pileup,
84  EventsBeforeFork=events_before_fork,
85  ChunkSize=chunk_size) ]
86 
87  # In pure MP, self.nThreads may be set to None - we want the pure MP setup in that case
88  if self.nThreads is not None and self.nThreads >= 1:
89  if(pileup):
90  raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
91  from AthenaMPTools.AthenaMPToolsConf import SharedHiveEvtQueueConsumer
92  self.Tools += [ SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
93  EventsBeforeFork=events_before_fork,
94  Debug=debug_worker) ]
95  else:
96  from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueConsumer
97  self.Tools += [ SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
98  UseSharedWriter=use_shared_writer,
99  IsPileup=pileup,
100  IsRoundRobin=(strategy=='RoundRobin'),
101  EventsBeforeFork=events_before_fork,
102  ReadEventOrders=jp.AthenaMPFlags.ReadEventOrders(),
103  EventOrdersFile=jp.AthenaMPFlags.EventOrdersFile(),
104  Debug=debug_worker) ]
105  if use_shared_writer:
106  from AthenaMPTools.AthenaMPToolsConf import SharedWriterTool
107  self.Tools += [ SharedWriterTool(MotherProcess=(events_before_fork>0),
108  IsPileup=pileup,
109  Debug=debug_worker) ]
110 
111  # Enable seeking
112  if not use_shared_reader:
114 
115  elif strategy=='EventService':
116  channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
117  channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
118 
119  from AthenaMPTools.AthenaMPToolsConf import EvtRangeScatterer
120  self.Tools += [ EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
121  EventRangeChannel = event_range_channel,
122  DoCaching=jp.AthenaMPFlags.EvtRangeScattererCaching()) ]
123 
124  from AthenaMPTools.AthenaMPToolsConf import EvtRangeProcessor
125  self.Tools += [ EvtRangeProcessor(IsPileup=pileup,
126  Channel2Scatterer = channelScatterer2Processor,
127  Channel2EvtSel = channelProcessor2EvtSel,
128  Debug=debug_worker) ]
129  # Enable seeking
131 
132  else:
133  msg.warning("Unknown strategy. No MP tools will be configured")
134 

Member Data Documentation

◆ CollectSubprocessLogs

python.PyComps.MpEvtLoopMgr.CollectSubprocessLogs

Definition at line 26 of file Control/AthenaMP/python/PyComps.py.

◆ EventsBeforeFork

python.PyComps.MpEvtLoopMgr.EventsBeforeFork

Definition at line 30 of file Control/AthenaMP/python/PyComps.py.

◆ IsPileup

python.PyComps.MpEvtLoopMgr.IsPileup

Definition at line 31 of file Control/AthenaMP/python/PyComps.py.

◆ MemSamplingInterval

python.PyComps.MpEvtLoopMgr.MemSamplingInterval

Definition at line 29 of file Control/AthenaMP/python/PyComps.py.

◆ nThreads

python.PyComps.MpEvtLoopMgr.nThreads

Definition at line 14 of file Control/AthenaMP/python/PyComps.py.

◆ OutputReportFile

python.PyComps.MpEvtLoopMgr.OutputReportFile

Definition at line 25 of file Control/AthenaMP/python/PyComps.py.

◆ PollingInterval

python.PyComps.MpEvtLoopMgr.PollingInterval

Definition at line 28 of file Control/AthenaMP/python/PyComps.py.

◆ Strategy

python.PyComps.MpEvtLoopMgr.Strategy

Definition at line 27 of file Control/AthenaMP/python/PyComps.py.

◆ WorkerTopDir

python.PyComps.MpEvtLoopMgr.WorkerTopDir

init base class

Definition at line 24 of file Control/AthenaMP/python/PyComps.py.


The documentation for this class was generated from the following file:
SharedWriterTool
Definition: SharedWriterTool.h:13
python.PyComps.getChunkSize
int getChunkSize()
Definition: Control/AthenaMP/python/PyComps.py:172
SharedEvtQueueConsumer
Definition: SharedEvtQueueConsumer.h:22
python.PyComps.setupEvtSelForSeekOps
None setupEvtSelForSeekOps()
Definition: Control/AthenaMP/python/PyComps.py:135
python.processes.powheg.ZZ.ZZ.__init__
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: ZZ.py:18
EvtRangeProcessor
Definition: EvtRangeProcessor.h:25
if
if(febId1==febId2)
Definition: LArRodBlockPhysicsV0.cxx:567
SharedEvtQueueProvider
Definition: SharedEvtQueueProvider.h:16
AthenaSharedMemoryTool
This class provides the IPCTool for SharedMemory objects.
Definition: AthenaSharedMemoryTool.h:33
EvtRangeScatterer
Definition: EvtRangeScatterer.h:19
SharedHiveEvtQueueConsumer
Definition: SharedHiveEvtQueueConsumer.h:20