10 #include "GaudiKernel/IIncidentSvc.h"
11 #include "GaudiKernel/IConversionSvc.h"
13 #include "GaudiKernel/Incident.h"
14 #include "GaudiKernel/ServiceHandle.h"
15 #include "GaudiKernel/IIoComponentMgr.h"
16 #include "GaudiKernel/IIoComponent.h"
17 #include "GaudiKernel/ConcurrencyFlags.h"
38 int getPss(
pid_t,
unsigned long&,
unsigned long&,
unsigned long&,
unsigned long&,
bool verbose=
false);
42 , ISvcLocator* svcLocator)
43 : base_class(
name,svcLocator)
44 , m_evtProcessor(
"AthenaEventLoopMgr",
name)
45 , m_evtSelector(nullptr)
47 , m_workerTopDir(
"athenaMP_workers")
48 , m_outputReportName(
"AthenaMPOutputs")
51 , m_collectSubprocessLogs(false)
53 , m_nChildProcesses(0)
54 , m_nPollingInterval(100)
55 , m_nMemSamplingInterval(0)
56 , m_nEventsBeforeFork(0)
57 , m_eventPrintoutInterval(1)
59 , m_masterPid(getpid())
67 declareProperty(
"Tools",
m_tools);
83 Gaudi::Concurrency::ConcurrencyFlags::setNumProcs(
m_nWorkers);
85 SmartIF<IProperty> prpMgr(serviceLocator());
86 if(!prpMgr.isValid()) {
88 return StatusCode::FAILURE;
91 std::string evtSelName = prpMgr->getProperty(
"EvtSel").toString();
98 ATH_MSG_ERROR(
"The EventService strategy cannot run with non-zero value for EventsBeforeFork");
99 return StatusCode::FAILURE;
104 ATH_MSG_ERROR(
"Failed to set skipEvents=0 in Event Service");
105 return StatusCode::FAILURE;
114 ATH_MSG_INFO(
"ELM: The job running in non-pileup mode");
124 if(propertyServer->setProperty(
"ExecAtPreFork",
m_execAtPreFork).isFailure()) {
125 ATH_MSG_WARNING(
"Could not set AthenaEventLoopMgr ExecAtPreFork property, memory usage might get affected!");
134 return StatusCode::SUCCESS;
139 return StatusCode::SUCCESS;
150 return EventContext{};
165 std::ostringstream randStream;
167 ATH_MSG_INFO(
"Using random components for IPC object names: " << randStream.str());
176 if(pDetStore->record(evtQueue,
"AthenaMPEventQueue_"+randStream.str()).isFailure()) {
177 ATH_MSG_FATAL(
"Unable to record the pointer to the Shared Event queue into Detector Store");
179 return StatusCode::FAILURE;
187 if(pDetStore->record(failedPidQueue,
"AthenaMPFailedPidQueue_"+randStream.str()).isFailure()) {
188 ATH_MSG_FATAL(
"Unable to record the pointer to the Failed PID queue into Detector Store");
189 delete failedPidQueue;
190 return StatusCode::FAILURE;
202 srand((
unsigned)
time(0));
203 std::ostringstream randname;
208 ATH_MSG_WARNING(
"The job will attempt to save it with the name " << backupDir <<
" and create new top directory from scratch");
212 strerror_r(errno, buf,
sizeof(buf));
214 return StatusCode::FAILURE;
224 strerror_r(errno, buf,
sizeof(buf));
226 return StatusCode::FAILURE;
236 incSvc->fireIncident(Incident(
name(),
"BeforeFork"));
249 SmartIF<IDataShare> dataShare{serviceLocator()->service(
"AthenaPoolCnvSvc")};
252 auto sharedWriterTool =
m_tools[
"SharedWriterTool"];
255 if(sharedWriterWithFAFE) {
256 (*sharedWriterTool)->useFdsRegistry(
registry);
257 (*sharedWriterTool)->setRandString(randStream.str());
261 ATH_MSG_FATAL(
"makePool failed for " << (*sharedWriterTool)->name());
262 return StatusCode::FAILURE;
269 StatusCode mySc = (*sharedWriterTool)->exec();
270 if(!mySc.isSuccess()) {
272 return StatusCode::FAILURE;
276 if(!dataShare->makeClient(
m_nWorkers+1).isSuccess()) {
277 ATH_MSG_FATAL(
"Cannot make mother process a client for Conversion Service");
278 return StatusCode::FAILURE;
288 if(!scEvtProc.isSuccess()) {
290 ATH_MSG_FATAL(
"Unable to process first " << nEventsToProcess <<
" events in the master");
292 ATH_MSG_FATAL(
"Unable to process first event in the master");
307 if(sharedWriterWithFAFE && !dataShare->makeClient(0).isSuccess()) {
308 ATH_MSG_FATAL(
"Cannot make mother process not client for Conversion Service");
309 return StatusCode::FAILURE;
319 for(;
it!=itLast; ++
it) {
320 if(sharedWriterWithFAFE && (*it)->name() ==
"AthMpEvtLoopMgr.SharedWriterTool")
continue;
322 (*it)->setRandString(randStream.str());
323 (*it)->setMaxEvt(maxevt);
324 (*it)->setMPRunStop(
this);
326 incSvc->fireIncident(Incident(
name(),
"PreFork"));
331 return StatusCode::FAILURE;
340 return StatusCode::FAILURE;
345 if(sharedWriterWithFAFE && (*it)->name() ==
"AthMpEvtLoopMgr.SharedWriterTool")
continue;
346 if((*it)->exec().isFailure()) {
347 ATH_MSG_FATAL(
"Unable to submit work to the tool " << (*it)->name());
348 return StatusCode::FAILURE;
365 std::vector<std::string> logs;
367 (*it)->subProcessLogs(logs);
368 for(
size_t i=0;
i<logs.size();++
i) {
369 std::cout <<
"\n File: " << logs[
i] <<
"\n" << std::endl;
371 log.open(logs[
i].c_str(),std::ifstream::in);
375 std::cout <<
line << std::endl;
417 if((*it)->wait_once(
pid).isFailure()) {
419 ATH_MSG_ERROR(
"Failure in waiting or sub-process finished abnormally");
433 unsigned long size(0);
434 unsigned long rss(0);
435 unsigned long pss(0);
436 unsigned long swap(0);
452 (*it)->reportSubprocessStatuses();
456 (*it)->killChildren();
459 return (all_ok?StatusCode::SUCCESS:StatusCode::FAILURE);
470 ATH_MSG_ERROR(
"Unable to open AthenaMPOutputs for writing!");
471 return StatusCode::FAILURE;
474 std::vector<AthenaMP::AllWorkerOutputs_ptr> allptrs;
479 allptrs.push_back((*it)->generateOutputReport());
482 std::set<std::string> allkeys;
483 for(
size_t i=0;
i<allptrs.size(); ++
i) {
485 it_wosLast = allptrs[
i]->end();
486 for(;it_wos!=it_wosLast;++it_wos)
487 allkeys.insert(it_wos->first);
491 ofs <<
"<?xml version=\"1.0\" encoding=\"utf-8\"?>" << std::endl;
492 ofs <<
"<athenaFileReport>" << std::endl;
493 std::set<std::string>::const_iterator keys_it = allkeys.begin(),
494 keys_itLast = allkeys.end();
495 for(;keys_it!=keys_itLast;++keys_it) {
496 ofs <<
" <Files OriginalName=\"" << (*keys_it) <<
"\">" << std::endl;
497 for(
size_t i=0;
i<allptrs.size(); ++
i) {
499 if(it_wos!=(allptrs[
i])->end()) {
500 for(
size_t ii=0; ii<it_wos->second.size(); ++ii) {
507 <<
"description=\"" <<
outp.description
508 <<
"\" mode=\"" <<
outp.access_mode
509 <<
"\" name=\"" << masterFile.string()
510 <<
"\" shared=\"" << (
outp.shared?
"True":
"False")
511 <<
"\" technology=\"" <<
outp.technology
512 <<
"\"/>" << std::endl;
515 <<
"description=\"" <<
outp.description
516 <<
"\" mode=\"" <<
outp.access_mode
517 <<
"\" name=\"" <<
outp.filename
518 <<
"\" shared=\"" << (
outp.shared?
"True":
"False")
519 <<
"\" technology=\"" <<
outp.technology
520 <<
"\"/>" << std::endl;
524 ofs <<
" </Files>" << std::endl;
526 ofs <<
"</athenaFileReport>" << std::endl;
530 return StatusCode::SUCCESS;
536 using namespace std::filesystem;
544 std::vector<std::string> excludePatterns {
556 path fdPath(
"/proc/self/fd");
557 for(directory_iterator fdIt(fdPath); fdIt!=directory_iterator(); fdIt++) {
558 if(is_symlink(fdIt->path())) {
559 path realpath = read_symlink(fdIt->path());
560 int fd =
atoi(fdIt->path().filename().string().c_str());
566 if(is_regular_file(realpath)) {
569 for(
size_t i=0;
i<excludePatterns.size(); ++
i) {
570 if(realpath.string().find(excludePatterns[
i])!=std::string::npos) {
576 ATH_MSG_DEBUG(realpath.string() <<
" Excluded from the registry by the pattern");
583 ATH_MSG_DEBUG(realpath.string() <<
" is not a regular file");
588 ATH_MSG_WARNING(
"UNEXPECTED. " << fdIt->path().string() <<
" Not a symlink");
592 for(
size_t ii(0); ii<
registry->size(); ++ii)
601 if(!propertyServer) {
602 ATH_MSG_ERROR(
"Unable to dyn-cast the event selector to IProperty");
603 return StatusCode::FAILURE;
606 IntegerProperty skipEventsProperty(
"SkipEvents",
skipEvents);
607 if(propertyServer->setProperty(skipEventsProperty).isFailure()) {
608 ATH_MSG_ERROR(
"Unable to update " << skipEventsProperty.name() <<
" property on the Event Selector");
609 return StatusCode::FAILURE;
613 return StatusCode::SUCCESS;