ATLAS Offline Software
Loading...
Searching...
No Matches
AthMpEvtLoopMgr Class Reference

#include <AthMpEvtLoopMgr.h>

Inheritance diagram for AthMpEvtLoopMgr:
Collaboration diagram for AthMpEvtLoopMgr:

Public Member Functions

 AthMpEvtLoopMgr (const std::string &name, ISvcLocator *svcLocator)
virtual ~AthMpEvtLoopMgr ()=default
 AthMpEvtLoopMgr ()=delete
 AthMpEvtLoopMgr (const AthMpEvtLoopMgr &)=delete
AthMpEvtLoopMgroperator= (const AthMpEvtLoopMgr &)=delete
virtual StatusCode initialize () override
virtual StatusCode finalize () override
virtual StatusCode nextEvent (int maxevt) override
virtual StatusCode executeEvent (EventContext &&ctx) override
virtual StatusCode executeRun (int maxevt) override
virtual StatusCode stopRun () override
virtual EventContext createEventContext () override
virtual bool stopScheduled () const override

Private Member Functions

StatusCode wait ()
StatusCode generateOutputReport ()
std::shared_ptr< AthenaInterprocess::FdsRegistryextractFds ()
StatusCode updateSkipEvents (int skipEvents)

Private Attributes

ServiceHandle< IEventProcessor > m_evtProcessor {this,"EventLoopManager","AthenaEventLoopMgr"}
SmartIF< IService > m_evtSelector {nullptr}
SmartIF< IDataSharem_dataShare
Gaudi::Property< int > m_nWorkers
Gaudi::Property< std::string > m_workerTopDir
Gaudi::Property< std::string > m_outputReportName
Gaudi::Property< std::string > m_strategy
Gaudi::Property< bool > m_isPileup
Gaudi::Property< bool > m_collectSubprocessLogs
ToolHandleArray< IAthenaMPToolm_tools {this,"Tools", {}}
Gaudi::Property< int > m_nPollingInterval
Gaudi::Property< int > m_nMemSamplingInterval
Gaudi::Property< int > m_nEventsBeforeFork
Gaudi::Property< unsigned int > m_eventPrintoutInterval
StringArrayProperty m_execAtPreFork
int m_nChildProcesses {0}
pid_t m_masterPid {}
bool m_scheduledStop {false}
std::vector< unsigned long > m_samplesRss
std::vector< unsigned long > m_samplesPss
std::vector< unsigned long > m_samplesSize
std::vector< unsigned long > m_samplesSwap

Detailed Description

Definition at line 19 of file AthMpEvtLoopMgr.h.

Constructor & Destructor Documentation

◆ AthMpEvtLoopMgr() [1/3]

AthMpEvtLoopMgr::AthMpEvtLoopMgr ( const std::string & name,
ISvcLocator * svcLocator )

Definition at line 39 of file AthMpEvtLoopMgr.cxx.

41 : base_class(name,svcLocator)
42 , m_masterPid(getpid())
43{
44}

◆ ~AthMpEvtLoopMgr()

virtual AthMpEvtLoopMgr::~AthMpEvtLoopMgr ( )
virtualdefault

◆ AthMpEvtLoopMgr() [2/3]

AthMpEvtLoopMgr::AthMpEvtLoopMgr ( )
delete

◆ AthMpEvtLoopMgr() [3/3]

AthMpEvtLoopMgr::AthMpEvtLoopMgr ( const AthMpEvtLoopMgr & )
delete

Member Function Documentation

◆ createEventContext()

EventContext AthMpEvtLoopMgr::createEventContext ( )
overridevirtual

Definition at line 117 of file AthMpEvtLoopMgr.cxx.

117 {
118 // return an invalid context - method should not be called
119 return EventContext{};
120}

◆ executeEvent()

StatusCode AthMpEvtLoopMgr::executeEvent ( EventContext && ctx)
overridevirtual

Definition at line 122 of file AthMpEvtLoopMgr.cxx.

123{
124 // Perhaps there we should return StatusCode::FAILURE as this method shoud not be called directly
125 return m_evtProcessor->executeEvent(std::move(ctx));
126}
ServiceHandle< IEventProcessor > m_evtProcessor

◆ executeRun()

StatusCode AthMpEvtLoopMgr::executeRun ( int maxevt)
overridevirtual

Definition at line 128 of file AthMpEvtLoopMgr.cxx.

