ATLAS Offline Software
Public Member Functions | Private Member Functions | Private Attributes | List of all members
AthMpEvtLoopMgr Class Reference

#include <AthMpEvtLoopMgr.h>

Inheritance diagram for AthMpEvtLoopMgr:
Collaboration diagram for AthMpEvtLoopMgr:

Public Member Functions

 AthMpEvtLoopMgr (const std::string &name, ISvcLocator *svcLocator)
 
virtual ~AthMpEvtLoopMgr ()=default
 
 AthMpEvtLoopMgr ()=delete
 
 AthMpEvtLoopMgr (const AthMpEvtLoopMgr &)=delete
 
AthMpEvtLoopMgroperator= (const AthMpEvtLoopMgr &)=delete
 
virtual StatusCode initialize () override
 
virtual StatusCode finalize () override
 
virtual StatusCode nextEvent (int maxevt) override
 
virtual StatusCode executeEvent (EventContext &&ctx) override
 
virtual StatusCode executeRun (int maxevt) override
 
virtual StatusCode stopRun () override
 
virtual EventContext createEventContext () override
 
virtual bool stopScheduled () const override
 

Private Member Functions

StatusCode wait ()
 
StatusCode generateOutputReport ()
 
std::shared_ptr< AthenaInterprocess::FdsRegistryextractFds ()
 
StatusCode updateSkipEvents (int skipEvents)
 

Private Attributes

ServiceHandle< IEventProcessor > m_evtProcessor {this,"EventLoopManager","AthenaEventLoopMgr"}
 
SmartIF< IService > m_evtSelector {nullptr}
 
SmartIF< IDataSharem_dataShare
 
Gaudi::Property< int > m_nWorkers
 
Gaudi::Property< std::string > m_workerTopDir
 
Gaudi::Property< std::string > m_outputReportName
 
Gaudi::Property< std::string > m_strategy
 
Gaudi::Property< bool > m_isPileup
 
Gaudi::Property< bool > m_collectSubprocessLogs
 
ToolHandleArray< IAthenaMPToolm_tools {this,"Tools", {}}
 
Gaudi::Property< int > m_nPollingInterval
 
Gaudi::Property< int > m_nMemSamplingInterval
 
Gaudi::Property< int > m_nEventsBeforeFork
 
Gaudi::Property< unsigned int > m_eventPrintoutInterval
 
StringArrayProperty m_execAtPreFork
 
int m_nChildProcesses {0}
 
pid_t m_masterPid {}
 
bool m_scheduledStop {false}
 
std::vector< unsigned long > m_samplesRss
 
std::vector< unsigned long > m_samplesPss
 
std::vector< unsigned long > m_samplesSize
 
std::vector< unsigned long > m_samplesSwap
 

Detailed Description

Definition at line 19 of file AthMpEvtLoopMgr.h.

Constructor & Destructor Documentation

◆ AthMpEvtLoopMgr() [1/3]

AthMpEvtLoopMgr::AthMpEvtLoopMgr ( const std::string &  name,
ISvcLocator *  svcLocator 
)

Definition at line 39 of file AthMpEvtLoopMgr.cxx.

41  : base_class(name,svcLocator)
42  , m_masterPid(getpid())
43 {
44 }

◆ ~AthMpEvtLoopMgr()

virtual AthMpEvtLoopMgr::~AthMpEvtLoopMgr ( )
virtualdefault

◆ AthMpEvtLoopMgr() [2/3]

AthMpEvtLoopMgr::AthMpEvtLoopMgr ( )
delete

◆ AthMpEvtLoopMgr() [3/3]

AthMpEvtLoopMgr::AthMpEvtLoopMgr ( const AthMpEvtLoopMgr )
delete

Member Function Documentation

◆ createEventContext()

EventContext AthMpEvtLoopMgr::createEventContext ( )
overridevirtual

Definition at line 117 of file AthMpEvtLoopMgr.cxx.

117  {
118  // return an invalid context - method should not be called
119  return EventContext{};
120 }

◆ executeEvent()

StatusCode AthMpEvtLoopMgr::executeEvent ( EventContext &&  ctx)
overridevirtual

Definition at line 122 of file AthMpEvtLoopMgr.cxx.

123 {
124  // Perhaps there we should return StatusCode::FAILURE as this method shoud not be called directly
125  return m_evtProcessor->executeEvent(std::move(ctx));
126 }

◆ executeRun()

StatusCode AthMpEvtLoopMgr::executeRun ( int  maxevt)
overridevirtual

Definition at line 128 of file AthMpEvtLoopMgr.cxx.

