ATLAS Offline Software
Loading...
Searching...
No Matches
python.PyComps.MpEvtLoopMgr Class Reference
Inheritance diagram for python.PyComps.MpEvtLoopMgr:
Collaboration diagram for python.PyComps.MpEvtLoopMgr:

Public Member Functions

 __init__ (self, name='AthMpEvtLoopMgr', isPileup=False, **kw)
None configureStrategy (self, strategy, pileup, events_before_fork)
virtual StatusCode initialize () override
virtual StatusCode finalize () override
virtual StatusCode nextEvent (int maxevt) override
virtual StatusCode executeEvent (EventContext &&ctx) override
virtual StatusCode executeRun (int maxevt) override
virtual StatusCode stopRun () override
virtual EventContext createEventContext () override
virtual bool stopScheduled () const override

Public Attributes

 nThreads = theApp._opts.threads
 WorkerTopDir = jp.AthenaMPFlags.WorkerTopDir()
 OutputReportFile = jp.AthenaMPFlags.OutputReportFile()
 CollectSubprocessLogs = jp.AthenaMPFlags.CollectSubprocessLogs()
str Strategy = jp.AthenaMPFlags.Strategy()
 PollingInterval = jp.AthenaMPFlags.PollingInterval()
 MemSamplingInterval = jp.AthenaMPFlags.MemSamplingInterval()
int EventsBeforeFork = jp.AthenaMPFlags.EventsBeforeFork()
 IsPileup = isPileup

Private Member Functions

StatusCode wait ()
StatusCode generateOutputReport ()
std::shared_ptr< AthenaInterprocess::FdsRegistryextractFds ()
StatusCode updateSkipEvents (int skipEvents)

Private Attributes

ServiceHandle< IEventProcessor > m_evtProcessor {this,"EventLoopManager","AthenaEventLoopMgr"}
SmartIF< IService > m_evtSelector {nullptr}
SmartIF< IDataSharem_dataShare
Gaudi::Property< int > m_nWorkers
Gaudi::Property< std::string > m_workerTopDir
Gaudi::Property< std::string > m_outputReportName
Gaudi::Property< std::string > m_strategy
Gaudi::Property< bool > m_isPileup
Gaudi::Property< bool > m_collectSubprocessLogs
ToolHandleArray< IAthenaMPToolm_tools {this,"Tools", {}}
Gaudi::Property< int > m_nPollingInterval
Gaudi::Property< int > m_nMemSamplingInterval
Gaudi::Property< int > m_nEventsBeforeFork
Gaudi::Property< unsigned int > m_eventPrintoutInterval
StringArrayProperty m_execAtPreFork
int m_nChildProcesses {0}
pid_t m_masterPid {}
bool m_scheduledStop {false}
std::vector< unsigned long > m_samplesRss
std::vector< unsigned long > m_samplesPss
std::vector< unsigned long > m_samplesSize
std::vector< unsigned long > m_samplesSwap

Detailed Description

Definition at line 10 of file Control/AthenaMP/python/PyComps.py.

Constructor & Destructor Documentation

◆ __init__()

python.PyComps.MpEvtLoopMgr.__init__ ( self,
name = 'AthMpEvtLoopMgr',
isPileup = False,
** kw )

Definition at line 11 of file Control/AthenaMP/python/PyComps.py.

11 def __init__(self, name='AthMpEvtLoopMgr', isPileup=False, **kw):
12
13 from AthenaCommon.AppMgr import theApp
14 self.nThreads = theApp._opts.threads
15
16
17 kw['name'] = name
18 super(MpEvtLoopMgr, self).__init__(**kw)
19
20 os.putenv('XRD_ENABLEFORKHANDLERS','1')
21 os.putenv('XRD_RUNFORKHANDLER','1')
22
23 from .AthenaMPFlags import jobproperties as jp
24 self.WorkerTopDir = jp.AthenaMPFlags.WorkerTopDir()
25 self.OutputReportFile = jp.AthenaMPFlags.OutputReportFile()
26 self.CollectSubprocessLogs = jp.AthenaMPFlags.CollectSubprocessLogs()
27 self.Strategy = jp.AthenaMPFlags.Strategy()
28 self.PollingInterval = jp.AthenaMPFlags.PollingInterval()
29 self.MemSamplingInterval = jp.AthenaMPFlags.MemSamplingInterval()
30 self.EventsBeforeFork = jp.AthenaMPFlags.EventsBeforeFork()
31 self.IsPileup = isPileup
32
33 if self.Strategy=='EventService':
34 self.EventsBeforeFork = 0
35
36 from AthenaCommon.AppMgr import theApp as app
37 app.EventLoop = self.getFullJobOptName()
38
39 # Enable FileMgr logging
40 from GaudiSvc.GaudiSvcConf import FileMgr
41 from AthenaCommon.AppMgr import ServiceMgr as svcMgr
42 svcMgr+=FileMgr(LogFile="FileManagerLog")
43
44 # Save PoolFileCatalog.xml if exists in the run directory
45 if os.path.isfile('PoolFileCatalog.xml'):
46 shutil.copyfile('PoolFileCatalog.xml','PoolFileCatalog.xml.AthenaMP-saved')
47
48 self.configureStrategy(self.Strategy,self.IsPileup,self.EventsBeforeFork)
49

Member Function Documentation

◆ configureStrategy()

None python.PyComps.MpEvtLoopMgr.configureStrategy ( self,
strategy,
pileup,
events_before_fork )

Definition at line 50 of file Control/AthenaMP/python/PyComps.py.