129{
130 ATH_MSG_DEBUG("in executeRun()");
131
132 // Generate random component of the Shared Memory and Shared Queue names
133 srand(time(NULL));
134 std::ostringstream randStream;
135 randStream << getpid() << '_' << AthenaInterprocess::randString();
136 ATH_MSG_INFO("Using random components for IPC object names: " << randStream.str());
137
138 ServiceHandle<StoreGateSvc> pDetStore("DetectorStore",name());
139 ATH_CHECK(pDetStore.retrieve());
140
141 // Create Shared Event queue if necessary and make it available to the tools
142 if(m_strategy=="SharedQueue"
143 || m_strategy=="RoundRobin") {
144 AthenaInterprocess::SharedQueue* evtQueue = new AthenaInterprocess::SharedQueue("AthenaMPEventQueue_"+randStream.str(),2000,sizeof(long));
145 if(pDetStore->record(evtQueue,"AthenaMPEventQueue_"+randStream.str()).isFailure()) {
146 ATH_MSG_FATAL("Unable to record the pointer to the Shared Event queue into Detector Store");
147 delete evtQueue;
148 return StatusCode::FAILURE;
149 }
150 }
151
152 // For the Event Service: create a queue for connecting EvtRangeProcessor in the master with EvtRangeScatterer subprocess
153 // The TokenProcessor master will be sending pid-s of failed processes to Token Scatterer
154 if(m_strategy=="EventService") {
155 AthenaInterprocess::SharedQueue* failedPidQueue = new AthenaInterprocess::SharedQueue("AthenaMPFailedPidQueue_"+randStream.str(),100,sizeof(pid_t));
156 if(pDetStore->record(failedPidQueue,"AthenaMPFailedPidQueue_"+randStream.str()).isFailure()) {
157 ATH_MSG_FATAL("Unable to record the pointer to the Failed PID queue into Detector Store");
158 delete failedPidQueue;
159 return StatusCode::FAILURE;
160 }
161 }
162
163 // Prepare work directory for sub-processes
164 if(mkdir(m_workerTopDir.value().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)!=0) {
165 switch(errno) {
166 case EEXIST:
167 {
168 // Top directory already exists, maybe a leftover from previous AthenaMP job in the same rundir
169 // Rename it with m_workerTopDir+"-bak-rand"
170
171 srand((unsigned)time(0));
172 std::ostringstream randname;
173 randname << rand();
174 std::string backupDir = (m_workerTopDir.value().rfind('/')==(m_workerTopDir.value().size()-1)
175 ? m_workerTopDir.value().substr(0,m_workerTopDir.value().size()-1)
176 : m_workerTopDir.value())+std::string("-bak-")+randname.str();
177
178 ATH_MSG_WARNING("The top directory " << m_workerTopDir << " already exists");
179 ATH_MSG_WARNING("The job will attempt to save it with the name " << backupDir << " and create new top directory from scratch");
180
181 if(rename(m_workerTopDir.value().c_str(),backupDir.c_str())!=0) {
182 char buf[256];
183 strerror_r(errno, buf, sizeof(buf));
184 ATH_MSG_ERROR("Unable to make backup directory! " << buf);
185 return StatusCode::FAILURE;
186 }
187
188 if(mkdir(m_workerTopDir.value().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==0) break;
189 }
190 /* FALLTHROUGH */
191 default:
192 {
193 char buf[256];
194 strerror_r(errno, buf, sizeof(buf));
195 ATH_MSG_ERROR("Unable to make top directory " << m_workerTopDir << " for children processes! " << buf);
196 return StatusCode::FAILURE;
197 }
198 }
199 }
200
201 // When forking before 1st event, fire BeforeFork incident in non-pileup jobs
202 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc",name());
203 ATH_CHECK(incSvc.retrieve());
204
206 incSvc->fireIncident(Incident(name(),"BeforeFork"));
207 }
208
209 // Extract process file descriptors
210 std::shared_ptr<AthenaInterprocess::FdsRegistry> registry = extractFds();
211
212 ToolHandleArray<IAthenaMPTool>::iterator it = m_tools.begin(),
213 itLast = m_tools.end();
214
215 // When using SharedWriter in conjunction with fork-after-N-events
216 // we have to make sure that mother process is a conversion service
217 // client so that events before forking workers are captured...
218
219 auto sharedWriterTool = m_tools["SharedWriterTool"];
220 const bool sharedWriterWithFAFE = (m_nEventsBeforeFork!=0 && sharedWriterTool);
221
222 if(sharedWriterWithFAFE) {
223 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service("AthenaPoolSharedIOCnvSvc"));
224 ATH_CHECK(m_dataShare.isValid());
225
226 (*sharedWriterTool)->useFdsRegistry(registry);
227 (*sharedWriterTool)->setRandString(randStream.str());
228
229 int nChildren = (*sharedWriterTool)->makePool(maxevt,m_nWorkers,m_workerTopDir);
230 if(nChildren==-1) {
231 ATH_MSG_FATAL("makePool failed for " << (*sharedWriterTool)->name());
232 return StatusCode::FAILURE;
233 }
234 else {
235 m_nChildProcesses+=nChildren;
236 }
237
238 // Execute the SharedWriterTool at this point
239 StatusCode mySc = (*sharedWriterTool)->exec();
240 if(!mySc.isSuccess()) {
241 ATH_MSG_FATAL("Cannot Execute SharedWriter Tool");
242 return StatusCode::FAILURE;
243 }
244
245 // Make the mother process a client
246 if(!m_dataShare->makeClient(m_nWorkers+1).isSuccess()) {
247 ATH_MSG_FATAL("Cannot make mother process a client for Conversion Service");
248 return StatusCode::FAILURE;
249 }
250 }
251
252 //
253 // Try processing requested number of events here
255 // Take into account a corner case: m_nEventsBeforeFork > maxevt
256 int nEventsToProcess = (maxevt>-1 && m_nEventsBeforeFork>maxevt)
257 ? maxevt
259 StatusCode scEvtProc = m_evtProcessor->nextEvent(nEventsToProcess);
260 if(!scEvtProc.isSuccess()) {
261 if(nEventsToProcess) {
262 ATH_MSG_FATAL("Unable to process first " << nEventsToProcess << " events in the master");
263 }
264 else {
265 ATH_MSG_FATAL("Unable to process first event in the master");
266 }
267 return scEvtProc;
268 }
269 }
270
271 // Finalize I/O (close input files) by IoComponents
272 ServiceHandle<IIoComponentMgr> ioMgr("IoComponentMgr",name());
273 ATH_CHECK(ioMgr.retrieve());
274 ATH_CHECK(ioMgr->io_finalize());
275 ATH_MSG_DEBUG("Successfully finalized I/O before forking");
276
277 // Flush stream buffers
278 fflush(NULL);
279
280 // Make the mother process not client
281 if(sharedWriterWithFAFE && !m_dataShare->makeClient(0).isSuccess()) {
282 ATH_MSG_FATAL("Cannot make mother process not client for Conversion Service");
283 return StatusCode::FAILURE;
284 }
285
286 int maxEvents(maxevt); // This can be modified after restart
287
288 // Re-extract process file descriptors
289 registry = extractFds();
290
291 // Make worker pools
292 it = m_tools.begin();
293 for(; it!=itLast; ++it) {
294 if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
295 (*it)->useFdsRegistry(registry);
296 (*it)->setRandString(randStream.str());
297 (*it)->setMaxEvt(maxevt);
298 (*it)->setMPRunStop(this);
299 if(it==m_tools.begin()) {
300 incSvc->fireIncident(Incident(name(),"PreFork")); // Do it only once
301 }
302 int nChildren = (*it)->makePool(maxEvents,m_nWorkers,m_workerTopDir);
303 if(nChildren==-1) {
304 ATH_MSG_FATAL("makePool failed for " << (*it)->name());
305 return StatusCode::FAILURE;
306 }
307 else {
308 m_nChildProcesses+=nChildren;
309 }
310 }
311
312 if(m_nChildProcesses==0) {
313 ATH_MSG_ERROR("No child processes were created");
314 return StatusCode::FAILURE;
315 }
316
317 // Assign work to child processes
318 for(it=m_tools.begin(); it!=itLast; ++it) {
319 if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
320 if((*it)->exec().isFailure()) {
321 ATH_MSG_FATAL("Unable to submit work to the tool " << (*it)->name());
322 return StatusCode::FAILURE;
323 }
324 }
325
326 StatusCode sc = wait();
327
329 ATH_MSG_INFO("*** *** Memory Usage *** ***");
330 ATH_MSG_INFO("*** MAX PSS " << (*std::max_element(m_samplesPss.cbegin(),m_samplesPss.cend()))/1024 << "MB");
331 ATH_MSG_INFO("*** MAX RSS " << (*std::max_element(m_samplesRss.cbegin(),m_samplesRss.cend()))/1024 << "MB");
332 ATH_MSG_INFO("*** MAX SIZE " << (*std::max_element(m_samplesSize.cbegin(),m_samplesSize.cend()))/1024 << "MB");
333 ATH_MSG_INFO("*** MAX SWAP " << (*std::max_element(m_samplesSwap.cbegin(),m_samplesSwap.cend()))/1024 << "MB");
334 ATH_MSG_INFO("*** *** Memory Usage *** ***");
335 }
336
338 ATH_MSG_INFO("BEGIN collecting sub-process logs");
339 std::vector<std::string> logs;
340 for(it=m_tools.begin(); it!=itLast; ++it) {
341 (*it)->subProcessLogs(logs);
342 for(size_t i=0;i<logs.size();++i) {
343 std::cout << "\n File: " << logs[i] << "\n" << std::endl;
344 std::ifstream log;
345 log.open(logs[i].c_str(),std::ifstream::in);
346 std::string line;
347 while(!log.eof()) {
348 std::getline(log,line);
349 std::cout << line << std::endl;
350 }
351 log.close();
352 }
353 }
354 ATH_MSG_INFO("END collecting sub-process logs");
355 }
356
357 if(sc.isSuccess())
358 return generateOutputReport();
359 else
360 return sc;
361}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
int32_t pid_t
static Double_t sc
std::vector< unsigned long > m_samplesSize
SmartIF< IDataShare > m_dataShare
ToolHandleArray< IAthenaMPTool > m_tools
Gaudi::Property< int > m_nWorkers
Gaudi::Property< std::string > m_workerTopDir
Gaudi::Property< bool > m_collectSubprocessLogs
std::shared_ptr< AthenaInterprocess::FdsRegistry > extractFds()
Gaudi::Property< int > m_nEventsBeforeFork
Gaudi::Property< bool > m_isPileup
std::vector< unsigned long > m_samplesRss
Gaudi::Property< std::string > m_strategy
StatusCode generateOutputReport()
std::vector< unsigned long > m_samplesSwap
std::vector< unsigned long > m_samplesPss
Gaudi::Property< int > m_nMemSamplingInterval
time(flags, cells_name, *args, **kw)
::StatusCode StatusCode
StatusCode definition for legacy code.
mkdir(path, recursive=True)

