Loading [MathJax]/extensions/tex2jax.js
ATLAS Offline Software
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Public Member Functions | Private Member Functions | Private Attributes | List of all members
AthMpEvtLoopMgr Class Reference

#include <AthMpEvtLoopMgr.h>

Inheritance diagram for AthMpEvtLoopMgr:
Collaboration diagram for AthMpEvtLoopMgr:

Public Member Functions

 AthMpEvtLoopMgr (const std::string &name, ISvcLocator *svcLocator)
 
virtual ~AthMpEvtLoopMgr ()
 
virtual StatusCode initialize () override
 
virtual StatusCode finalize () override
 
virtual StatusCode nextEvent (int maxevt) override
 
virtual StatusCode executeEvent (EventContext &&ctx) override
 
virtual StatusCode executeRun (int maxevt) override
 
virtual StatusCode stopRun () override
 
virtual EventContext createEventContext () override
 
virtual bool stopScheduled () const override
 

Private Member Functions

 AthMpEvtLoopMgr ()
 
 AthMpEvtLoopMgr (const AthMpEvtLoopMgr &)
 
AthMpEvtLoopMgroperator= (const AthMpEvtLoopMgr &)
 
StatusCode wait ()
 
StatusCode generateOutputReport ()
 
std::shared_ptr< AthenaInterprocess::FdsRegistryextractFds ()
 
StatusCode updateSkipEvents (int skipEvents)
 

Private Attributes

ServiceHandle< IEventProcessor > m_evtProcessor
 
SmartIF< IService > m_evtSelector
 
int m_nWorkers
 
std::string m_workerTopDir
 
std::string m_outputReportName
 
std::string m_strategy
 
bool m_isPileup
 
bool m_collectSubprocessLogs
 
ToolHandleArray< IAthenaMPToolm_tools
 
int m_nChildProcesses
 
int m_nPollingInterval
 
int m_nMemSamplingInterval
 
int m_nEventsBeforeFork
 
unsigned int m_eventPrintoutInterval
 
StringArrayProperty m_execAtPreFork
 
pid_t m_masterPid
 
bool m_scheduledStop {false}
 
std::vector< unsigned long > m_samplesRss
 
std::vector< unsigned long > m_samplesPss
 
std::vector< unsigned long > m_samplesSize
 
std::vector< unsigned long > m_samplesSwap
 

Detailed Description

Definition at line 18 of file AthMpEvtLoopMgr.h.

Constructor & Destructor Documentation

◆ AthMpEvtLoopMgr() [1/3]

AthMpEvtLoopMgr::AthMpEvtLoopMgr ( const std::string &  name,
ISvcLocator *  svcLocator 
)

Definition at line 41 of file AthMpEvtLoopMgr.cxx.

43  : base_class(name,svcLocator)
44  , m_evtProcessor("AthenaEventLoopMgr", name)
45  , m_evtSelector(nullptr)
46  , m_nWorkers(0)
47  , m_workerTopDir("athenaMP_workers")
48  , m_outputReportName("AthenaMPOutputs")
49  , m_strategy("")
50  , m_isPileup(false)
52  , m_tools(this)
54  , m_nPollingInterval(100) // 0.1 second
55  , m_nMemSamplingInterval(0) // no sampling by default
58  , m_execAtPreFork()
59  , m_masterPid(getpid())
60 {
61  declareProperty("NWorkers",m_nWorkers);
62  declareProperty("WorkerTopDir",m_workerTopDir);
63  declareProperty("OutputReportFile",m_outputReportName);
64  declareProperty("Strategy",m_strategy);
65  declareProperty("IsPileup",m_isPileup);
66  declareProperty("CollectSubprocessLogs",m_collectSubprocessLogs);
67  declareProperty("Tools",m_tools);
68  declareProperty("PollingInterval",m_nPollingInterval);
69  declareProperty("MemSamplingInterval",m_nMemSamplingInterval);
70  declareProperty("EventsBeforeFork",m_nEventsBeforeFork);
71  declareProperty("EventPrintoutInterval",m_eventPrintoutInterval);
72  declareProperty("ExecAtPreFork", m_execAtPreFork);
73 }

◆ ~AthMpEvtLoopMgr()

AthMpEvtLoopMgr::~AthMpEvtLoopMgr ( )
virtual

Definition at line 75 of file AthMpEvtLoopMgr.cxx.

76 {
77 }

◆ AthMpEvtLoopMgr() [2/3]

AthMpEvtLoopMgr::AthMpEvtLoopMgr ( )
private

◆ AthMpEvtLoopMgr() [3/3]

AthMpEvtLoopMgr::AthMpEvtLoopMgr ( const AthMpEvtLoopMgr )
private

Member Function Documentation

◆ createEventContext()

EventContext AthMpEvtLoopMgr::createEventContext ( )
overridevirtual

Definition at line 148 of file AthMpEvtLoopMgr.cxx.