50 def configureStrategy(self,strategy,pileup,events_before_fork) -> None :
51 from .AthenaMPFlags import jobproperties as jp
52 import AthenaCommon.ConcurrencyFlags # noqa: F401
53 event_range_channel = jp.AthenaMPFlags.EventRangeChannel()
54
55 chunk_size = getChunkSize()
56 if chunk_size < 1:
57 msg.warning('Nonpositive ChunkSize (%i) caught, setting it to 1', chunk_size)
58 chunk_size = 1
59
60 debug_worker = jp.ConcurrencyFlags.DebugWorkers()
61 use_shared_reader = jp.AthenaMPFlags.UseSharedReader()
62 use_shared_writer = jp.AthenaMPFlags.UseSharedWriter()
63 use_parallel_compression = jp.AthenaMPFlags.UseParallelCompression()
64 unique_id = f"{str(os.getpid())}-{uuid.uuid4().hex}"
65
66 # For e.g. event generation, if we use SharedQueue the job does not complete correctly
67 if strategy == 'SharedQueue':
68 from AthenaCommon.AthenaCommonFlags import jobproperties as ajp
69 if (not ajp.AthenaCommonFlags.FilesInput.statusOn) or ajp.AthenaCommonFlags.FilesInput == []:
70 msg.info('MP strategy "SharedQueue" will not work without input files when maxEvents=-1. Switching to "RoundRobin" just in case')
71 strategy = 'RoundRobin'
72
73 if strategy=='SharedQueue' or strategy=='RoundRobin':
74 if use_shared_reader:
75 from AthenaCommon.AppMgr import ServiceMgr as svcMgr
76 svcMgr.PoolSvc.MaxFilesOpen = 2
77 from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
78 svcMgr.EventSelector.SharedMemoryTool = AthenaSharedMemoryTool("EventStreamingTool", SharedMemoryName=f"EventStream{unique_id}")
79 if 'AthenaPoolCnvSvc.ReadAthenaPool' in sys.modules:
80 svcMgr.AthenaPoolCnvSvc.InputStreamingTool = AthenaSharedMemoryTool("InputStreamingTool", SharedMemoryName=f"InputStream{unique_id}")
81 if use_shared_writer:
82 from AthenaCommon.AppMgr import ServiceMgr as svcMgr
83 if 'AthenaPoolCnvSvc.WriteAthenaPool' in sys.modules:
84 from AthenaIPCTools.AthenaIPCToolsConf import AthenaSharedMemoryTool
85 svcMgr.AthenaPoolCnvSvc.OutputStreamingTool = AthenaSharedMemoryTool("OutputStreamingTool", SharedMemoryName=f"OutputStream{unique_id}")
86 svcMgr.AthenaPoolCnvSvc.ParallelCompression=use_parallel_compression
87
88 if strategy=='SharedQueue':
89 from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueProvider
90 self.Tools += [ SharedEvtQueueProvider(UseSharedReader=use_shared_reader,
91 IsPileup=pileup,
92 EventsBeforeFork=events_before_fork,
93 ChunkSize=chunk_size) ]
94
95 # In pure MP, self.nThreads may be set to None - we want the pure MP setup in that case
96 if self.nThreads is not None and self.nThreads >= 1:
97 if(pileup):
98 raise Exception('Running pileup digitization in mixed MP+MT currently not supported')
99 from AthenaMPTools.AthenaMPToolsConf import SharedHiveEvtQueueConsumer
100 self.Tools += [ SharedHiveEvtQueueConsumer(UseSharedWriter=use_shared_writer,
101 EventsBeforeFork=events_before_fork,
102 Debug=debug_worker) ]
103 else:
104 from AthenaMPTools.AthenaMPToolsConf import SharedEvtQueueConsumer
105 self.Tools += [ SharedEvtQueueConsumer(UseSharedReader=use_shared_reader,
106 UseSharedWriter=use_shared_writer,
107 IsPileup=pileup,
108 IsRoundRobin=(strategy=='RoundRobin'),
109 EventsBeforeFork=events_before_fork,
110 ReadEventOrders=jp.AthenaMPFlags.ReadEventOrders(),
111 EventOrdersFile=jp.AthenaMPFlags.EventOrdersFile(),
112 Debug=debug_worker) ]
113 if use_shared_writer:
114 from AthenaMPTools.AthenaMPToolsConf import SharedWriterTool
115 self.Tools += [ SharedWriterTool(MotherProcess=(events_before_fork>0),
116 IsPileup=pileup,
117 Debug=debug_worker) ]
118 elif strategy=='EventService':
119 channelScatterer2Processor = "AthenaMP_Scatterer2Processor"
120 channelProcessor2EvtSel = "AthenaMP_Processor2EvtSel"
121
122 from AthenaMPTools.AthenaMPToolsConf import EvtRangeScatterer
123 self.Tools += [ EvtRangeScatterer(ProcessorChannel = channelScatterer2Processor,
124 EventRangeChannel = event_range_channel,
125 DoCaching=jp.AthenaMPFlags.EvtRangeScattererCaching()) ]
126
127 from AthenaMPTools.AthenaMPToolsConf import EvtRangeProcessor
128 self.Tools += [ EvtRangeProcessor(IsPileup=pileup,
129 Channel2Scatterer = channelScatterer2Processor,
130 Channel2EvtSel = channelProcessor2EvtSel,
131 Debug=debug_worker) ]
132 else:
133 msg.warning("Unknown strategy. No MP tools will be configured")
134
if(febId1==febId2)
This class provides the IPCTool for SharedMemory objects.

◆ createEventContext()

EventContext AthMpEvtLoopMgr::createEventContext ( )
overridevirtualinherited

Definition at line 117 of file AthMpEvtLoopMgr.cxx.

117 {
118 // return an invalid context - method should not be called
119 return EventContext{};
120}

◆ executeEvent()

StatusCode AthMpEvtLoopMgr::executeEvent ( EventContext && ctx)
overridevirtualinherited

Definition at line 122 of file AthMpEvtLoopMgr.cxx.

123{
124 // Perhaps there we should return StatusCode::FAILURE as this method shoud not be called directly
125 return m_evtProcessor->executeEvent(std::move(ctx));
126}
ServiceHandle< IEventProcessor > m_evtProcessor