◆ extractFds()

std::shared_ptr< AthenaInterprocess::FdsRegistry > AthMpEvtLoopMgr::extractFds ( )
private

Definition at line 507 of file AthMpEvtLoopMgr.cxx.

508{
509 ATH_MSG_DEBUG("Extracting file descriptors");
510 using namespace std::filesystem;
511 std::shared_ptr<AthenaInterprocess::FdsRegistry> registry(new AthenaInterprocess::FdsRegistry());
512
513 // Extract file descriptors associated with the current process
514 // 1. Store only those regular files in the registry, which
515 // don't contain substrings from the "exclusion pattern" set
516 // 2. Skip also stdout and stderr
517
518 std::vector<std::string> excludePatterns {
519 "/root/etc/plugins/"
520 ,"/root/cint/cint/"
521 ,"/root/include/"
522 ,"/var/tmp/"
523 ,"/var/lock/"
524 ,"/var/lib/"
525 ,"/bin/python/"
526 ,"/include/c++/"
527 ,".confdb2"
528 };
529
530 path fdPath("/proc/self/fd");
531 for(directory_iterator fdIt(fdPath); fdIt!=directory_iterator(); fdIt++) {
532 if(is_symlink(fdIt->path())) {
533 path realpath = read_symlink(fdIt->path());
534 int fd = atoi(fdIt->path().filename().string().c_str());
535
536 if (fd==1 || fd==2) // Skip stdout and stderr
537 continue;
538
539 if(exists(realpath)) {
540 if(is_regular_file(realpath)) {
541 // Check against the exclusion criteria
542 bool exclude(false);
543 for(size_t i=0;i<excludePatterns.size(); ++i) {
544 if(realpath.string().find(excludePatterns[i])!=std::string::npos) {
545 exclude = true;
546 break;
547 }
548 }
549 if(exclude) {
550 ATH_MSG_DEBUG(realpath.string() << " Excluded from the registry by the pattern");
551 }
552 else {
553 registry->push_back(AthenaInterprocess::FdsRegistryEntry(fd,realpath.string()));
554 }
555 }
556 else {
557 ATH_MSG_DEBUG(realpath.string() << " is not a regular file"); // TODO: deal with these?
558 }
559 } // File exists
560 }
561 else
562 ATH_MSG_WARNING("UNEXPECTED. " << fdIt->path().string() << " Not a symlink");
563 } // Directory iteration
564
565 ATH_MSG_DEBUG("Fds Reistry created. Contents:");
566 for(size_t ii(0); ii<registry->size(); ++ii)
567 ATH_MSG_DEBUG((*registry)[ii].fd << " " << (*registry)[ii].name);
568
569 return registry;
570}
bool exists(const std::string &filename)
does a file exist
std::set< std::string > exclude
list of directories to be excluded
Definition hcg.cxx:98
std::vector< FdsRegistryEntry > FdsRegistry
Definition FdsRegistry.h:22
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
path
python interpreter configuration --------------------------------------—
Definition athena.py:128

