ATLAS Offline Software
Public Member Functions | Private Member Functions | Private Attributes | List of all members
AthMpEvtLoopMgr Class Reference

#include <AthMpEvtLoopMgr.h>

Inheritance diagram for AthMpEvtLoopMgr:
Collaboration diagram for AthMpEvtLoopMgr:

Public Member Functions

 AthMpEvtLoopMgr (const std::string &name, ISvcLocator *svcLocator)
 
virtual ~AthMpEvtLoopMgr ()
 
virtual StatusCode initialize () override
 
virtual StatusCode finalize () override
 
virtual StatusCode nextEvent (int maxevt) override
 
virtual StatusCode executeEvent (EventContext &&ctx) override
 
virtual StatusCode executeRun (int maxevt) override
 
virtual StatusCode stopRun () override
 
virtual EventContext createEventContext () override
 

Private Member Functions

 AthMpEvtLoopMgr ()
 
 AthMpEvtLoopMgr (const AthMpEvtLoopMgr &)
 
AthMpEvtLoopMgroperator= (const AthMpEvtLoopMgr &)
 
StatusCode wait ()
 
StatusCode generateOutputReport ()
 
std::shared_ptr< AthenaInterprocess::FdsRegistryextractFds ()
 
StatusCode updateSkipEvents (int skipEvents)
 

Private Attributes

ServiceHandle< IEventProcessor > m_evtProcessor
 
SmartIF< IService > m_evtSelector
 
int m_nWorkers
 
std::string m_workerTopDir
 
std::string m_outputReportName
 
std::string m_strategy
 
bool m_isPileup
 
bool m_collectSubprocessLogs
 
ToolHandleArray< IAthenaMPToolm_tools
 
int m_nChildProcesses
 
int m_nPollingInterval
 
int m_nMemSamplingInterval
 
int m_nEventsBeforeFork
 
unsigned int m_eventPrintoutInterval
 
StringArrayProperty m_execAtPreFork
 
pid_t m_masterPid
 
std::vector< unsigned long > m_samplesRss
 
std::vector< unsigned long > m_samplesPss
 
std::vector< unsigned long > m_samplesSize
 
std::vector< unsigned long > m_samplesSwap
 

Detailed Description

Definition at line 17 of file AthMpEvtLoopMgr.h.

Constructor & Destructor Documentation

◆ AthMpEvtLoopMgr() [1/3]

AthMpEvtLoopMgr::AthMpEvtLoopMgr ( const std::string &  name,
ISvcLocator *  svcLocator 
)

Definition at line 41 of file AthMpEvtLoopMgr.cxx.

43  : base_class(name,svcLocator)
44  , m_evtProcessor("AthenaEventLoopMgr", name)
45  , m_evtSelector(nullptr)
46  , m_nWorkers(0)
47  , m_workerTopDir("athenaMP_workers")
48  , m_outputReportName("AthenaMPOutputs")
49  , m_strategy("")
50  , m_isPileup(false)
52  , m_tools(this)
54  , m_nPollingInterval(100) // 0.1 second
55  , m_nMemSamplingInterval(0) // no sampling by default
58  , m_execAtPreFork()
59  , m_masterPid(getpid())
60 {
61  declareProperty("NWorkers",m_nWorkers);
62  declareProperty("WorkerTopDir",m_workerTopDir);
63  declareProperty("OutputReportFile",m_outputReportName);
64  declareProperty("Strategy",m_strategy);
65  declareProperty("IsPileup",m_isPileup);
66  declareProperty("CollectSubprocessLogs",m_collectSubprocessLogs);
67  declareProperty("Tools",m_tools);
68  declareProperty("PollingInterval",m_nPollingInterval);
69  declareProperty("MemSamplingInterval",m_nMemSamplingInterval);
70  declareProperty("EventsBeforeFork",m_nEventsBeforeFork);
71  declareProperty("EventPrintoutInterval",m_eventPrintoutInterval);
72  declareProperty("ExecAtPreFork", m_execAtPreFork);
73 }

◆ ~AthMpEvtLoopMgr()

AthMpEvtLoopMgr::~AthMpEvtLoopMgr ( )
virtual

Definition at line 75 of file AthMpEvtLoopMgr.cxx.

76 {
77 }

◆ AthMpEvtLoopMgr() [2/3]

AthMpEvtLoopMgr::AthMpEvtLoopMgr ( )
private

◆ AthMpEvtLoopMgr() [3/3]

AthMpEvtLoopMgr::AthMpEvtLoopMgr ( const AthMpEvtLoopMgr )
private

Member Function Documentation

◆ createEventContext()

EventContext AthMpEvtLoopMgr::createEventContext ( )
overridevirtual

Definition at line 148 of file AthMpEvtLoopMgr.cxx.

148  {
149  // return an invalid context - method should not be called
150  return EventContext{};
151 }

◆ executeEvent()

StatusCode AthMpEvtLoopMgr::executeEvent ( EventContext &&  ctx)
overridevirtual

Definition at line 153 of file AthMpEvtLoopMgr.cxx.

154 {
155  // Perhaps there we should return StatusCode::FAILURE as this method shoud not be called directly
156  return m_evtProcessor->executeEvent(std::move(ctx));
157 }

◆ executeRun()

StatusCode AthMpEvtLoopMgr::executeRun ( int  maxevt)
overridevirtual

Definition at line 159 of file AthMpEvtLoopMgr.cxx.

160 {
161  ATH_MSG_DEBUG("in executeRun()");
162 
163  // Generate random component of the Shared Memory and Shared Queue names
164  srand(time(NULL));
165  std::ostringstream randStream;
166  randStream << getpid() << '_' << AthenaInterprocess::randString();
167  ATH_MSG_INFO("Using random components for IPC object names: " << randStream.str());
168 
169  ServiceHandle<StoreGateSvc> pDetStore("DetectorStore",name());
170  ATH_CHECK(pDetStore.retrieve());
171 
172  // Create Shared Event queue if necessary and make it available to the tools
173  if(m_strategy=="SharedQueue"
174  || m_strategy=="RoundRobin") {
175  AthenaInterprocess::SharedQueue* evtQueue = new AthenaInterprocess::SharedQueue("AthenaMPEventQueue_"+randStream.str(),2000,sizeof(long));
176  if(pDetStore->record(evtQueue,"AthenaMPEventQueue_"+randStream.str()).isFailure()) {
177  ATH_MSG_FATAL("Unable to record the pointer to the Shared Event queue into Detector Store");
178  delete evtQueue;
179  return StatusCode::FAILURE;
180  }
181  }
182 
183  // For the Event Service: create a queue for connecting EvtRangeProcessor in the master with EvtRangeScatterer subprocess
184  // The TokenProcessor master will be sending pid-s of failed processes to Token Scatterer
185  if(m_strategy=="EventService") {
186  AthenaInterprocess::SharedQueue* failedPidQueue = new AthenaInterprocess::SharedQueue("AthenaMPFailedPidQueue_"+randStream.str(),100,sizeof(pid_t));
187  if(pDetStore->record(failedPidQueue,"AthenaMPFailedPidQueue_"+randStream.str()).isFailure()) {
188  ATH_MSG_FATAL("Unable to record the pointer to the Failed PID queue into Detector Store");
189  delete failedPidQueue;
190  return StatusCode::FAILURE;
191  }
192  }
193 
194  // Prepare work directory for sub-processes
195  if(mkdir(m_workerTopDir.c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)!=0) {
196  switch(errno) {
197  case EEXIST:
198  {
199  // Top directory already exists, maybe a leftover from previous AthenaMP job in the same rundir
200  // Rename it with m_workerTopDir+"-bak-rand"
201 
202  srand((unsigned)time(0));
203  std::ostringstream randname;
204  randname << rand();
205  std::string backupDir = (m_workerTopDir.rfind('/')==(m_workerTopDir.size()-1)?m_workerTopDir.substr(0,m_workerTopDir.size()-1):m_workerTopDir)+std::string("-bak-")+randname.str();
206 
207  ATH_MSG_WARNING("The top directory " << m_workerTopDir << " already exists");
208  ATH_MSG_WARNING("The job will attempt to save it with the name " << backupDir << " and create new top directory from scratch");
209 
210  if(rename(m_workerTopDir.c_str(),backupDir.c_str())!=0) {
211  char buf[256];
212  strerror_r(errno, buf, sizeof(buf));
213  ATH_MSG_ERROR("Unable to make backup directory! " << buf);
214  return StatusCode::FAILURE;
215  }
216 
217  if(mkdir(m_workerTopDir.c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==0)
218  break;
219  }
220  /* FALLTHROUGH */
221  default:
222  {
223  char buf[256];
224  strerror_r(errno, buf, sizeof(buf));
225  ATH_MSG_ERROR("Unable to make top directory " << m_workerTopDir << " for children processes! " << buf);
226  return StatusCode::FAILURE;
227  }
228  }
229  }
230 
231  // When forking before 1st event, fire BeforeFork incident in non-pileup jobs
232  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc",name());
233  ATH_CHECK(incSvc.retrieve());
234 
235  if(m_nEventsBeforeFork==0 && !m_isPileup) {
236  incSvc->fireIncident(Incident(name(),"BeforeFork"));
237  }
238 
239  // Extract process file descriptors
240  std::shared_ptr<AthenaInterprocess::FdsRegistry> registry = extractFds();
241 
243  itLast = m_tools.end();
244 
245  // When using SharedWriter in conjunction with fork-after-N-events
246  // we have to make sure that mother process is a conversion service
247  // client so that events before forking workers are captured...
248 
249  SmartIF<IDataShare> dataShare{serviceLocator()->service("AthenaPoolCnvSvc")};
250  ATH_CHECK(dataShare.isValid());
251 
252  auto sharedWriterTool = m_tools["SharedWriterTool"];
253  const bool sharedWriterWithFAFE = (m_nEventsBeforeFork!=0 && sharedWriterTool);
254 
255  if(sharedWriterWithFAFE) {
256  (*sharedWriterTool)->useFdsRegistry(registry);
257  (*sharedWriterTool)->setRandString(randStream.str());
258 
259  int nChildren = (*sharedWriterTool)->makePool(maxevt,m_nWorkers,m_workerTopDir);
260  if(nChildren==-1) {
261  ATH_MSG_FATAL("makePool failed for " << (*sharedWriterTool)->name());
262  return StatusCode::FAILURE;
263  }
264  else {
265  m_nChildProcesses+=nChildren;
266  }
267 
268  // Execute the SharedWriterTool at this point
269  StatusCode mySc = (*sharedWriterTool)->exec();
270  if(!mySc.isSuccess()) {
271  ATH_MSG_FATAL("Cannot Execute SharedWriter Tool");
272  return StatusCode::FAILURE;
273  }
274 
275  // Make the mother process a client
276  if(!dataShare->makeClient(m_nWorkers+1).isSuccess()) {
277  ATH_MSG_FATAL("Cannot make mother process a client for Conversion Service");
278  return StatusCode::FAILURE;
279  }
280  }
281 
282  //
283  // Try processing requested number of events here
284  if(m_nEventsBeforeFork) {
285  // Take into account a corner case: m_nEventsBeforeFork > maxevt
286  int nEventsToProcess = ((maxevt>-1 && m_nEventsBeforeFork>maxevt)?maxevt:m_nEventsBeforeFork);
287  StatusCode scEvtProc = m_evtProcessor->nextEvent(nEventsToProcess);
288  if(!scEvtProc.isSuccess()) {
289  if(nEventsToProcess)
290  ATH_MSG_FATAL("Unable to process first " << nEventsToProcess << " events in the master");
291  else
292  ATH_MSG_FATAL("Unable to process first event in the master");
293  return scEvtProc;
294  }
295  }
296 
297  // Finalize I/O (close input files) by IoComponents
298  ServiceHandle<IIoComponentMgr> ioMgr("IoComponentMgr",name());
299  ATH_CHECK(ioMgr.retrieve());
300  ATH_CHECK(ioMgr->io_finalize());
301  ATH_MSG_DEBUG("Successfully finalized I/O before forking");
302 
303  // Flush stream buffers
304  fflush(NULL);
305 
306  // Make the mother process not client
307  if(sharedWriterWithFAFE && !dataShare->makeClient(0).isSuccess()) {
308  ATH_MSG_FATAL("Cannot make mother process not client for Conversion Service");
309  return StatusCode::FAILURE;
310  }
311 
312  int maxEvents(maxevt); // This can be modified after restart
313 
314  // Re-extract process file descriptors
315  registry = extractFds();
316 
317  // Make worker pools
318  it = m_tools.begin();
319  for(; it!=itLast; ++it) {
320  if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
321  (*it)->useFdsRegistry(registry);
322  (*it)->setRandString(randStream.str());
323  if(it==m_tools.begin()) {
324  incSvc->fireIncident(Incident(name(),"PreFork")); // Do it only once
325  }
326  int nChildren = (*it)->makePool(maxEvents,m_nWorkers,m_workerTopDir);
327  if(nChildren==-1) {
328  ATH_MSG_FATAL("makePool failed for " << (*it)->name());
329  return StatusCode::FAILURE;
330  }
331  else {
332  m_nChildProcesses+=nChildren;
333  }
334  }
335 
336  if(m_nChildProcesses==0) {
337  ATH_MSG_ERROR("No child processes were created");
338  return StatusCode::FAILURE;
339  }
340 
341  // Assign work to child processes
342  for(it=m_tools.begin(); it!=itLast; ++it) {
343  if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
344  if((*it)->exec().isFailure()) {
345  ATH_MSG_FATAL("Unable to submit work to the tool " << (*it)->name());
346  return StatusCode::FAILURE;
347  }
348  }
349 
350  StatusCode sc = wait();
351 
352  if(m_nMemSamplingInterval>0) {
353  ATH_MSG_INFO("*** *** Memory Usage *** ***");
354  ATH_MSG_INFO("*** MAX PSS " << (*std::max_element(m_samplesPss.cbegin(),m_samplesPss.cend()))/1024 << "MB");
355  ATH_MSG_INFO("*** MAX RSS " << (*std::max_element(m_samplesRss.cbegin(),m_samplesRss.cend()))/1024 << "MB");
356  ATH_MSG_INFO("*** MAX SIZE " << (*std::max_element(m_samplesSize.cbegin(),m_samplesSize.cend()))/1024 << "MB");
357  ATH_MSG_INFO("*** MAX SWAP " << (*std::max_element(m_samplesSwap.cbegin(),m_samplesSwap.cend()))/1024 << "MB");
358  ATH_MSG_INFO("*** *** Memory Usage *** ***");
359  }
360 
362  ATH_MSG_INFO("BEGIN collecting sub-process logs");
363  std::vector<std::string> logs;
364  for(it=m_tools.begin(); it!=itLast; ++it) {
365  (*it)->subProcessLogs(logs);
366  for(size_t i=0;i<logs.size();++i) {
367  std::cout << "\n File: " << logs[i] << "\n" << std::endl;
368  std::ifstream log;
369  log.open(logs[i].c_str(),std::ifstream::in);
370  std::string line;
371  while(!log.eof()) {
372  std::getline(log,line);
373  std::cout << line << std::endl;
374  }
375  log.close();
376  }
377  }
378  ATH_MSG_INFO("END collecting sub-process logs");
379  }
380 
381  if(sc.isSuccess())
382  return generateOutputReport();
383  else
384  return sc;
385 }

◆ extractFds()

std::shared_ptr< AthenaInterprocess::FdsRegistry > AthMpEvtLoopMgr::extractFds ( )
private

Definition at line 530 of file AthMpEvtLoopMgr.cxx.

531 {
532  ATH_MSG_DEBUG("Extracting file descriptors");
533  using namespace std::filesystem;
534  std::shared_ptr<AthenaInterprocess::FdsRegistry> registry(new AthenaInterprocess::FdsRegistry());
535 
536  // Extract file descriptors associated with the current process
537  // 1. Store only those regular files in the registry, which
538  // don't contain substrings from the "exclusion pattern" set
539  // 2. Skip also stdout and stderr
540 
541  std::vector<std::string> excludePatterns {
542  "/root/etc/plugins/"
543  ,"/root/cint/cint/"
544  ,"/root/include/"
545  ,"/var/tmp/"
546  ,"/var/lock/"
547  ,"/var/lib/"
548  ,"/bin/python/"
549  ,"/include/c++/"
550  ,".confdb2"
551  };
552 
553  path fdPath("/proc/self/fd");
554  for(directory_iterator fdIt(fdPath); fdIt!=directory_iterator(); fdIt++) {
555  if(is_symlink(fdIt->path())) {
556  path realpath = read_symlink(fdIt->path());
557  int fd = atoi(fdIt->path().filename().string().c_str());
558 
559  if (fd==1 || fd==2) // Skip stdout and stderr
560  continue;
561 
562  if(exists(realpath)) {
563  if(is_regular_file(realpath)) {
564  // Check against the exclusion criteria
565  bool exclude(false);
566  for(size_t i=0;i<excludePatterns.size(); ++i) {
567  if(realpath.string().find(excludePatterns[i])!=std::string::npos) {
568  exclude = true;
569  break;
570  }
571  }
572  if(exclude) {
573  ATH_MSG_DEBUG(realpath.string() << " Excluded from the registry by the pattern");
574  }
575  else {
576  registry->push_back(AthenaInterprocess::FdsRegistryEntry(fd,realpath.string()));
577  }
578  }
579  else {
580  ATH_MSG_DEBUG(realpath.string() << " is not a regular file"); // TODO: deal with these?
581  }
582  } // File exists
583  }
584  else
585  ATH_MSG_WARNING("UNEXPECTED. " << fdIt->path().string() << " Not a symlink");
586  } // Directory iteration
587 
588  ATH_MSG_DEBUG("Fds Reistry created. Contents:");
589  for(size_t ii(0); ii<registry->size(); ++ii)
590  ATH_MSG_DEBUG((*registry)[ii].fd << " " << (*registry)[ii].name);
591 
592  return registry;
593 }

◆ finalize()

StatusCode AthMpEvtLoopMgr::finalize ( )
overridevirtual

Definition at line 137 of file AthMpEvtLoopMgr.cxx.

138 {
139  return StatusCode::SUCCESS;
140 }

◆ generateOutputReport()

StatusCode AthMpEvtLoopMgr::generateOutputReport ( )
private

Definition at line 459 of file AthMpEvtLoopMgr.cxx.

460 {
461  // Loop over tools, collect their output reports and put them all together into a single file.
462  // If m_nEventsBeforeFork!=0 then take into account the outputs made by the master process too
463 
464  std::ofstream ofs;
465  ofs.open(m_outputReportName.c_str());
466  if(!ofs) {
467  ATH_MSG_ERROR("Unable to open AthenaMPOutputs for writing!");
468  return StatusCode::FAILURE;
469  }
470  else {
471  std::vector<AthenaMP::AllWorkerOutputs_ptr> allptrs;
472 
474  itLast = m_tools.end();
475  for(it=m_tools.begin(); it!=itLast; ++it)
476  allptrs.push_back((*it)->generateOutputReport());
477 
478  // First collect keys=file_names from all tools
479  std::set<std::string> allkeys;
480  for(size_t i=0; i<allptrs.size(); ++i) {
481  AthenaMP::AllWorkerOutputsIterator it_wos = allptrs[i]->begin(),
482  it_wosLast = allptrs[i]->end();
483  for(;it_wos!=it_wosLast;++it_wos)
484  allkeys.insert(it_wos->first);
485  }
486 
487  // Generate XML
488  ofs << "<?xml version=\"1.0\" encoding=\"utf-8\"?>" << std::endl;
489  ofs << "<athenaFileReport>" << std::endl;
490  std::set<std::string>::const_iterator keys_it = allkeys.begin(),
491  keys_itLast = allkeys.end();
492  for(;keys_it!=keys_itLast;++keys_it) {
493  ofs << " <Files OriginalName=\"" << (*keys_it) << "\">" << std::endl;
494  for(size_t i=0; i<allptrs.size(); ++i) {
495  AthenaMP::AllWorkerOutputsIterator it_wos = (allptrs[i])->find(*keys_it);
496  if(it_wos!=(allptrs[i])->end()) {
497  for(size_t ii=0; ii<it_wos->second.size(); ++ii) {
498  AthenaMP::WorkerOutput& outp = it_wos->second[ii];
499  if(ii==0 && m_nEventsBeforeFork>0) {
500  std::filesystem::path masterFile(std::filesystem::current_path());
501  masterFile /= std::filesystem::path(*keys_it);
502  if(std::filesystem::exists(masterFile) && std::filesystem::is_regular_file(masterFile))
503  ofs << " <File "
504  << "description=\"" << outp.description
505  << "\" mode=\"" << outp.access_mode
506  << "\" name=\"" << masterFile.string()
507  << "\" shared=\"" << (outp.shared?"True":"False")
508  << "\" technology=\"" << outp.technology
509  << "\"/>" << std::endl;
510  }
511  ofs << " <File "
512  << "description=\"" << outp.description
513  << "\" mode=\"" << outp.access_mode
514  << "\" name=\"" << outp.filename
515  << "\" shared=\"" << (outp.shared?"True":"False")
516  << "\" technology=\"" << outp.technology
517  << "\"/>" << std::endl;
518  }
519  }
520  }
521  ofs << " </Files>" << std::endl;
522  }
523  ofs << "</athenaFileReport>" << std::endl;
524  ofs.close();
525  }
526 
527  return StatusCode::SUCCESS;
528 }