◆ executeRun()

StatusCode AthMpEvtLoopMgr::executeRun ( int maxevt)
overridevirtualinherited

Definition at line 128 of file AthMpEvtLoopMgr.cxx.

129{
130 ATH_MSG_DEBUG("in executeRun()");
131
132 // Generate random component of the Shared Memory and Shared Queue names
133 srand(time(NULL));
134 std::ostringstream randStream;
135 randStream << getpid() << '_' << AthenaInterprocess::randString();
136 ATH_MSG_INFO("Using random components for IPC object names: " << randStream.str());
137
138 ServiceHandle<StoreGateSvc> pDetStore("DetectorStore",name());
139 ATH_CHECK(pDetStore.retrieve());
140
141 // Create Shared Event queue if necessary and make it available to the tools
142 if(m_strategy=="SharedQueue"
143 || m_strategy=="RoundRobin") {
144 AthenaInterprocess::SharedQueue* evtQueue = new AthenaInterprocess::SharedQueue("AthenaMPEventQueue_"+randStream.str(),2000,sizeof(long));
145 if(pDetStore->record(evtQueue,"AthenaMPEventQueue_"+randStream.str()).isFailure()) {
146 ATH_MSG_FATAL("Unable to record the pointer to the Shared Event queue into Detector Store");
147 delete evtQueue;
148 return StatusCode::FAILURE;
149 }
150 }
151
152 // For the Event Service: create a queue for connecting EvtRangeProcessor in the master with EvtRangeScatterer subprocess
153 // The TokenProcessor master will be sending pid-s of failed processes to Token Scatterer
154 if(m_strategy=="EventService") {
155 AthenaInterprocess::SharedQueue* failedPidQueue = new AthenaInterprocess::SharedQueue("AthenaMPFailedPidQueue_"+randStream.str(),100,sizeof(pid_t));
156 if(pDetStore->record(failedPidQueue,"AthenaMPFailedPidQueue_"+randStream.str()).isFailure()) {
157 ATH_MSG_FATAL("Unable to record the pointer to the Failed PID queue into Detector Store");
158 delete failedPidQueue;
159 return StatusCode::FAILURE;
160 }
161 }
162
163 // Prepare work directory for sub-processes
164 if(mkdir(m_workerTopDir.value().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)!=0) {
165 switch(errno) {
166 case EEXIST:
167 {
168 // Top directory already exists, maybe a leftover from previous AthenaMP job in the same rundir
169 // Rename it with m_workerTopDir+"-bak-rand"
170
171 srand((unsigned)time(0));
172 std::ostringstream randname;
173 randname << rand();
174 std::string backupDir = (m_workerTopDir.value().rfind('/')==(m_workerTopDir.value().size()-1)
175 ? m_workerTopDir.value().substr(0,m_workerTopDir.value().size()-1)
176 : m_workerTopDir.value())+std::string("-bak-")+randname.str();
177
178 ATH_MSG_WARNING("The top directory " << m_workerTopDir << " already exists");
179 ATH_MSG_WARNING("The job will attempt to save it with the name " << backupDir << " and create new top directory from scratch");
180
181 if(rename(m_workerTopDir.value().c_str(),backupDir.c_str())!=0) {
182 char buf[256];
183 strerror_r(errno, buf, sizeof(buf));
184 ATH_MSG_ERROR("Unable to make backup directory! " << buf);
185 return StatusCode::FAILURE;
186 }
187
188 if(mkdir(m_workerTopDir.value().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==0) break;
189 }
190 /* FALLTHROUGH */
191 default:
192 {
193 char buf[256];
194 strerror_r(errno, buf, sizeof(buf));
195 ATH_MSG_ERROR("Unable to make top directory " << m_workerTopDir << " for children processes! " << buf);
196 return StatusCode::FAILURE;
197 }
198 }
199 }
200
201 // When forking before 1st event, fire BeforeFork incident in non-pileup jobs
202 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc",name());
203 ATH_CHECK(incSvc.retrieve());
204
206 incSvc->fireIncident(Incident(name(),"BeforeFork"));
207 }
208
209 // Extract process file descriptors
210 std::shared_ptr<AthenaInterprocess::FdsRegistry> registry = extractFds();
211
212 ToolHandleArray<IAthenaMPTool>::iterator it = m_tools.begin(),
213 itLast = m_tools.end();
214
215 // When using SharedWriter in conjunction with fork-after-N-events
216 // we have to make sure that mother process is a conversion service
217 // client so that events before forking workers are captured...
218
219 auto sharedWriterTool = m_tools["SharedWriterTool"];
220 const bool sharedWriterWithFAFE = (m_nEventsBeforeFork!=0 && sharedWriterTool);
221
222 if(sharedWriterWithFAFE) {
223 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service("AthenaPoolSharedIOCnvSvc"));
224 ATH_CHECK(m_dataShare.isValid());
225
226 (*sharedWriterTool)->useFdsRegistry(registry);
227 (*sharedWriterTool)->setRandString(randStream.str());
228
229 int nChildren = (*sharedWriterTool)->makePool(maxevt,m_nWorkers,m_workerTopDir);
230 if(nChildren==-1) {
231 ATH_MSG_FATAL("makePool failed for " << (*sharedWriterTool)->name());
232 return StatusCode::FAILURE;
233 }
234 else {
235 m_nChildProcesses+=nChildren;
236 }
237
238 // Execute the SharedWriterTool at this point
239 StatusCode mySc = (*sharedWriterTool)->exec();
240 if(!mySc.isSuccess()) {
241 ATH_MSG_FATAL("Cannot Execute SharedWriter Tool");
242 return StatusCode::FAILURE;
243 }
244
245 // Make the mother process a client
246 if(!m_dataShare->makeClient(m_nWorkers+1).isSuccess()) {
247 ATH_MSG_FATAL("Cannot make mother process a client for Conversion Service");
248 return StatusCode::FAILURE;
249 }
250 }
251
252 //
253 // Try processing requested number of events here
255 // Take into account a corner case: m_nEventsBeforeFork > maxevt
256 int nEventsToProcess = (maxevt>-1 && m_nEventsBeforeFork>maxevt)
257 ? maxevt
259 StatusCode scEvtProc = m_evtProcessor->nextEvent(nEventsToProcess);
260 if(!scEvtProc.isSuccess()) {
261 if(nEventsToProcess) {
262 ATH_MSG_FATAL("Unable to process first " << nEventsToProcess << " events in the master");
263 }
264 else {
265 ATH_MSG_FATAL("Unable to process first event in the master");
266 }
267 return scEvtProc;
268 }
269 }
270
271 // Finalize I/O (close input files) by IoComponents
272 ServiceHandle<IIoComponentMgr> ioMgr("IoComponentMgr",name());
273 ATH_CHECK(ioMgr.retrieve());
274 ATH_CHECK(ioMgr->io_finalize());
275 ATH_MSG_DEBUG("Successfully finalized I/O before forking");
276
277 // Flush stream buffers
278 fflush(NULL);
279
280 // Make the mother process not client
281 if(sharedWriterWithFAFE && !m_dataShare->makeClient(0).isSuccess()) {
282 ATH_MSG_FATAL("Cannot make mother process not client for Conversion Service");
283 return StatusCode::FAILURE;
284 }
285
286 int maxEvents(maxevt); // This can be modified after restart
287
288 // Re-extract process file descriptors
289 registry = extractFds();
290
291 // Make worker pools
292 it = m_tools.begin();
293 for(; it!=itLast; ++it) {
294 if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
295 (*it)->useFdsRegistry(registry);
296 (*it)->setRandString(randStream.str());
297 (*it)->setMaxEvt(maxevt);
298 (*it)->setMPRunStop(this);
299 if(it==m_tools.begin()) {
300 incSvc->fireIncident(Incident(name(),"PreFork")); // Do it only once
301 }
302 int nChildren = (*it)->makePool(maxEvents,m_nWorkers,m_workerTopDir);
303 if(nChildren==-1) {
304 ATH_MSG_FATAL("makePool failed for " << (*it)->name());
305 return StatusCode::FAILURE;
306 }
307 else {
308 m_nChildProcesses+=nChildren;
309 }
310 }
311
312 if(m_nChildProcesses==0) {
313 ATH_MSG_ERROR("No child processes were created");
314 return StatusCode::FAILURE;
315 }
316
317 // Assign work to child processes
318 for(it=m_tools.begin(); it!=itLast; ++it) {
319 if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
320 if((*it)->exec().isFailure()) {
321 ATH_MSG_FATAL("Unable to submit work to the tool " << (*it)->name());
322 return StatusCode::FAILURE;
323 }
324 }
325
326 StatusCode sc = wait();
327
329 ATH_MSG_INFO("*** *** Memory Usage *** ***");
330 ATH_MSG_INFO("*** MAX PSS " << (*std::max_element(m_samplesPss.cbegin(),m_samplesPss.cend()))/1024 << "MB");
331 ATH_MSG_INFO("*** MAX RSS " << (*std::max_element(m_samplesRss.cbegin(),m_samplesRss.cend()))/1024 << "MB");
332 ATH_MSG_INFO("*** MAX SIZE " << (*std::max_element(m_samplesSize.cbegin(),m_samplesSize.cend()))/1024 << "MB");
333 ATH_MSG_INFO("*** MAX SWAP " << (*std::max_element(m_samplesSwap.cbegin(),m_samplesSwap.cend()))/1024 << "MB");
334 ATH_MSG_INFO("*** *** Memory Usage *** ***");
335 }
336
338 ATH_MSG_INFO("BEGIN collecting sub-process logs");
339 std::vector<std::string> logs;
340 for(it=m_tools.begin(); it!=itLast; ++it) {
341 (*it)->subProcessLogs(logs);
342 for(size_t i=0;i<logs.size();++i) {
343 std::cout << "\n File: " << logs[i] << "\n" << std::endl;
344 std::ifstream log;
345 log.open(logs[i].c_str(),std::ifstream::in);
346 std::string line;
347 while(!log.eof()) {
348 std::getline(log,line);
349 std::cout << line << std::endl;
350 }
351 log.close();
352 }
353 }
354 ATH_MSG_INFO("END collecting sub-process logs");
355 }
356
357 if(sc.isSuccess())
358 return generateOutputReport();
359 else
360 return sc;
361}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
int32_t pid_t
static Double_t sc
std::vector< unsigned long > m_samplesSize
SmartIF< IDataShare > m_dataShare
ToolHandleArray< IAthenaMPTool > m_tools
Gaudi::Property< int > m_nWorkers
Gaudi::Property< std::string > m_workerTopDir
Gaudi::Property< bool > m_collectSubprocessLogs
std::shared_ptr< AthenaInterprocess::FdsRegistry > extractFds()
Gaudi::Property< int > m_nEventsBeforeFork
Gaudi::Property< bool > m_isPileup
std::vector< unsigned long > m_samplesRss
Gaudi::Property< std::string > m_strategy
StatusCode generateOutputReport()
std::vector< unsigned long > m_samplesSwap
std::vector< unsigned long > m_samplesPss
Gaudi::Property< int > m_nMemSamplingInterval
time(flags, cells_name, *args, **kw)
::StatusCode StatusCode
StatusCode definition for legacy code.
mkdir(path, recursive=True)