◆ finalize()

StatusCode AthMpEvtLoopMgr::finalize ( )
overridevirtual

Definition at line 106 of file AthMpEvtLoopMgr.cxx.

107{
108 return StatusCode::SUCCESS;
109}

◆ generateOutputReport()

StatusCode AthMpEvtLoopMgr::generateOutputReport ( )
private

Definition at line 436 of file AthMpEvtLoopMgr.cxx.

437{
438 // Loop over tools, collect their output reports and put them all together into a single file.
439 // If m_nEventsBeforeFork!=0 then take into account the outputs made by the master process too
440
441 std::ofstream ofs;
442 ofs.open(m_outputReportName.value().c_str());
443 if(!ofs) {
444 ATH_MSG_ERROR("Unable to open AthenaMPOutputs for writing!");
445 return StatusCode::FAILURE;
446 }
447 else {
448 std::vector<AthenaMP::AllWorkerOutputs_ptr> allptrs;
449
450 ToolHandleArray<IAthenaMPTool>::iterator it = m_tools.begin(),
451 itLast = m_tools.end();
452 for(it=m_tools.begin(); it!=itLast; ++it)
453 allptrs.push_back((*it)->generateOutputReport());
454
455 // First collect keys=file_names from all tools
456 std::set<std::string> allkeys;
457 for(size_t i=0; i<allptrs.size(); ++i) {
458 AthenaMP::AllWorkerOutputsIterator it_wos = allptrs[i]->begin(),
459 it_wosLast = allptrs[i]->end();
460 for(;it_wos!=it_wosLast;++it_wos)
461 allkeys.insert(it_wos->first);
462 }
463
464 // Generate XML
465 ofs << "<?xml version=\"1.0\" encoding=\"utf-8\"?>" << std::endl;
466 ofs << "<athenaFileReport>" << std::endl;
467 std::set<std::string>::const_iterator keys_it = allkeys.begin(),
468 keys_itLast = allkeys.end();
469 for(;keys_it!=keys_itLast;++keys_it) {
470 ofs << " <Files OriginalName=\"" << (*keys_it) << "\">" << std::endl;
471 for(size_t i=0; i<allptrs.size(); ++i) {
472 AthenaMP::AllWorkerOutputsIterator it_wos = (allptrs[i])->find(*keys_it);
473 if(it_wos!=(allptrs[i])->end()) {
474 for(size_t ii=0; ii<it_wos->second.size(); ++ii) {
475 AthenaMP::WorkerOutput& outp = it_wos->second[ii];
476 if(ii==0 && m_nEventsBeforeFork>0) {
477 std::filesystem::path masterFile(std::filesystem::current_path());
478 masterFile /= std::filesystem::path(*keys_it);
479 if(std::filesystem::exists(masterFile) && std::filesystem::is_regular_file(masterFile))
480 ofs << " <File "
481 << "description=\"" << outp.description
482 << "\" mode=\"" << outp.access_mode
483 << "\" name=\"" << masterFile.string()
484 << "\" shared=\"" << (outp.shared?"True":"False")
485 << "\" technology=\"" << outp.technology
486 << "\"/>" << std::endl;
487 }
488 ofs << " <File "
489 << "description=\"" << outp.description
490 << "\" mode=\"" << outp.access_mode
491 << "\" name=\"" << outp.filename
492 << "\" shared=\"" << (outp.shared?"True":"False")
493 << "\" technology=\"" << outp.technology
494 << "\"/>" << std::endl;
495 }
496 }
497 }
498 ofs << " </Files>" << std::endl;
499 }
500 ofs << "</athenaFileReport>" << std::endl;
501 ofs.close();
502 }
503
504 return StatusCode::SUCCESS;
505}
Gaudi::Property< std::string > m_outputReportName
std::ostream * outp
send output to here ...
Definition hcg.cxx:76
std::string find(const std::string &s)
return a remapped string
Definition hcg.cxx:138
AllWorkerOutputs::iterator AllWorkerOutputsIterator