129 {
130  ATH_MSG_DEBUG("in executeRun()");
131 
132  // Generate random component of the Shared Memory and Shared Queue names
133  srand(time(NULL));
134  std::ostringstream randStream;
135  randStream << getpid() << '_' << AthenaInterprocess::randString();
136  ATH_MSG_INFO("Using random components for IPC object names: " << randStream.str());
137 
138  ServiceHandle<StoreGateSvc> pDetStore("DetectorStore",name());
139  ATH_CHECK(pDetStore.retrieve());
140 
141  // Create Shared Event queue if necessary and make it available to the tools
142  if(m_strategy=="SharedQueue"
143  || m_strategy=="RoundRobin") {
144  AthenaInterprocess::SharedQueue* evtQueue = new AthenaInterprocess::SharedQueue("AthenaMPEventQueue_"+randStream.str(),2000,sizeof(long));
145  if(pDetStore->record(evtQueue,"AthenaMPEventQueue_"+randStream.str()).isFailure()) {
146  ATH_MSG_FATAL("Unable to record the pointer to the Shared Event queue into Detector Store");
147  delete evtQueue;
148  return StatusCode::FAILURE;
149  }
150  }
151 
152  // For the Event Service: create a queue for connecting EvtRangeProcessor in the master with EvtRangeScatterer subprocess
153  // The TokenProcessor master will be sending pid-s of failed processes to Token Scatterer
154  if(m_strategy=="EventService") {
155  AthenaInterprocess::SharedQueue* failedPidQueue = new AthenaInterprocess::SharedQueue("AthenaMPFailedPidQueue_"+randStream.str(),100,sizeof(pid_t));
156  if(pDetStore->record(failedPidQueue,"AthenaMPFailedPidQueue_"+randStream.str()).isFailure()) {
157  ATH_MSG_FATAL("Unable to record the pointer to the Failed PID queue into Detector Store");
158  delete failedPidQueue;
159  return StatusCode::FAILURE;
160  }
161  }
162 
163  // Prepare work directory for sub-processes
164  if(mkdir(m_workerTopDir.value().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)!=0) {
165  switch(errno) {
166  case EEXIST:
167  {
168  // Top directory already exists, maybe a leftover from previous AthenaMP job in the same rundir
169  // Rename it with m_workerTopDir+"-bak-rand"
170 
171  srand((unsigned)time(0));
172  std::ostringstream randname;
173  randname << rand();
174  std::string backupDir = (m_workerTopDir.value().rfind('/')==(m_workerTopDir.value().size()-1)
175  ? m_workerTopDir.value().substr(0,m_workerTopDir.value().size()-1)
176  : m_workerTopDir.value())+std::string("-bak-")+randname.str();
177 
178  ATH_MSG_WARNING("The top directory " << m_workerTopDir << " already exists");
179  ATH_MSG_WARNING("The job will attempt to save it with the name " << backupDir << " and create new top directory from scratch");
180 
181  if(rename(m_workerTopDir.value().c_str(),backupDir.c_str())!=0) {
182  char buf[256];
183  strerror_r(errno, buf, sizeof(buf));
184  ATH_MSG_ERROR("Unable to make backup directory! " << buf);
185  return StatusCode::FAILURE;
186  }
187 
188  if(mkdir(m_workerTopDir.value().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==0) break;
189  }
190  /* FALLTHROUGH */
191  default:
192  {
193  char buf[256];
194  strerror_r(errno, buf, sizeof(buf));
195  ATH_MSG_ERROR("Unable to make top directory " << m_workerTopDir << " for children processes! " << buf);
196  return StatusCode::FAILURE;
197  }
198  }
199  }
200 
201  // When forking before 1st event, fire BeforeFork incident in non-pileup jobs
202  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc",name());
203  ATH_CHECK(incSvc.retrieve());
204 
205  if(m_nEventsBeforeFork==0 && !m_isPileup) {
206  incSvc->fireIncident(Incident(name(),"BeforeFork"));
207  }
208 
209  // Extract process file descriptors
210  std::shared_ptr<AthenaInterprocess::FdsRegistry> registry = extractFds();
211 
213  itLast = m_tools.end();
214 
215  // When using SharedWriter in conjunction with fork-after-N-events
216  // we have to make sure that mother process is a conversion service
217  // client so that events before forking workers are captured...
218 
219  auto sharedWriterTool = m_tools["SharedWriterTool"];
220  const bool sharedWriterWithFAFE = (m_nEventsBeforeFork!=0 && sharedWriterTool);
221 
222  if(sharedWriterWithFAFE) {
223  m_dataShare = SmartIF<IDataShare>(serviceLocator()->service("AthenaPoolSharedIOCnvSvc"));
224  ATH_CHECK(m_dataShare.isValid());
225 
226  (*sharedWriterTool)->useFdsRegistry(registry);
227  (*sharedWriterTool)->setRandString(randStream.str());
228 
229  int nChildren = (*sharedWriterTool)->makePool(maxevt,m_nWorkers,m_workerTopDir);
230  if(nChildren==-1) {
231  ATH_MSG_FATAL("makePool failed for " << (*sharedWriterTool)->name());
232  return StatusCode::FAILURE;
233  }
234  else {
235  m_nChildProcesses+=nChildren;
236  }
237 
238  // Execute the SharedWriterTool at this point
239  StatusCode mySc = (*sharedWriterTool)->exec();
240  if(!mySc.isSuccess()) {
241  ATH_MSG_FATAL("Cannot Execute SharedWriter Tool");
242  return StatusCode::FAILURE;
243  }
244 
245  // Make the mother process a client
246  if(!m_dataShare->makeClient(m_nWorkers+1).isSuccess()) {
247  ATH_MSG_FATAL("Cannot make mother process a client for Conversion Service");
248  return StatusCode::FAILURE;
249  }
250  }
251 
252  //
253  // Try processing requested number of events here
254  if(m_nEventsBeforeFork) {
255  // Take into account a corner case: m_nEventsBeforeFork > maxevt
256  int nEventsToProcess = (maxevt>-1 && m_nEventsBeforeFork>maxevt)
257  ? maxevt
258  : m_nEventsBeforeFork.value();
259  StatusCode scEvtProc = m_evtProcessor->nextEvent(nEventsToProcess);
260  if(!scEvtProc.isSuccess()) {
261  if(nEventsToProcess) {
262  ATH_MSG_FATAL("Unable to process first " << nEventsToProcess << " events in the master");
263  }
264  else {
265  ATH_MSG_FATAL("Unable to process first event in the master");
266  }
267  return scEvtProc;
268  }
269  }
270 
271  // Finalize I/O (close input files) by IoComponents
272  ServiceHandle<IIoComponentMgr> ioMgr("IoComponentMgr",name());
273  ATH_CHECK(ioMgr.retrieve());
274  ATH_CHECK(ioMgr->io_finalize());
275  ATH_MSG_DEBUG("Successfully finalized I/O before forking");
276 
277  // Flush stream buffers
278  fflush(NULL);
279 
280  // Make the mother process not client
281  if(sharedWriterWithFAFE && !m_dataShare->makeClient(0).isSuccess()) {
282  ATH_MSG_FATAL("Cannot make mother process not client for Conversion Service");
283  return StatusCode::FAILURE;
284  }
285 
286  int maxEvents(maxevt); // This can be modified after restart
287 
288  // Re-extract process file descriptors
289  registry = extractFds();
290 
291  // Make worker pools
292  it = m_tools.begin();
293  for(; it!=itLast; ++it) {
294  if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
295  (*it)->useFdsRegistry(registry);
296  (*it)->setRandString(randStream.str());
297  (*it)->setMaxEvt(maxevt);
298  (*it)->setMPRunStop(this);
299  if(it==m_tools.begin()) {
300  incSvc->fireIncident(Incident(name(),"PreFork")); // Do it only once
301  }
302  int nChildren = (*it)->makePool(maxEvents,m_nWorkers,m_workerTopDir);
303  if(nChildren==-1) {
304  ATH_MSG_FATAL("makePool failed for " << (*it)->name());
305  return StatusCode::FAILURE;
306  }
307  else {
308  m_nChildProcesses+=nChildren;
309  }
310  }
311 
312  if(m_nChildProcesses==0) {
313  ATH_MSG_ERROR("No child processes were created");
314  return StatusCode::FAILURE;
315  }
316 
317  // Assign work to child processes
318  for(it=m_tools.begin(); it!=itLast; ++it) {
319  if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
320  if((*it)->exec().isFailure()) {
321  ATH_MSG_FATAL("Unable to submit work to the tool " << (*it)->name());
322  return StatusCode::FAILURE;
323  }
324  }
325 
326  StatusCode sc = wait();
327 
328  if(m_nMemSamplingInterval>0) {
329  ATH_MSG_INFO("*** *** Memory Usage *** ***");
330  ATH_MSG_INFO("*** MAX PSS " << (*std::max_element(m_samplesPss.cbegin(),m_samplesPss.cend()))/1024 << "MB");
331  ATH_MSG_INFO("*** MAX RSS " << (*std::max_element(m_samplesRss.cbegin(),m_samplesRss.cend()))/1024 << "MB");
332  ATH_MSG_INFO("*** MAX SIZE " << (*std::max_element(m_samplesSize.cbegin(),m_samplesSize.cend()))/1024 << "MB");
333  ATH_MSG_INFO("*** MAX SWAP " << (*std::max_element(m_samplesSwap.cbegin(),m_samplesSwap.cend()))/1024 << "MB");
334  ATH_MSG_INFO("*** *** Memory Usage *** ***");
335  }
336 
338  ATH_MSG_INFO("BEGIN collecting sub-process logs");
339  std::vector<std::string> logs;
340  for(it=m_tools.begin(); it!=itLast; ++it) {
341  (*it)->subProcessLogs(logs);
342  for(size_t i=0;i<logs.size();++i) {
343  std::cout << "\n File: " << logs[i] << "\n" << std::endl;
344  std::ifstream log;
345  log.open(logs[i].c_str(),std::ifstream::in);
346  std::string line;
347  while(!log.eof()) {
348  std::getline(log,line);
349  std::cout << line << std::endl;
350  }
351  log.close();
352  }
353  }
354  ATH_MSG_INFO("END collecting sub-process logs");
355  }
356 
357  if(sc.isSuccess())
358  return generateOutputReport();
359  else
360  return sc;
361 }

