10 #include "GaudiKernel/IIncidentSvc.h"
11 #include "GaudiKernel/IConversionSvc.h"
13 #include "GaudiKernel/Incident.h"
14 #include "GaudiKernel/ServiceHandle.h"
15 #include "GaudiKernel/IIoComponentMgr.h"
16 #include "GaudiKernel/IIoComponent.h"
17 #include "GaudiKernel/ConcurrencyFlags.h"
38 int getPss(
pid_t,
unsigned long&,
unsigned long&,
unsigned long&,
unsigned long&,
bool verbose=
false);
42 , ISvcLocator* svcLocator)
43 : base_class(
name,svcLocator)
44 , m_evtProcessor(
"AthenaEventLoopMgr",
name)
45 , m_evtSelector(nullptr)
47 , m_workerTopDir(
"athenaMP_workers")
48 , m_outputReportName(
"AthenaMPOutputs")
51 , m_collectSubprocessLogs(false)
53 , m_nChildProcesses(0)
54 , m_nPollingInterval(100)
55 , m_nMemSamplingInterval(0)
56 , m_nEventsBeforeFork(0)
57 , m_eventPrintoutInterval(1)
59 , m_masterPid(getpid())
67 declareProperty(
"Tools",
m_tools);
83 Gaudi::Concurrency::ConcurrencyFlags::setNumProcs(
m_nWorkers);
85 SmartIF<IProperty> prpMgr(serviceLocator());
86 if(!prpMgr.isValid()) {
88 return StatusCode::FAILURE;
91 std::string evtSelName = prpMgr->getProperty(
"EvtSel").toString();
98 ATH_MSG_ERROR(
"The EventService strategy cannot run with non-zero value for EventsBeforeFork");
99 return StatusCode::FAILURE;
104 ATH_MSG_ERROR(
"Failed to set skipEvents=0 in Event Service");
105 return StatusCode::FAILURE;
114 ATH_MSG_INFO(
"ELM: The job running in non-pileup mode");
124 if(propertyServer->setProperty(
"ExecAtPreFork",
m_execAtPreFork).isFailure()) {
125 ATH_MSG_WARNING(
"Could not set AthenaEventLoopMgr ExecAtPreFork property, memory usage might get affected!");
134 return StatusCode::SUCCESS;
139 return StatusCode::SUCCESS;
150 return EventContext{};
165 std::ostringstream randStream;
167 ATH_MSG_INFO(
"Using random components for IPC object names: " << randStream.str());
176 if(pDetStore->record(evtQueue,
"AthenaMPEventQueue_"+randStream.str()).isFailure()) {
177 ATH_MSG_FATAL(
"Unable to record the pointer to the Shared Event queue into Detector Store");
179 return StatusCode::FAILURE;
187 if(pDetStore->record(failedPidQueue,
"AthenaMPFailedPidQueue_"+randStream.str()).isFailure()) {
188 ATH_MSG_FATAL(
"Unable to record the pointer to the Failed PID queue into Detector Store");
189 delete failedPidQueue;
190 return StatusCode::FAILURE;
202 srand((
unsigned)
time(0));
203 std::ostringstream randname;
208 ATH_MSG_WARNING(
"The job will attempt to save it with the name " << backupDir <<
" and create new top directory from scratch");
212 strerror_r(errno, buf,
sizeof(buf));
214 return StatusCode::FAILURE;
224 strerror_r(errno, buf,
sizeof(buf));
226 return StatusCode::FAILURE;
236 incSvc->fireIncident(Incident(
name(),
"BeforeFork"));
249 SmartIF<IDataShare> dataShare{serviceLocator()->service(
"AthenaPoolCnvSvc")};
252 auto sharedWriterTool =
m_tools[
"SharedWriterTool"];
255 if(sharedWriterWithFAFE) {
256 (*sharedWriterTool)->useFdsRegistry(
registry);
257 (*sharedWriterTool)->setRandString(randStream.str());
261 ATH_MSG_FATAL(
"makePool failed for " << (*sharedWriterTool)->name());
262 return StatusCode::FAILURE;
269 StatusCode mySc = (*sharedWriterTool)->exec();
270 if(!mySc.isSuccess()) {
272 return StatusCode::FAILURE;
276 if(!dataShare->makeClient(
m_nWorkers+1).isSuccess()) {
277 ATH_MSG_FATAL(
"Cannot make mother process a client for Conversion Service");
278 return StatusCode::FAILURE;
288 if(!scEvtProc.isSuccess()) {
290 ATH_MSG_FATAL(
"Unable to process first " << nEventsToProcess <<
" events in the master");
292 ATH_MSG_FATAL(
"Unable to process first event in the master");
307 if(sharedWriterWithFAFE && !dataShare->makeClient(0).isSuccess()) {
308 ATH_MSG_FATAL(
"Cannot make mother process not client for Conversion Service");
309 return StatusCode::FAILURE;
319 for(;
it!=itLast; ++
it) {
320 if(sharedWriterWithFAFE && (*it)->name() ==
"AthMpEvtLoopMgr.SharedWriterTool")
continue;
322 (*it)->setRandString(randStream.str());
324 incSvc->fireIncident(Incident(
name(),
"PreFork"));
329 return StatusCode::FAILURE;
338 return StatusCode::FAILURE;
343 if(sharedWriterWithFAFE && (*it)->name() ==
"AthMpEvtLoopMgr.SharedWriterTool")
continue;
344 if((*it)->exec().isFailure()) {
345 ATH_MSG_FATAL(
"Unable to submit work to the tool " << (*it)->name());
346 return StatusCode::FAILURE;
363 std::vector<std::string> logs;
365 (*it)->subProcessLogs(logs);
366 for(
size_t i=0;
i<logs.size();++
i) {
367 std::cout <<
"\n File: " << logs[
i] <<
"\n" << std::endl;
369 log.open(logs[
i].c_str(),std::ifstream::in);
373 std::cout <<
line << std::endl;
414 if((*it)->wait_once(
pid).isFailure()) {
416 ATH_MSG_ERROR(
"Failure in waiting or sub-process finished abnormally");
430 unsigned long size(0);
431 unsigned long rss(0);
432 unsigned long pss(0);
433 unsigned long swap(0);
449 (*it)->reportSubprocessStatuses();
453 (*it)->killChildren();
456 return (all_ok?StatusCode::SUCCESS:StatusCode::FAILURE);
467 ATH_MSG_ERROR(
"Unable to open AthenaMPOutputs for writing!");
468 return StatusCode::FAILURE;
471 std::vector<AthenaMP::AllWorkerOutputs_ptr> allptrs;
476 allptrs.push_back((*it)->generateOutputReport());
479 std::set<std::string> allkeys;
480 for(
size_t i=0;
i<allptrs.size(); ++
i) {
482 it_wosLast = allptrs[
i]->end();
483 for(;it_wos!=it_wosLast;++it_wos)
484 allkeys.insert(it_wos->first);
488 ofs <<
"<?xml version=\"1.0\" encoding=\"utf-8\"?>" << std::endl;
489 ofs <<
"<athenaFileReport>" << std::endl;
490 std::set<std::string>::const_iterator keys_it = allkeys.begin(),
491 keys_itLast = allkeys.end();
492 for(;keys_it!=keys_itLast;++keys_it) {
493 ofs <<
" <Files OriginalName=\"" << (*keys_it) <<
"\">" << std::endl;
494 for(
size_t i=0;
i<allptrs.size(); ++
i) {
496 if(it_wos!=(allptrs[
i])->end()) {
497 for(
size_t ii=0; ii<it_wos->second.size(); ++ii) {
504 <<
"description=\"" <<
outp.description
505 <<
"\" mode=\"" <<
outp.access_mode
506 <<
"\" name=\"" << masterFile.string()
507 <<
"\" shared=\"" << (
outp.shared?
"True":
"False")
508 <<
"\" technology=\"" <<
outp.technology
509 <<
"\"/>" << std::endl;
512 <<
"description=\"" <<
outp.description
513 <<
"\" mode=\"" <<
outp.access_mode
514 <<
"\" name=\"" <<
outp.filename
515 <<
"\" shared=\"" << (
outp.shared?
"True":
"False")
516 <<
"\" technology=\"" <<
outp.technology
517 <<
"\"/>" << std::endl;
521 ofs <<
" </Files>" << std::endl;
523 ofs <<
"</athenaFileReport>" << std::endl;
527 return StatusCode::SUCCESS;
533 using namespace std::filesystem;
541 std::vector<std::string> excludePatterns {
553 path fdPath(
"/proc/self/fd");
554 for(directory_iterator fdIt(fdPath); fdIt!=directory_iterator(); fdIt++) {
555 if(is_symlink(fdIt->path())) {
556 path realpath = read_symlink(fdIt->path());
557 int fd =
atoi(fdIt->path().filename().string().c_str());
563 if(is_regular_file(realpath)) {
566 for(
size_t i=0;
i<excludePatterns.size(); ++
i) {
567 if(realpath.string().find(excludePatterns[
i])!=std::string::npos) {
573 ATH_MSG_DEBUG(realpath.string() <<
" Excluded from the registry by the pattern");
580 ATH_MSG_DEBUG(realpath.string() <<
" is not a regular file");
585 ATH_MSG_WARNING(
"UNEXPECTED. " << fdIt->path().string() <<
" Not a symlink");
589 for(
size_t ii(0); ii<
registry->size(); ++ii)
598 if(!propertyServer) {
599 ATH_MSG_ERROR(
"Unable to dyn-cast the event selector to IProperty");
600 return StatusCode::FAILURE;
603 IntegerProperty skipEventsProperty(
"SkipEvents",
skipEvents);
604 if(propertyServer->setProperty(skipEventsProperty).isFailure()) {
605 ATH_MSG_ERROR(
"Unable to update " << skipEventsProperty.name() <<
" property on the Event Selector");
606 return StatusCode::FAILURE;
610 return StatusCode::SUCCESS;