ATLAS Offline Software
Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Private Types | Private Member Functions | Private Attributes | List of all members
SharedEvtQueueConsumer Class Referencefinalabstract

#include <SharedEvtQueueConsumer.h>

Inheritance diagram for SharedEvtQueueConsumer:
Collaboration diagram for SharedEvtQueueConsumer:

Public Member Functions

 SharedEvtQueueConsumer (const std::string &type, const std::string &name, const IInterface *parent)
 
virtual ~SharedEvtQueueConsumer () override
 
virtual StatusCode initialize () override
 
virtual StatusCode finalize () override
 
virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override
 
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override
 
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t &pid) override
 
virtual void reportSubprocessStatuses () override
 
virtual void subProcessLogs (std::vector< std::string > &) override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkexec_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkfin_func () override
 
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport () override
 
virtual void useFdsRegistry (std::shared_ptr< AthenaInterprocess::FdsRegistry >) override
 
virtual void setRandString (const std::string &randStr) override
 
virtual void setMaxEvt (int maxEvt) override
 
virtual void setMPRunStop (const AthenaInterprocess::IMPRunStop *runStop) override
 
virtual void killChildren () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > virtual operator() ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func ()=0
 
virtual std::unique_ptr< ScheduledWork > operator () ATLAS_NOT_THREAD_SAFE(const ScheduledWork &)=0
 

Protected Types

enum  ESRange_Status {
  ESRANGE_SUCCESS, ESRANGE_NOTFOUND, ESRANGE_SEEKFAILED, ESRANGE_PROCFAILED,
  ESRANGE_FILENOTMADE, ESRANGE_BADINPFILE
}
 
enum  Func_Flag { FUNC_BOOTSTRAP, FUNC_EXEC, FUNC_FIN }
 

Protected Member Functions

int mapAsyncFlag ATLAS_NOT_THREAD_SAFE (Func_Flag flag, pid_t pid=0)
 
int redirectLog (const std::string &rundir, bool addTimeStamp=true)
 
int updateIoReg (const std::string &rundir)
 
std::string fmterror (int errnum)
 
int reopenFds ()
 
int handleSavedPfc (const std::filesystem::path &dest_path)
 
void waitForSignal ()
 
IEvtSelector * evtSelector ()
 

Protected Attributes

int m_nprocs {-1}
 Number of workers spawned by the master process. More...
 
int m_maxEvt {-1}
 Maximum number of events assigned to the job. More...
 
std::string m_subprocTopDir
 Top run directory for subprocesses. More...
 
std::string m_subprocDirPrefix
 For ex. "worker__". More...
 
std::string m_evtSelName
 Name of the event selector. More...
 
AthenaInterprocess::ProcessGroupm_processGroup {nullptr}
 
const AthenaInterprocess::IMPRunStopm_mpRunStop {nullptr}
 
ServiceHandle< IEventProcessor > m_evtProcessor
 
ServiceHandle< IAppMgrUI > m_appMgr
 
ServiceHandle< IFileMgr > m_fileMgr
 
ServiceHandle< IIoComponentMgr > m_ioMgr
 
SmartIF< IEvtSelector > m_evtSelector
 
std::string m_fileMgrLog
 
std::shared_ptr< AthenaInterprocess::FdsRegistrym_fdsRegistry
 
std::string m_randStr
 
Gaudi::Property< bool > m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
 

Private Types

typedef System::ProcessTime::TimeValueType TimeValType
 

Private Member Functions

int decodeProcessResult ATLAS_NOT_THREAD_SAFE (const AthenaInterprocess::ProcessResult *presult, bool doFinalize)
 
int reopenFd (int fd, const std::string &name)
 

Private Attributes

Gaudi::Property< bool > m_useSharedReader {this, "UseSharedReader", false, "Work in pair with a SharedReader"}
 
Gaudi::Property< bool > m_useSharedWriter {this, "UseSharedWriter", false, "Work in pair with a SharedWriter"}
 
Gaudi::Property< bool > m_isRoundRobin {this, "IsRoundRobin", false, "Are we running in the 'reproducible mode'?"}
 
Gaudi::Property< bool > m_debug {this, "Debug", false}
 
Gaudi::Property< bool > m_readEventOrders {this, "ReadEventOrders", false}
 
Gaudi::Property< int > m_nEventsBeforeFork {this, "EventsBeforeFork", 0}
 
Gaudi::Property< std::string > m_eventOrdersFile {this, "EventOrdersFile", "athenamp_eventorders.txt"}
 
int m_rankId {-1}
 
int m_nSkipEvents {0}
 
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
 
SmartIF< IEventSeekm_evtSeek
 
SmartIF< IEvtSelectorSeekm_evtSelSeek
 
IEvtSelector::Context * m_evtContext {nullptr}
 
SmartIF< IEventSharem_evtShare
 
SmartIF< IDataSharem_dataShare
 
AthenaInterprocess::SharedQueuem_sharedEventQueue {nullptr}
 
std::unique_ptr< AthenaInterprocess::SharedQueuem_sharedRankQueue
 
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
 
std::queue< pid_tm_finQueue
 
std::vector< int > m_eventOrders
 
pid_t m_masterPid
 

Detailed Description

Definition at line 23 of file SharedEvtQueueConsumer.h.

Member Typedef Documentation

◆ TimeValType

typedef System::ProcessTime::TimeValueType SharedEvtQueueConsumer::TimeValType
private

Definition at line 76 of file SharedEvtQueueConsumer.h.

Member Enumeration Documentation

◆ ESRange_Status

enum AthenaMPToolBase::ESRange_Status
protectedinherited
Enumerator
ESRANGE_SUCCESS 
ESRANGE_NOTFOUND 
ESRANGE_SEEKFAILED 
ESRANGE_PROCFAILED 
ESRANGE_FILENOTMADE 
ESRANGE_BADINPFILE 

Definition at line 58 of file AthenaMPToolBase.h.

◆ Func_Flag

enum AthenaMPToolBase::Func_Flag
protectedinherited
Enumerator
FUNC_BOOTSTRAP 
FUNC_EXEC 
FUNC_FIN 

Definition at line 67 of file AthenaMPToolBase.h.

67  {
69  , FUNC_EXEC
70  , FUNC_FIN
71  };

Constructor & Destructor Documentation

◆ SharedEvtQueueConsumer()

SharedEvtQueueConsumer::SharedEvtQueueConsumer ( const std::string &  type,
const std::string &  name,
const IInterface *  parent 
)

Definition at line 34 of file SharedEvtQueueConsumer.cxx.

38  , m_chronoStatSvc("ChronoStatSvc", name)
39  , m_masterPid(getpid())
40 {
41  m_subprocDirPrefix = "worker_";
42 }

◆ ~SharedEvtQueueConsumer()

SharedEvtQueueConsumer::~SharedEvtQueueConsumer ( )
overridevirtual

Definition at line 44 of file SharedEvtQueueConsumer.cxx.

45 {
46 }

Member Function Documentation

◆ ATLAS_NOT_THREAD_SAFE() [1/5]

virtual StatusCode exec SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( )
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [2/5]

int decodeProcessResult SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( const AthenaInterprocess::ProcessResult presult,
bool  doFinalize 
)
private

◆ ATLAS_NOT_THREAD_SAFE() [3/5]

int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE ( Func_Flag  flag,
pid_t  pid = 0 
)
protectedinherited

◆ ATLAS_NOT_THREAD_SAFE() [4/5]

virtual int makePool SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( int  maxevt,
int  nprocs,
const std::string &  topdir 
)
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [5/5]

virtual StatusCode wait_once SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( pid_t pid)
overridevirtual