◆ extractFds()

std::shared_ptr< AthenaInterprocess::FdsRegistry > AthMpEvtLoopMgr::extractFds ( )
privateinherited

Definition at line 507 of file AthMpEvtLoopMgr.cxx.

508{
509 ATH_MSG_DEBUG("Extracting file descriptors");
510 using namespace std::filesystem;
511 std::shared_ptr<AthenaInterprocess::FdsRegistry> registry(new AthenaInterprocess::FdsRegistry());
512
513 // Extract file descriptors associated with the current process
514 // 1. Store only those regular files in the registry, which
515 // don't contain substrings from the "exclusion pattern" set
516 // 2. Skip also stdout and stderr
517
518 std::vector<std::string> excludePatterns {
519 "/root/etc/plugins/"
520 ,"/root/cint/cint/"
521 ,"/root/include/"
522 ,"/var/tmp/"
523 ,"/var/lock/"
524 ,"/var/lib/"
525 ,"/bin/python/"
526 ,"/include/c++/"
527 ,".confdb2"
528 };
529
530 path fdPath("/proc/self/fd");
531 for(directory_iterator fdIt(fdPath); fdIt!=directory_iterator(); fdIt++) {
532 if(is_symlink(fdIt->path())) {
533 path realpath = read_symlink(fdIt->path());
534 int fd = atoi(fdIt->path().filename().string().c_str());
535
536 if (fd==1 || fd==2) // Skip stdout and stderr
537 continue;
538
539 if(exists(realpath)) {
540 if(is_regular_file(realpath)) {
541 // Check against the exclusion criteria
542 bool exclude(false);
543 for(size_t i=0;i<excludePatterns.size(); ++i) {
544 if(realpath.string().find(excludePatterns[i])!=std::string::npos) {
545 exclude = true;
546 break;
547 }
548 }
549 if(exclude) {
550 ATH_MSG_DEBUG(realpath.string() << " Excluded from the registry by the pattern");
551 }
552 else {
553 registry->push_back(AthenaInterprocess::FdsRegistryEntry(fd,realpath.string()));
554 }
555 }
556 else {
557 ATH_MSG_DEBUG(realpath.string() << " is not a regular file"); // TODO: deal with these?
558 }
559 } // File exists
560 }
561 else
562 ATH_MSG_WARNING("UNEXPECTED. " << fdIt->path().string() << " Not a symlink");
563 } // Directory iteration
564
565 ATH_MSG_DEBUG("Fds Reistry created. Contents:");
566 for(size_t ii(0); ii<registry->size(); ++ii)
567 ATH_MSG_DEBUG((*registry)[ii].fd << " " << (*registry)[ii].name);
568
569 return registry;
570}
bool exists(const std::string &filename)
does a file exist
std::set< std::string > exclude
list of directories to be excluded
Definition hcg.cxx:98
std::vector< FdsRegistryEntry > FdsRegistry
Definition FdsRegistry.h:22
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
path
python interpreter configuration --------------------------------------—
Definition athena.py:128