◆ extractFds()

std::shared_ptr< AthenaInterprocess::FdsRegistry > AthMpEvtLoopMgr::extractFds ( )
private

Definition at line 507 of file AthMpEvtLoopMgr.cxx.

508 {
509  ATH_MSG_DEBUG("Extracting file descriptors");
510  using namespace std::filesystem;
511  std::shared_ptr<AthenaInterprocess::FdsRegistry> registry(new AthenaInterprocess::FdsRegistry());
512 
513  // Extract file descriptors associated with the current process
514  // 1. Store only those regular files in the registry, which
515  // don't contain substrings from the "exclusion pattern" set
516  // 2. Skip also stdout and stderr
517 
518  std::vector<std::string> excludePatterns {
519  "/root/etc/plugins/"
520  ,"/root/cint/cint/"
521  ,"/root/include/"
522  ,"/var/tmp/"
523  ,"/var/lock/"
524  ,"/var/lib/"
525  ,"/bin/python/"
526  ,"/include/c++/"
527  ,".confdb2"
528  };
529 
530  path fdPath("/proc/self/fd");
531  for(directory_iterator fdIt(fdPath); fdIt!=directory_iterator(); fdIt++) {
532  if(is_symlink(fdIt->path())) {
533  path realpath = read_symlink(fdIt->path());
534  int fd = atoi(fdIt->path().filename().string().c_str());
535 
536  if (fd==1 || fd==2) // Skip stdout and stderr
537  continue;
538 
539  if(exists(realpath)) {
540  if(is_regular_file(realpath)) {
541  // Check against the exclusion criteria
542  bool exclude(false);
543  for(size_t i=0;i<excludePatterns.size(); ++i) {
544  if(realpath.string().find(excludePatterns[i])!=std::string::npos) {
545  exclude = true;
546  break;
547  }
548  }
549  if(exclude) {
550  ATH_MSG_DEBUG(realpath.string() << " Excluded from the registry by the pattern");
551  }
552  else {
553  registry->push_back(AthenaInterprocess::FdsRegistryEntry(fd,realpath.string()));
554  }
555  }
556  else {
557  ATH_MSG_DEBUG(realpath.string() << " is not a regular file"); // TODO: deal with these?
558  }
559  } // File exists
560  }
561  else
562  ATH_MSG_WARNING("UNEXPECTED. " << fdIt->path().string() << " Not a symlink");
563  } // Directory iteration
564 
565  ATH_MSG_DEBUG("Fds Reistry created. Contents:");
566  for(size_t ii(0); ii<registry->size(); ++ii)
567  ATH_MSG_DEBUG((*registry)[ii].fd << " " << (*registry)[ii].name);
568 
569  return registry;
570 }