148  {
149  // return an invalid context - method should not be called
150  return EventContext{};
151 }

◆ executeEvent()

StatusCode AthMpEvtLoopMgr::executeEvent ( EventContext &&  ctx)
overridevirtual

Definition at line 153 of file AthMpEvtLoopMgr.cxx.

154 {
155  // Perhaps there we should return StatusCode::FAILURE as this method shoud not be called directly
156  return m_evtProcessor->executeEvent(std::move(ctx));
157 }

◆ executeRun()

StatusCode AthMpEvtLoopMgr::executeRun ( int  maxevt)
overridevirtual

Definition at line 159 of file AthMpEvtLoopMgr.cxx.

160 {
161  ATH_MSG_DEBUG("in executeRun()");
162 
163  // Generate random component of the Shared Memory and Shared Queue names
164  srand(time(NULL));
165  std::ostringstream randStream;
166  randStream << getpid() << '_' << AthenaInterprocess::randString();
167  ATH_MSG_INFO("Using random components for IPC object names: " << randStream.str());
168 
169  ServiceHandle<StoreGateSvc> pDetStore("DetectorStore",name());
170  ATH_CHECK(pDetStore.retrieve());
171 
172  // Create Shared Event queue if necessary and make it available to the tools
173  if(m_strategy=="SharedQueue"
174  || m_strategy=="RoundRobin") {
175  AthenaInterprocess::SharedQueue* evtQueue = new AthenaInterprocess::SharedQueue("AthenaMPEventQueue_"+randStream.str(),2000,sizeof(long));
176  if(pDetStore->record(evtQueue,"AthenaMPEventQueue_"+randStream.str()).isFailure()) {
177  ATH_MSG_FATAL("Unable to record the pointer to the Shared Event queue into Detector Store");
178  delete evtQueue;
179  return StatusCode::FAILURE;
180  }
181  }
182 
183  // For the Event Service: create a queue for connecting EvtRangeProcessor in the master with EvtRangeScatterer subprocess
184  // The TokenProcessor master will be sending pid-s of failed processes to Token Scatterer
185  if(m_strategy=="EventService") {
186  AthenaInterprocess::SharedQueue* failedPidQueue = new AthenaInterprocess::SharedQueue("AthenaMPFailedPidQueue_"+randStream.str(),100,sizeof(pid_t));
187  if(pDetStore->record(failedPidQueue,"AthenaMPFailedPidQueue_"+randStream.str()).isFailure()) {
188  ATH_MSG_FATAL("Unable to record the pointer to the Failed PID queue into Detector Store");
189  delete failedPidQueue;
190  return StatusCode::FAILURE;
191  }
192  }
193 
194  // Prepare work directory for sub-processes
195  if(mkdir(m_workerTopDir.c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)!=0) {
196  switch(errno) {
197  case EEXIST:
198  {
199  // Top directory already exists, maybe a leftover from previous AthenaMP job in the same rundir
200  // Rename it with m_workerTopDir+"-bak-rand"
201 
202  srand((unsigned)time(0));
203  std::ostringstream randname;
204  randname << rand();
205  std::string backupDir = (m_workerTopDir.rfind('/')==(m_workerTopDir.size()-1)?m_workerTopDir.substr(0,m_workerTopDir.size()-1):m_workerTopDir)+std::string("-bak-")+randname.str();
206 
207  ATH_MSG_WARNING("The top directory " << m_workerTopDir << " already exists");
208  ATH_MSG_WARNING("The job will attempt to save it with the name " << backupDir << " and create new top directory from scratch");
209 
210  if(rename(m_workerTopDir.c_str(),backupDir.c_str())!=0) {
211  char buf[256];
212  strerror_r(errno, buf, sizeof(buf));
213  ATH_MSG_ERROR("Unable to make backup directory! " << buf);
214  return StatusCode::FAILURE;
215  }
216 
217  if(mkdir(m_workerTopDir.c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==0)
218  break;
219  }
220  /* FALLTHROUGH */
221  default:
222  {
223  char buf[256];
224  strerror_r(errno, buf, sizeof(buf));
225  ATH_MSG_ERROR("Unable to make top directory " << m_workerTopDir << " for children processes! " << buf);
226  return StatusCode::FAILURE;
227  }
228  }
229  }
230 
231  // When forking before 1st event, fire BeforeFork incident in non-pileup jobs
232  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc",name());
233  ATH_CHECK(incSvc.retrieve());
234 
235  if(m_nEventsBeforeFork==0 && !m_isPileup) {
236  incSvc->fireIncident(Incident(name(),"BeforeFork"));
237  }
238 
239  // Extract process file descriptors
240  std::shared_ptr<AthenaInterprocess::FdsRegistry> registry = extractFds();
241 
243  itLast = m_tools.end();
244 
245  // When using SharedWriter in conjunction with fork-after-N-events
246  // we have to make sure that mother process is a conversion service
247  // client so that events before forking workers are captured...
248 
249  SmartIF<IDataShare> dataShare{serviceLocator()->service("AthenaPoolCnvSvc")};
250  ATH_CHECK(dataShare.isValid());
251 
252  auto sharedWriterTool = m_tools["SharedWriterTool"];
253  const bool sharedWriterWithFAFE = (m_nEventsBeforeFork!=0 && sharedWriterTool);
254 
255  if(sharedWriterWithFAFE) {
256  (*sharedWriterTool)->useFdsRegistry(registry);
257  (*sharedWriterTool)->setRandString(randStream.str());
258 
259  int nChildren = (*sharedWriterTool)->makePool(maxevt,m_nWorkers,m_workerTopDir);
260  if(nChildren==-1) {
261  ATH_MSG_FATAL("makePool failed for " << (*sharedWriterTool)->name());
262  return StatusCode::FAILURE;
263  }
264  else {
265  m_nChildProcesses+=nChildren;
266  }
267 
268  // Execute the SharedWriterTool at this point
269  StatusCode mySc = (*sharedWriterTool)->exec();
270  if(!mySc.isSuccess()) {
271  ATH_MSG_FATAL("Cannot Execute SharedWriter Tool");
272  return StatusCode::FAILURE;
273  }
274 
275  // Make the mother process a client
276  if(!dataShare->makeClient(m_nWorkers+1).isSuccess()) {
277  ATH_MSG_FATAL("Cannot make mother process a client for Conversion Service");
278  return StatusCode::FAILURE;
279  }
280  }
281 
282  //
283  // Try processing requested number of events here
284  if(m_nEventsBeforeFork) {
285  // Take into account a corner case: m_nEventsBeforeFork > maxevt
286  int nEventsToProcess = ((maxevt>-1 && m_nEventsBeforeFork>maxevt)?maxevt:m_nEventsBeforeFork);
287  StatusCode scEvtProc = m_evtProcessor->nextEvent(nEventsToProcess);
288  if(!scEvtProc.isSuccess()) {
289  if(nEventsToProcess)
290  ATH_MSG_FATAL("Unable to process first " << nEventsToProcess << " events in the master");
291  else
292  ATH_MSG_FATAL("Unable to process first event in the master");
293  return scEvtProc;
294  }
295  }
296 
297  // Finalize I/O (close input files) by IoComponents
298  ServiceHandle<IIoComponentMgr> ioMgr("IoComponentMgr",name());
299  ATH_CHECK(ioMgr.retrieve());
300  ATH_CHECK(ioMgr->io_finalize());
301  ATH_MSG_DEBUG("Successfully finalized I/O before forking");
302 
303  // Flush stream buffers
304  fflush(NULL);
305 
306  // Make the mother process not client
307  if(sharedWriterWithFAFE && !dataShare->makeClient(0).isSuccess()) {
308  ATH_MSG_FATAL("Cannot make mother process not client for Conversion Service");
309  return StatusCode::FAILURE;
310  }
311 
312  int maxEvents(maxevt); // This can be modified after restart
313 
314  // Re-extract process file descriptors
315  registry = extractFds();
316 
317  // Make worker pools
318  it = m_tools.begin();
319  for(; it!=itLast; ++it) {
320  if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
321  (*it)->useFdsRegistry(registry);
322  (*it)->setRandString(randStream.str());
323  (*it)->setMaxEvt(maxevt);
324  (*it)->setMPRunStop(this);
325  if(it==m_tools.begin()) {
326  incSvc->fireIncident(Incident(name(),"PreFork")); // Do it only once
327  }
328  int nChildren = (*it)->makePool(maxEvents,m_nWorkers,m_workerTopDir);
329  if(nChildren==-1) {
330  ATH_MSG_FATAL("makePool failed for " << (*it)->name());
331  return StatusCode::FAILURE;
332  }
333  else {
334  m_nChildProcesses+=nChildren;
335  }
336  }
337 
338  if(m_nChildProcesses==0) {
339  ATH_MSG_ERROR("No child processes were created");
340  return StatusCode::FAILURE;
341  }
342 
343  // Assign work to child processes
344  for(it=m_tools.begin(); it!=itLast; ++it) {
345  if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
346  if((*it)->exec().isFailure()) {
347  ATH_MSG_FATAL("Unable to submit work to the tool " << (*it)->name());
348  return StatusCode::FAILURE;
349  }
350  }
351 
352  StatusCode sc = wait();
353 
354  if(m_nMemSamplingInterval>0) {
355  ATH_MSG_INFO("*** *** Memory Usage *** ***");
356  ATH_MSG_INFO("*** MAX PSS " << (*std::max_element(m_samplesPss.cbegin(),m_samplesPss.cend()))/1024 << "MB");
357  ATH_MSG_INFO("*** MAX RSS " << (*std::max_element(m_samplesRss.cbegin(),m_samplesRss.cend()))/1024 << "MB");
358  ATH_MSG_INFO("*** MAX SIZE " << (*std::max_element(m_samplesSize.cbegin(),m_samplesSize.cend()))/1024 << "MB");
359  ATH_MSG_INFO("*** MAX SWAP " << (*std::max_element(m_samplesSwap.cbegin(),m_samplesSwap.cend()))/1024 << "MB");
360  ATH_MSG_INFO("*** *** Memory Usage *** ***");
361  }
362 
364  ATH_MSG_INFO("BEGIN collecting sub-process logs");
365  std::vector<std::string> logs;
366  for(it=m_tools.begin(); it!=itLast; ++it) {
367  (*it)->subProcessLogs(logs);
368  for(size_t i=0;i<logs.size();++i) {
369  std::cout << "\n File: " << logs[i] << "\n" << std::endl;
370  std::ifstream log;
371  log.open(logs[i].c_str(),std::ifstream::in);
372  std::string line;
373  while(!log.eof()) {
374  std::getline(log,line);
375  std::cout << line << std::endl;
376  }
377  log.close();
378  }
379  }
380  ATH_MSG_INFO("END collecting sub-process logs");
381  }
382 
383  if(sc.isSuccess())
384  return generateOutputReport();
385  else
386  return sc;
387 }

◆ extractFds()

std::shared_ptr< AthenaInterprocess::FdsRegistry > AthMpEvtLoopMgr::extractFds ( )
private

Definition at line 533 of file AthMpEvtLoopMgr.cxx.

534 {
535  ATH_MSG_DEBUG("Extracting file descriptors");
536  using namespace std::filesystem;
537  std::shared_ptr<AthenaInterprocess::FdsRegistry> registry(new AthenaInterprocess::FdsRegistry());
538 
539  // Extract file descriptors associated with the current process
540  // 1. Store only those regular files in the registry, which
541  // don't contain substrings from the "exclusion pattern" set
542  // 2. Skip also stdout and stderr
543 
544  std::vector<std::string> excludePatterns {
545  "/root/etc/plugins/"
546  ,"/root/cint/cint/"
547  ,"/root/include/"
548  ,"/var/tmp/"
549  ,"/var/lock/"
550  ,"/var/lib/"
551  ,"/bin/python/"
552  ,"/include/c++/"
553  ,".confdb2"
554  };
555 
556  path fdPath("/proc/self/fd");
557  for(directory_iterator fdIt(fdPath); fdIt!=directory_iterator(); fdIt++) {
558  if(is_symlink(fdIt->path())) {
559  path realpath = read_symlink(fdIt->path());
560  int fd = atoi(fdIt->path().filename().string().c_str());
561 
562  if (fd==1 || fd==2) // Skip stdout and stderr
563  continue;
564 
565  if(exists(realpath)) {
566  if(is_regular_file(realpath)) {
567  // Check against the exclusion criteria
568  bool exclude(false);
569  for(size_t i=0;i<excludePatterns.size(); ++i) {
570  if(realpath.string().find(excludePatterns[i])!=std::string::npos) {
571  exclude = true;
572  break;
573  }
574  }
575  if(exclude) {
576  ATH_MSG_DEBUG(realpath.string() << " Excluded from the registry by the pattern");
577  }
578  else {
579  registry->push_back(AthenaInterprocess::FdsRegistryEntry(fd,realpath.string()));
580  }
581  }
582  else {
583  ATH_MSG_DEBUG(realpath.string() << " is not a regular file"); // TODO: deal with these?
584  }
585  } // File exists
586  }
587  else
588  ATH_MSG_WARNING("UNEXPECTED. " << fdIt->path().string() << " Not a symlink");
589  } // Directory iteration
590 
591  ATH_MSG_DEBUG("Fds Reistry created. Contents:");
592  for(size_t ii(0); ii<registry->size(); ++ii)
593  ATH_MSG_DEBUG((*registry)[ii].fd << " " << (*registry)[ii].name);
594 
595  return registry;
596 }

◆ finalize()

StatusCode AthMpEvtLoopMgr::finalize ( )
overridevirtual

Definition at line 137 of file AthMpEvtLoopMgr.cxx.

138 {
139  return StatusCode::SUCCESS;
140 }

◆ generateOutputReport()

StatusCode AthMpEvtLoopMgr::generateOutputReport ( )
private

Definition at line 462 of file AthMpEvtLoopMgr.cxx.

463 {
464  // Loop over tools, collect their output reports and put them all together into a single file.
465  // If m_nEventsBeforeFork!=0 then take into account the outputs made by the master process too
466 
467  std::ofstream ofs;
468  ofs.open(m_outputReportName.c_str());
469  if(!ofs) {
470  ATH_MSG_ERROR("Unable to open AthenaMPOutputs for writing!");
471  return StatusCode::FAILURE;
472  }
473  else {
474  std::vector<AthenaMP::AllWorkerOutputs_ptr> allptrs;
475 
477  itLast = m_tools.end();
478  for(it=m_tools.begin(); it!=itLast; ++it)
479  allptrs.push_back((*it)->generateOutputReport());
480 
481  // First collect keys=file_names from all tools
482  std::set<std::string> allkeys;
483  for(size_t i=0; i<allptrs.size(); ++i) {
484  AthenaMP::AllWorkerOutputsIterator it_wos = allptrs[i]->begin(),
485  it_wosLast = allptrs[i]->end();
486  for(;it_wos!=it_wosLast;++it_wos)
487  allkeys.insert(it_wos->first);
488  }
489 
490  // Generate XML
491  ofs << "<?xml version=\"1.0\" encoding=\"utf-8\"?>" << std::endl;
492  ofs << "<athenaFileReport>" << std::endl;
493  std::set<std::string>::const_iterator keys_it = allkeys.begin(),
494  keys_itLast = allkeys.end();
495  for(;keys_it!=keys_itLast;++keys_it) {
496  ofs << " <Files OriginalName=\"" << (*keys_it) << "\">" << std::endl;
497  for(size_t i=0; i<allptrs.size(); ++i) {
498  AthenaMP::AllWorkerOutputsIterator it_wos = (allptrs[i])->find(*keys_it);
499  if(it_wos!=(allptrs[i])->end()) {
500  for(size_t ii=0; ii<it_wos->second.size(); ++ii) {
501  AthenaMP::WorkerOutput& outp = it_wos->second[ii];
502  if(ii==0 && m_nEventsBeforeFork>0) {
503  std::filesystem::path masterFile(std::filesystem::current_path());
504  masterFile /= std::filesystem::path(*keys_it);
505  if(std::filesystem::exists(masterFile) && std::filesystem::is_regular_file(masterFile))
506  ofs << " <File "
507  << "description=\"" << outp.description
508  << "\" mode=\"" << outp.access_mode
509  << "\" name=\"" << masterFile.string()
510  << "\" shared=\"" << (outp.shared?"True":"False")
511  << "\" technology=\"" << outp.technology
512  << "\"/>" << std::endl;
513  }
514  ofs << " <File "
515  << "description=\"" << outp.description
516  << "\" mode=\"" << outp.access_mode
517  << "\" name=\"" << outp.filename
518  << "\" shared=\"" << (outp.shared?"True":"False")
519  << "\" technology=\"" << outp.technology
520  << "\"/>" << std::endl;
521  }
522  }
523  }
524  ofs << " </Files>" << std::endl;
525  }
526  ofs << "</athenaFileReport>" << std::endl;
527  ofs.close();
528  }
529 
530  return StatusCode::SUCCESS;
531 }

◆ initialize()

StatusCode AthMpEvtLoopMgr::initialize ( )
overridevirtual

Definition at line 79 of file AthMpEvtLoopMgr.cxx.

80 {
81  ATH_MSG_DEBUG("in initialize() ... ");
82 
83  Gaudi::Concurrency::ConcurrencyFlags::setNumProcs(m_nWorkers);
84 
85  SmartIF<IProperty> prpMgr(serviceLocator());
86  if(!prpMgr.isValid()) {
87  ATH_MSG_ERROR("Failed to get hold of the Property Manager");
88  return StatusCode::FAILURE;
89  }
90 
91  std::string evtSelName = prpMgr->getProperty("EvtSel").toString();
92  m_evtSelector = serviceLocator()->service(evtSelName);
93  ATH_CHECK(m_evtSelector.isValid());
94 
95  if(m_strategy=="EventService") {
96  // ES with non-zero events before forking makes no sense
97  if(m_nEventsBeforeFork!=0) {
98  ATH_MSG_ERROR("The EventService strategy cannot run with non-zero value for EventsBeforeFork");
99  return StatusCode::FAILURE;
100  }
101 
102  // We need to ignore SkipEvents in ES
103  if(updateSkipEvents(0).isFailure()) {
104  ATH_MSG_ERROR("Failed to set skipEvents=0 in Event Service");
105  return StatusCode::FAILURE;
106  }
107  }
108 
109  if(m_isPileup) {
110  m_evtProcessor = ServiceHandle<IEventProcessor>("PileUpEventLoopMgr",name());
111  ATH_MSG_INFO("ELM: The job running in pileup mode");
112  }
113  else {
114  ATH_MSG_INFO("ELM: The job running in non-pileup mode");
115  }
116 
117  ATH_CHECK(m_evtProcessor.retrieve());
118  if(!m_isPileup) {
119  SmartIF<IProperty> propertyServer(m_evtProcessor.get());
120  if(propertyServer) {
121  if(propertyServer->setProperty("EventPrintoutInterval",m_eventPrintoutInterval).isFailure()) {
122  ATH_MSG_WARNING("Could not set AthenaEventLoopMgr EventPrintoutInterval to " << m_eventPrintoutInterval);
123  }
124  if(propertyServer->setProperty("ExecAtPreFork",m_execAtPreFork).isFailure()) {
125  ATH_MSG_WARNING("Could not set AthenaEventLoopMgr ExecAtPreFork property, memory usage might get affected!");
126  }
127  }
128  else {
129  ATH_MSG_WARNING("Could not cast AthenaEventLoopMgr to IProperty");
130  }
131  }
132  ATH_CHECK(m_tools.retrieve());
133 
134  return StatusCode::SUCCESS;
135 }