◆ finalize()

StatusCode AthMpEvtLoopMgr::finalize ( )
overridevirtualinherited

Definition at line 106 of file AthMpEvtLoopMgr.cxx.

107{
108 return StatusCode::SUCCESS;
109}

◆ generateOutputReport()

StatusCode AthMpEvtLoopMgr::generateOutputReport ( )
privateinherited

Definition at line 436 of file AthMpEvtLoopMgr.cxx.

437{
438 // Loop over tools, collect their output reports and put them all together into a single file.
439 // If m_nEventsBeforeFork!=0 then take into account the outputs made by the master process too
440
441 std::ofstream ofs;
442 ofs.open(m_outputReportName.value().c_str());
443 if(!ofs) {
444 ATH_MSG_ERROR("Unable to open AthenaMPOutputs for writing!");
445 return StatusCode::FAILURE;
446 }
447 else {
448 std::vector<AthenaMP::AllWorkerOutputs_ptr> allptrs;
449
450 ToolHandleArray<IAthenaMPTool>::iterator it = m_tools.begin(),
451 itLast = m_tools.end();
452 for(it=m_tools.begin(); it!=itLast; ++it)
453 allptrs.push_back((*it)->generateOutputReport());
454
455 // First collect keys=file_names from all tools
456 std::set<std::string> allkeys;
457 for(size_t i=0; i<allptrs.size(); ++i) {
458 AthenaMP::AllWorkerOutputsIterator it_wos = allptrs[i]->begin(),
459 it_wosLast = allptrs[i]->end();
460 for(;it_wos!=it_wosLast;++it_wos)
461 allkeys.insert(it_wos->first);
462 }
463
464 // Generate XML
465 ofs << "<?xml version=\"1.0\" encoding=\"utf-8\"?>" << std::endl;
466 ofs << "<athenaFileReport>" << std::endl;
467 std::set<std::string>::const_iterator keys_it = allkeys.begin(),
468 keys_itLast = allkeys.end();
469 for(;keys_it!=keys_itLast;++keys_it) {
470 ofs << " <Files OriginalName=\"" << (*keys_it) << "\">" << std::endl;
471 for(size_t i=0; i<allptrs.size(); ++i) {
472 AthenaMP::AllWorkerOutputsIterator it_wos = (allptrs[i])->find(*keys_it);
473 if(it_wos!=(allptrs[i])->end()) {
474 for(size_t ii=0; ii<it_wos->second.size(); ++ii) {
475 AthenaMP::WorkerOutput& outp = it_wos->second[ii];
476 if(ii==0 && m_nEventsBeforeFork>0) {
477 std::filesystem::path masterFile(std::filesystem::current_path());
478 masterFile /= std::filesystem::path(*keys_it);
479 if(std::filesystem::exists(masterFile) && std::filesystem::is_regular_file(masterFile))
480 ofs << " <File "
481 << "description=\"" << outp.description
482 << "\" mode=\"" << outp.access_mode
483 << "\" name=\"" << masterFile.string()
484 << "\" shared=\"" << (outp.shared?"True":"False")
485 << "\" technology=\"" << outp.technology
486 << "\"/>" << std::endl;
487 }
488 ofs << " <File "
489 << "description=\"" << outp.description
490 << "\" mode=\"" << outp.access_mode
491 << "\" name=\"" << outp.filename
492 << "\" shared=\"" << (outp.shared?"True":"False")
493 << "\" technology=\"" << outp.technology
494 << "\"/>" << std::endl;
495 }
496 }
497 }
498 ofs << " </Files>" << std::endl;
499 }
500 ofs << "</athenaFileReport>" << std::endl;
501 ofs.close();
502 }
503
504 return StatusCode::SUCCESS;
505}
Gaudi::Property< std::string > m_outputReportName
std::ostream * outp
send output to here ...
Definition hcg.cxx:76
std::string find(const std::string &s)
return a remapped string
Definition hcg.cxx:138
AllWorkerOutputs::iterator AllWorkerOutputsIterator

