50 Gaudi::Concurrency::ConcurrencyFlags::setNumProcs(
m_nWorkers);
52 SmartIF<IProperty> prpMgr(serviceLocator());
53 if(!prpMgr.isValid()) {
55 return StatusCode::FAILURE;
59 std::string evtSelName = prpMgr->getProperty(
"EvtSel").toString();
60 m_evtSelector = serviceLocator()->service(std::move(evtSelName));
67 ATH_MSG_ERROR(
"The EventService strategy cannot run with non-zero value for EventsBeforeFork");
68 return StatusCode::FAILURE;
74 return StatusCode::FAILURE;
93 if(propertyServer->setProperty(
"ExecAtPreFork",
m_execAtPreFork).isFailure()) {
94 ATH_MSG_WARNING(
"Could not set AthenaEventLoopMgr ExecAtPreFork property, memory usage might get affected!");
103 return StatusCode::SUCCESS;
134 std::ostringstream randStream;
136 ATH_MSG_INFO(
"Using random components for IPC object names: " << randStream.str());
145 if(pDetStore->record(evtQueue,
"AthenaMPEventQueue_"+randStream.str()).isFailure()) {
146 ATH_MSG_FATAL(
"Unable to record the pointer to the Shared Event queue into Detector Store");
148 return StatusCode::FAILURE;
156 if(pDetStore->record(failedPidQueue,
"AthenaMPFailedPidQueue_"+randStream.str()).isFailure()) {
157 ATH_MSG_FATAL(
"Unable to record the pointer to the Failed PID queue into Detector Store");
158 delete failedPidQueue;
159 return StatusCode::FAILURE;
164 if(mkdir(
m_workerTopDir.value().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)!=0) {
171 srand((
unsigned)time(0));
172 std::ostringstream randname;
179 ATH_MSG_WARNING(
"The job will attempt to save it with the name " << backupDir <<
" and create new top directory from scratch");
183 strerror_r(errno, buf,
sizeof(buf));
185 return StatusCode::FAILURE;
188 if(mkdir(
m_workerTopDir.value().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==0)
break;
194 strerror_r(errno, buf,
sizeof(buf));
196 return StatusCode::FAILURE;
206 incSvc->fireIncident(Incident(name(),
"BeforeFork"));
210 std::shared_ptr<AthenaInterprocess::FdsRegistry> registry =
extractFds();
212 ToolHandleArray<IAthenaMPTool>::iterator it =
m_tools.begin(),
219 auto sharedWriterTool =
m_tools[
"SharedWriterTool"];
222 if(sharedWriterWithFAFE) {
223 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service(
"AthenaPoolSharedIOCnvSvc"));
226 (*sharedWriterTool)->useFdsRegistry(registry);
227 (*sharedWriterTool)->setRandString(randStream.str());
231 ATH_MSG_FATAL(
"makePool failed for " << (*sharedWriterTool)->name());
232 return StatusCode::FAILURE;
239 StatusCode mySc = (*sharedWriterTool)->exec();
240 if(!mySc.isSuccess()) {
242 return StatusCode::FAILURE;
247 ATH_MSG_FATAL(
"Cannot make mother process a client for Conversion Service");
248 return StatusCode::FAILURE;
259 StatusCode scEvtProc =
m_evtProcessor->nextEvent(nEventsToProcess);
260 if(!scEvtProc.isSuccess()) {
261 if(nEventsToProcess) {
262 ATH_MSG_FATAL(
"Unable to process first " << nEventsToProcess <<
" events in the master");
265 ATH_MSG_FATAL(
"Unable to process first event in the master");
281 if(sharedWriterWithFAFE && !
m_dataShare->makeClient(0).isSuccess()) {
282 ATH_MSG_FATAL(
"Cannot make mother process not client for Conversion Service");
283 return StatusCode::FAILURE;
286 int maxEvents(maxevt);
293 for(; it!=itLast; ++it) {
294 if(sharedWriterWithFAFE && (*it)->name() ==
"AthMpEvtLoopMgr.SharedWriterTool")
continue;
295 (*it)->useFdsRegistry(registry);
296 (*it)->setRandString(randStream.str());
297 (*it)->setMaxEvt(maxevt);
298 (*it)->setMPRunStop(
this);
300 incSvc->fireIncident(Incident(name(),
"PreFork"));
305 return StatusCode::FAILURE;
314 return StatusCode::FAILURE;
318 for(it=
m_tools.begin(); it!=itLast; ++it) {
319 if(sharedWriterWithFAFE && (*it)->name() ==
"AthMpEvtLoopMgr.SharedWriterTool")
continue;
320 if((*it)->exec().isFailure()) {
321 ATH_MSG_FATAL(
"Unable to submit work to the tool " << (*it)->name());
322 return StatusCode::FAILURE;
339 std::vector<std::string> logs;
340 for(it=
m_tools.begin(); it!=itLast; ++it) {
341 (*it)->subProcessLogs(logs);
342 for(
size_t i=0;i<logs.size();++i) {
343 std::cout <<
"\n File: " << logs[i] <<
"\n" << std::endl;
345 log.open(logs[i].c_str(),std::ifstream::in);
348 std::getline(log,line);
349 std::cout << line << std::endl;
382 ToolHandleArray<IAthenaMPTool>::iterator it =
m_tools.begin(),
387 auto memMonTime = std::chrono::system_clock::now();
390 for(it =
m_tools.begin(); it!=itLast; ++it) {
391 if((*it)->wait_once(pid).isFailure()) {
393 ATH_MSG_ERROR(
"Failure in waiting or sub-process finished abnormally");
405 auto currTime = std::chrono::system_clock::now();
407 unsigned long size(0);
408 unsigned long rss(0);
409 unsigned long pss(0);
410 unsigned long swap(0);
425 for(it=
m_tools.begin(); it!=itLast; ++it)
426 (*it)->reportSubprocessStatuses();
429 for(it=
m_tools.begin(); it!=itLast; ++it)
430 (*it)->killChildren();
433 return (all_ok?StatusCode::SUCCESS:StatusCode::FAILURE);
444 ATH_MSG_ERROR(
"Unable to open AthenaMPOutputs for writing!");
445 return StatusCode::FAILURE;
448 std::vector<AthenaMP::AllWorkerOutputs_ptr> allptrs;
450 ToolHandleArray<IAthenaMPTool>::iterator it =
m_tools.begin(),
452 for(it=
m_tools.begin(); it!=itLast; ++it)
453 allptrs.push_back((*it)->generateOutputReport());
456 std::set<std::string> allkeys;
457 for(
size_t i=0; i<allptrs.size(); ++i) {
459 it_wosLast = allptrs[i]->end();
460 for(;it_wos!=it_wosLast;++it_wos)
461 allkeys.insert(it_wos->first);
465 ofs <<
"<?xml version=\"1.0\" encoding=\"utf-8\"?>" << std::endl;
466 ofs <<
"<athenaFileReport>" << std::endl;
467 std::set<std::string>::const_iterator keys_it = allkeys.begin(),
468 keys_itLast = allkeys.end();
469 for(;keys_it!=keys_itLast;++keys_it) {
470 ofs <<
" <Files OriginalName=\"" << (*keys_it) <<
"\">" << std::endl;
471 for(
size_t i=0; i<allptrs.size(); ++i) {
473 if(it_wos!=(allptrs[i])->end()) {
474 for(
size_t ii=0; ii<it_wos->second.size(); ++ii) {
477 std::filesystem::path masterFile(std::filesystem::current_path());
478 masterFile /= std::filesystem::path(*keys_it);
479 if(std::filesystem::exists(masterFile) && std::filesystem::is_regular_file(masterFile))
481 <<
"description=\"" <<
outp.description
482 <<
"\" mode=\"" <<
outp.access_mode
483 <<
"\" name=\"" << masterFile.string()
484 <<
"\" shared=\"" << (
outp.shared?
"True":
"False")
485 <<
"\" technology=\"" <<
outp.technology
486 <<
"\"/>" << std::endl;
489 <<
"description=\"" <<
outp.description
490 <<
"\" mode=\"" <<
outp.access_mode
491 <<
"\" name=\"" <<
outp.filename
492 <<
"\" shared=\"" << (
outp.shared?
"True":
"False")
493 <<
"\" technology=\"" <<
outp.technology
494 <<
"\"/>" << std::endl;
498 ofs <<
" </Files>" << std::endl;
500 ofs <<
"</athenaFileReport>" << std::endl;
504 return StatusCode::SUCCESS;
510 using namespace std::filesystem;
518 std::vector<std::string> excludePatterns {
530 path fdPath(
"/proc/self/fd");
531 for(directory_iterator fdIt(fdPath); fdIt!=directory_iterator(); fdIt++) {
532 if(is_symlink(fdIt->path())) {
533 path realpath = read_symlink(fdIt->path());
534 int fd = atoi(fdIt->path().filename().string().c_str());
540 if(is_regular_file(realpath)) {
543 for(
size_t i=0;i<excludePatterns.size(); ++i) {
544 if(realpath.string().find(excludePatterns[i])!=std::string::npos) {
550 ATH_MSG_DEBUG(realpath.string() <<
" Excluded from the registry by the pattern");
557 ATH_MSG_DEBUG(realpath.string() <<
" is not a regular file");
562 ATH_MSG_WARNING(
"UNEXPECTED. " << fdIt->path().string() <<
" Not a symlink");
566 for(
size_t ii(0); ii<registry->size(); ++ii)
567 ATH_MSG_DEBUG((*registry)[ii].fd <<
" " << (*registry)[ii].name);
void swap(DataVector< T > &a, DataVector< T > &b)
See DataVector<T, BASE>::swap().
int getPss(pid_t, unsigned long &, unsigned long &, unsigned long &, unsigned long &, bool verbose=false)