9 #include "GaudiKernel/IIncidentSvc.h"
10 #include "GaudiKernel/IConversionSvc.h"
11 #include "GaudiKernel/Incident.h"
12 #include "GaudiKernel/ServiceHandle.h"
13 #include "GaudiKernel/IIoComponentMgr.h"
14 #include "GaudiKernel/IIoComponent.h"
15 #include "GaudiKernel/ConcurrencyFlags.h"
36 int getPss(
pid_t,
unsigned long&,
unsigned long&,
unsigned long&,
unsigned long&,
bool verbose=
false);
40 , ISvcLocator* svcLocator)
41 : base_class(
name,svcLocator)
42 , m_masterPid(getpid())
50 Gaudi::Concurrency::ConcurrencyFlags::setNumProcs(
m_nWorkers);
52 SmartIF<IProperty> prpMgr(serviceLocator());
53 if(!prpMgr.isValid()) {
55 return StatusCode::FAILURE;
59 std::string evtSelName = prpMgr->getProperty(
"EvtSel").toString();
60 m_evtSelector = serviceLocator()->service(std::move(evtSelName));
67 ATH_MSG_ERROR(
"The EventService strategy cannot run with non-zero value for EventsBeforeFork");
68 return StatusCode::FAILURE;
74 return StatusCode::FAILURE;
93 if(propertyServer->setProperty(
"ExecAtPreFork",
m_execAtPreFork).isFailure()) {
94 ATH_MSG_WARNING(
"Could not set AthenaEventLoopMgr ExecAtPreFork property, memory usage might get affected!");
103 return StatusCode::SUCCESS;
108 return StatusCode::SUCCESS;
119 return EventContext{};
134 std::ostringstream randStream;
136 ATH_MSG_INFO(
"Using random components for IPC object names: " << randStream.str());
145 if(pDetStore->record(evtQueue,
"AthenaMPEventQueue_"+randStream.str()).isFailure()) {
146 ATH_MSG_FATAL(
"Unable to record the pointer to the Shared Event queue into Detector Store");
148 return StatusCode::FAILURE;
156 if(pDetStore->record(failedPidQueue,
"AthenaMPFailedPidQueue_"+randStream.str()).isFailure()) {
157 ATH_MSG_FATAL(
"Unable to record the pointer to the Failed PID queue into Detector Store");
158 delete failedPidQueue;
159 return StatusCode::FAILURE;
171 srand((
unsigned)
time(0));
172 std::ostringstream randname;
179 ATH_MSG_WARNING(
"The job will attempt to save it with the name " << backupDir <<
" and create new top directory from scratch");
183 strerror_r(errno, buf,
sizeof(buf));
185 return StatusCode::FAILURE;
188 if(
mkdir(
m_workerTopDir.value().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==0)
break;
194 strerror_r(errno, buf,
sizeof(buf));
196 return StatusCode::FAILURE;
206 incSvc->fireIncident(Incident(
name(),
"BeforeFork"));
219 auto sharedWriterTool =
m_tools[
"SharedWriterTool"];
222 if(sharedWriterWithFAFE) {
223 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service(
"AthenaPoolSharedIOCnvSvc"));
226 (*sharedWriterTool)->useFdsRegistry(
registry);
227 (*sharedWriterTool)->setRandString(randStream.str());
231 ATH_MSG_FATAL(
"makePool failed for " << (*sharedWriterTool)->name());
232 return StatusCode::FAILURE;
239 StatusCode mySc = (*sharedWriterTool)->exec();
240 if(!mySc.isSuccess()) {
242 return StatusCode::FAILURE;
247 ATH_MSG_FATAL(
"Cannot make mother process a client for Conversion Service");
248 return StatusCode::FAILURE;
260 if(!scEvtProc.isSuccess()) {
261 if(nEventsToProcess) {
262 ATH_MSG_FATAL(
"Unable to process first " << nEventsToProcess <<
" events in the master");
265 ATH_MSG_FATAL(
"Unable to process first event in the master");
281 if(sharedWriterWithFAFE && !
m_dataShare->makeClient(0).isSuccess()) {
282 ATH_MSG_FATAL(
"Cannot make mother process not client for Conversion Service");
283 return StatusCode::FAILURE;
293 for(;
it!=itLast; ++
it) {
294 if(sharedWriterWithFAFE && (*it)->name() ==
"AthMpEvtLoopMgr.SharedWriterTool")
continue;
296 (*it)->setRandString(randStream.str());
297 (*it)->setMaxEvt(maxevt);
298 (*it)->setMPRunStop(
this);
300 incSvc->fireIncident(Incident(
name(),
"PreFork"));
305 return StatusCode::FAILURE;
314 return StatusCode::FAILURE;
319 if(sharedWriterWithFAFE && (*it)->name() ==
"AthMpEvtLoopMgr.SharedWriterTool")
continue;
320 if((*it)->exec().isFailure()) {
321 ATH_MSG_FATAL(
"Unable to submit work to the tool " << (*it)->name());
322 return StatusCode::FAILURE;
339 std::vector<std::string> logs;
341 (*it)->subProcessLogs(logs);
342 for(
size_t i=0;
i<logs.size();++
i) {
343 std::cout <<
"\n File: " << logs[
i] <<
"\n" << std::endl;
345 log.open(logs[
i].c_str(),std::ifstream::in);
349 std::cout <<
line << std::endl;
391 if((*it)->wait_once(
pid).isFailure()) {
393 ATH_MSG_ERROR(
"Failure in waiting or sub-process finished abnormally");
407 unsigned long size(0);
408 unsigned long rss(0);
409 unsigned long pss(0);
410 unsigned long swap(0);
426 (*it)->reportSubprocessStatuses();
430 (*it)->killChildren();
433 return (all_ok?StatusCode::SUCCESS:StatusCode::FAILURE);
444 ATH_MSG_ERROR(
"Unable to open AthenaMPOutputs for writing!");
445 return StatusCode::FAILURE;
448 std::vector<AthenaMP::AllWorkerOutputs_ptr> allptrs;
453 allptrs.push_back((*it)->generateOutputReport());
456 std::set<std::string> allkeys;
457 for(
size_t i=0;
i<allptrs.size(); ++
i) {
459 it_wosLast = allptrs[
i]->end();
460 for(;it_wos!=it_wosLast;++it_wos)
461 allkeys.insert(it_wos->first);
465 ofs <<
"<?xml version=\"1.0\" encoding=\"utf-8\"?>" << std::endl;
466 ofs <<
"<athenaFileReport>" << std::endl;
467 std::set<std::string>::const_iterator keys_it = allkeys.begin(),
468 keys_itLast = allkeys.end();
469 for(;keys_it!=keys_itLast;++keys_it) {
470 ofs <<
" <Files OriginalName=\"" << (*keys_it) <<
"\">" << std::endl;
471 for(
size_t i=0;
i<allptrs.size(); ++
i) {
473 if(it_wos!=(allptrs[
i])->end()) {
474 for(
size_t ii=0; ii<it_wos->second.size(); ++ii) {
481 <<
"description=\"" <<
outp.description
482 <<
"\" mode=\"" <<
outp.access_mode
483 <<
"\" name=\"" << masterFile.string()
484 <<
"\" shared=\"" << (
outp.shared?
"True":
"False")
485 <<
"\" technology=\"" <<
outp.technology
486 <<
"\"/>" << std::endl;
489 <<
"description=\"" <<
outp.description
490 <<
"\" mode=\"" <<
outp.access_mode
491 <<
"\" name=\"" <<
outp.filename
492 <<
"\" shared=\"" << (
outp.shared?
"True":
"False")
493 <<
"\" technology=\"" <<
outp.technology
494 <<
"\"/>" << std::endl;
498 ofs <<
" </Files>" << std::endl;
500 ofs <<
"</athenaFileReport>" << std::endl;
504 return StatusCode::SUCCESS;
510 using namespace std::filesystem;
518 std::vector<std::string> excludePatterns {
530 path fdPath(
"/proc/self/fd");
531 for(directory_iterator fdIt(fdPath); fdIt!=directory_iterator(); fdIt++) {
532 if(is_symlink(fdIt->path())) {
533 path realpath = read_symlink(fdIt->path());
534 int fd =
atoi(fdIt->path().filename().string().c_str());
540 if(is_regular_file(realpath)) {
543 for(
size_t i=0;
i<excludePatterns.size(); ++
i) {
544 if(realpath.string().find(excludePatterns[
i])!=std::string::npos) {
550 ATH_MSG_DEBUG(realpath.string() <<
" Excluded from the registry by the pattern");
557 ATH_MSG_DEBUG(realpath.string() <<
" is not a regular file");
562 ATH_MSG_WARNING(
"UNEXPECTED. " << fdIt->path().string() <<
" Not a symlink");
566 for(
size_t ii(0); ii<
registry->size(); ++ii)
575 if(!propertyServer) {
576 ATH_MSG_ERROR(
"Unable to dyn-cast the event selector to IProperty");
577 return StatusCode::FAILURE;
580 IntegerProperty skipEventsProperty(
"SkipEvents",
skipEvents);
581 if(propertyServer->setProperty(skipEventsProperty).isFailure()) {
582 ATH_MSG_ERROR(
"Unable to update " << skipEventsProperty.name() <<
" property on the Event Selector");
583 return StatusCode::FAILURE;
587 return StatusCode::SUCCESS;