◆ initialize()

StatusCode AthMpEvtLoopMgr::initialize ( )
overridevirtualinherited

Definition at line 46 of file AthMpEvtLoopMgr.cxx.

47{
48 ATH_MSG_DEBUG("in initialize() ... ");
49
50 Gaudi::Concurrency::ConcurrencyFlags::setNumProcs(m_nWorkers);
51
52 SmartIF<IProperty> prpMgr(serviceLocator());
53 if(!prpMgr.isValid()) {
54 ATH_MSG_ERROR("Failed to get hold of the Property Manager");
55 return StatusCode::FAILURE;
56 }
57
58 {
59 std::string evtSelName = prpMgr->getProperty("EvtSel").toString();
60 m_evtSelector = serviceLocator()->service(std::move(evtSelName));
61 }
62 ATH_CHECK(m_evtSelector.isValid());
63
64 if(m_strategy=="EventService") {
65 // ES with non-zero events before forking makes no sense
66 if(m_nEventsBeforeFork!=0) {
67 ATH_MSG_ERROR("The EventService strategy cannot run with non-zero value for EventsBeforeFork");
68 return StatusCode::FAILURE;
69 }
70
71 // We need to ignore SkipEvents in ES
72 if(updateSkipEvents(0).isFailure()) {
73 ATH_MSG_ERROR("Failed to set skipEvents=0 in Event Service");
74 return StatusCode::FAILURE;
75 }
76 }
77
78 if(m_isPileup) {
79 m_evtProcessor = ServiceHandle<IEventProcessor>("PileUpEventLoopMgr",name());
80 ATH_MSG_INFO("ELM: The job running in pileup mode");
81 }
82 else {
83 ATH_MSG_INFO("ELM: The job running in non-pileup mode");
84 }
85
86 ATH_CHECK(m_evtProcessor.retrieve());
87 if(!m_isPileup) {
88 SmartIF<IProperty> propertyServer(m_evtProcessor.get());
89 if(propertyServer) {
90 if(propertyServer->setProperty("EventPrintoutInterval",m_eventPrintoutInterval).isFailure()) {
91 ATH_MSG_WARNING("Could not set AthenaEventLoopMgr EventPrintoutInterval to " << m_eventPrintoutInterval);
92 }
93 if(propertyServer->setProperty("ExecAtPreFork",m_execAtPreFork).isFailure()) {
94 ATH_MSG_WARNING("Could not set AthenaEventLoopMgr ExecAtPreFork property, memory usage might get affected!");
95 }
96 }
97 else {
98 ATH_MSG_WARNING("Could not cast AthenaEventLoopMgr to IProperty");
99 }
100 }
101 ATH_CHECK(m_tools.retrieve());
102
103 return StatusCode::SUCCESS;
104}
StringArrayProperty m_execAtPreFork
StatusCode updateSkipEvents(int skipEvents)
Gaudi::Property< unsigned int > m_eventPrintoutInterval
SmartIF< IService > m_evtSelector

◆ nextEvent()

StatusCode AthMpEvtLoopMgr::nextEvent ( int maxevt)
overridevirtualinherited

Definition at line 111 of file AthMpEvtLoopMgr.cxx.

112{
113 // Perhaps there we should return StatusCode::FAILURE as this method shoud not be called directly
114 return m_evtProcessor->nextEvent(maxevt);
115}

◆ stopRun()

StatusCode AthMpEvtLoopMgr::stopRun ( )
overridevirtualinherited

Definition at line 363 of file AthMpEvtLoopMgr.cxx.

364{
365 m_scheduledStop = true;
366 return m_evtProcessor->stopRun();
367}

◆ stopScheduled()

virtual bool AthMpEvtLoopMgr::stopScheduled ( ) const
inlineoverridevirtualinherited

Definition at line 42 of file AthMpEvtLoopMgr.h.

42{return m_scheduledStop;};

◆ updateSkipEvents()

StatusCode AthMpEvtLoopMgr::updateSkipEvents ( int skipEvents)
privateinherited

Definition at line 572 of file AthMpEvtLoopMgr.cxx.

573{
574 SmartIF<IProperty> propertyServer(m_evtSelector);
575 if(!propertyServer) {
576 ATH_MSG_ERROR("Unable to dyn-cast the event selector to IProperty");
577 return StatusCode::FAILURE;
578 }
579
580 IntegerProperty skipEventsProperty("SkipEvents", skipEvents);
581 if(propertyServer->setProperty(skipEventsProperty).isFailure()) {
582 ATH_MSG_ERROR("Unable to update " << skipEventsProperty.name() << " property on the Event Selector");
583 return StatusCode::FAILURE;
584 }
585 ATH_MSG_INFO("Updated the SkipEvents property of the event selector. New value: " << skipEvents);
586
587 return StatusCode::SUCCESS;
588}

◆ wait()

StatusCode AthMpEvtLoopMgr::wait ( )
privateinherited

Definition at line 379 of file AthMpEvtLoopMgr.cxx.