Reimplemented from AthenaMPToolBase.

◆ bootstrap_func() [1/2]

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueConsumer::bootstrap_func ( )
overridevirtual

Definition at line 254 of file SharedEvtQueueConsumer.cxx.

255 {
256  if(m_debug) waitForSignal();
257 
258  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
259  outwork->data = CxxUtils::xmalloc(sizeof(int));
260  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
261  outwork->size = sizeof(int);
262 
263  // For PileUp Digi Fork-After-N-Events >>>>
264  // Retrieve cuEvent-s for all background event selectors, if we forked after N events
265  std::map<IService*,int> bkgEvtSelectors;
266 
267  if(m_isPileup) {
268  for(IService* ptrSvc : serviceLocator()->getServices()) {
269  IEvtSelector* evtsel = dynamic_cast<IEvtSelector*>(ptrSvc);
270  if(evtsel && (evtsel != m_evtSelector)) {
271  if(m_nEventsBeforeFork>0) {
272  IEvtSelectorSeek* evtselseek = dynamic_cast<IEvtSelectorSeek*>(evtsel);
273  if(evtselseek) {
274  bkgEvtSelectors.emplace(ptrSvc,evtselseek->curEvent(*m_evtContext));
275  }
276  else {
277  ATH_MSG_ERROR("Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->name());
278  return outwork;
279  }
280  }
281  else {
282  bkgEvtSelectors.emplace(ptrSvc,0);
283  }
284  }
285  }
286  }
287  // <<<< For PileUp Digi Fork-After-N-Events
288 
289  // ...
290  // (possible) TODO: extend outwork with some error message, which will be eventually
291  // reported in the master proces
292  // ...
293 
294  // ________________________ Get IncidentSvc and fire PostFork ________________________
295  SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service("IncidentSvc"));
296  if(!p_incidentSvc) {
297  ATH_MSG_ERROR("Unable to retrieve IncidentSvc");
298  return outwork;
299  }
300  p_incidentSvc->fireIncident(Incident(name(),"PostFork"));
301 
302 
303  // ________________________ Get RankID ________________________
304  //
305  if(!m_sharedRankQueue->receive_basic<int>(m_rankId)) {
306  ATH_MSG_ERROR("Unable to get rank ID!");
307  return outwork;
308  }
309  std::ostringstream workindex;
310  workindex<<m_rankId;
311 
312  // ________________________ Worker dir: mkdir ________________________
313  std::filesystem::path worker_rundir(m_subprocTopDir);
314  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
315  // TODO: this "worker_" can be made configurable too
316 
317  if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
318  ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
319  return outwork;
320  }
321 
322  // __________ Redirect logs unless we want to attach debugger ____________
323  if(!m_debug) {
324  if(redirectLog(worker_rundir.string()))
325  return outwork;
326 
327  ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
328  }
329 
330  // ________________________ Update Io Registry ____________________________
331  if(updateIoReg(worker_rundir.string()))
332  return outwork;
333 
334  ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
335 
336  // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
337  std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
338  if(std::filesystem::is_regular_file("SimParams.db"))
339  COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
340  if(std::filesystem::is_regular_file("DigitParams.db"))
341  COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
342  if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
343  COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
344 
345  // _______________________ Handle saved PFC (if any) ______________________
346  if(handleSavedPfc(abs_worker_rundir))
347  return outwork;
348 
349  // ________________________ reopen descriptors ____________________________
350  if(reopenFds())
351  return outwork;
352 
353  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
354 
355 
356  // ________________________ Make Shared Reader/Writer Client ________________________
358  ATH_CHECK( m_evtShare->makeClient(m_rankId), outwork);
359  }
360 
362  SmartIF<IProperty> propertyServer(m_dataShare);
363  if (!propertyServer || propertyServer->setProperty("MakeStreamingToolClient", m_rankId + 1).isFailure()) {
364  ATH_MSG_ERROR("Could not change AthenaPoolSharedIOCnvSvc MakeClient Property");
365  return outwork;
366  }
367  else {
368  ATH_MSG_DEBUG("Successfully made the conversion service a share client");
369  }
370  }
371 
372  // ________________________ I/O reinit ________________________
373  ATH_CHECK( m_ioMgr->io_reinitialize(), outwork );
374 
375  // _______________ Get the value of SkipEvent ________________________
376  if(m_evtSelector) {
377  SmartIF<IProperty> propertyServer(m_evtSelector);
378  ATH_CHECK( propertyServer.isValid(), outwork);
379 
380  IntegerProperty skipEventsProp("SkipEvents", m_nSkipEvents);
381  if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
382  ATH_MSG_INFO("Event Selector does not have SkipEvents property");
383  }
384  else {
385  m_nSkipEvents = skipEventsProp.value();
386  }
387 
388  // ________________________ Event selector restart ________________________
389  SmartIF<IService> evtSelSvc(m_evtSelector);
390  ATH_CHECK( evtSelSvc.isValid(), outwork);
391  ATH_CHECK( evtSelSvc->start(), outwork);
392  }
393  // For PileUp jobs >>>>
394  // Main event selector: advance it if we either forked after N events, or skipEvents!=0
395  // Background event selectors: restart, and advance if we forked after N events
396  if(m_isPileup) {
397  // Deal with the main event selector first
398  m_evtSelSeek = serviceLocator()->service(m_evtSelName);
399  if(!m_evtSelSeek) {
400  ATH_MSG_ERROR("Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
401  return outwork;
402  }
403 
406  ATH_MSG_ERROR("Failed to seek to " << m_nEventsBeforeFork+m_nSkipEvents);
407  return outwork;
408  }
409 
410  // Deal with background event selectors
411  for(auto [evtsel,curEvt] : bkgEvtSelectors) {
412  if(evtsel->start().isSuccess()) {
413  if (m_nEventsBeforeFork>0) {
414  SmartIF<IEvtSelectorSeek> evtselseek(evtsel);
415  if(evtselseek->seek(*m_evtContext,curEvt).isFailure()) {
416  ATH_MSG_ERROR("Failed to seek to " << curEvt << " in the BKG Event Selector " << evtsel->name());
417  return outwork;
418  }
419  }
420  }
421  else {
422  ATH_MSG_ERROR("Failed to restart BKG Event Selector " << evtsel->name());
423  return outwork;
424  }
425  }
426  }
427  // <<<< For PileUp jobs
428 
429  // _______________________ Event orders for debugging ________________________________
430  if(m_readEventOrders) {
431  std::fstream fs(m_eventOrdersFile,std::fstream::in);
432  if(fs.good()) {
433  ATH_MSG_INFO("Reading predefined event orders from " << m_eventOrdersFile);
434  while(fs.good()){
435  std::string line;
436  std::getline(fs,line);
437  if(line.empty())continue;
438 
439  // Parse the string
440  size_t idx(0);
441  int rank = std::stoi(line,&idx);
442  if(rank==m_rankId) {
443  msg(MSG::INFO) << "This worker will proces the following events #";
444  while(idx<line.size()-1) {
445  line = line.substr(idx+1);
446  int evtnum = std::stoi(line,&idx);
447  m_eventOrders.push_back(evtnum);
448  msg(MSG::INFO) << " " << evtnum;
449  }
450  msg(MSG::INFO) << endmsg;
451  }
452  }
453  if(m_eventOrders.empty()) {
454  ATH_MSG_ERROR("Could not read event orders for the rank " << m_rankId);
455  return outwork;
456  }
457  fs.close();
458  }
459  else {
460  ATH_MSG_ERROR("Unable to read predefined event orders from " << m_eventOrdersFile);
461  return outwork;
462  }
463  }
464 
465  // ________________________ Worker dir: chdir ________________________
466  if(chdir(worker_rundir.string().c_str())==-1) {
467  ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
468  return outwork;
469  }
470 
471  // ___________________ Fire UpdateAfterFork incident _________________
472  p_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_rankId,getpid(),name()));
473 
474  // Declare success and return
475  *(int*)(outwork->data) = 0;
476  return outwork;
477 }

