ATLAS Offline Software
Public Member Functions | Public Attributes | List of all members
python.PyComps.MpEvtLoopMgr Class Reference
Inheritance diagram for python.PyComps.MpEvtLoopMgr:
Collaboration diagram for python.PyComps.MpEvtLoopMgr:

Public Member Functions

def __init__ (self, name='AthMpEvtLoopMgr', isPileup=False, **kw)
 
None configureStrategy (self, strategy, pileup, events_before_fork)
 

Public Attributes

 nThreads
 
 WorkerTopDir
 init base class More...
 
 OutputReportFile
 
 CollectSubprocessLogs
 
 Strategy
 
 PollingInterval
 
 MemSamplingInterval
 
 EventsBeforeFork
 
 IsPileup
 

Detailed Description

Definition at line 10 of file Control/AthenaMP/python/PyComps.py.

Constructor & Destructor Documentation

◆ __init__()

def python.PyComps.MpEvtLoopMgr.__init__ (   self,
  name = 'AthMpEvtLoopMgr',
  isPileup = False,
**  kw 
)

Definition at line 11 of file Control/AthenaMP/python/PyComps.py.

11  def __init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw):
12 
13  from AthenaCommon.AppMgr import theApp
14  self.nThreads = theApp._opts.threads
15 
16 
17  kw['name'] = name
18  super(MpEvtLoopMgr, self).__init__(**kw)
19 
20  os.putenv('XRD_ENABLEFORKHANDLERS','1')
21  os.putenv('XRD_RUNFORKHANDLER','1')
22 
23  from .AthenaMPFlags import jobproperties as jp
24  self.WorkerTopDir = jp.AthenaMPFlags.WorkerTopDir()
25  self.OutputReportFile = jp.AthenaMPFlags.OutputReportFile()
26  self.CollectSubprocessLogs = jp.AthenaMPFlags.CollectSubprocessLogs()
27  self.Strategy = jp.AthenaMPFlags.Strategy()
28  self.PollingInterval = jp.AthenaMPFlags.PollingInterval()
29  self.MemSamplingInterval = jp.AthenaMPFlags.MemSamplingInterval()
30  self.EventsBeforeFork = jp.AthenaMPFlags.EventsBeforeFork()
31  self.IsPileup = isPileup
32 
33  if self.Strategy=='EventService':
34  self.EventsBeforeFork = 0
35 
36  from AthenaCommon.AppMgr import theApp as app
37  app.EventLoop = self.getFullJobOptName()
38 
39  # Enable FileMgr logging
40  from GaudiSvc.GaudiSvcConf import FileMgr
41  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
42  svcMgr+=FileMgr(LogFile="FileManagerLog")
43 
44  # Save PoolFileCatalog.xml if exists in the run directory
45  if os.path.isfile('PoolFileCatalog.xml'):
46  shutil.copyfile('PoolFileCatalog.xml','PoolFileCatalog.xml.AthenaMP-saved')
47 
48  self.configureStrategy(self.Strategy,self.IsPileup,self.EventsBeforeFork)
49 

Member Function Documentation

◆ configureStrategy()

None python.PyComps.MpEvtLoopMgr.configureStrategy (   self,
  strategy,
  pileup,
  events_before_fork 
)

Definition at line 50 of file Control/AthenaMP/python/PyComps.py.