380{
381 ATH_MSG_INFO("Waiting for sub-processes");
382 ToolHandleArray<IAthenaMPTool>::iterator it = m_tools.begin(),
383 itLast = m_tools.end();
384 pid_t pid(0);
385 bool all_ok(true);
386
387 auto memMonTime = std::chrono::system_clock::now();
388
389 while(m_nChildProcesses>0) {
390 for(it = m_tools.begin(); it!=itLast; ++it) {
391 if((*it)->wait_once(pid).isFailure()) {
392 all_ok = false;
393 ATH_MSG_ERROR("Failure in waiting or sub-process finished abnormally");
394 break;
395 }
396 else {
397 if(pid>0) m_nChildProcesses -= 1;
398 }
399 }
400 if(!all_ok) break;
401
402 usleep(m_nPollingInterval*1000);
403
405 auto currTime = std::chrono::system_clock::now();
406 if(std::chrono::duration<double,std::ratio<1,1>>(currTime-memMonTime).count()>m_nMemSamplingInterval) {
407 unsigned long size(0);
408 unsigned long rss(0);
409 unsigned long pss(0);
410 unsigned long swap(0);
411
412 if(athenaMP_MemHelper::getPss(getpid(), pss, swap, rss, size, msgLvl(MSG::DEBUG)))
413 ATH_MSG_WARNING("Unable to get memory sample");
414 else {
415 m_samplesRss.push_back(rss);
416 m_samplesPss.push_back(pss);
417 m_samplesSize.push_back(size);
418 m_samplesSwap.push_back(swap);
419 }
420 memMonTime=currTime;
421 }
422 }
423 }
424
425 for(it=m_tools.begin(); it!=itLast; ++it)
426 (*it)->reportSubprocessStatuses();
427
428 if(!all_ok) {
429 for(it=m_tools.begin(); it!=itLast; ++it)
430 (*it)->killChildren();
431 }
432
433 return (all_ok?StatusCode::SUCCESS:StatusCode::FAILURE);
434}
void swap(DataVector< T > &a, DataVector< T > &b)
See DataVector<T, BASE>::swap().
Gaudi::Property< int > m_nPollingInterval
int count(std::string s, const std::string &regx)
count how many occurances of a regx are in a string
Definition hcg.cxx:146
int getPss(pid_t, unsigned long &, unsigned long &, unsigned long &, unsigned long &, bool verbose=false)

Member Data Documentation

◆ CollectSubprocessLogs

python.PyComps.MpEvtLoopMgr.CollectSubprocessLogs = jp.AthenaMPFlags.CollectSubprocessLogs()

Definition at line 26 of file Control/AthenaMP/python/PyComps.py.

◆ EventsBeforeFork

python.PyComps.MpEvtLoopMgr.EventsBeforeFork = jp.AthenaMPFlags.EventsBeforeFork()

Definition at line 30 of file Control/AthenaMP/python/PyComps.py.

◆ IsPileup

python.PyComps.MpEvtLoopMgr.IsPileup = isPileup

Definition at line 31 of file Control/AthenaMP/python/PyComps.py.

◆ m_collectSubprocessLogs

Gaudi::Property<bool> AthMpEvtLoopMgr::m_collectSubprocessLogs
privateinherited
Initial value:
{this, "CollectSubprocessLogs", false,
"Copy all workers' logs into the main log file at the end of the job?"}

Definition at line 64 of file AthMpEvtLoopMgr.h.

64 {this, "CollectSubprocessLogs", false,
65 "Copy all workers' logs into the main log file at the end of the job?"};

◆ m_dataShare

SmartIF<IDataShare> AthMpEvtLoopMgr::m_dataShare
privateinherited

Definition at line 47 of file AthMpEvtLoopMgr.h.

◆ m_eventPrintoutInterval

Gaudi::Property<unsigned int> AthMpEvtLoopMgr::m_eventPrintoutInterval
privateinherited
Initial value:
{this, "EventPrintoutInterval", 1,
"The value to be forwarded to the EventPrintoutInterval property of the AthenaEventLoopMgr"}

Definition at line 78 of file AthMpEvtLoopMgr.h.

78 {this, "EventPrintoutInterval", 1,
79 "The value to be forwarded to the EventPrintoutInterval property of the AthenaEventLoopMgr"};

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthMpEvtLoopMgr::m_evtProcessor {this,"EventLoopManager","AthenaEventLoopMgr"}
privateinherited

Definition at line 45 of file AthMpEvtLoopMgr.h.

45{this,"EventLoopManager","AthenaEventLoopMgr"};

◆ m_evtSelector

SmartIF<IService> AthMpEvtLoopMgr::m_evtSelector {nullptr}
privateinherited

Definition at line 46 of file AthMpEvtLoopMgr.h.

46{nullptr};

◆ m_execAtPreFork

StringArrayProperty AthMpEvtLoopMgr::m_execAtPreFork
privateinherited
Initial value:
{this, "ExecAtPreFork", {},
"The value to be forwarded to the ExecAtPreFork property of the AthenaEventLoopMgr"}

Definition at line 81 of file AthMpEvtLoopMgr.h.

81 {this, "ExecAtPreFork", {},
82 "The value to be forwarded to the ExecAtPreFork property of the AthenaEventLoopMgr"};

◆ m_isPileup

Gaudi::Property<bool> AthMpEvtLoopMgr::m_isPileup
privateinherited
Initial value:
{this, "IsPileup", false,
"Is AthenaMP running a PileUp Digitization job?"}

Definition at line 61 of file AthMpEvtLoopMgr.h.

61 {this, "IsPileup", false,
62 "Is AthenaMP running a PileUp Digitization job?"};

◆ m_masterPid

pid_t AthMpEvtLoopMgr::m_masterPid {}
privateinherited

Definition at line 85 of file AthMpEvtLoopMgr.h.