◆ initialize()

StatusCode AthMpEvtLoopMgr::initialize ( )
overridevirtual

Definition at line 79 of file AthMpEvtLoopMgr.cxx.

80 {
81  ATH_MSG_DEBUG("in initialize() ... ");
82 
83  Gaudi::Concurrency::ConcurrencyFlags::setNumProcs(m_nWorkers);
84 
85  SmartIF<IProperty> prpMgr(serviceLocator());
86  if(!prpMgr.isValid()) {
87  ATH_MSG_ERROR("Failed to get hold of the Property Manager");
88  return StatusCode::FAILURE;
89  }
90 
91  std::string evtSelName = prpMgr->getProperty("EvtSel").toString();
92  m_evtSelector = serviceLocator()->service(evtSelName);
93  ATH_CHECK(m_evtSelector.isValid());
94 
95  if(m_strategy=="EventService") {
96  // ES with non-zero events before forking makes no sense
97  if(m_nEventsBeforeFork!=0) {
98  ATH_MSG_ERROR("The EventService strategy cannot run with non-zero value for EventsBeforeFork");
99  return StatusCode::FAILURE;
100  }
101 
102  // We need to ignore SkipEvents in ES
103  if(updateSkipEvents(0).isFailure()) {
104  ATH_MSG_ERROR("Failed to set skipEvents=0 in Event Service");
105  return StatusCode::FAILURE;
106  }
107  }
108 
109  if(m_isPileup) {
110  m_evtProcessor = ServiceHandle<IEventProcessor>("PileUpEventLoopMgr",name());
111  ATH_MSG_INFO("ELM: The job running in pileup mode");
112  }
113  else {
114  ATH_MSG_INFO("ELM: The job running in non-pileup mode");
115  }
116 
117  ATH_CHECK(m_evtProcessor.retrieve());
118  if(!m_isPileup) {
119  SmartIF<IProperty> propertyServer(m_evtProcessor.get());
120  if(propertyServer) {
121  if(propertyServer->setProperty("EventPrintoutInterval",m_eventPrintoutInterval).isFailure()) {
122  ATH_MSG_WARNING("Could not set AthenaEventLoopMgr EventPrintoutInterval to " << m_eventPrintoutInterval);
123  }
124  if(propertyServer->setProperty("ExecAtPreFork",m_execAtPreFork).isFailure()) {
125  ATH_MSG_WARNING("Could not set AthenaEventLoopMgr ExecAtPreFork property, memory usage might get affected!");
126  }
127  }
128  else {
129  ATH_MSG_WARNING("Could not cast AthenaEventLoopMgr to IProperty");
130  }
131  }
132  ATH_CHECK(m_tools.retrieve());
133 
134  return StatusCode::SUCCESS;
135 }

◆ nextEvent()

StatusCode AthMpEvtLoopMgr::nextEvent ( int  maxevt)
overridevirtual

Definition at line 142 of file AthMpEvtLoopMgr.cxx.

143 {
144  // Perhaps there we should return StatusCode::FAILURE as this method shoud not be called directly
145  return m_evtProcessor->nextEvent(maxevt);
146 }

◆ operator=()

AthMpEvtLoopMgr& AthMpEvtLoopMgr::operator= ( const AthMpEvtLoopMgr )
private

◆ stopRun()

StatusCode AthMpEvtLoopMgr::stopRun ( )
overridevirtual

Definition at line 387 of file AthMpEvtLoopMgr.cxx.

388 {
389  return m_evtProcessor->stopRun();
390 }

◆ updateSkipEvents()

StatusCode AthMpEvtLoopMgr::updateSkipEvents ( int  skipEvents)
private

Definition at line 595 of file AthMpEvtLoopMgr.cxx.

596 {
597  SmartIF<IProperty> propertyServer(m_evtSelector);
598  if(!propertyServer) {
599  ATH_MSG_ERROR("Unable to dyn-cast the event selector to IProperty");
600  return StatusCode::FAILURE;
601  }
602 
603  IntegerProperty skipEventsProperty("SkipEvents", skipEvents);
604  if(propertyServer->setProperty(skipEventsProperty).isFailure()) {
605  ATH_MSG_ERROR("Unable to update " << skipEventsProperty.name() << " property on the Event Selector");
606  return StatusCode::FAILURE;
607  }
608  ATH_MSG_INFO("Updated the SkipEvents property of the event selector. New value: " << skipEvents);
609 
610  return StatusCode::SUCCESS;
611 }

◆ wait()

StatusCode AthMpEvtLoopMgr::wait ( )
private

Definition at line 402 of file AthMpEvtLoopMgr.cxx.

403 {
404  ATH_MSG_INFO("Waiting for sub-processes");
406  itLast = m_tools.end();
407  pid_t pid(0);
408  bool all_ok(true);
409 
410  auto memMonTime = std::chrono::system_clock::now();
411 
412  while(m_nChildProcesses>0) {
413  for(it = m_tools.begin(); it!=itLast; ++it) {
414  if((*it)->wait_once(pid).isFailure()) {
415  all_ok = false;
416  ATH_MSG_ERROR("Failure in waiting or sub-process finished abnormally");
417  break;
418  }
419  else {
420  if(pid>0) m_nChildProcesses -= 1;
421  }
422  }
423  if(!all_ok) break;
424 
425  usleep(m_nPollingInterval*1000);
426 
427  if(m_nMemSamplingInterval>0) {
428  auto currTime = std::chrono::system_clock::now();
429  if(std::chrono::duration<double,std::ratio<1,1>>(currTime-memMonTime).count()>m_nMemSamplingInterval) {
430  unsigned long size(0);
431  unsigned long rss(0);
432  unsigned long pss(0);
433  unsigned long swap(0);
434 
435  if(athenaMP_MemHelper::getPss(getpid(), pss, swap, rss, size, msgLvl(MSG::DEBUG)))
436  ATH_MSG_WARNING("Unable to get memory sample");
437  else {
438  m_samplesRss.push_back(rss);
439  m_samplesPss.push_back(pss);
440  m_samplesSize.push_back(size);
441  m_samplesSwap.push_back(swap);
442  }
443  memMonTime=currTime;
444  }
445  }
446  }
447 
448  for(it=m_tools.begin(); it!=itLast; ++it)
449  (*it)->reportSubprocessStatuses();
450 
451  if(!all_ok) {
452  for(it=m_tools.begin(); it!=itLast; ++it)
453  (*it)->killChildren();
454  }
455 
456  return (all_ok?StatusCode::SUCCESS:StatusCode::FAILURE);
457 }