◆ initialize()

StatusCode AthMpEvtLoopMgr::initialize ( )
overridevirtual

Definition at line 46 of file AthMpEvtLoopMgr.cxx.

47{
48 ATH_MSG_DEBUG("in initialize() ... ");
49
50 Gaudi::Concurrency::ConcurrencyFlags::setNumProcs(m_nWorkers);
51
52 SmartIF<IProperty> prpMgr(serviceLocator());
53 if(!prpMgr.isValid()) {
54 ATH_MSG_ERROR("Failed to get hold of the Property Manager");
55 return StatusCode::FAILURE;
56 }
57
58 {
59 std::string evtSelName = prpMgr->getProperty("EvtSel").toString();
60 m_evtSelector = serviceLocator()->service(std::move(evtSelName));
61 }
62 ATH_CHECK(m_evtSelector.isValid());
63
64 if(m_strategy=="EventService") {
65 // ES with non-zero events before forking makes no sense
66 if(m_nEventsBeforeFork!=0) {
67 ATH_MSG_ERROR("The EventService strategy cannot run with non-zero value for EventsBeforeFork");
68 return StatusCode::FAILURE;
69 }
70
71 // We need to ignore SkipEvents in ES
72 if(updateSkipEvents(0).isFailure()) {
73 ATH_MSG_ERROR("Failed to set skipEvents=0 in Event Service");
74 return StatusCode::FAILURE;
75 }
76 }
77
78 if(m_isPileup) {
79 m_evtProcessor = ServiceHandle<IEventProcessor>("PileUpEventLoopMgr",name());
80 ATH_MSG_INFO("ELM: The job running in pileup mode");
81 }
82 else {
83 ATH_MSG_INFO("ELM: The job running in non-pileup mode");
84 }
85
86 ATH_CHECK(m_evtProcessor.retrieve());
87 if(!m_isPileup) {
88 SmartIF<IProperty> propertyServer(m_evtProcessor.get());
89 if(propertyServer) {
90 if(propertyServer->setProperty("EventPrintoutInterval",m_eventPrintoutInterval).isFailure()) {
91 ATH_MSG_WARNING("Could not set AthenaEventLoopMgr EventPrintoutInterval to " << m_eventPrintoutInterval);
92 }
93 if(propertyServer->setProperty("ExecAtPreFork",m_execAtPreFork).isFailure()) {
94 ATH_MSG_WARNING("Could not set AthenaEventLoopMgr ExecAtPreFork property, memory usage might get affected!");
95 }
96 }
97 else {
98 ATH_MSG_WARNING("Could not cast AthenaEventLoopMgr to IProperty");
99 }
100 }
101 ATH_CHECK(m_tools.retrieve());
102
103 return StatusCode::SUCCESS;
104}
StringArrayProperty m_execAtPreFork
StatusCode updateSkipEvents(int skipEvents)
Gaudi::Property< unsigned int > m_eventPrintoutInterval
SmartIF< IService > m_evtSelector