◆ finalize()

StatusCode AthMpEvtLoopMgr::finalize ( )
overridevirtual

Definition at line 106 of file AthMpEvtLoopMgr.cxx.

107 {
108  return StatusCode::SUCCESS;
109 }

◆ generateOutputReport()

StatusCode AthMpEvtLoopMgr::generateOutputReport ( )
private

Definition at line 436 of file AthMpEvtLoopMgr.cxx.

437 {
438  // Loop over tools, collect their output reports and put them all together into a single file.
439  // If m_nEventsBeforeFork!=0 then take into account the outputs made by the master process too
440 
441  std::ofstream ofs;
442  ofs.open(m_outputReportName.value().c_str());
443  if(!ofs) {
444  ATH_MSG_ERROR("Unable to open AthenaMPOutputs for writing!");
445  return StatusCode::FAILURE;
446  }
447  else {
448  std::vector<AthenaMP::AllWorkerOutputs_ptr> allptrs;
449 
451  itLast = m_tools.end();
452  for(it=m_tools.begin(); it!=itLast; ++it)
453  allptrs.push_back((*it)->generateOutputReport());
454 
455  // First collect keys=file_names from all tools
456  std::set<std::string> allkeys;
457  for(size_t i=0; i<allptrs.size(); ++i) {
458  AthenaMP::AllWorkerOutputsIterator it_wos = allptrs[i]->begin(),
459  it_wosLast = allptrs[i]->end();
460  for(;it_wos!=it_wosLast;++it_wos)
461  allkeys.insert(it_wos->first);
462  }
463 
464  // Generate XML
465  ofs << "<?xml version=\"1.0\" encoding=\"utf-8\"?>" << std::endl;
466  ofs << "<athenaFileReport>" << std::endl;
467  std::set<std::string>::const_iterator keys_it = allkeys.begin(),
468  keys_itLast = allkeys.end();
469  for(;keys_it!=keys_itLast;++keys_it) {
470  ofs << " <Files OriginalName=\"" << (*keys_it) << "\">" << std::endl;
471  for(size_t i=0; i<allptrs.size(); ++i) {
472  AthenaMP::AllWorkerOutputsIterator it_wos = (allptrs[i])->find(*keys_it);
473  if(it_wos!=(allptrs[i])->end()) {
474  for(size_t ii=0; ii<it_wos->second.size(); ++ii) {
475  AthenaMP::WorkerOutput& outp = it_wos->second[ii];
476  if(ii==0 && m_nEventsBeforeFork>0) {
477  std::filesystem::path masterFile(std::filesystem::current_path());
478  masterFile /= std::filesystem::path(*keys_it);
479  if(std::filesystem::exists(masterFile) && std::filesystem::is_regular_file(masterFile))
480  ofs << " <File "
481  << "description=\"" << outp.description
482  << "\" mode=\"" << outp.access_mode
483  << "\" name=\"" << masterFile.string()
484  << "\" shared=\"" << (outp.shared?"True":"False")
485  << "\" technology=\"" << outp.technology
486  << "\"/>" << std::endl;
487  }
488  ofs << " <File "
489  << "description=\"" << outp.description
490  << "\" mode=\"" << outp.access_mode
491  << "\" name=\"" << outp.filename
492  << "\" shared=\"" << (outp.shared?"True":"False")
493  << "\" technology=\"" << outp.technology
494  << "\"/>" << std::endl;
495  }
496  }
497  }
498  ofs << " </Files>" << std::endl;
499  }
500  ofs << "</athenaFileReport>" << std::endl;
501  ofs.close();
502  }
503 
504  return StatusCode::SUCCESS;
505 }

◆ initialize()

StatusCode AthMpEvtLoopMgr::initialize ( )
overridevirtual

Definition at line 46 of file AthMpEvtLoopMgr.cxx.

47 {
48  ATH_MSG_DEBUG("in initialize() ... ");
49 
50  Gaudi::Concurrency::ConcurrencyFlags::setNumProcs(m_nWorkers);
51 
52  SmartIF<IProperty> prpMgr(serviceLocator());
53  if(!prpMgr.isValid()) {
54  ATH_MSG_ERROR("Failed to get hold of the Property Manager");
55  return StatusCode::FAILURE;
56  }
57 
58  {
59  std::string evtSelName = prpMgr->getProperty("EvtSel").toString();
60  m_evtSelector = serviceLocator()->service(std::move(evtSelName));
61  }
62  ATH_CHECK(m_evtSelector.isValid());
63 
64  if(m_strategy=="EventService") {
65  // ES with non-zero events before forking makes no sense
66  if(m_nEventsBeforeFork!=0) {
67  ATH_MSG_ERROR("The EventService strategy cannot run with non-zero value for EventsBeforeFork");
68  return StatusCode::FAILURE;
69  }
70 
71  // We need to ignore SkipEvents in ES
72  if(updateSkipEvents(0).isFailure()) {
73  ATH_MSG_ERROR("Failed to set skipEvents=0 in Event Service");
74  return StatusCode::FAILURE;
75  }
76  }
77 
78  if(m_isPileup) {
79  m_evtProcessor = ServiceHandle<IEventProcessor>("PileUpEventLoopMgr",name());
80  ATH_MSG_INFO("ELM: The job running in pileup mode");
81  }
82  else {
83  ATH_MSG_INFO("ELM: The job running in non-pileup mode");
84  }
85 
86  ATH_CHECK(m_evtProcessor.retrieve());
87  if(!m_isPileup) {
88  SmartIF<IProperty> propertyServer(m_evtProcessor.get());
89  if(propertyServer) {
90  if(propertyServer->setProperty("EventPrintoutInterval",m_eventPrintoutInterval).isFailure()) {
91  ATH_MSG_WARNING("Could not set AthenaEventLoopMgr EventPrintoutInterval to " << m_eventPrintoutInterval);
92  }
93  if(propertyServer->setProperty("ExecAtPreFork",m_execAtPreFork).isFailure()) {
94  ATH_MSG_WARNING("Could not set AthenaEventLoopMgr ExecAtPreFork property, memory usage might get affected!");
95  }
96  }
97  else {
98  ATH_MSG_WARNING("Could not cast AthenaEventLoopMgr to IProperty");
99  }
100  }
101  ATH_CHECK(m_tools.retrieve());
102 
103  return StatusCode::SUCCESS;
104 }

◆ nextEvent()

StatusCode AthMpEvtLoopMgr::nextEvent ( int  maxevt)
overridevirtual

Definition at line 111 of file AthMpEvtLoopMgr.cxx.

112 {
113  // Perhaps there we should return StatusCode::FAILURE as this method shoud not be called directly
114  return m_evtProcessor->nextEvent(maxevt);
115 }

◆ operator=()

AthMpEvtLoopMgr& AthMpEvtLoopMgr::operator= ( const AthMpEvtLoopMgr )
delete

◆ stopRun()

StatusCode AthMpEvtLoopMgr::stopRun ( )
overridevirtual

Definition at line 363 of file AthMpEvtLoopMgr.cxx.

364 {
365  m_scheduledStop = true;
366  return m_evtProcessor->stopRun();
367 }

◆ stopScheduled()

virtual bool AthMpEvtLoopMgr::stopScheduled ( ) const
inlineoverridevirtual

Definition at line 42 of file AthMpEvtLoopMgr.h.

42 {return m_scheduledStop;};

◆ updateSkipEvents()

StatusCode AthMpEvtLoopMgr::updateSkipEvents ( int  skipEvents)
private

Definition at line 572 of file AthMpEvtLoopMgr.cxx.

573 {
574  SmartIF<IProperty> propertyServer(m_evtSelector);
575  if(!propertyServer) {
576  ATH_MSG_ERROR("Unable to dyn-cast the event selector to IProperty");
577  return StatusCode::FAILURE;
578  }
579 
580  IntegerProperty skipEventsProperty("SkipEvents", skipEvents);
581  if(propertyServer->setProperty(skipEventsProperty).isFailure()) {
582  ATH_MSG_ERROR("Unable to update " << skipEventsProperty.name() << " property on the Event Selector");
583  return StatusCode::FAILURE;
584  }
585  ATH_MSG_INFO("Updated the SkipEvents property of the event selector. New value: " << skipEvents);
586 
587  return StatusCode::SUCCESS;
588 }

◆ wait()

StatusCode AthMpEvtLoopMgr::wait ( )
private

Definition at line 379 of file AthMpEvtLoopMgr.cxx.

380 {
381  ATH_MSG_INFO("Waiting for sub-processes");
383  itLast = m_tools.end();
384  pid_t pid(0);
385  bool all_ok(true);
386 
387  auto memMonTime = std::chrono::system_clock::now();
388 
389  while(m_nChildProcesses>0) {
390  for(it = m_tools.begin(); it!=itLast; ++it) {
391  if((*it)->wait_once(pid).isFailure()) {
392  all_ok = false;
393  ATH_MSG_ERROR("Failure in waiting or sub-process finished abnormally");
394  break;
395  }
396  else {
397  if(pid>0) m_nChildProcesses -= 1;
398  }
399  }
400  if(!all_ok) break;
401 
402  usleep(m_nPollingInterval*1000);
403 
404  if(m_nMemSamplingInterval>0) {
405  auto currTime = std::chrono::system_clock::now();
406  if(std::chrono::duration<double,std::ratio<1,1>>(currTime-memMonTime).count()>m_nMemSamplingInterval) {
407  unsigned long size(0);
408  unsigned long rss(0);
409  unsigned long pss(0);
410  unsigned long swap(0);
411 
412  if(athenaMP_MemHelper::getPss(getpid(), pss, swap, rss, size, msgLvl(MSG::DEBUG)))
413  ATH_MSG_WARNING("Unable to get memory sample");
414  else {
415  m_samplesRss.push_back(rss);
416  m_samplesPss.push_back(pss);
417  m_samplesSize.push_back(size);
418  m_samplesSwap.push_back(swap);
419  }
420  memMonTime=currTime;
421  }
422  }
423  }
424 
425  for(it=m_tools.begin(); it!=itLast; ++it)
426  (*it)->reportSubprocessStatuses();
427 
428  if(!all_ok) {
429  for(it=m_tools.begin(); it!=itLast; ++it)
430  (*it)->killChildren();
431  }
432 
433  return (all_ok?StatusCode::SUCCESS:StatusCode::FAILURE);
434 }

Member Data Documentation

◆ m_collectSubprocessLogs

Gaudi::Property<bool> AthMpEvtLoopMgr::m_collectSubprocessLogs
private
Initial value:
{this, "CollectSubprocessLogs", false,
"Copy all workers' logs into the main log file at the end of the job?"}

Definition at line 64 of file AthMpEvtLoopMgr.h.

◆ m_dataShare

SmartIF<IDataShare> AthMpEvtLoopMgr::m_dataShare
private

Definition at line 47 of file AthMpEvtLoopMgr.h.

◆ m_eventPrintoutInterval

Gaudi::Property<unsigned int> AthMpEvtLoopMgr::m_eventPrintoutInterval
private
Initial value:
{this, "EventPrintoutInterval", 1,
"The value to be forwarded to the EventPrintoutInterval property of the AthenaEventLoopMgr"}

Definition at line 78 of file AthMpEvtLoopMgr.h.

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthMpEvtLoopMgr::m_evtProcessor {this,"EventLoopManager","AthenaEventLoopMgr"}
private

Definition at line 45 of file AthMpEvtLoopMgr.h.

◆ m_evtSelector

SmartIF<IService> AthMpEvtLoopMgr::m_evtSelector {nullptr}
private

Definition at line 46 of file AthMpEvtLoopMgr.h.

◆ m_execAtPreFork

StringArrayProperty AthMpEvtLoopMgr::m_execAtPreFork
private
Initial value:
{this, "ExecAtPreFork", {},
"The value to be forwarded to the ExecAtPreFork property of the AthenaEventLoopMgr"}

Definition at line 81 of file AthMpEvtLoopMgr.h.

◆ m_isPileup

Gaudi::Property<bool> AthMpEvtLoopMgr::m_isPileup
private
Initial value:
{this, "IsPileup", false,
"Is AthenaMP running a PileUp Digitization job?"}

Definition at line 61 of file AthMpEvtLoopMgr.h.

◆ m_masterPid

pid_t AthMpEvtLoopMgr::m_masterPid {}
private

Definition at line 85 of file AthMpEvtLoopMgr.h.

◆ m_nChildProcesses

int AthMpEvtLoopMgr::m_nChildProcesses {0}
private