◆ nextEvent()

StatusCode AthMpEvtLoopMgr::nextEvent ( int  maxevt)
overridevirtual

Definition at line 142 of file AthMpEvtLoopMgr.cxx.

143 {
144  // Perhaps there we should return StatusCode::FAILURE as this method shoud not be called directly
145  return m_evtProcessor->nextEvent(maxevt);
146 }

◆ operator=()

AthMpEvtLoopMgr& AthMpEvtLoopMgr::operator= ( const AthMpEvtLoopMgr )
private

◆ stopRun()

StatusCode AthMpEvtLoopMgr::stopRun ( )
overridevirtual

Definition at line 389 of file AthMpEvtLoopMgr.cxx.

390 {
391  m_scheduledStop = true;
392  return m_evtProcessor->stopRun();
393 }

◆ stopScheduled()

virtual bool AthMpEvtLoopMgr::stopScheduled ( ) const
inlineoverridevirtual

Definition at line 36 of file AthMpEvtLoopMgr.h.

36 {return m_scheduledStop;};

◆ updateSkipEvents()

StatusCode AthMpEvtLoopMgr::updateSkipEvents ( int  skipEvents)
private

Definition at line 598 of file AthMpEvtLoopMgr.cxx.

599 {
600  SmartIF<IProperty> propertyServer(m_evtSelector);
601  if(!propertyServer) {
602  ATH_MSG_ERROR("Unable to dyn-cast the event selector to IProperty");
603  return StatusCode::FAILURE;
604  }
605 
606  IntegerProperty skipEventsProperty("SkipEvents", skipEvents);
607  if(propertyServer->setProperty(skipEventsProperty).isFailure()) {
608  ATH_MSG_ERROR("Unable to update " << skipEventsProperty.name() << " property on the Event Selector");
609  return StatusCode::FAILURE;
610  }
611  ATH_MSG_INFO("Updated the SkipEvents property of the event selector. New value: " << skipEvents);
612 
613  return StatusCode::SUCCESS;
614 }

◆ wait()

StatusCode AthMpEvtLoopMgr::wait ( )
private

Definition at line 405 of file AthMpEvtLoopMgr.cxx.

406 {
407  ATH_MSG_INFO("Waiting for sub-processes");
409  itLast = m_tools.end();
410  pid_t pid(0);
411  bool all_ok(true);
412 
413  auto memMonTime = std::chrono::system_clock::now();
414 
415  while(m_nChildProcesses>0) {
416  for(it = m_tools.begin(); it!=itLast; ++it) {
417  if((*it)->wait_once(pid).isFailure()) {
418  all_ok = false;
419  ATH_MSG_ERROR("Failure in waiting or sub-process finished abnormally");
420  break;
421  }
422  else {
423  if(pid>0) m_nChildProcesses -= 1;
424  }
425  }
426  if(!all_ok) break;
427 
428  usleep(m_nPollingInterval*1000);
429 
430  if(m_nMemSamplingInterval>0) {
431  auto currTime = std::chrono::system_clock::now();
432  if(std::chrono::duration<double,std::ratio<1,1>>(currTime-memMonTime).count()>m_nMemSamplingInterval) {
433  unsigned long size(0);
434  unsigned long rss(0);
435  unsigned long pss(0);
436  unsigned long swap(0);
437 
438  if(athenaMP_MemHelper::getPss(getpid(), pss, swap, rss, size, msgLvl(MSG::DEBUG)))
439  ATH_MSG_WARNING("Unable to get memory sample");
440  else {
441  m_samplesRss.push_back(rss);
442  m_samplesPss.push_back(pss);
443  m_samplesSize.push_back(size);
444  m_samplesSwap.push_back(swap);
445  }
446  memMonTime=currTime;
447  }
448  }
449  }
450 
451  for(it=m_tools.begin(); it!=itLast; ++it)
452  (*it)->reportSubprocessStatuses();
453 
454  if(!all_ok) {
455  for(it=m_tools.begin(); it!=itLast; ++it)
456  (*it)->killChildren();
457  }
458 
459  return (all_ok?StatusCode::SUCCESS:StatusCode::FAILURE);
460 }

Member Data Documentation

◆ m_collectSubprocessLogs

bool AthMpEvtLoopMgr::m_collectSubprocessLogs
private

Definition at line 46 of file AthMpEvtLoopMgr.h.

◆ m_eventPrintoutInterval

unsigned int AthMpEvtLoopMgr::m_eventPrintoutInterval
private

Definition at line 52 of file AthMpEvtLoopMgr.h.

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthMpEvtLoopMgr::m_evtProcessor
private

Definition at line 39 of file AthMpEvtLoopMgr.h.

◆ m_evtSelector

SmartIF<IService> AthMpEvtLoopMgr::m_evtSelector
private

Definition at line 40 of file AthMpEvtLoopMgr.h.

◆ m_execAtPreFork

StringArrayProperty AthMpEvtLoopMgr::m_execAtPreFork
private

Definition at line 53 of file AthMpEvtLoopMgr.h.

◆ m_isPileup

bool AthMpEvtLoopMgr::m_isPileup
private

Definition at line 45 of file AthMpEvtLoopMgr.h.

◆ m_masterPid

pid_t AthMpEvtLoopMgr::m_masterPid
private

Definition at line 54 of file AthMpEvtLoopMgr.h.

◆ m_nChildProcesses

int AthMpEvtLoopMgr::m_nChildProcesses
private

Definition at line 48 of file AthMpEvtLoopMgr.h.

◆ m_nEventsBeforeFork

int AthMpEvtLoopMgr::m_nEventsBeforeFork
private

Definition at line 51 of file AthMpEvtLoopMgr.h.

◆ m_nMemSamplingInterval

int AthMpEvtLoopMgr::m_nMemSamplingInterval
private

Definition at line 50 of file AthMpEvtLoopMgr.h.

◆ m_nPollingInterval

int AthMpEvtLoopMgr::m_nPollingInterval
private

Definition at line 49 of file AthMpEvtLoopMgr.h.

◆ m_nWorkers

int AthMpEvtLoopMgr::m_nWorkers
private

Definition at line 41 of file AthMpEvtLoopMgr.h.

◆ m_outputReportName

std::string AthMpEvtLoopMgr::m_outputReportName
private

Definition at line 43 of file AthMpEvtLoopMgr.h.

◆ m_samplesPss

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesPss
private

Definition at line 59 of file AthMpEvtLoopMgr.h.

◆ m_samplesRss

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesRss
private

Definition at line 58 of file AthMpEvtLoopMgr.h.

◆ m_samplesSize

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesSize
private

Definition at line 60 of file AthMpEvtLoopMgr.h.

◆ m_samplesSwap

std::vector<unsigned long> AthMpEvtLoopMgr::m_samplesSwap
private

Definition at line 61 of file AthMpEvtLoopMgr.h.

◆ m_scheduledStop

bool AthMpEvtLoopMgr::m_scheduledStop {false}
private

Definition at line 55 of file AthMpEvtLoopMgr.h.

◆ m_strategy

std::string AthMpEvtLoopMgr::m_strategy
private

Definition at line 44 of file AthMpEvtLoopMgr.h.

◆ m_tools

ToolHandleArray<IAthenaMPTool> AthMpEvtLoopMgr::m_tools
private

Definition at line 47 of file AthMpEvtLoopMgr.h.

◆ m_workerTopDir

std::string AthMpEvtLoopMgr::m_workerTopDir
private

Definition at line 42 of file AthMpEvtLoopMgr.h.