85{}; // PID of the main process

◆ m_nChildProcesses

int AthMpEvtLoopMgr::m_nChildProcesses {0}
privateinherited

Definition at line 84 of file AthMpEvtLoopMgr.h.

84{0}; // Total number of child processes

◆ m_nEventsBeforeFork

Gaudi::Property<int> AthMpEvtLoopMgr::m_nEventsBeforeFork
privateinherited
Initial value:
{this, "EventsBeforeFork", 0,
"Number of events to be processed by the main process before forking the workers. 0 - fork after BeginRun incident"}

Definition at line 75 of file AthMpEvtLoopMgr.h.

75 {this, "EventsBeforeFork", 0,
76 "Number of events to be processed by the main process before forking the workers. 0 - fork after BeginRun incident"};

◆ m_nMemSamplingInterval

Gaudi::Property<int> AthMpEvtLoopMgr::m_nMemSamplingInterval
privateinherited
Initial value:
{this, "MemSamplingInterval", 0,
"Interval in seconds between taking memory usage samples. 0 - no sampling"}

Definition at line 72 of file AthMpEvtLoopMgr.h.

72 {this, "MemSamplingInterval", 0,
73 "Interval in seconds between taking memory usage samples. 0 - no sampling"};

◆ m_nPollingInterval

Gaudi::Property<int> AthMpEvtLoopMgr::m_nPollingInterval
privateinherited
Initial value:
{this, "PollingInterval", 100,
"Interval in milliseconds between checks of sub-processes statuses"}

Definition at line 69 of file AthMpEvtLoopMgr.h.

69 {this, "PollingInterval", 100,
70 "Interval in milliseconds between checks of sub-processes statuses"};

◆ m_nWorkers

Gaudi::Property<int> AthMpEvtLoopMgr::m_nWorkers
privateinherited
Initial value:
{this, "NWorkers", 0,
"Number of AthenaMP worker processes"}

Definition at line 49 of file AthMpEvtLoopMgr.h.

49 {this, "NWorkers", 0,
50 "Number of AthenaMP worker processes"};

◆ m_outputReportName

Gaudi::Property<std::string> AthMpEvtLoopMgr::m_outputReportName
privateinherited
Initial value:
{this, "OutputReportFile", "AthenaMPOutputs",
"ASCII file in the main run directory that lists outputs of all workers. Used by Job Transform"}

Definition at line 55 of file AthMpEvtLoopMgr.h.

55 {this, "OutputReportFile", "AthenaMPOutputs",
56 "ASCII file in the main run directory that lists outputs of all workers. Used by Job Transform"};

◆ m_samplesPss

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesPss
privateinherited

Definition at line 90 of file AthMpEvtLoopMgr.h.

◆ m_samplesRss

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesRss
privateinherited

Definition at line 89 of file AthMpEvtLoopMgr.h.

◆ m_samplesSize

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesSize
privateinherited

Definition at line 91 of file AthMpEvtLoopMgr.h.

◆ m_samplesSwap

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesSwap
privateinherited

Definition at line 92 of file AthMpEvtLoopMgr.h.

◆ m_scheduledStop

bool AthMpEvtLoopMgr::m_scheduledStop {false}
privateinherited

Definition at line 86 of file AthMpEvtLoopMgr.h.

86{false}; // Flag for early termination of the event loop (for the generators use-case)

◆ m_strategy

Gaudi::Property<std::string> AthMpEvtLoopMgr::m_strategy
privateinherited
Initial value:
{this, "Strategy", "",
"Event processing strategy used by AthenaMP workers. E.g, Shared Queue, Round Robin"}

Definition at line 58 of file AthMpEvtLoopMgr.h.

58 {this, "Strategy", "",
59 "Event processing strategy used by AthenaMP workers. E.g, Shared Queue, Round Robin"};

◆ m_tools

ToolHandleArray<IAthenaMPTool> AthMpEvtLoopMgr::m_tools {this,"Tools", {}}
privateinherited

Definition at line 67 of file AthMpEvtLoopMgr.h.

67{this,"Tools", {}};

◆ m_workerTopDir

Gaudi::Property<std::string> AthMpEvtLoopMgr::m_workerTopDir
privateinherited
Initial value:
{this, "WorkerTopDir", "athenaMP_workers",
"Sub-directory of the main run directory that contains run directories of all workers"}

Definition at line 52 of file AthMpEvtLoopMgr.h.

52 {this, "WorkerTopDir", "athenaMP_workers",
53 "Sub-directory of the main run directory that contains run directories of all workers"};

◆ MemSamplingInterval

python.PyComps.MpEvtLoopMgr.MemSamplingInterval = jp.AthenaMPFlags.MemSamplingInterval()

Definition at line 29 of file Control/AthenaMP/python/PyComps.py.

◆ nThreads

python.PyComps.MpEvtLoopMgr.nThreads = theApp._opts.threads

Definition at line 14 of file Control/AthenaMP/python/PyComps.py.

◆ OutputReportFile

python.PyComps.MpEvtLoopMgr.OutputReportFile = jp.AthenaMPFlags.OutputReportFile()

Definition at line 25 of file Control/AthenaMP/python/PyComps.py.

◆ PollingInterval

python.PyComps.MpEvtLoopMgr.PollingInterval = jp.AthenaMPFlags.PollingInterval()

Definition at line 28 of file Control/AthenaMP/python/PyComps.py.

◆ Strategy

python.PyComps.MpEvtLoopMgr.Strategy = jp.AthenaMPFlags.Strategy()

Definition at line 27 of file Control/AthenaMP/python/PyComps.py.

◆ WorkerTopDir

python.PyComps.MpEvtLoopMgr.WorkerTopDir = jp.AthenaMPFlags.WorkerTopDir()

Definition at line 24 of file Control/AthenaMP/python/PyComps.py.


The documentation for this class was generated from the following file: