10 #include "GaudiKernel/IIncidentSvc.h"
11 #include "GaudiKernel/IConversionSvc.h"
13 #include "GaudiKernel/Incident.h"
14 #include "GaudiKernel/ServiceHandle.h"
15 #include "GaudiKernel/IIoComponentMgr.h"
16 #include "GaudiKernel/IIoComponent.h"
17 #include "GaudiKernel/ConcurrencyFlags.h"
38 int getPss(
pid_t,
unsigned long&,
unsigned long&,
unsigned long&,
unsigned long&,
bool verbose=
false);
42 , ISvcLocator* svcLocator)
44 , m_evtProcessor(
"AthenaEventLoopMgr",
name)
45 , m_evtSelector(nullptr)
47 , m_workerTopDir(
"athenaMP_workers")
48 , m_outputReportName(
"AthenaMPOutputs")
51 , m_collectSubprocessLogs(false)
53 , m_nChildProcesses(0)
54 , m_nPollingInterval(100)
55 , m_nMemSamplingInterval(0)
56 , m_nEventsBeforeFork(0)
57 , m_eventPrintoutInterval(1)
59 , m_masterPid(getpid())
67 declareProperty(
"Tools",
m_tools);
83 Gaudi::Concurrency::ConcurrencyFlags::setNumProcs(
m_nWorkers);
85 SmartIF<IProperty> prpMgr(serviceLocator());
86 if(!prpMgr.isValid()) {
88 return StatusCode::FAILURE;
91 std::string evtSelName = prpMgr->getProperty(
"EvtSel").toString();
98 ATH_MSG_ERROR(
"The EventService strategy cannot run with non-zero value for EventsBeforeFork");
99 return StatusCode::FAILURE;
104 ATH_MSG_ERROR(
"Failed to set skipEvents=0 in Event Service");
105 return StatusCode::FAILURE;
114 ATH_MSG_INFO(
"ELM: The job running in non-pileup mode");
124 if(propertyServer->setProperty(
"ExecAtPreFork",
m_execAtPreFork).isFailure()) {
125 ATH_MSG_WARNING(
"Could not set AthenaEventLoopMgr ExecAtPreFork property, memory usage might get affected!");
134 return StatusCode::SUCCESS;
139 return StatusCode::SUCCESS;
145 if(IEventProcessor::interfaceID().versionMatch(riid)) {
146 *ppvInterface = (IEventProcessor*)
this;
148 return StatusCode::SUCCESS;
152 return AthService::queryInterface(riid, ppvInterface);
164 return EventContext{};
179 std::ostringstream randStream;
181 ATH_MSG_INFO(
"Using random components for IPC object names: " << randStream.str());
190 if(pDetStore->record(evtQueue,
"AthenaMPEventQueue_"+randStream.str()).isFailure()) {
191 ATH_MSG_FATAL(
"Unable to record the pointer to the Shared Event queue into Detector Store");
193 return StatusCode::FAILURE;
201 if(pDetStore->record(failedPidQueue,
"AthenaMPFailedPidQueue_"+randStream.str()).isFailure()) {
202 ATH_MSG_FATAL(
"Unable to record the pointer to the Failed PID queue into Detector Store");
203 delete failedPidQueue;
204 return StatusCode::FAILURE;
216 srand((
unsigned)
time(0));
217 std::ostringstream randname;
222 ATH_MSG_WARNING(
"The job will attempt to save it with the name " << backupDir <<
" and create new top directory from scratch");
226 strerror_r(errno, buf,
sizeof(buf));
228 return StatusCode::FAILURE;
238 strerror_r(errno, buf,
sizeof(buf));
240 return StatusCode::FAILURE;
250 incSvc->fireIncident(Incident(
name(),
"BeforeFork"));
263 SmartIF<IDataShare> dataShare{serviceLocator()->service(
"AthenaPoolCnvSvc")};
266 auto sharedWriterTool =
m_tools[
"SharedWriterTool"];
269 if(sharedWriterWithFAFE) {
270 (*sharedWriterTool)->useFdsRegistry(
registry);
271 (*sharedWriterTool)->setRandString(randStream.str());
275 ATH_MSG_FATAL(
"makePool failed for " << (*sharedWriterTool)->name());
276 return StatusCode::FAILURE;
283 StatusCode mySc = (*sharedWriterTool)->exec();
284 if(!mySc.isSuccess()) {
286 return StatusCode::FAILURE;
290 if(!dataShare->makeClient(
m_nWorkers+1).isSuccess()) {
291 ATH_MSG_FATAL(
"Cannot make mother process a client for Conversion Service");
292 return StatusCode::FAILURE;
302 if(!scEvtProc.isSuccess()) {
304 ATH_MSG_FATAL(
"Unable to process first " << nEventsToProcess <<
" events in the master");
306 ATH_MSG_FATAL(
"Unable to process first event in the master");
321 if(sharedWriterWithFAFE && !dataShare->makeClient(0).isSuccess()) {
322 ATH_MSG_FATAL(
"Cannot make mother process not client for Conversion Service");
323 return StatusCode::FAILURE;
333 for(;
it!=itLast; ++
it) {
334 if(sharedWriterWithFAFE && (*it)->name() ==
"AthMpEvtLoopMgr.SharedWriterTool")
continue;
336 (*it)->setRandString(randStream.str());
338 incSvc->fireIncident(Incident(
name(),
"PreFork"));
343 return StatusCode::FAILURE;
352 return StatusCode::FAILURE;
357 if(sharedWriterWithFAFE && (*it)->name() ==
"AthMpEvtLoopMgr.SharedWriterTool")
continue;
358 if((*it)->exec().isFailure()) {
359 ATH_MSG_FATAL(
"Unable to submit work to the tool " << (*it)->name());
360 return StatusCode::FAILURE;
377 std::vector<std::string> logs;
379 (*it)->subProcessLogs(logs);
380 for(
size_t i=0;
i<logs.size();++
i) {
381 std::cout <<
"\n File: " << logs[
i] <<
"\n" << std::endl;
383 log.open(logs[
i].c_str(),std::ifstream::in);
387 std::cout <<
line << std::endl;
428 if((*it)->wait_once(
pid).isFailure()) {
430 ATH_MSG_ERROR(
"Failure in waiting or sub-process finished abnormally");
444 unsigned long size(0);
445 unsigned long rss(0);
446 unsigned long pss(0);
447 unsigned long swap(0);
463 (*it)->reportSubprocessStatuses();
467 (*it)->killChildren();
470 return (all_ok?StatusCode::SUCCESS:StatusCode::FAILURE);
481 ATH_MSG_ERROR(
"Unable to open AthenaMPOutputs for writing!");
482 return StatusCode::FAILURE;
485 std::vector<AthenaMP::AllWorkerOutputs_ptr> allptrs;
490 allptrs.push_back((*it)->generateOutputReport());
493 std::set<std::string> allkeys;
494 for(
size_t i=0;
i<allptrs.size(); ++
i) {
496 it_wosLast = allptrs[
i]->end();
497 for(;it_wos!=it_wosLast;++it_wos)
498 allkeys.insert(it_wos->first);
502 ofs <<
"<?xml version=\"1.0\" encoding=\"utf-8\"?>" << std::endl;
503 ofs <<
"<athenaFileReport>" << std::endl;
504 std::set<std::string>::const_iterator keys_it = allkeys.begin(),
505 keys_itLast = allkeys.end();
506 for(;keys_it!=keys_itLast;++keys_it) {
507 ofs <<
" <Files OriginalName=\"" << (*keys_it) <<
"\">" << std::endl;
508 for(
size_t i=0;
i<allptrs.size(); ++
i) {
510 if(it_wos!=(allptrs[
i])->end()) {
511 for(
size_t ii=0; ii<it_wos->second.size(); ++ii) {
518 <<
"description=\"" <<
outp.description
519 <<
"\" mode=\"" <<
outp.access_mode
520 <<
"\" name=\"" << masterFile.string()
521 <<
"\" shared=\"" << (
outp.shared?
"True":
"False")
522 <<
"\" technology=\"" <<
outp.technology
523 <<
"\"/>" << std::endl;
526 <<
"description=\"" <<
outp.description
527 <<
"\" mode=\"" <<
outp.access_mode
528 <<
"\" name=\"" <<
outp.filename
529 <<
"\" shared=\"" << (
outp.shared?
"True":
"False")
530 <<
"\" technology=\"" <<
outp.technology
531 <<
"\"/>" << std::endl;
535 ofs <<
" </Files>" << std::endl;
537 ofs <<
"</athenaFileReport>" << std::endl;
541 return StatusCode::SUCCESS;
547 using namespace std::filesystem;
555 std::vector<std::string> excludePatterns {
567 path fdPath(
"/proc/self/fd");
568 for(directory_iterator fdIt(fdPath); fdIt!=directory_iterator(); fdIt++) {
569 if(is_symlink(fdIt->path())) {
570 path realpath = read_symlink(fdIt->path());
571 int fd =
atoi(fdIt->path().filename().string().c_str());
577 if(is_regular_file(realpath)) {
580 for(
size_t i=0;
i<excludePatterns.size(); ++
i) {
581 if(realpath.string().find(excludePatterns[
i])!=std::string::npos) {
587 ATH_MSG_DEBUG(realpath.string() <<
" Excluded from the registry by the pattern");
594 ATH_MSG_DEBUG(realpath.string() <<
" is not a regular file");
599 ATH_MSG_WARNING(
"UNEXPECTED. " << fdIt->path().string() <<
" Not a symlink");
603 for(
size_t ii(0); ii<
registry->size(); ++ii)
612 if(!propertyServer) {
613 ATH_MSG_ERROR(
"Unable to dyn-cast the event selector to IProperty");
614 return StatusCode::FAILURE;
617 IntegerProperty skipEventsProperty(
"SkipEvents",
skipEvents);
618 if(propertyServer->setProperty(skipEventsProperty).isFailure()) {
619 ATH_MSG_ERROR(
"Unable to update " << skipEventsProperty.name() <<
" property on the Event Selector");
620 return StatusCode::FAILURE;
624 return StatusCode::SUCCESS;