The documentation for this class was generated from the following files:
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
outp
std::ostream * outp
send output to here ...
Definition: hcg.cxx:73
python.Dso.registry
registry
Definition: Control/AthenaServices/python/Dso.py:159
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthMpEvtLoopMgr::m_eventPrintoutInterval
unsigned int m_eventPrintoutInterval
Definition: AthMpEvtLoopMgr.h:52
AthMpEvtLoopMgr::m_isPileup
bool m_isPileup
Definition: AthMpEvtLoopMgr.h:45
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
find
std::string find(const std::string &s)
return a remapped string
Definition: hcg.cxx:135
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
collListGuids.line
string line
Definition: collListGuids.py:77
AthMpEvtLoopMgr::generateOutputReport
StatusCode generateOutputReport()
Definition: AthMpEvtLoopMgr.cxx:462
AthMpEvtLoopMgr::m_nPollingInterval
int m_nPollingInterval
Definition: AthMpEvtLoopMgr.h:49
AthMpEvtLoopMgr::m_samplesRss
std::vector< unsigned long > m_samplesRss
Definition: AthMpEvtLoopMgr.h:58
skel.it
it
Definition: skel.GENtoEVGEN.py:407
AthMpEvtLoopMgr::m_nChildProcesses
int m_nChildProcesses
Definition: AthMpEvtLoopMgr.h:48
AthMpEvtLoopMgr::updateSkipEvents
StatusCode updateSkipEvents(int skipEvents)
Definition: AthMpEvtLoopMgr.cxx:598
exclude
std::set< std::string > exclude
list of directories to be excluded
Definition: hcg.cxx:95
AthMpEvtLoopMgr::m_nEventsBeforeFork
int m_nEventsBeforeFork
Definition: AthMpEvtLoopMgr.h:51
AthMpEvtLoopMgr::m_tools
ToolHandleArray< IAthenaMPTool > m_tools
Definition: AthMpEvtLoopMgr.h:47
XMLtoHeader.count
count
Definition: XMLtoHeader.py:85
AthMpEvtLoopMgr::m_samplesSize
std::vector< unsigned long > m_samplesSize
Definition: AthMpEvtLoopMgr.h:60
AthMpEvtLoopMgr::m_outputReportName
std::string m_outputReportName
Definition: AthMpEvtLoopMgr.h:43
AthenaMP::AllWorkerOutputsIterator
AllWorkerOutputs::iterator AllWorkerOutputsIterator
Definition: IAthenaMPTool.h:30
AthMpEvtLoopMgr::m_nWorkers
int m_nWorkers
Definition: AthMpEvtLoopMgr.h:41
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:56
python.SCT_ByteStreamErrorsTestAlgConfig.maxEvents
maxEvents
Definition: SCT_ByteStreamErrorsTestAlgConfig.py:43
AthMpEvtLoopMgr::m_workerTopDir
std::string m_workerTopDir
Definition: AthMpEvtLoopMgr.h:42
AthMpEvtLoopMgr::m_samplesPss
std::vector< unsigned long > m_samplesPss
Definition: AthMpEvtLoopMgr.h:59
athenaMP_MemHelper::getPss
int getPss(pid_t, unsigned long &, unsigned long &, unsigned long &, unsigned long &, bool verbose=false)
AthMpEvtLoopMgr::wait
StatusCode wait()
Definition: AthMpEvtLoopMgr.cxx:405
AthMpEvtLoopMgr::m_evtSelector
SmartIF< IService > m_evtSelector
Definition: AthMpEvtLoopMgr.h:40
python.setupRTTAlg.size
int size
Definition: setupRTTAlg.py:39
AthMpEvtLoopMgr::m_nMemSamplingInterval
int m_nMemSamplingInterval
Definition: AthMpEvtLoopMgr.h:50
python.handimod.now
now
Definition: handimod.py:675
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
LArG4FSStartPointFilter.rand
rand
Definition: LArG4FSStartPointFilter.py:80
lumiFormat.i
int i
Definition: lumiFormat.py:85
python.DecayParser.buf
buf
print ("=> [%s]"cmd)
Definition: DecayParser.py:27
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthMpEvtLoopMgr::m_samplesSwap
std::vector< unsigned long > m_samplesSwap
Definition: AthMpEvtLoopMgr.h:61
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthMpEvtLoopMgr::m_strategy
std::string m_strategy
Definition: AthMpEvtLoopMgr.h:44
AthenaInterprocess::randString
std::string randString()
Definition: Control/AthenaInterprocess/AthenaInterprocess/Utilities.h:13
AthMpEvtLoopMgr::extractFds
std::shared_ptr< AthenaInterprocess::FdsRegistry > extractFds()
Definition: AthMpEvtLoopMgr.cxx:533
AthenaInterprocess::FdsRegistry
std::vector< FdsRegistryEntry > FdsRegistry
Definition: FdsRegistry.h:22
WriteCalibToCool.swap
swap
Definition: WriteCalibToCool.py:94
PixelAthHitMonAlgCfg.duration
duration
Definition: PixelAthHitMonAlgCfg.py:152
ReadFromCoolCompare.fd
fd
Definition: ReadFromCoolCompare.py:196
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AthMpEvtLoopMgr::m_execAtPreFork
StringArrayProperty m_execAtPreFork
Definition: AthMpEvtLoopMgr.h:53
CaloSwCorrections.time
def time(flags, cells_name, *args, **kw)
Definition: CaloSwCorrections.py:242
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaInterprocess::FdsRegistryEntry
Definition: FdsRegistry.h:13
DEBUG
#define DEBUG
Definition: page_access.h:11
AthenaMP::WorkerOutput
Definition: IAthenaMPTool.h:21
python.CaloCondTools.log
log
Definition: CaloCondTools.py:20
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
AthMpEvtLoopMgr::m_masterPid
pid_t m_masterPid
Definition: AthMpEvtLoopMgr.h:54
python.dummyaccess.exists
def exists(filename)
Definition: dummyaccess.py:9
AthMpEvtLoopMgr::m_scheduledStop
bool m_scheduledStop
Definition: AthMpEvtLoopMgr.h:55
AthMpEvtLoopMgr::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthMpEvtLoopMgr.h:36
AthMpEvtLoopMgr::m_collectSubprocessLogs
bool m_collectSubprocessLogs
Definition: AthMpEvtLoopMgr.h:46
ServiceHandle< StoreGateSvc >