Definition at line 84 of file AthMpEvtLoopMgr.h.

◆ m_nEventsBeforeFork

Gaudi::Property<int> AthMpEvtLoopMgr::m_nEventsBeforeFork
private
Initial value:
{this, "EventsBeforeFork", 0,
"Number of events to be processed by the main process before forking the workers. 0 - fork after BeginRun incident"}

Definition at line 75 of file AthMpEvtLoopMgr.h.

◆ m_nMemSamplingInterval

Gaudi::Property<int> AthMpEvtLoopMgr::m_nMemSamplingInterval
private
Initial value:
{this, "MemSamplingInterval", 0,
"Interval in seconds between taking memory usage samples. 0 - no sampling"}

Definition at line 72 of file AthMpEvtLoopMgr.h.

◆ m_nPollingInterval

Gaudi::Property<int> AthMpEvtLoopMgr::m_nPollingInterval
private
Initial value:
{this, "PollingInterval", 100,
"Interval in milliseconds between checks of sub-processes statuses"}

Definition at line 69 of file AthMpEvtLoopMgr.h.

◆ m_nWorkers

Gaudi::Property<int> AthMpEvtLoopMgr::m_nWorkers
private
Initial value:
{this, "NWorkers", 0,
"Number of AthenaMP worker processes"}

Definition at line 49 of file AthMpEvtLoopMgr.h.

◆ m_outputReportName

Gaudi::Property<std::string> AthMpEvtLoopMgr::m_outputReportName
private
Initial value:
{this, "OutputReportFile", "AthenaMPOutputs",
"ASCII file in the main run directory that lists outputs of all workers. Used by Job Transform"}

Definition at line 55 of file AthMpEvtLoopMgr.h.

◆ m_samplesPss

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesPss
private

Definition at line 90 of file AthMpEvtLoopMgr.h.

◆ m_samplesRss

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesRss
private

Definition at line 89 of file AthMpEvtLoopMgr.h.

◆ m_samplesSize

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesSize
private

Definition at line 91 of file AthMpEvtLoopMgr.h.

◆ m_samplesSwap

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesSwap
private

Definition at line 92 of file AthMpEvtLoopMgr.h.

◆ m_scheduledStop

bool AthMpEvtLoopMgr::m_scheduledStop {false}
private

Definition at line 86 of file AthMpEvtLoopMgr.h.

◆ m_strategy

Gaudi::Property<std::string> AthMpEvtLoopMgr::m_strategy
private
Initial value:
{this, "Strategy", "",
"Event processing strategy used by AthenaMP workers. E.g, Shared Queue, Round Robin"}

Definition at line 58 of file AthMpEvtLoopMgr.h.

◆ m_tools

ToolHandleArray<IAthenaMPTool> AthMpEvtLoopMgr::m_tools {this,"Tools", {}}
private

Definition at line 67 of file AthMpEvtLoopMgr.h.

◆ m_workerTopDir

Gaudi::Property<std::string> AthMpEvtLoopMgr::m_workerTopDir
private
Initial value:
{this, "WorkerTopDir", "athenaMP_workers",
"Sub-directory of the main run directory that contains run directories of all workers"}

Definition at line 52 of file AthMpEvtLoopMgr.h.


The documentation for this class was generated from the following files:
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
outp
std::ostream * outp
send output to here ...
Definition: hcg.cxx:73
python.Dso.registry
registry
Definition: Control/AthenaServices/python/Dso.py:158
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthMpEvtLoopMgr::m_collectSubprocessLogs
Gaudi::Property< bool > m_collectSubprocessLogs
Definition: AthMpEvtLoopMgr.h:64
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
find
std::string find(const std::string &s)
return a remapped string
Definition: hcg.cxx:135
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:15
AthMpEvtLoopMgr::m_nWorkers
Gaudi::Property< int > m_nWorkers
Definition: AthMpEvtLoopMgr.h:49
AthMpEvtLoopMgr::generateOutputReport
StatusCode generateOutputReport()
Definition: AthMpEvtLoopMgr.cxx:436
AthMpEvtLoopMgr::m_nEventsBeforeFork
Gaudi::Property< int > m_nEventsBeforeFork
Definition: AthMpEvtLoopMgr.h:75
AthMpEvtLoopMgr::m_samplesRss
std::vector< unsigned long > m_samplesRss
Definition: AthMpEvtLoopMgr.h:89
skel.it
it
Definition: skel.GENtoEVGEN.py:407
AthMpEvtLoopMgr::m_nChildProcesses
int m_nChildProcesses
Definition: AthMpEvtLoopMgr.h:84
AthMpEvtLoopMgr::updateSkipEvents
StatusCode updateSkipEvents(int skipEvents)
Definition: AthMpEvtLoopMgr.cxx:572
exclude
std::set< std::string > exclude
list of directories to be excluded
Definition: hcg.cxx:95
AthMpEvtLoopMgr::m_tools
ToolHandleArray< IAthenaMPTool > m_tools
Definition: AthMpEvtLoopMgr.h:67
dq_defect_bulk_create_defects.line
line
Definition: dq_defect_bulk_create_defects.py:27
XMLtoHeader.count
count
Definition: XMLtoHeader.py:84
AthMpEvtLoopMgr::m_samplesSize
std::vector< unsigned long > m_samplesSize
Definition: AthMpEvtLoopMgr.h:91
AthenaMP::AllWorkerOutputsIterator
AllWorkerOutputs::iterator AllWorkerOutputsIterator
Definition: IAthenaMPTool.h:30
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthMpEvtLoopMgr::m_isPileup
Gaudi::Property< bool > m_isPileup
Definition: AthMpEvtLoopMgr.h:61
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:55
python.SCT_ByteStreamErrorsTestAlgConfig.maxEvents
maxEvents
Definition: SCT_ByteStreamErrorsTestAlgConfig.py:43
AthMpEvtLoopMgr::m_samplesPss
std::vector< unsigned long > m_samplesPss
Definition: AthMpEvtLoopMgr.h:90
athenaMP_MemHelper::getPss
int getPss(pid_t, unsigned long &, unsigned long &, unsigned long &, unsigned long &, bool verbose=false)
AthMpEvtLoopMgr::wait
StatusCode wait()
Definition: AthMpEvtLoopMgr.cxx:379
AthMpEvtLoopMgr::m_evtSelector
SmartIF< IService > m_evtSelector
Definition: AthMpEvtLoopMgr.h:46
python.setupRTTAlg.size
int size
Definition: setupRTTAlg.py:39
AthMpEvtLoopMgr::m_strategy
Gaudi::Property< std::string > m_strategy
Definition: AthMpEvtLoopMgr.h:58
python.handimod.now
now
Definition: handimod.py:674
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
LArG4FSStartPointFilter.rand
rand
Definition: LArG4FSStartPointFilter.py:80
lumiFormat.i
int i
Definition: lumiFormat.py:85
python.DecayParser.buf
buf
print ("=> [%s]"cmd)
Definition: DecayParser.py:27
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthMpEvtLoopMgr::m_samplesSwap
std::vector< unsigned long > m_samplesSwap
Definition: AthMpEvtLoopMgr.h:92
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaInterprocess::randString
std::string randString()
Definition: Control/AthenaInterprocess/AthenaInterprocess/Utilities.h:13
AthMpEvtLoopMgr::extractFds
std::shared_ptr< AthenaInterprocess::FdsRegistry > extractFds()
Definition: AthMpEvtLoopMgr.cxx:507
AthenaInterprocess::FdsRegistry
std::vector< FdsRegistryEntry > FdsRegistry
Definition: FdsRegistry.h:22
WriteCalibToCool.swap
swap
Definition: WriteCalibToCool.py:94
PixelAthHitMonAlgCfg.duration
duration
Definition: PixelAthHitMonAlgCfg.py:152
ReadFromCoolCompare.fd
fd
Definition: ReadFromCoolCompare.py:196
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
python.plotting.G4DebuggerUtils.rename
def rename(label)
Definition: G4DebuggerUtils.py:11
AthMpEvtLoopMgr::m_execAtPreFork
StringArrayProperty m_execAtPreFork
Definition: AthMpEvtLoopMgr.h:81
CaloSwCorrections.time
def time(flags, cells_name, *args, **kw)
Definition: CaloSwCorrections.py:242
AthMpEvtLoopMgr::m_workerTopDir
Gaudi::Property< std::string > m_workerTopDir
Definition: AthMpEvtLoopMgr.h:52
AthMpEvtLoopMgr::m_nMemSamplingInterval
Gaudi::Property< int > m_nMemSamplingInterval
Definition: AthMpEvtLoopMgr.h:72
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaInterprocess::FdsRegistryEntry
Definition: FdsRegistry.h:13
DEBUG
#define DEBUG
Definition: page_access.h:11
AthenaMP::WorkerOutput
Definition: IAthenaMPTool.h:21
python.CaloCondTools.log
log
Definition: CaloCondTools.py:20
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
AthMpEvtLoopMgr::m_masterPid
pid_t m_masterPid
Definition: AthMpEvtLoopMgr.h:85
AthMpEvtLoopMgr::m_dataShare
SmartIF< IDataShare > m_dataShare
Definition: AthMpEvtLoopMgr.h:47
python.dummyaccess.exists
def exists(filename)
Definition: dummyaccess.py:9
AthMpEvtLoopMgr::m_outputReportName
Gaudi::Property< std::string > m_outputReportName
Definition: AthMpEvtLoopMgr.h:55
AthMpEvtLoopMgr::m_scheduledStop
bool m_scheduledStop
Definition: AthMpEvtLoopMgr.h:86
AthMpEvtLoopMgr::m_eventPrintoutInterval
Gaudi::Property< unsigned int > m_eventPrintoutInterval
Definition: AthMpEvtLoopMgr.h:78
AthMpEvtLoopMgr::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthMpEvtLoopMgr.h:45
AthMpEvtLoopMgr::m_nPollingInterval
Gaudi::Property< int > m_nPollingInterval
Definition: AthMpEvtLoopMgr.h:69
ServiceHandle< StoreGateSvc >