50  def configureStrategy(self,strategy,pileup,events_before_fork) -> None :
51  from .AthenaMPFlags import jobproperties as jp
52  import AthenaCommon.ConcurrencyFlags # noqa: F401
53  event_range_channel = jp.AthenaMPFlags.EventRangeChannel()
54 
55  chunk_size = getChunkSize()
56  if chunk_size < 1:
57  msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
58  chunk_size = 1
59 
60  debug_worker = jp.ConcurrencyFlags.DebugWorkers()
61  use_shared_reader = jp.AthenaMPFlags.UseSharedReader()
62  use_shared_writer = jp.AthenaMPFlags.UseSharedWriter()
63  use_parallel_compression = jp.AthenaMPFlags.UseParallelCompression()
64  unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
65 
66  if strategy=='SharedQueue' or strategy=='RoundRobin':
67  if use_shared_reader:
68  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
69  svcMgr.PoolSvc.MaxFilesOpen = 2
70  from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
71  svcMgr.EventSelector.SharedMemoryTool = AthenaSharedMemoryTool("EventStreamingTool", SharedMemoryName=f"EventStream{unique_id}")
72  if 'AthenaPoolCnvSvc.ReadAthenaPool' in sys.modules:
73  svcMgr.AthenaPoolCnvSvc.InputStreamingTool = AthenaSharedMemoryTool("InputStreamingTool", SharedMemoryName=f"InputStream{unique_id}")
74  if use_shared_writer:
75  from AthenaCommon.AppMgr import ServiceMgr as svcMgr
76  if 'AthenaPoolCnvSvc.WriteAthenaPool' in sys.modules:
77  from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
78  svcMgr.AthenaPoolCnvSvc.OutputStreamingTool = AthenaSharedMemoryTool("OutputStreamingTool", SharedMemoryName=f"OutputStream{unique_id}")
79  svcMgr.AthenaPoolCnvSvc.ParallelCompression=use_parallel_compression
80 
81  from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueProvider
82  self.Tools += [ SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
83  IsPileup=pileup,
84  EventsBeforeFork=events_before_fork,
85  ChunkSize=chunk_size) ]
86 
87  if (self.nThreads >= 1):
88  if(pileup):
89  raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
90  from AthenaMPTools.AthenaMPToolsConf import SharedHiveEvtQueueConsumer
91  self.Tools += [ SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
92  EventsBeforeFork=events_before_fork,
93  Debug=debug_worker) ]
94  else:
95  from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueConsumer
96  self.Tools += [ SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
97  UseSharedWriter=use_shared_writer,
98  IsPileup=pileup,
99  IsRoundRobin=(strategy=='RoundRobin'),
100  EventsBeforeFork=events_before_fork,
101  ReadEventOrders=jp.AthenaMPFlags.ReadEventOrders(),
102  EventOrdersFile=jp.AthenaMPFlags.EventOrdersFile(),
103  Debug=debug_worker) ]
104  if use_shared_writer:
105  from AthenaMPTools.AthenaMPToolsConf import SharedWriterTool
106  self.Tools += [ SharedWriterTool(MotherProcess=(events_before_fork>0),
107  IsPileup=pileup,
108  Debug=debug_worker) ]
109 
110  # Enable seeking
111  if not use_shared_reader:
113 
114  elif strategy=='EventService':
115  channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
116  channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
117 
118  from AthenaMPTools.AthenaMPToolsConf import EvtRangeScatterer
119  self.Tools += [ EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
120  EventRangeChannel = event_range_channel,
121  DoCaching=jp.AthenaMPFlags.EvtRangeScattererCaching()) ]
122 
123  from AthenaMPTools.AthenaMPToolsConf import EvtRangeProcessor
124  self.Tools += [ EvtRangeProcessor(IsPileup=pileup,
125  Channel2Scatterer = channelScatterer2Processor,
126  Channel2EvtSel = channelProcessor2EvtSel,
127  Debug=debug_worker) ]
128  # Enable seeking
130 
131  else:
132  msg.warning("Unknown strategy. No MP tools will be configured")
133 

Member Data Documentation

◆ CollectSubprocessLogs

python.PyComps.MpEvtLoopMgr.CollectSubprocessLogs

Definition at line 26 of file Control/AthenaMP/python/PyComps.py.

◆ EventsBeforeFork

python.PyComps.MpEvtLoopMgr.EventsBeforeFork

Definition at line 30 of file Control/AthenaMP/python/PyComps.py.

◆ IsPileup

python.PyComps.MpEvtLoopMgr.IsPileup

Definition at line 31 of file Control/AthenaMP/python/PyComps.py.

◆ MemSamplingInterval

python.PyComps.MpEvtLoopMgr.MemSamplingInterval

Definition at line 29 of file Control/AthenaMP/python/PyComps.py.

◆ nThreads

python.PyComps.MpEvtLoopMgr.nThreads

Definition at line 14 of file Control/AthenaMP/python/PyComps.py.

◆ OutputReportFile

python.PyComps.MpEvtLoopMgr.OutputReportFile

Definition at line 25 of file Control/AthenaMP/python/PyComps.py.

◆ PollingInterval

python.PyComps.MpEvtLoopMgr.PollingInterval

Definition at line 28 of file Control/AthenaMP/python/PyComps.py.

◆ Strategy

python.PyComps.MpEvtLoopMgr.Strategy

Definition at line 27 of file Control/AthenaMP/python/PyComps.py.

◆ WorkerTopDir

python.PyComps.MpEvtLoopMgr.WorkerTopDir

init base class

Definition at line 24 of file Control/AthenaMP/python/PyComps.py.


The documentation for this class was generated from the following file:
SharedWriterTool
Definition: SharedWriterTool.h:13
python.PyComps.getChunkSize
int getChunkSize()
Definition: Control/AthenaMP/python/PyComps.py:171
SharedEvtQueueConsumer
Definition: SharedEvtQueueConsumer.h:22
python.PyComps.setupEvtSelForSeekOps
None setupEvtSelForSeekOps()
Definition: Control/AthenaMP/python/PyComps.py:134
python.processes.powheg.ZZ.ZZ.__init__
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: ZZ.py:18
EvtRangeProcessor
Definition: EvtRangeProcessor.h:25
if
if(febId1==febId2)
Definition: LArRodBlockPhysicsV0.cxx:569
SharedEvtQueueProvider
Definition: SharedEvtQueueProvider.h:16
AthenaSharedMemoryTool
This class provides the IPCTool for SharedMemory objects.
Definition: AthenaSharedMemoryTool.h:33
EvtRangeScatterer
Definition: EvtRangeScatterer.h:19
SharedHiveEvtQueueConsumer
Definition: SharedHiveEvtQueueConsumer.h:20