Member Data Documentation

◆ m_collectSubprocessLogs

bool AthMpEvtLoopMgr::m_collectSubprocessLogs
private

Definition at line 42 of file AthMpEvtLoopMgr.h.

◆ m_eventPrintoutInterval

unsigned int AthMpEvtLoopMgr::m_eventPrintoutInterval
private

Definition at line 48 of file AthMpEvtLoopMgr.h.

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthMpEvtLoopMgr::m_evtProcessor
private

Definition at line 35 of file AthMpEvtLoopMgr.h.

◆ m_evtSelector

SmartIF<IService> AthMpEvtLoopMgr::m_evtSelector
private

Definition at line 36 of file AthMpEvtLoopMgr.h.

◆ m_execAtPreFork

StringArrayProperty AthMpEvtLoopMgr::m_execAtPreFork
private

Definition at line 49 of file AthMpEvtLoopMgr.h.

◆ m_isPileup

bool AthMpEvtLoopMgr::m_isPileup
private

Definition at line 41 of file AthMpEvtLoopMgr.h.

◆ m_masterPid

pid_t AthMpEvtLoopMgr::m_masterPid
private

Definition at line 50 of file AthMpEvtLoopMgr.h.

◆ m_nChildProcesses

int AthMpEvtLoopMgr::m_nChildProcesses
private

Definition at line 44 of file AthMpEvtLoopMgr.h.

◆ m_nEventsBeforeFork

int AthMpEvtLoopMgr::m_nEventsBeforeFork
private

Definition at line 47 of file AthMpEvtLoopMgr.h.

◆ m_nMemSamplingInterval

int AthMpEvtLoopMgr::m_nMemSamplingInterval
private

Definition at line 46 of file AthMpEvtLoopMgr.h.

◆ m_nPollingInterval

int AthMpEvtLoopMgr::m_nPollingInterval
private

Definition at line 45 of file AthMpEvtLoopMgr.h.

◆ m_nWorkers

int AthMpEvtLoopMgr::m_nWorkers
private

Definition at line 37 of file AthMpEvtLoopMgr.h.

◆ m_outputReportName

std::string AthMpEvtLoopMgr::m_outputReportName
private

Definition at line 39 of file AthMpEvtLoopMgr.h.

◆ m_samplesPss

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesPss
private

Definition at line 54 of file AthMpEvtLoopMgr.h.

◆ m_samplesRss

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesRss
private

Definition at line 53 of file AthMpEvtLoopMgr.h.

◆ m_samplesSize

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesSize
private

Definition at line 55 of file AthMpEvtLoopMgr.h.

◆ m_samplesSwap

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesSwap
private

Definition at line 56 of file AthMpEvtLoopMgr.h.

◆ m_strategy

std::string AthMpEvtLoopMgr::m_strategy
private

Definition at line 40 of file AthMpEvtLoopMgr.h.

◆ m_tools

ToolHandleArray<IAthenaMPTool> AthMpEvtLoopMgr::m_tools
private

Definition at line 43 of file AthMpEvtLoopMgr.h.

◆ m_workerTopDir

std::string AthMpEvtLoopMgr::m_workerTopDir
private

Definition at line 38 of file AthMpEvtLoopMgr.h.