◆ nextEvent()

StatusCode AthMpEvtLoopMgr::nextEvent ( int maxevt)
overridevirtual

Definition at line 111 of file AthMpEvtLoopMgr.cxx.

112{
113 // Perhaps there we should return StatusCode::FAILURE as this method shoud not be called directly
114 return m_evtProcessor->nextEvent(maxevt);
115}

◆ operator=()

AthMpEvtLoopMgr & AthMpEvtLoopMgr::operator= ( const AthMpEvtLoopMgr & )
delete

◆ stopRun()

StatusCode AthMpEvtLoopMgr::stopRun ( )
overridevirtual

Definition at line 363 of file AthMpEvtLoopMgr.cxx.

364{
365 m_scheduledStop = true;
366 return m_evtProcessor->stopRun();
367}

◆ stopScheduled()

virtual bool AthMpEvtLoopMgr::stopScheduled ( ) const
inlineoverridevirtual

Definition at line 42 of file AthMpEvtLoopMgr.h.

42{return m_scheduledStop;};

◆ updateSkipEvents()

StatusCode AthMpEvtLoopMgr::updateSkipEvents ( int skipEvents)
private

Definition at line 572 of file AthMpEvtLoopMgr.cxx.

573{
574 SmartIF<IProperty> propertyServer(m_evtSelector);
575 if(!propertyServer) {
576 ATH_MSG_ERROR("Unable to dyn-cast the event selector to IProperty");
577 return StatusCode::FAILURE;
578 }
579
580 IntegerProperty skipEventsProperty("SkipEvents", skipEvents);
581 if(propertyServer->setProperty(skipEventsProperty).isFailure()) {
582 ATH_MSG_ERROR("Unable to update " << skipEventsProperty.name() << " property on the Event Selector");
583 return StatusCode::FAILURE;
584 }
585 ATH_MSG_INFO("Updated the SkipEvents property of the event selector. New value: " << skipEvents);
586
587 return StatusCode::SUCCESS;
588}

◆ wait()

StatusCode AthMpEvtLoopMgr::wait ( )
private

Definition at line 379 of file AthMpEvtLoopMgr.cxx.