◆ bootstrap_func() [2/2]

virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> virtual operator () ATLAS_NOT_THREAD_SAFE ( const AthenaInterprocess std::unique_ptr<AthenaInterprocess::ScheduledWork> AthenaMPToolBase::bootstrap_func ( )
pure virtualinherited

◆ evtSelector()

IEvtSelector* AthenaMPToolBase::evtSelector ( )
inlineprotectedinherited

Definition at line 83 of file AthenaMPToolBase.h.

83 { return m_evtSelector; }

◆ exec_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueConsumer::exec_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 479 of file SharedEvtQueueConsumer.cxx.

480 {
481  ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
482 
483  bool all_ok(true);
484 
485  long intmask = pow(0x100,sizeof(int))-1; // Mask for decoding event number from the value posted to the queue
486  int nEvt(m_nEventsBeforeFork);
487  int nEventsProcessed(0);
488  long evtnumAndChunk(0);
489 
490  unsigned evtCounter(0);
491  int evtnum(0), chunkSize(1);
492  auto predefinedEvt = m_eventOrders.cbegin();
493 
494  // If the event orders file already exists in worker's run directory, then it's an unexpected error!
495  std::filesystem::path ordersFile(m_eventOrdersFile.value());
496  if(std::filesystem::exists(ordersFile)) {
497  ATH_MSG_ERROR(m_eventOrdersFile << " already exists in the worker's run directory!");
498  all_ok = false;
499  }
500 
501  System::ProcessTime time_start = System::getProcessTime();
502  if(all_ok) {
503  std::fstream fs(m_eventOrdersFile,std::fstream::out);
504  fs << m_rankId;
505  bool firstOrder(true);
506  while(true) {
507  if(m_isRoundRobin) {
508  evtnum = m_nSkipEvents + m_nprocs*evtCounter + m_rankId;
509  if(m_maxEvt!=-1 && evtnum>=m_maxEvt+m_nSkipEvents) {
510  break;
511  }
512  evtCounter++;
513  }
514  else {
515  if(m_readEventOrders) {
516  if(predefinedEvt==m_eventOrders.cend()) break;
517  evtnum = *predefinedEvt;
518  predefinedEvt++;
519  fs << (firstOrder?":":",") << evtnum;
520  fs.flush();
521  firstOrder=false;
522  ATH_MSG_INFO("Read event number from the orders file: " << evtnum);
523  }
524  else {
525  if(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
526  // The event queue is empty, but we should check whether there are more events to come or not
527  ATH_MSG_DEBUG("Event queue is empty");
528  usleep(1000);
529  continue;
530  }
531  if(evtnumAndChunk<=0) {
532  evtnumAndChunk *= -1;
533  ATH_MSG_DEBUG("No more events are expected. The total number of events for this job = " << evtnumAndChunk);
534  break;
535  }
536  ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
537  chunkSize = evtnumAndChunk >> (sizeof(int)*8);
538  evtnum = evtnumAndChunk & intmask;
539  ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
540 
541  // Save event order
542  for(int i(0);i<chunkSize;++i) {
543  fs << (firstOrder?":":",") << evtnum+i;
544  firstOrder=false;
545  }
546  fs.flush();
547  } // Get event numbers from the shared queue
548  } // Not RoundRobin
549  nEvt+=chunkSize;
550  StatusCode sc;
551  if(m_useSharedReader) {
552  sc = m_evtShare->share(evtnum);
553  if(sc.isFailure()){
554  ATH_MSG_ERROR("Unable to share " << evtnum);
555  all_ok=false;
556  break;
557  }
558  else {
559  ATH_MSG_INFO("Share of " << evtnum << " succeeded");
560  }
561  }
562  else if(m_evtSelector) {
563  m_chronoStatSvc->chronoStart("AthenaMP_seek");
564  if (m_evtSeek) {
565  sc=m_evtSeek->seek(evtnum);
566  }
567  else {
568  sc=m_evtSelSeek->seek(*m_evtContext, evtnum);
569  }
570  if(sc.isFailure()){
571  ATH_MSG_ERROR("Unable to seek to " << evtnum);
572  all_ok=false;
573  break;
574  }
575  else {
576  ATH_MSG_INFO("Seek to " << evtnum << " succeeded");
577  }
578  m_chronoStatSvc->chronoStop("AthenaMP_seek");
579  }
580  m_chronoStatSvc->chronoStart("AthenaMP_nextEvent");
581  sc = m_evtProcessor->nextEvent(nEvt);
582  nEventsProcessed += chunkSize;
583  if(sc.isFailure()){
584  if(chunkSize==1) {
585  ATH_MSG_ERROR("Unable to process event " << evtnum);
586  }
587  else {
588  ATH_MSG_ERROR("Unable to process the chunk (" << evtnum << "," << evtnum+chunkSize-1 << ")");
589  }
590  all_ok=false;
591  break;
592  }
593  m_chronoStatSvc->chronoStop("AthenaMP_nextEvent");
594  if(m_mpRunStop->stopScheduled()) {
595  ATH_MSG_INFO("Scheduled stop");
596  break;
597  }
598  }
599  fs.close();
600  }
601  System::ProcessTime time_delta = System::getProcessTime() - time_start;
602  TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
603 
604  if(all_ok) {
605  if(m_evtProcessor->executeRun(0).isFailure()) {
606  ATH_MSG_ERROR("Could not finalize the Run");
607  all_ok=false;
608  }
609  else if(!m_useSharedReader && m_evtSelector) {
610  StatusCode sc;
611  if (m_evtSeek) {
612  sc = m_evtSeek->seek(evtnumAndChunk+m_nSkipEvents);
613  }
614  else {
615  sc = m_evtSelSeek->seek(*m_evtContext, evtnumAndChunk+m_nSkipEvents);
616  }
617  if(sc.isFailure()) {
618  ATH_MSG_WARNING("Seek past maxevt to " << evtnumAndChunk+m_nSkipEvents << " returned failure.");
619  }
620  }
621  }
622 
623  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
624 
625  // Return value: "ERRCODE|Func_Flag|NEvt|EvtLoopTime"
626  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(elapsedTime);
627  void* outdata = CxxUtils::xmalloc(outsize);
628  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
630  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
631  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
632  memcpy((char*)outdata+2*sizeof(int)+sizeof(func),&elapsedTime,sizeof(elapsedTime));
633  outwork->data = outdata;
634  outwork->size = outsize;
635  // ...
636  // (possible) TODO: extend outwork with some error message, which will be eventually
637  // reported in the master proces
638  // ...
639  return outwork;
640 }

◆ fin_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueConsumer::fin_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 642 of file SharedEvtQueueConsumer.cxx.

643 {
644  ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
645 
646  bool all_ok(true);
647 
648  if(m_appMgr->stop().isFailure()) {
649  ATH_MSG_ERROR("Unable to stop AppMgr");
650  all_ok=false;
651  }
652  else {
653  if(m_appMgr->finalize().isFailure()) {
654  std::cerr << "Unable to finalize AppMgr" << std::endl;
655  all_ok=false;
656  }
657  }
658 
659  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
660 
661  // Return value: "ERRCODE|Func_Flag|NEvt|EvtLoopTime" (Here NEvt=-1 and EvtLoopTime=-1)
662  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(TimeValType);
663  void* outdata = CxxUtils::xmalloc(outsize);
664  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
666  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
667  int nEvt = -1;
668  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
669  TimeValType elapsed = -1;
670  memcpy((char*)outdata+2*sizeof(int)+sizeof(func),&elapsed,sizeof(elapsed));
671 
672  outwork->data = outdata;
673  outwork->size = outsize;
674 
675  return outwork;
676 }

◆ finalize()

StatusCode SharedEvtQueueConsumer::finalize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 96 of file SharedEvtQueueConsumer.cxx.

97 {
98  if(getpid()==m_masterPid) {
99  ATH_MSG_INFO("finalize() in the master process");
100  // Merge saved event orders into one in the master run directory
101 
102  // 1. Check if master run directory already contains a file with saved orders
103  // If so, then rename it with random suffix
104  std::filesystem::path ordersFile(m_eventOrdersFile.value());
105  if(std::filesystem::exists(ordersFile)) {
106  srand((unsigned)time(0));
107  std::ostringstream randname;
108  randname << rand();
109  std::string ordersFileBak = m_eventOrdersFile+std::string("-bak-")+randname.str();
110  ATH_MSG_WARNING("File " << m_eventOrdersFile << " already exists in the master run directory!");
111  ATH_MSG_WARNING("Saving a backup with new name " << ordersFileBak);
112 
113  std::filesystem::path ordersFileBakpath(ordersFileBak);
114  std::filesystem::rename(ordersFile,ordersFileBakpath);
115  }
116 
117  // 2. Merge workers event orders into the master file
118  std::fstream fs(m_eventOrdersFile,std::fstream::out);
119  for(int i=0; i<m_nprocs; ++i) {
120  std::ostringstream workerIndex;
121  workerIndex << i;
122  std::filesystem::path worker_rundir(m_subprocTopDir);
123  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
124  std::string ordersFileWorker(worker_rundir.string()+std::string("/")+m_eventOrdersFile);
125  ATH_MSG_INFO("Processing " << ordersFileWorker << " ...");
126  std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
127  std::string line;
128  while(fs_worker.good()) {
129  std::getline(fs_worker,line);
130  fs << line << std::endl;
131  }
132  fs_worker.close();
133  }
134  fs.close();
135  } // if(getpid()==m_masterPid)
136 
137  if (m_evtContext) {
138  ATH_CHECK( m_evtSelector->releaseContext (m_evtContext) );
139  m_evtContext = nullptr;
140  }
141 
142  return StatusCode::SUCCESS;
143 }

◆ fmterror()

std::string AthenaMPToolBase::fmterror ( int  errnum)
protectedinherited

Definition at line 333 of file AthenaMPToolBase.cxx.

334 {
335  char buf[256];
336  strerror_r(errnum, buf, sizeof(buf));
337  return std::string(buf);
338 }

◆ generateOutputReport()

AthenaMP::AllWorkerOutputs_ptr AthenaMPToolBase::generateOutputReport ( )
overridevirtualinherited

Reimplemented in EvtRangeProcessor, EvtRangeScatterer, and SharedWriterTool.

Definition at line 119 of file AthenaMPToolBase.cxx.

120 {
122 
123  if(m_fileMgrLog.empty()) {
124  ATH_MSG_WARNING(name() << " cannot make output report because FileMgr has not been configured to write log file!");
125  }
126  else {
127  // Collect output files made by the workers
128  std::string line;
129  for(int i=0;i<m_nprocs;++i) {
130  // Get the name of FileMgr log
131  std::ostringstream workindex;
132  workindex<<i;
134  logFilePath /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
135  std::filesystem::path logFile(logFilePath);
137  if(!(std::filesystem::exists(logFile)&&std::filesystem::is_regular_file(logFile))) {
138  ATH_MSG_WARNING(logFile.string() << " either does not exist or is not a regular file. Skipping");
139  continue;
140  }
141 
142  ATH_MSG_DEBUG("FileMgr log file (" << i << ") " << logFile);
143 
144  std::ifstream inpStream(logFile.string().c_str());
145  std::set<std::string> reportedFiles; // Don't report the same file twice
146  while(!inpStream.eof()) {
147  std::getline(inpStream,line);
148  if(line.find("WRITE")!=std::string::npos) {
149  // Parse the entry
150  size_t startpos(0);
151  std::vector<std::string> entries;
152  while(startpos<line.size()) {
153  while(line[startpos]==' ')
154  startpos++;
155 
156  size_t endpos = line.find(' ',startpos);
157  if(endpos==std::string::npos) endpos = line.size();
158  entries.push_back(line.substr(startpos,endpos-startpos));
159  startpos=endpos+1;
160  }
161 
162  // enties[0] is filename
163  std::filesystem::path filenamePath(entries[0]);
164  std::filesystem::path basename = filenamePath.filename();
165  if(reportedFiles.find(basename.string())==reportedFiles.end())
166  reportedFiles.insert(basename.string());
167  else
168  continue;
169  std::filesystem::path absolutename = basename.is_absolute() ? basename : std::filesystem::absolute(std::filesystem::path(logFilePath)/=basename);
170  AthenaMP::AllWorkerOutputsIterator it1 = jobOutputs->find(basename.string());
171  if(it1==jobOutputs->end()) {
172  (*jobOutputs)[basename.string()] = AthenaMP::SingleWorkerOutputs();
173  (*jobOutputs)[basename.string()].reserve(m_nprocs);
174  }
175 
176  AthenaMP::WorkerOutput newOutput;
177  newOutput.filename = absolutename.string();
178  newOutput.technology = entries[1];
179  newOutput.description = entries[2];
180  newOutput.access_mode = entries[3];
181  newOutput.shared = (line.find("SHARED")!=std::string::npos);
182 
183  (*jobOutputs)[basename.string()].push_back(newOutput);
184  }
185  }
186  }
187  }
188  return jobOutputs;
189 }

◆ handleSavedPfc()

int AthenaMPToolBase::handleSavedPfc ( const std::filesystem::path &  dest_path)
protectedinherited

Definition at line 396 of file AthenaMPToolBase.cxx.

397 {
398  if(std::filesystem::is_regular_file("PoolFileCatalog.xml.AthenaMP-saved"))
399  COPY_FILE_HACK("PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+"/PoolFileCatalog.xml");
400  return 0;
401 }

◆ initialize()

StatusCode SharedEvtQueueConsumer::initialize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 48 of file SharedEvtQueueConsumer.cxx.

49 {
50  ATH_MSG_DEBUG("In initialize");
51 
53 
54  // For pile-up jobs use event loop manager for seeking
55  // otherwise use event selector
56  if(m_isPileup) {
57  m_evtSeek = SmartIF<IEventSeek>(m_evtProcessor.get());
58  if(!m_evtSeek) {
59  ATH_MSG_ERROR("Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
60  return StatusCode::FAILURE;
61  }
62  }
63  else if(m_evtSelector) {
64  m_evtSelSeek = serviceLocator()->service(m_evtSelName);
65  ATH_CHECK(m_evtSelSeek.isValid());
66  }
67 
68  if(m_evtSelector) {
69  ATH_CHECK( m_evtSelector->createContext (m_evtContext) );
70 
71  m_evtShare = serviceLocator()->service(m_evtSelName);
72  if(!m_evtShare) {
73  if(m_useSharedReader) {
74  ATH_MSG_ERROR("Error retrieving IEventShare");
75  return StatusCode::FAILURE;
76  }
77  ATH_MSG_INFO("Could not retrieve IEventShare");
78  }
79 
80  //FIXME: AthenaPool dependent for now
81 
82  if(m_useSharedWriter) {
83  m_dataShare = SmartIF<IDataShare>(serviceLocator()->service("AthenaPoolSharedIOCnvSvc"));
84  if(!m_dataShare) {
85  ATH_MSG_ERROR("Error retrieving AthenaPoolSharedIOCnvSvc");
86  return StatusCode::FAILURE;
87  }
88  }
89  }
90 
91  ATH_CHECK(m_chronoStatSvc.retrieve());
92 
93  return StatusCode::SUCCESS;
94 }

◆ killChildren()

void AthenaMPToolBase::killChildren ( )
overridevirtualinherited

Definition at line 201 of file AthenaMPToolBase.cxx.

202 {
204  kill(child.getProcessID(),SIGKILL);
205  }
206 }

◆ operator()

virtual std::unique_ptr<ScheduledWork> AthenaInterprocess::IMessageDecoder::operator ( ) const &
pure virtualinherited

◆ redirectLog()

int AthenaMPToolBase::redirectLog ( const std::string &  rundir,
bool  addTimeStamp = true 
)
protectedinherited

Definition at line 269 of file AthenaMPToolBase.cxx.

270 {
271  // Redirect both stdout and stderr to the same file AthenaMP.log
272  int dup2result1(0), dup2result2(0);
273 
274  int newout = open(std::string(rundir+"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
275  if(newout==-1) {
276  ATH_MSG_ERROR("Unable to open log file in the run directory. " << fmterror(errno));
277  return -1;
278  }
279  dup2result1 = dup2(newout, STDOUT_FILENO);
280  dup2result2 = dup2(newout, STDERR_FILENO);
281  TEMP_FAILURE_RETRY(close(newout));
282  if(dup2result1==-1) {
283  ATH_MSG_ERROR("Unable to redirect standard output. " << fmterror(errno));
284  return -1;
285  }
286  if(dup2result2==-1) {
287  ATH_MSG_ERROR("Unable to redirect standard error. " << fmterror(errno));
288  return -1;
289  }
290 
291  if(addTimeStamp) {
292  SmartIF<IProperty> propertyServer(msgSvc());
293  if(propertyServer==0) {
294  ATH_MSG_ERROR("Unable to cast message svc to IProperty");
295  return -1;
296  }
297 
298  std::string propertyName("Format");
299  std::string oldFormat("");
300  StringProperty formatProp(propertyName,oldFormat);
301  StatusCode sc = propertyServer->getProperty(&formatProp);
302  if(sc.isFailure()) {
303  ATH_MSG_WARNING("Message Service does not have Format property");
304  }
305  else {
306  oldFormat = formatProp.value();
307  if(oldFormat.find("%t")==std::string::npos) {
308  // Add time stamps
309  std::string newFormat("%t " + oldFormat);
310  StringProperty newFormatProp(std::move(propertyName),newFormat);
311  ATH_CHECK(propertyServer->setProperty(newFormatProp), -1);
312  }
313  else {
314  ATH_MSG_DEBUG("MsgSvc format already contains timestamps. Nothing to be done");
315  }
316  }
317  }
318 
319  return 0;
320 }

◆ reopenFd()

int AthenaMPToolBase::reopenFd ( int  fd,
const std::string &  name 
)
privateinherited

Definition at line 419 of file AthenaMPToolBase.cxx.

420 {
421  ATH_MSG_DEBUG("Attempting to reopen descriptor for " << name);
422  int old_openflags = fcntl(fd,F_GETFL,0);
423  switch(old_openflags & O_ACCMODE) {
424  case O_RDONLY: {
425  ATH_MSG_DEBUG("The File Access Mode is RDONLY");
426  break;
427  }
428  case O_WRONLY: {
429  ATH_MSG_DEBUG("The File Access Mode is WRONLY");
430  break;
431  }
432  case O_RDWR: {
433  ATH_MSG_DEBUG("The File Access Mode is RDWR");
434  break;
435  }
436  }
437 
438  int old_descflags = fcntl(fd,F_GETFD,0);
439  off_t oldpos = lseek(fd,0,SEEK_CUR);
440  if(oldpos==-1) {
441  if(errno==ESPIPE) {
442  ATH_MSG_WARNING("Dealing with PIPE. Skipping ... (FIXME!)");
443  }
444  else {
445  ATH_MSG_ERROR("When re-opening file descriptors lseek failed on " << name << ". " << fmterror(errno));
446  return -1;
447  }
448  }
449  else {
450  Io::Fd newfd = open(name.c_str(),old_openflags);
451  if(newfd==-1) {
452  ATH_MSG_ERROR("When re-opening file descriptors unable to open " << name << " for reading. " << fmterror(errno));
453  return -1;
454  }
455  if(lseek(newfd,oldpos,SEEK_SET)==-1){
456  ATH_MSG_ERROR("When re-opening file descriptors lseek failed on the newly opened " << name << ". " << fmterror(errno));
457  TEMP_FAILURE_RETRY(close(newfd));
458  return -1;
459  }
460  TEMP_FAILURE_RETRY(close(fd));
461  if(dup2(newfd,fd)==-1) {
462  ATH_MSG_ERROR("When re-opening file descriptors unable to duplicate descriptor for " << name << ". " << fmterror(errno));
463  TEMP_FAILURE_RETRY(close(newfd));
464  return -1;
465  }
466  if(fcntl(fd,F_SETFD,old_descflags)==-1) {
467  ATH_MSG_ERROR("When re-opening file descriptors unable to set descriptor flags for " << name << ". " << fmterror(errno));
468  TEMP_FAILURE_RETRY(close(newfd));
469  return -1;
470  }
471  TEMP_FAILURE_RETRY(close(newfd));
472  }
473  return 0;
474 }

◆ reopenFds()

int AthenaMPToolBase::reopenFds ( )
protectedinherited

Definition at line 340 of file AthenaMPToolBase.cxx.

341 {
342  // Reopen file descriptors.
343  // First go over all open files, which have been registered with the FileMgr
344  // Then also check the FdsRegistry, in case it contains some files not registered with the FileMgr
345  std::set<int> fdLog;
346 
347  // Query the FileMgr contents
348  std::vector<const Io::FileAttr*> filemgrFiles;
349  std::vector<const Io::FileAttr*>::const_iterator itFile;
350  unsigned filenum = m_fileMgr->getFiles(filemgrFiles); // Get attributes for open files only. We don't care about closed ones at this point
351  if(filenum!=filemgrFiles.size())
352  ATH_MSG_WARNING("getFiles returned " << filenum << " while vector size is " << filemgrFiles.size());
353 
354  for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
355  ATH_MSG_DEBUG("* " << **itFile);
356  const std::string& filename = (**itFile).name();
357  Io::Fd fd = (**itFile).fd();
358 
359  if(fd==-1) {
360  // It is legal to have fd=-1 for remote inputs
361  // On the other hand, these inputs should not remain open after fork. The issue being tracked at ATEAM-434.
362  // So, this hopefully is a temporary patch
363  ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " << filename);
364  continue;
365  }
366 
367  if(reopenFd(fd,filename))
368  return -1;
369 
370  fdLog.insert(fd);
371  }
372 
373  // Check the FdsRegistry
374  for(const AthenaInterprocess::FdsRegistryEntry& regEntry : *m_fdsRegistry) {
375  if(fdLog.find(regEntry.fd)!=fdLog.end()) {
376  ATH_MSG_DEBUG("The file from FdsRegistry " << regEntry.name << " was registered with FileMgr. Skip reopening");
377  }
378  else {
379  ATH_MSG_WARNING("The file " << regEntry.name << " has not been registered with the FileMgr!");
380 
381  if(regEntry.fd==-1) {
382  // Same protection as the one above
383  ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
384  continue;
385  }
386 
387  if(reopenFd(regEntry.fd,regEntry.name))
388  return -1;
389 
390  fdLog.insert(regEntry.fd);
391  }
392  }
393  return 0;
394 }

◆ reportSubprocessStatuses()

void SharedEvtQueueConsumer::reportSubprocessStatuses ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 223 of file SharedEvtQueueConsumer.cxx.

224 {
225  ATH_MSG_INFO("Statuses of event processors");
226  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
227  for(size_t i=0; i<statuses.size(); ++i) {
228  // Get the number of events processed by this worker
229  auto it = m_eventStat.find(statuses[i].pid);
230  msg(MSG::INFO) << "*** Process PID=" << statuses[i].pid
231  << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
232  << ". Number of events processed: ";
233  if(it==m_eventStat.end())
234  msg(MSG::INFO) << "N/A" << endmsg;
235  else
236  msg(MSG::INFO) << it->second.first
237  << ", Event Loop Time: " << it->second.second << "sec."
238  << endmsg;
239  }
240 }

◆ setMaxEvt()

virtual void AthenaMPToolBase::setMaxEvt ( int  maxEvt)
inlineoverridevirtualinherited

Definition at line 44 of file AthenaMPToolBase.h.

44 {m_maxEvt=maxEvt;}

◆ setMPRunStop()

virtual void AthenaMPToolBase::setMPRunStop ( const AthenaInterprocess::IMPRunStop runStop)
inlineoverridevirtualinherited

Definition at line 45 of file AthenaMPToolBase.h.

45 {m_mpRunStop=runStop;}

◆ setRandString()

void AthenaMPToolBase::setRandString ( const std::string &  randStr)
overridevirtualinherited

Definition at line 196 of file AthenaMPToolBase.cxx.

197 {
198  m_randStr = randStr;
199 }

◆ subProcessLogs()

void SharedEvtQueueConsumer::subProcessLogs ( std::vector< std::string > &  filenames)
overridevirtual

Definition at line 242 of file SharedEvtQueueConsumer.cxx.

243 {
244  filenames.clear();
245  for(int i=0; i<m_nprocs; ++i) {
246  std::ostringstream workerIndex;
247  workerIndex << i;
248  std::filesystem::path worker_rundir(m_subprocTopDir);
249  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
250  filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
251  }
252 }

◆ updateIoReg()

int AthenaMPToolBase::updateIoReg ( const std::string &  rundir)
protectedinherited

Definition at line 322 of file AthenaMPToolBase.cxx.

323 {
324  ATH_CHECK(m_ioMgr.retrieve(), -1);
325 
326  // update the IoRegistry for the new workdir - make sure we use absolute path
328  ATH_CHECK(m_ioMgr->io_update_all(abs_rundir.string()), -1);
329 
330  return 0;
331 }

◆ useFdsRegistry()

void AthenaMPToolBase::useFdsRegistry ( std::shared_ptr< AthenaInterprocess::FdsRegistry registry)
overridevirtualinherited

Definition at line 191 of file AthenaMPToolBase.cxx.

192 {
193  m_fdsRegistry = std::move(registry);
194 }

◆ waitForSignal()

void AthenaMPToolBase::waitForSignal ( )
protectedinherited

Definition at line 403 of file AthenaMPToolBase.cxx.

404 {
405  ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
406  sigset_t mask, oldmask;
407 
409 
410  sigemptyset (&mask);
411  sigaddset (&mask, SIGUSR1);
412 
413  sigprocmask (SIG_BLOCK, &mask, &oldmask);
415  sigsuspend (&oldmask);
416  sigprocmask (SIG_UNBLOCK, &mask, NULL);
417 }

Member Data Documentation

◆ m_appMgr

ServiceHandle<IAppMgrUI> AthenaMPToolBase::m_appMgr
protectedinherited

Definition at line 95 of file AthenaMPToolBase.h.

◆ m_chronoStatSvc

ServiceHandle<IChronoStatSvc> SharedEvtQueueConsumer::m_chronoStatSvc
private

Definition at line 66 of file SharedEvtQueueConsumer.h.

◆ m_dataShare

SmartIF<IDataShare> SharedEvtQueueConsumer::m_dataShare
private

Definition at line 71 of file SharedEvtQueueConsumer.h.

◆ m_debug

Gaudi::Property<bool> SharedEvtQueueConsumer::m_debug {this, "Debug", false}
private

Definition at line 58 of file SharedEvtQueueConsumer.h.

◆ m_eventOrders

std::vector<int> SharedEvtQueueConsumer::m_eventOrders
private

Definition at line 81 of file SharedEvtQueueConsumer.h.

◆ m_eventOrdersFile

Gaudi::Property<std::string> SharedEvtQueueConsumer::m_eventOrdersFile {this, "EventOrdersFile", "athenamp_eventorders.txt"}
private

Definition at line 61 of file SharedEvtQueueConsumer.h.

◆ m_eventStat

std::map<pid_t,std::pair<int,TimeValType> > SharedEvtQueueConsumer::m_eventStat
private

Definition at line 77 of file SharedEvtQueueConsumer.h.

◆ m_evtContext

IEvtSelector::Context* SharedEvtQueueConsumer::m_evtContext {nullptr}
private

Definition at line 69 of file SharedEvtQueueConsumer.h.

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthenaMPToolBase::m_evtProcessor
protectedinherited

Definition at line 94 of file AthenaMPToolBase.h.

◆ m_evtSeek

SmartIF<IEventSeek> SharedEvtQueueConsumer::m_evtSeek
private

Definition at line 67 of file SharedEvtQueueConsumer.h.

◆ m_evtSelector

SmartIF<IEvtSelector> AthenaMPToolBase::m_evtSelector
protectedinherited

Definition at line 98 of file AthenaMPToolBase.h.

◆ m_evtSelName

std::string AthenaMPToolBase::m_evtSelName
protectedinherited

Name of the event selector.

Definition at line 89 of file AthenaMPToolBase.h.

◆ m_evtSelSeek

SmartIF<IEvtSelectorSeek> SharedEvtQueueConsumer::m_evtSelSeek
private

Definition at line 68 of file SharedEvtQueueConsumer.h.

◆ m_evtShare

SmartIF<IEventShare> SharedEvtQueueConsumer::m_evtShare
private

Definition at line 70 of file SharedEvtQueueConsumer.h.

◆ m_fdsRegistry

std::shared_ptr<AthenaInterprocess::FdsRegistry> AthenaMPToolBase::m_fdsRegistry
protectedinherited

Definition at line 100 of file AthenaMPToolBase.h.

◆ m_fileMgr

ServiceHandle<IFileMgr> AthenaMPToolBase::m_fileMgr
protectedinherited

Definition at line 96 of file AthenaMPToolBase.h.

◆ m_fileMgrLog

std::string AthenaMPToolBase::m_fileMgrLog
protectedinherited

Definition at line 99 of file AthenaMPToolBase.h.

◆ m_finQueue

std::queue<pid_t> SharedEvtQueueConsumer::m_finQueue
private

Definition at line 78 of file SharedEvtQueueConsumer.h.

◆ m_ioMgr

ServiceHandle<IIoComponentMgr> AthenaMPToolBase::m_ioMgr
protectedinherited

Definition at line 97 of file AthenaMPToolBase.h.

◆ m_isPileup

Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
protectedinherited

Definition at line 103 of file AthenaMPToolBase.h.

◆ m_isRoundRobin

Gaudi::Property<bool> SharedEvtQueueConsumer::m_isRoundRobin {this, "IsRoundRobin", false, "Are we running in the 'reproducible mode'?"}
private

Definition at line 57 of file SharedEvtQueueConsumer.h.

◆ m_masterPid

pid_t SharedEvtQueueConsumer::m_masterPid
private

Definition at line 82 of file SharedEvtQueueConsumer.h.

◆ m_maxEvt

int AthenaMPToolBase::m_maxEvt {-1}
protectedinherited

Maximum number of events assigned to the job.

Definition at line 86 of file AthenaMPToolBase.h.

◆ m_mpRunStop

const AthenaInterprocess::IMPRunStop* AthenaMPToolBase::m_mpRunStop {nullptr}
protectedinherited

Definition at line 92 of file AthenaMPToolBase.h.

◆ m_nEventsBeforeFork

Gaudi::Property<int> SharedEvtQueueConsumer::m_nEventsBeforeFork {this, "EventsBeforeFork", 0}
private

Definition at line 60 of file SharedEvtQueueConsumer.h.

◆ m_nprocs

int AthenaMPToolBase::m_nprocs {-1}
protectedinherited

Number of workers spawned by the master process.

Definition at line 85 of file AthenaMPToolBase.h.

◆ m_nSkipEvents

int SharedEvtQueueConsumer::m_nSkipEvents {0}
private

Definition at line 64 of file SharedEvtQueueConsumer.h.

◆ m_processGroup

AthenaInterprocess::ProcessGroup* AthenaMPToolBase::m_processGroup {nullptr}
protectedinherited

Definition at line 91 of file AthenaMPToolBase.h.

◆ m_randStr

std::string AthenaMPToolBase::m_randStr
protectedinherited

Definition at line 101 of file AthenaMPToolBase.h.

◆ m_rankId

int SharedEvtQueueConsumer::m_rankId {-1}
private

Definition at line 63 of file SharedEvtQueueConsumer.h.

◆ m_readEventOrders

Gaudi::Property<bool> SharedEvtQueueConsumer::m_readEventOrders {this, "ReadEventOrders", false}
private

Definition at line 59 of file SharedEvtQueueConsumer.h.

◆ m_sharedEventQueue

AthenaInterprocess::SharedQueue* SharedEvtQueueConsumer::m_sharedEventQueue {nullptr}
private

Definition at line 73 of file SharedEvtQueueConsumer.h.

◆ m_sharedRankQueue

std::unique_ptr<AthenaInterprocess::SharedQueue> SharedEvtQueueConsumer::m_sharedRankQueue
private

Definition at line 74 of file SharedEvtQueueConsumer.h.

◆ m_subprocDirPrefix

std::string AthenaMPToolBase::m_subprocDirPrefix
protectedinherited

For ex. "worker__".

Definition at line 88 of file AthenaMPToolBase.h.

◆ m_subprocTopDir

std::string AthenaMPToolBase::m_subprocTopDir
protectedinherited

Top run directory for subprocesses.

Definition at line 87 of file AthenaMPToolBase.h.

◆ m_useSharedReader

Gaudi::Property<bool> SharedEvtQueueConsumer::m_useSharedReader {this, "UseSharedReader", false, "Work in pair with a SharedReader"}
private

Definition at line 55 of file SharedEvtQueueConsumer.h.

◆ m_useSharedWriter

Gaudi::Property<bool> SharedEvtQueueConsumer::m_useSharedWriter {this, "UseSharedWriter", false, "Work in pair with a SharedWriter"}
private

Definition at line 56 of file SharedEvtQueueConsumer.h.


The documentation for this class was generated from the following files:
SharedEvtQueueConsumer::m_nSkipEvents
int m_nSkipEvents
Definition: SharedEvtQueueConsumer.h:64
python.Dso.registry
registry
Definition: Control/AthenaServices/python/Dso.py:158
python.DQPostProcessMod.rundir
def rundir(fname)
Definition: DQPostProcessMod.py:115
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
SharedEvtQueueConsumer::m_isRoundRobin
Gaudi::Property< bool > m_isRoundRobin
Definition: SharedEvtQueueConsumer.h:57
AthenaMPToolBase::waitForSignal
void waitForSignal()
Definition: AthenaMPToolBase.cxx:403
SharedEvtQueueConsumer::m_useSharedReader
Gaudi::Property< bool > m_useSharedReader
Definition: SharedEvtQueueConsumer.h:55
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
AthenaMPToolBase::ESRANGE_BADINPFILE
@ ESRANGE_BADINPFILE
Definition: AthenaMPToolBase.h:64
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:15
AthenaMP::WorkerOutput::description
std::string description
Definition: IAthenaMPTool.h:24
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:91
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
SharedEvtQueueConsumer::m_readEventOrders
Gaudi::Property< bool > m_readEventOrders
Definition: SharedEvtQueueConsumer.h:59
AthenaInterprocess::UpdateAfterFork
Definition: Incidents.h:22
SharedEvtQueueConsumer::m_evtSeek
SmartIF< IEventSeek > m_evtSeek
Definition: SharedEvtQueueConsumer.h:67
SharedEvtQueueConsumer::m_evtSelSeek
SmartIF< IEvtSelectorSeek > m_evtSelSeek
Definition: SharedEvtQueueConsumer.h:68
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:101
AthenaMPToolBase::m_nprocs
int m_nprocs
Number of workers spawned by the master process.
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:70
AthenaInterprocess::ProcessGroup::getChildren
const std::vector< Process > & getChildren() const
Definition: ProcessGroup.cxx:197
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
skel.it
it
Definition: skel.GENtoEVGEN.py:407
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:333
python.AthDsoLogger.out
out
Definition: AthDsoLogger.py:70
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
AthenaMPToolBase::m_evtSelName
std::string m_evtSelName
Name of the event selector.
Definition: AthenaMPToolBase.h:89
AthenaInterprocess::SharedQueue::try_receive_basic
bool try_receive_basic(T &)
Definition: SharedQueue.h:119
AthenaMPToolBase::ESRANGE_SEEKFAILED
@ ESRANGE_SEEKFAILED
Definition: AthenaMPToolBase.h:61
dq_defect_bulk_create_defects.line
line
Definition: dq_defect_bulk_create_defects.py:27
AthenaMPToolBase::m_fileMgrLog
std::string m_fileMgrLog
Definition: AthenaMPToolBase.h:99
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
SharedEvtQueueConsumer::m_eventStat
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
Definition: SharedEvtQueueConsumer.h:77
python.utils.AtlRunQueryLookup.mask
string mask
Definition: AtlRunQueryLookup.py:459
AthenaMP::AllWorkerOutputsIterator
AllWorkerOutputs::iterator AllWorkerOutputsIterator
Definition: IAthenaMPTool.h:30
AthenaMP::SingleWorkerOutputs
std::vector< WorkerOutput > SingleWorkerOutputs
Definition: IAthenaMPTool.h:28
Fd
IIoSvc::Fd Fd
Definition: IoSvc.cxx:22
sigaddset
#define sigaddset(x, y)
Definition: SealSignal.h:84
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:69
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
SharedEvtQueueConsumer::m_rankId
int m_rankId
Definition: SharedEvtQueueConsumer.h:63
sigset_t
int sigset_t
Definition: SealSignal.h:80
IEvtSelectorSeek::seek
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
AthenaMPToolBase::m_evtSelector
SmartIF< IEvtSelector > m_evtSelector
Definition: AthenaMPToolBase.h:98
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:95
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
IEvtSelectorSeek::curEvent
virtual int curEvent(const IEvtSelector::Context &c) const =0
return the current event number.
LArG4FSStartPointFilter.rand
rand
Definition: LArG4FSStartPointFilter.py:80
AthenaMPToolBase::m_isPileup
Gaudi::Property< bool > m_isPileup
Definition: AthenaMPToolBase.h:103
StdJOSetup.msgSvc
msgSvc
Provide convenience handles for various services.
Definition: StdJOSetup.py:36
lumiFormat.i
int i
Definition: lumiFormat.py:85
SharedEvtQueueConsumer::m_dataShare
SmartIF< IDataShare > m_dataShare
Definition: SharedEvtQueueConsumer.h:71
python.DecayParser.buf
buf
print ("=> [%s]"cmd)
Definition: DecayParser.py:27
DetDescrDictionaryDict::it1
std::vector< HWIdentifier >::iterator it1
Definition: DetDescrDictionaryDict.h:17
AthenaMPToolBase::Func_Flag
Func_Flag
Definition: AthenaMPToolBase.h:67
SharedEvtQueueConsumer::m_nEventsBeforeFork
Gaudi::Property< int > m_nEventsBeforeFork
Definition: SharedEvtQueueConsumer.h:60
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaMPToolBase::reopenFd
int reopenFd(int fd, const std::string &name)
Definition: AthenaMPToolBase.cxx:419
AthenaMPToolBase::AthenaMPToolBase
AthenaMPToolBase()
AthenaMP::WorkerOutput::shared
bool shared
Definition: IAthenaMPTool.h:26
SharedEvtQueueConsumer::m_eventOrders
std::vector< int > m_eventOrders
Definition: SharedEvtQueueConsumer.h:81
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
test_pyathena.parent
parent
Definition: test_pyathena.py:15
AthenaMPToolBase_d::pauseForDebug
void pauseForDebug(int)
Definition: AthenaMPToolBase.cxx:28
AthenaMPToolBase::ESRANGE_SUCCESS
@ ESRANGE_SUCCESS
Definition: AthenaMPToolBase.h:59
SharedEvtQueueConsumer::m_sharedRankQueue
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
Definition: SharedEvtQueueConsumer.h:74
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaMP::WorkerOutput::access_mode
std::string access_mode
Definition: IAthenaMPTool.h:25
SharedEvtQueueConsumer::m_eventOrdersFile
Gaudi::Property< std::string > m_eventOrdersFile
Definition: SharedEvtQueueConsumer.h:61
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:322
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:48
SharedEvtQueueConsumer::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: SharedEvtQueueConsumer.h:66
AthenaMPToolBase::m_mpRunStop
const AthenaInterprocess::IMPRunStop * m_mpRunStop
Definition: AthenaMPToolBase.h:92
SharedEvtQueueConsumer::m_evtShare
SmartIF< IEventShare > m_evtShare
Definition: SharedEvtQueueConsumer.h:70
ReadFromCoolCompare.fd
fd
Definition: ReadFromCoolCompare.py:196
SharedEvtQueueConsumer::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedEvtQueueConsumer.h:73
AthenaInterprocess::IMPRunStop::stopScheduled
virtual bool stopScheduled() const =0
AthenaMPToolBase::ESRANGE_PROCFAILED
@ ESRANGE_PROCFAILED
Definition: AthenaMPToolBase.h:62
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:97
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:269
AthenaInterprocess::Process
Definition: Process.h:17
LArG4FSStartPointFilter.outdata
outdata
Definition: LArG4FSStartPointFilter.py:62
Cut::signal
@ signal
Definition: SUSYToolsAlg.cxx:67
python.plotting.G4DebuggerUtils.rename
def rename(label)
Definition: G4DebuggerUtils.py:11
SharedEvtQueueConsumer::m_masterPid
pid_t m_masterPid
Definition: SharedEvtQueueConsumer.h:82
grepfile.filenames
list filenames
Definition: grepfile.py:34
Trk::open
@ open
Definition: BinningType.h:40
AthenaMPToolBase::m_maxEvt
int m_maxEvt
Maximum number of events assigned to the job.
Definition: AthenaMPToolBase.h:86
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
CaloSwCorrections.time
def time(flags, cells_name, *args, **kw)
Definition: CaloSwCorrections.py:242
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
python.Constants.INFO
int INFO
Definition: Control/AthenaCommon/python/Constants.py:15
AthenaMP::WorkerOutput::technology
std::string technology
Definition: IAthenaMPTool.h:23
AthenaInterprocess::FdsRegistryEntry
Definition: FdsRegistry.h:13
SharedEvtQueueConsumer::m_evtContext
IEvtSelector::Context * m_evtContext
Definition: SharedEvtQueueConsumer.h:69
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:32
AthenaMP::WorkerOutput
Definition: IAthenaMPTool.h:21
Herwig7_QED_EvtGen_ll.fs
dictionary fs
Definition: Herwig7_QED_EvtGen_ll.py:17
LArNewCalib_DelayDump_OFC_Cali.idx
idx
Definition: LArNewCalib_DelayDump_OFC_Cali.py:69
CaloCellTimeCorrFiller.filename
filename
Definition: CaloCellTimeCorrFiller.py:23
entries
double entries
Definition: listroot.cxx:49
SharedEvtQueueConsumer::m_useSharedWriter
Gaudi::Property< bool > m_useSharedWriter
Definition: SharedEvtQueueConsumer.h:56
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
For ex. "worker__".
Definition: AthenaMPToolBase.h:88
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Top run directory for subprocesses.
Definition: AthenaMPToolBase.h:87
python.dummyaccess.exists
def exists(filename)
Definition: dummyaccess.py:9
AthenaMPToolBase::ESRANGE_NOTFOUND
@ ESRANGE_NOTFOUND
Definition: AthenaMPToolBase.h:60
CxxUtils::xmalloc
void * xmalloc(size_t size)
Trapping version of malloc.
Definition: xmalloc.cxx:31
pow
constexpr int pow(int base, int exp) noexcept
Definition: ap_fixedTest.cxx:15
jetMakeRefSamples.logFile
string logFile
Definition: jetMakeRefSamples.py:56
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:396
AthenaMPToolBase::m_fileMgr
ServiceHandle< IFileMgr > m_fileMgr
Definition: AthenaMPToolBase.h:96
SharedEvtQueueConsumer::m_debug
Gaudi::Property< bool > m_debug
Definition: SharedEvtQueueConsumer.h:58
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:29
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:68
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:340
IEvtSelectorSeek
Abstract interface for seeking for an event selector.
Definition: IEvtSelectorSeek.h:28
sigemptyset
#define sigemptyset(x)
Definition: SealSignal.h:82
AthenaMPToolBase::ESRANGE_FILENOTMADE
@ ESRANGE_FILENOTMADE
Definition: AthenaMPToolBase.h:63
AthenaMPToolBase::m_fdsRegistry
std::shared_ptr< AthenaInterprocess::FdsRegistry > m_fdsRegistry
Definition: AthenaMPToolBase.h:100
AthenaMP::WorkerOutput::filename
std::string filename
Definition: IAthenaMPTool.h:22
SharedEvtQueueConsumer::TimeValType
System::ProcessTime::TimeValueType TimeValType
Definition: SharedEvtQueueConsumer.h:76
AthenaMPToolBase_d::sig_done
std::atomic< bool > sig_done
Definition: AthenaMPToolBase.cxx:27
AthenaMPToolBase::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthenaMPToolBase.h:94
beamspotman.basename
basename
Definition: beamspotman.py:638