The documentation for this class was generated from the following files:
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
outp
std::ostream * outp
send output to here ...
Definition: hcg.cxx:73
python.Dso.registry
registry
Definition: Control/AthenaServices/python/Dso.py:159
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthMpEvtLoopMgr::m_eventPrintoutInterval
unsigned int m_eventPrintoutInterval
Definition: AthMpEvtLoopMgr.h:48
AthMpEvtLoopMgr::m_isPileup
bool m_isPileup
Definition: AthMpEvtLoopMgr.h:41
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
checkFileSG.line
line
Definition: checkFileSG.py:75
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
find
std::string find(const std::string &s)
return a remapped string
Definition: hcg.cxx:135
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
AthMpEvtLoopMgr::generateOutputReport
StatusCode generateOutputReport()
Definition: AthMpEvtLoopMgr.cxx:459
AthMpEvtLoopMgr::m_nPollingInterval
int m_nPollingInterval
Definition: AthMpEvtLoopMgr.h:45
AthMpEvtLoopMgr::m_samplesRss
std::vector< unsigned long > m_samplesRss
Definition: AthMpEvtLoopMgr.h:53
skel.it
it
Definition: skel.GENtoEVGEN.py:396
AthMpEvtLoopMgr::m_nChildProcesses
int m_nChildProcesses
Definition: AthMpEvtLoopMgr.h:44
AthMpEvtLoopMgr::updateSkipEvents
StatusCode updateSkipEvents(int skipEvents)
Definition: AthMpEvtLoopMgr.cxx:595
exclude
std::set< std::string > exclude
list of directories to be excluded
Definition: hcg.cxx:95
AthMpEvtLoopMgr::m_nEventsBeforeFork
int m_nEventsBeforeFork
Definition: AthMpEvtLoopMgr.h:47
AthMpEvtLoopMgr::m_tools
ToolHandleArray< IAthenaMPTool > m_tools
Definition: AthMpEvtLoopMgr.h:43
XMLtoHeader.count
count
Definition: XMLtoHeader.py:85
AthMpEvtLoopMgr::m_samplesSize
std::vector< unsigned long > m_samplesSize
Definition: AthMpEvtLoopMgr.h:55
AthMpEvtLoopMgr::m_outputReportName
std::string m_outputReportName
Definition: AthMpEvtLoopMgr.h:39
AthenaMP::AllWorkerOutputsIterator
AllWorkerOutputs::iterator AllWorkerOutputsIterator
Definition: IAthenaMPTool.h:26
AthMpEvtLoopMgr::m_nWorkers
int m_nWorkers
Definition: AthMpEvtLoopMgr.h:37
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:56
python.SCT_ByteStreamErrorsTestAlgConfig.maxEvents
maxEvents
Definition: SCT_ByteStreamErrorsTestAlgConfig.py:43
AthMpEvtLoopMgr::m_workerTopDir
std::string m_workerTopDir
Definition: AthMpEvtLoopMgr.h:38
AthMpEvtLoopMgr::m_samplesPss
std::vector< unsigned long > m_samplesPss
Definition: AthMpEvtLoopMgr.h:54
athenaMP_MemHelper::getPss
int getPss(pid_t, unsigned long &, unsigned long &, unsigned long &, unsigned long &, bool verbose=false)
AthMpEvtLoopMgr::wait
StatusCode wait()
Definition: AthMpEvtLoopMgr.cxx:402
AthMpEvtLoopMgr::m_evtSelector
SmartIF< IService > m_evtSelector
Definition: AthMpEvtLoopMgr.h:36
python.setupRTTAlg.size
int size
Definition: setupRTTAlg.py:39
AthMpEvtLoopMgr::m_nMemSamplingInterval
int m_nMemSamplingInterval
Definition: AthMpEvtLoopMgr.h:46
python.handimod.now
now
Definition: handimod.py:675
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
LArG4FSStartPointFilter.rand
rand
Definition: LArG4FSStartPointFilter.py:80
lumiFormat.i
int i
Definition: lumiFormat.py:85
python.DecayParser.buf
buf
print ("=> [%s]"cmd)
Definition: DecayParser.py:27
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthMpEvtLoopMgr::m_samplesSwap
std::vector< unsigned long > m_samplesSwap
Definition: AthMpEvtLoopMgr.h:56
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthMpEvtLoopMgr::m_strategy
std::string m_strategy
Definition: AthMpEvtLoopMgr.h:40
AthenaInterprocess::randString
std::string randString()
Definition: Control/AthenaInterprocess/AthenaInterprocess/Utilities.h:13
AthMpEvtLoopMgr::extractFds
std::shared_ptr< AthenaInterprocess::FdsRegistry > extractFds()
Definition: AthMpEvtLoopMgr.cxx:530
AthenaInterprocess::FdsRegistry
std::vector< FdsRegistryEntry > FdsRegistry
Definition: FdsRegistry.h:22
WriteCalibToCool.swap
swap
Definition: WriteCalibToCool.py:94
PixelAthHitMonAlgCfg.duration
duration
Definition: PixelAthHitMonAlgCfg.py:152
ReadFromCoolCompare.fd
fd
Definition: ReadFromCoolCompare.py:196
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
AthMpEvtLoopMgr::m_execAtPreFork
StringArrayProperty m_execAtPreFork
Definition: AthMpEvtLoopMgr.h:49
CaloSwCorrections.time
def time(flags, cells_name, *args, **kw)
Definition: CaloSwCorrections.py:242
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaInterprocess::FdsRegistryEntry
Definition: FdsRegistry.h:13
DEBUG
#define DEBUG
Definition: page_access.h:11
AthenaMP::WorkerOutput
Definition: IAthenaMPTool.h:17
python.CaloCondTools.log
log
Definition: CaloCondTools.py:20
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
AthMpEvtLoopMgr::m_masterPid
pid_t m_masterPid
Definition: AthMpEvtLoopMgr.h:50
python.dummyaccess.exists
def exists(filename)
Definition: dummyaccess.py:9
AthMpEvtLoopMgr::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthMpEvtLoopMgr.h:35
AthMpEvtLoopMgr::m_collectSubprocessLogs
bool m_collectSubprocessLogs
Definition: AthMpEvtLoopMgr.h:42
ServiceHandle< StoreGateSvc >