380{
381 ATH_MSG_INFO("Waiting for sub-processes");
382 ToolHandleArray<IAthenaMPTool>::iterator it = m_tools.begin(),
383 itLast = m_tools.end();
384 pid_t pid(0);
385 bool all_ok(true);
386
387 auto memMonTime = std::chrono::system_clock::now();
388
389 while(m_nChildProcesses>0) {
390 for(it = m_tools.begin(); it!=itLast; ++it) {
391 if((*it)->wait_once(pid).isFailure()) {
392 all_ok = false;
393 ATH_MSG_ERROR("Failure in waiting or sub-process finished abnormally");
394 break;
395 }
396 else {
397 if(pid>0) m_nChildProcesses -= 1;
398 }
399 }
400 if(!all_ok) break;
401
402 usleep(m_nPollingInterval*1000);
403
405 auto currTime = std::chrono::system_clock::now();
406 if(std::chrono::duration<double,std::ratio<1,1>>(currTime-memMonTime).count()>m_nMemSamplingInterval) {
407 unsigned long size(0);
408 unsigned long rss(0);
409 unsigned long pss(0);
410 unsigned long swap(0);
411
412 if(athenaMP_MemHelper::getPss(getpid(), pss, swap, rss, size, msgLvl(MSG::DEBUG)))
413 ATH_MSG_WARNING("Unable to get memory sample");
414 else {
415 m_samplesRss.push_back(rss);
416 m_samplesPss.push_back(pss);
417 m_samplesSize.push_back(size);
418 m_samplesSwap.push_back(swap);
419 }
420 memMonTime=currTime;
421 }
422 }
423 }
424
425 for(it=m_tools.begin(); it!=itLast; ++it)
426 (*it)->reportSubprocessStatuses();
427
428 if(!all_ok) {
429 for(it=m_tools.begin(); it!=itLast; ++it)
430 (*it)->killChildren();
431 }
432
433 return (all_ok?StatusCode::SUCCESS:StatusCode::FAILURE);
434}
void swap(DataVector< T > &a, DataVector< T > &b)
See DataVector<T, BASE>::swap().
Gaudi::Property< int > m_nPollingInterval
int count(std::string s, const std::string &regx)
count how many occurances of a regx are in a string
Definition hcg.cxx:146
int getPss(pid_t, unsigned long &, unsigned long &, unsigned long &, unsigned long &, bool verbose=false)

Member Data Documentation

◆ m_collectSubprocessLogs

Gaudi::Property<bool> AthMpEvtLoopMgr::m_collectSubprocessLogs
private
Initial value:
{this, "CollectSubprocessLogs", false,
"Copy all workers' logs into the main log file at the end of the job?"}

Definition at line 64 of file AthMpEvtLoopMgr.h.

64 {this, "CollectSubprocessLogs", false,
65 "Copy all workers' logs into the main log file at the end of the job?"};

◆ m_dataShare

SmartIF<IDataShare> AthMpEvtLoopMgr::m_dataShare
private

Definition at line 47 of file AthMpEvtLoopMgr.h.

◆ m_eventPrintoutInterval

Gaudi::Property<unsigned int> AthMpEvtLoopMgr::m_eventPrintoutInterval
private
Initial value:
{this, "EventPrintoutInterval", 1,
"The value to be forwarded to the EventPrintoutInterval property of the AthenaEventLoopMgr"}

Definition at line 78 of file AthMpEvtLoopMgr.h.

78 {this, "EventPrintoutInterval", 1,
79 "The value to be forwarded to the EventPrintoutInterval property of the AthenaEventLoopMgr"};

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthMpEvtLoopMgr::m_evtProcessor {this,"EventLoopManager","AthenaEventLoopMgr"}
private

Definition at line 45 of file AthMpEvtLoopMgr.h.

45{this,"EventLoopManager","AthenaEventLoopMgr"};

◆ m_evtSelector

SmartIF<IService> AthMpEvtLoopMgr::m_evtSelector {nullptr}
private

Definition at line 46 of file AthMpEvtLoopMgr.h.

46{nullptr};

◆ m_execAtPreFork

StringArrayProperty AthMpEvtLoopMgr::m_execAtPreFork
private
Initial value:
{this, "ExecAtPreFork", {},
"The value to be forwarded to the ExecAtPreFork property of the AthenaEventLoopMgr"}

Definition at line 81 of file AthMpEvtLoopMgr.h.

81 {this, "ExecAtPreFork", {},
82 "The value to be forwarded to the ExecAtPreFork property of the AthenaEventLoopMgr"};

◆ m_isPileup

Gaudi::Property<bool> AthMpEvtLoopMgr::m_isPileup
private
Initial value:
{this, "IsPileup", false,
"Is AthenaMP running a PileUp Digitization job?"}

Definition at line 61 of file AthMpEvtLoopMgr.h.

61 {this, "IsPileup", false,
62 "Is AthenaMP running a PileUp Digitization job?"};

◆ m_masterPid

pid_t AthMpEvtLoopMgr::m_masterPid {}
private

Definition at line 85 of file AthMpEvtLoopMgr.h.

85{}; // PID of the main process

◆ m_nChildProcesses

int AthMpEvtLoopMgr::m_nChildProcesses {0}
private

Definition at line 84 of file AthMpEvtLoopMgr.h.

84{0}; // Total number of child processes

◆ m_nEventsBeforeFork

Gaudi::Property<int> AthMpEvtLoopMgr::m_nEventsBeforeFork
private
Initial value:
{this, "EventsBeforeFork", 0,
"Number of events to be processed by the main process before forking the workers. 0 - fork after BeginRun incident"}

Definition at line 75 of file AthMpEvtLoopMgr.h.

75 {this, "EventsBeforeFork", 0,
76 "Number of events to be processed by the main process before forking the workers. 0 - fork after BeginRun incident"};

◆ m_nMemSamplingInterval

Gaudi::Property<int> AthMpEvtLoopMgr::m_nMemSamplingInterval
private
Initial value:
{this, "MemSamplingInterval", 0,
"Interval in seconds between taking memory usage samples. 0 - no sampling"}

Definition at line 72 of file AthMpEvtLoopMgr.h.

72 {this, "MemSamplingInterval", 0,
73 "Interval in seconds between taking memory usage samples. 0 - no sampling"};

◆ m_nPollingInterval

Gaudi::Property<int> AthMpEvtLoopMgr::m_nPollingInterval
private
Initial value:
{this, "PollingInterval", 100,
"Interval in milliseconds between checks of sub-processes statuses"}

Definition at line 69 of file AthMpEvtLoopMgr.h.

69 {this, "PollingInterval", 100,
70 "Interval in milliseconds between checks of sub-processes statuses"};

◆ m_nWorkers

Gaudi::Property<int> AthMpEvtLoopMgr::m_nWorkers
private
Initial value:
{this, "NWorkers", 0,
"Number of AthenaMP worker processes"}

Definition at line 49 of file AthMpEvtLoopMgr.h.

49 {this, "NWorkers", 0,
50 "Number of AthenaMP worker processes"};

◆ m_outputReportName

Gaudi::Property<std::string> AthMpEvtLoopMgr::m_outputReportName
private
Initial value:
{this, "OutputReportFile", "AthenaMPOutputs",
"ASCII file in the main run directory that lists outputs of all workers. Used by Job Transform"}

Definition at line 55 of file AthMpEvtLoopMgr.h.

55 {this, "OutputReportFile", "AthenaMPOutputs",
56 "ASCII file in the main run directory that lists outputs of all workers. Used by Job Transform"};

◆ m_samplesPss

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesPss
private

Definition at line 90 of file AthMpEvtLoopMgr.h.

◆ m_samplesRss

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesRss
private

Definition at line 89 of file AthMpEvtLoopMgr.h.

◆ m_samplesSize

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesSize
private

Definition at line 91 of file AthMpEvtLoopMgr.h.

◆ m_samplesSwap

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesSwap
private

Definition at line 92 of file AthMpEvtLoopMgr.h.

◆ m_scheduledStop

bool AthMpEvtLoopMgr::m_scheduledStop {false}
private

Definition at line 86 of file AthMpEvtLoopMgr.h.

86{false}; // Flag for early termination of the event loop (for the generators use-case)

◆ m_strategy

Gaudi::Property<std::string> AthMpEvtLoopMgr::m_strategy
private
Initial value:
{this, "Strategy", "",
"Event processing strategy used by AthenaMP workers. E.g, Shared Queue, Round Robin"}

Definition at line 58 of file AthMpEvtLoopMgr.h.

58 {this, "Strategy", "",
59 "Event processing strategy used by AthenaMP workers. E.g, Shared Queue, Round Robin"};

◆ m_tools

ToolHandleArray<IAthenaMPTool> AthMpEvtLoopMgr::m_tools {this,"Tools", {}}
private

Definition at line 67 of file AthMpEvtLoopMgr.h.

67{this,"Tools", {}};

◆ m_workerTopDir

Gaudi::Property<std::string> AthMpEvtLoopMgr::m_workerTopDir
private
Initial value:
{this, "WorkerTopDir", "athenaMP_workers",
"Sub-directory of the main run directory that contains run directories of all workers"}

Definition at line 52 of file AthMpEvtLoopMgr.h.

52 {this, "WorkerTopDir", "athenaMP_workers",
53 "Sub-directory of the main run directory that contains run directories of all workers"};

The documentation for this class was generated from the following files: