ATLAS Offline Software
Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Private Types | Private Member Functions | Private Attributes | List of all members
EvtRangeProcessor Class Referencefinalabstract

#include <EvtRangeProcessor.h>

Inheritance diagram for EvtRangeProcessor:
Collaboration diagram for EvtRangeProcessor:

Public Member Functions

 EvtRangeProcessor (const std::string &type, const std::string &name, const IInterface *parent)
 
virtual ~EvtRangeProcessor () override
 
virtual StatusCode initialize () override
 
virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override
 
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override
 
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t &pid) override
 
virtual void reportSubprocessStatuses () override
 
virtual void subProcessLogs (std::vector< std::string > &) override
 
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkexec_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkfin_func () override
 
virtual StatusCode finalize () override
 
virtual void useFdsRegistry (std::shared_ptr< AthenaInterprocess::FdsRegistry >) override
 
virtual void setRandString (const std::string &randStr) override
 
virtual void setMaxEvt (int maxEvt) override
 
virtual void setMPRunStop (const AthenaInterprocess::IMPRunStop *runStop) override
 
virtual void killChildren () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > virtual operator() ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func ()=0
 
virtual std::unique_ptr< ScheduledWork > operator () ATLAS_NOT_THREAD_SAFE(const ScheduledWork &)=0
 

Protected Types

enum  ESRange_Status {
  ESRANGE_SUCCESS, ESRANGE_NOTFOUND, ESRANGE_SEEKFAILED, ESRANGE_PROCFAILED,
  ESRANGE_FILENOTMADE, ESRANGE_BADINPFILE
}
 
enum  Func_Flag { FUNC_BOOTSTRAP, FUNC_EXEC, FUNC_FIN }
 

Protected Member Functions

int mapAsyncFlag ATLAS_NOT_THREAD_SAFE (Func_Flag flag, pid_t pid=0)
 
int redirectLog (const std::string &rundir, bool addTimeStamp=true)
 
int updateIoReg (const std::string &rundir)
 
std::string fmterror (int errnum)
 
int reopenFds ()
 
int handleSavedPfc (const std::filesystem::path &dest_path)
 
void waitForSignal ()
 
IEvtSelector * evtSelector ()
 

Protected Attributes

int m_nprocs {-1}
 Number of workers spawned by the master process. More...
 
int m_maxEvt {-1}
 Maximum number of events assigned to the job. More...
 
std::string m_subprocTopDir
 Top run directory for subprocesses. More...
 
std::string m_subprocDirPrefix
 For ex. "worker__". More...
 
std::string m_evtSelName
 Name of the event selector. More...
 
AthenaInterprocess::ProcessGroupm_processGroup {nullptr}
 
const AthenaInterprocess::IMPRunStopm_mpRunStop {nullptr}
 
ServiceHandle< IEventProcessor > m_evtProcessor
 
ServiceHandle< IAppMgrUI > m_appMgr
 
ServiceHandle< IFileMgr > m_fileMgr
 
ServiceHandle< IIoComponentMgr > m_ioMgr
 
SmartIF< IEvtSelector > m_evtSelector
 
std::string m_fileMgrLog
 
std::shared_ptr< AthenaInterprocess::FdsRegistrym_fdsRegistry
 
std::string m_randStr
 
Gaudi::Property< bool > m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
 

Private Types

enum  ProcessState { PROC_STATE_INIT, PROC_STATE_EXEC, PROC_STATE_FIN, PROC_STATE_STOP }
 

Private Member Functions

StatusCode startProcess ATLAS_NOT_THREAD_SAFE ()
 
StatusCode setNewInputFile (const std::string &newFile)
 
void reportError (yampl::ISocket *socket, AthenaMPToolBase::ESRange_Status status)
 
int reopenFd (int fd, const std::string &name)
 

Private Attributes

Gaudi::Property< int > m_nEventsBeforeFork {this, "EventsBeforeFork", 0, "Number of events before forking"}
 
Gaudi::Property< std::string > m_channel2Scatterer {this, "Channel2Scatterer", {}}
 
Gaudi::Property< std::string > m_channel2EvtSel {this, "Channel2EvtSel", {}}
 
Gaudi::Property< bool > m_debug {this, "Debug", false}
 
int m_rankId {-1}
 Each worker has its own unique RankID from the range (0,...,m_nprocs-1) More...
 
int m_activeWorkers {0}
 Keep track of the number of workers. More...
 
std::string m_inpFile
 Cached name of the input file. To avoid reopening. More...
 
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
 
ServiceHandle< IIncidentSvc > m_incidentSvc
 
SmartIF< IEvtSelectorSeekm_evtSeek
 
std::unique_ptr< AthenaInterprocess::SharedQueuem_sharedRankQueue
 
AthenaInterprocess::SharedQueuem_sharedFailedPidQueue {nullptr}
 
std::map< pid_t, int > m_nProcessedEvents
 
std::deque< pid_tm_finQueue
 
std::map< pid_t, ProcessStatem_procStates
 

Detailed Description

Definition at line 25 of file EvtRangeProcessor.h.

Member Enumeration Documentation

◆ ESRange_Status

enum AthenaMPToolBase::ESRange_Status
protectedinherited
Enumerator
ESRANGE_SUCCESS 
ESRANGE_NOTFOUND 
ESRANGE_SEEKFAILED 
ESRANGE_PROCFAILED 
ESRANGE_FILENOTMADE 
ESRANGE_BADINPFILE 

Definition at line 58 of file AthenaMPToolBase.h.

◆ Func_Flag

enum AthenaMPToolBase::Func_Flag
protectedinherited
Enumerator
FUNC_BOOTSTRAP 
FUNC_EXEC 
FUNC_FIN 

Definition at line 67 of file AthenaMPToolBase.h.

67  {
69  , FUNC_EXEC
70  , FUNC_FIN
71  };

◆ ProcessState

Enumerator
PROC_STATE_INIT 
PROC_STATE_EXEC 
PROC_STATE_FIN 
PROC_STATE_STOP 

Definition at line 56 of file EvtRangeProcessor.h.

56  {
61  };

Constructor & Destructor Documentation

◆ EvtRangeProcessor()

EvtRangeProcessor::EvtRangeProcessor ( const std::string &  type,
const std::string &  name,
const IInterface *  parent 
)

Definition at line 34 of file EvtRangeProcessor.cxx.

38  , m_chronoStatSvc("ChronoStatSvc", name)
39  , m_incidentSvc("IncidentSvc", name)
40 {
41  m_subprocDirPrefix = "worker_";
42 }

◆ ~EvtRangeProcessor()

EvtRangeProcessor::~EvtRangeProcessor ( )
overridevirtual

Definition at line 44 of file EvtRangeProcessor.cxx.

45 {
46 }

Member Function Documentation

◆ ATLAS_NOT_THREAD_SAFE() [1/5]

StatusCode startProcess EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE ( )
private

◆ ATLAS_NOT_THREAD_SAFE() [2/5]

virtual StatusCode exec EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE ( )
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [3/5]

int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE ( Func_Flag  flag,
pid_t  pid = 0 
)
protectedinherited

◆ ATLAS_NOT_THREAD_SAFE() [4/5]

virtual int makePool EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE ( int  maxevt,
int  nprocs,
const std::string &  topdir 
)
overridevirtual

◆ ATLAS_NOT_THREAD_SAFE() [5/5]

virtual StatusCode wait_once EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE ( pid_t pid)
overridevirtual

Reimplemented from AthenaMPToolBase.

◆ bootstrap_func() [1/2]

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeProcessor::bootstrap_func ( )
overridevirtual

Definition at line 368 of file EvtRangeProcessor.cxx.

369 {
370  if(m_debug) waitForSignal();
371 
372  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
373  outwork->data = CxxUtils::xmalloc(sizeof(int));
374  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
375  outwork->size = sizeof(int);
376  // ...
377  // (possible) TODO: extend outwork with some error message, which will be eventually
378  // reported in the master proces
379  // ...
380 
381  // ________________________ Get RankID ________________________
382  //
383  if(!m_sharedRankQueue->receive_basic<int>(m_rankId)) {
384  ATH_MSG_ERROR("Unable to get rank ID!");
385  return outwork;
386  }
387  std::ostringstream workindex;
388  workindex<<m_rankId;
389 
390  // ________________________ Worker dir: mkdir ________________________
391  std::filesystem::path worker_rundir(m_subprocTopDir);
392  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
393  // TODO: this "worker_" can be made configurable too
394 
395  if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
396  ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
397  return outwork;
398  }
399 
400  // ________________________ Redirect logs ________________________
401  if(!m_debug) {
402  if(redirectLog(worker_rundir.string()))
403  return outwork;
404 
405  ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
406  }
407 
408  // ________________________ Update Io Registry ____________________________
409  if(updateIoReg(worker_rundir.string()))
410  return outwork;
411 
412  ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
413 
414  // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
415  std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
416  if(std::filesystem::is_regular_file("SimParams.db"))
417  COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
418  if(std::filesystem::is_regular_file("DigitParams.db"))
419  COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
420  if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
421  COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
422 
423  // _______________________ Handle saved PFC (if any) ______________________
424  if(handleSavedPfc(abs_worker_rundir))
425  return outwork;
426 
427  // ________________________ reopen descriptors ____________________________
428  if(reopenFds())
429  return outwork;
430 
431  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
432 
433 
434  // ________________________ I/O reinit ________________________
435  if(!m_ioMgr->io_reinitialize().isSuccess()) {
436  ATH_MSG_ERROR("Failed to reinitialize I/O");
437  return outwork;
438  } else {
439  ATH_MSG_DEBUG("Successfully reinitialized I/O");
440  }
441 
442  // ________________________ Event selector restart ________________________
443  IService* evtSelSvc = dynamic_cast<IService*>(evtSelector());
444  if(!evtSelSvc) {
445  ATH_MSG_ERROR("Failed to dyncast event selector to IService");
446  return outwork;
447  }
448  if(!evtSelSvc->start().isSuccess()) {
449  ATH_MSG_ERROR("Failed to restart the event selector");
450  return outwork;
451  } else {
452  ATH_MSG_DEBUG("Successfully restarted the event selector");
453  }
454 
455  // ________________________ Restart background event selectors in pileup jobs ________________________
456  if(m_isPileup) {
457  const std::list<IService*>& service_list = serviceLocator()->getServices();
458  std::list<IService*>::const_iterator itSvc = service_list.begin(),
459  itSvcLast = service_list.end();
460  for(;itSvc!=itSvcLast;++itSvc) {
461  IEvtSelector* evtsel = dynamic_cast<IEvtSelector*>(*itSvc);
462  if(evtsel && (evtsel != evtSelector())) {
463  if((*itSvc)->start().isSuccess())
464  ATH_MSG_DEBUG("Restarted event selector " << (*itSvc)->name());
465  else {
466  ATH_MSG_ERROR("Failed to restart event selector " << (*itSvc)->name());
467  return outwork;
468  }
469  }
470  }
471  }
472 
473  // ________________________ Worker dir: chdir ________________________
474  if(chdir(worker_rundir.string().c_str())==-1) {
475  ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
476  return outwork;
477  }
478 
479  // Declare success and return
480  *(int*)(outwork->data) = 0;
481  return outwork;
482 }

◆ bootstrap_func() [2/2]

virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> virtual operator () ATLAS_NOT_THREAD_SAFE ( const AthenaInterprocess std::unique_ptr<AthenaInterprocess::ScheduledWork> AthenaMPToolBase::bootstrap_func ( )
pure virtualinherited

◆ evtSelector()

IEvtSelector* AthenaMPToolBase::evtSelector ( )
inlineprotectedinherited

Definition at line 83 of file AthenaMPToolBase.h.

83 { return m_evtSelector; }

◆ exec_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeProcessor::exec_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 484 of file EvtRangeProcessor.cxx.

485 {
486  ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
487 
488  int nEvt(1);
489  int nEventsProcessed(0);
490 
491  std::queue<std::string> queueTokens;
492 
493  // Get the yampl connection channels
494  yampl::ISocketFactory* socketFactory = new yampl::SocketFactory();
495  std::string socket2ScattererName = m_channel2Scatterer.value() + std::string("_") + m_randStr;
496  yampl::ISocket* socket2Scatterer = socketFactory->createClientSocket(yampl::Channel(socket2ScattererName,yampl::LOCAL),yampl::MOVE_DATA);
497  ATH_MSG_INFO("Created CLIENT socket to the Scatterer: " << socket2ScattererName);
498  std::ostringstream pidstr;
499  pidstr << getpid();
500 
501  // Construct a "welcome" message to be sent to the EvtRangeScatterer
502  std::string ping = pidstr.str() + std::string(" ready for event processing");
503 
504  while(true) {
505  void* message2scatterer = CxxUtils::xmalloc(ping.size());
506  memcpy(message2scatterer,ping.data(),ping.size());
507  socket2Scatterer->send(message2scatterer,ping.size());
508  ATH_MSG_INFO("Sent a welcome message to the Scatterer");
509 
510  // Get the response - list of tokens - from the scatterer.
511  // The format of the response: | ResponseSize | RangeID, | evtEvtRange[,evtToken] |
512  char *responseBuffer(0);
513  std::string strPeerId;
514  ssize_t responseSize = socket2Scatterer->recv(responseBuffer,strPeerId);
515  // If response size is 0 then break the loop
516  if(responseSize==1) {
517  ATH_MSG_INFO("Empty range received. Terminating the loop");
518  break;
519  }
520 
521  std::string responseStr(responseBuffer,responseSize);
522  ATH_MSG_INFO("Received response from the Scatterer : " << responseStr);
523 
524  // Start timing
525  System::ProcessTime time_start = System::getProcessTime();
526 
527  size_t startpos(0);
528  size_t endpos = responseStr.find(',');
529  while(endpos!=std::string::npos) {
530  queueTokens.push(responseStr.substr(startpos,endpos-startpos));
531  startpos = endpos+1;
532  endpos = responseStr.find(',',startpos);
533  }
534  queueTokens.push(responseStr.substr(startpos));
535  // Actually the first element in the tokens queue is the RangeID. Get it
536  std::string rangeID = queueTokens.front();
537  queueTokens.pop();
538  ATH_MSG_INFO("Received RangeID=" << rangeID);
539  // Fire an incident
540  m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange",rangeID));
541 
542  // Here we need to support two formats of the responseStr
543  // Format 1. RangeID,startEvent,endEvent
544  // Format 2. RangeID,fileName,startEvent,endEvent
545  //
546  // The difference between these two is that for Format 2 we first
547  // need to update InputCollections property on the Event Selector
548  // and only after that proceed with seeking
549  //
550  // The seeking part is identical for Format 1 and 2
551 
553 
554  // Determine the format
555  std::string filename("");
556  if(queueTokens.front().find("PFN:")==0) {
557  // We have Format 2
558  // Update InputCollections property of the Event Selector with the file name from Event Range
559  filename = queueTokens.front().substr(4);
560  if(setNewInputFile(filename).isFailure()) {
561  ATH_MSG_WARNING("Failed to set input file for the range: " << rangeID);
563  reportError(socket2Scatterer,rangeStatus);
564  m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange","dummy"));
565  continue;
566  }
567  queueTokens.pop();
568  }
569 
570  // Get the number of events to process
571  int startEvent = std::atoi(queueTokens.front().c_str());
572  queueTokens.pop();
573  int endEvent = std::atoi(queueTokens.front().c_str());
574  queueTokens.pop();
575  ATH_MSG_INFO("Range fields. File Name: " << (filename.empty()?"N/A":filename)
576  << ", First Event:" << startEvent
577  << ", Last Event:" << endEvent);
578 
579  // Process the events
580  IEvtSelector::Context* ctx = nullptr;
581  if (evtSelector()->createContext (ctx).isFailure()) {
582  ATH_MSG_WARNING("Failed to create IEventSelector context.");
584  }
585  else {
586  for(int i(startEvent-1); i<endEvent; ++i) {
587  StatusCode sc = m_evtSeek->seek(*ctx, i);
588  if(sc.isRecoverable()) {
589  ATH_MSG_WARNING("Event " << i << " from range: " << rangeID << " not in the input file");
591  break;
592  }
593  else if(sc.isFailure()) {
594  ATH_MSG_WARNING("Failed to seek to " << i << " in range: " << rangeID);
596  break;
597  }
598  ATH_MSG_INFO("Seek to " << i << " succeeded");
599  m_chronoStatSvc->chronoStart("AthenaMP_nextEvent");
600  sc = m_evtProcessor->nextEvent(nEvt++);
601 
602  m_chronoStatSvc->chronoStop("AthenaMP_nextEvent");
603  if(sc.isFailure()){
604  ATH_MSG_WARNING("Failed to process the event " << i << " in range:" << rangeID);
606  break;
607  }
608  else {
609  ATH_MSG_DEBUG("Event processed");
610  nEventsProcessed++;
611  }
612  }
613  }
614  if (evtSelector()->releaseContext (ctx).isFailure()) {
615  ATH_MSG_WARNING("Failed to release IEventSelector context.");
616  }
617 
618  // Fire dummy NextEventRange incident in order to cut the previous output and report it
619  m_incidentSvc->fireIncident(FileIncident(name(),"NextEventRange","dummy"));
620  if(rangeStatus!=AthenaMPToolBase::ESRANGE_SUCCESS) {
621  reportError(socket2Scatterer,rangeStatus);
622  continue;
623  }
624 
625  // Event range successfully processed
626  std::string strOutpFile;
627  // Get the full path of the event range output file
628  for(std::filesystem::directory_iterator fdIt(std::filesystem::current_path()); fdIt!=std::filesystem::directory_iterator(); fdIt++) {
629  if(fdIt->path().string().rfind(rangeID) == fdIt->path().string().size()-rangeID.size()) {
630  if(strOutpFile.empty()) {
631  strOutpFile = fdIt->path().string();
632  }
633  else {
634  strOutpFile += (std::string(",")+fdIt->path().string());
635  }
636  }
637  }
638 
639  // Stop timing
640  System::ProcessTime time_delta = System::getProcessTime() - time_start;
641 
642  // Prepare the output report
643  if(!strOutpFile.empty()) {
644  // We need to combine the output file name with
645  // 1. RangeID (requested by JEDI)
646  // 2. CPU time
647  // 3. Wall time
648  std::ostringstream outputReportStream;
649  outputReportStream << strOutpFile
650  << ",ID:" << rangeID
651  << ",CPU:" << time_delta.cpuTime<System::Sec>()
652  << ",WALL:" << time_delta.elapsedTime<System::Sec>();
653  std::string outputFileReport = outputReportStream.str();
654 
655  // Report the output
656  message2scatterer = CxxUtils::xmalloc(outputFileReport.size());
657  memcpy(message2scatterer,outputFileReport.data(),outputFileReport.size());
658  socket2Scatterer->send(message2scatterer,outputFileReport.size());
659  ATH_MSG_INFO("Reported the output " << outputFileReport);
660  }
661  else {
662  // This is an error: range successfully processed but no outputs were made
663  ATH_MSG_WARNING("Failed to make an output file for range: " << rangeID);
665  }
666  } // Main "event loop"
667 
668  if(m_evtProcessor->executeRun(0).isFailure()) {
669  ATH_MSG_WARNING("Could not finalize the Run");
670  }
671 
672  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
673 
674  // Return value: "ERRCODE|Func_Flag|NEvt"
675  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
676  void* outdata = CxxUtils::xmalloc(outsize);
677  *(int*)(outdata) = 0; // Error code: for now use 0 success, 1 failure
679  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
680  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
681 
682  outwork->data = outdata;
683  outwork->size = outsize;
684  // ...
685  // (possible) TODO: extend outwork with some error message, which will be eventually
686  // reported in the master proces
687  // ...
688 
689  delete socket2Scatterer;
690  delete socketFactory;
691 
692  return outwork;
693 }

◆ fin_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > EvtRangeProcessor::fin_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 695 of file EvtRangeProcessor.cxx.

696 {
697  ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
698 
699  if(m_appMgr->stop().isFailure()) {
700  ATH_MSG_WARNING("Unable to stop AppMgr");
701  }
702  else {
703  if(m_appMgr->finalize().isFailure()) {
704  std::cout << "Unable to finalize AppMgr" << std::endl;
705  }
706  }
707 
708  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
709 
710  // Return value: "ERRCODE|Func_Flag|NEvt" (Here NEvt=-1)
711  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag);
712  void* outdata = CxxUtils::xmalloc(outsize);
713  *(int*)(outdata) = 0; // Error code: for now use 0 success, 1 failure
715  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
716  int nEvt = -1;
717  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
718 
719  outwork->data = outdata;
720  outwork->size = outsize;
721 
722  return outwork;
723 }

◆ finalize()

StatusCode AthenaMPToolBase::finalize ( )
overridevirtualinherited

Reimplemented in SharedEvtQueueConsumer, SharedHiveEvtQueueConsumer, EvtRangeScatterer, and SharedWriterTool.

Definition at line 87 of file AthenaMPToolBase.cxx.

88 {
89  return StatusCode::SUCCESS;
90 }

◆ fmterror()

std::string AthenaMPToolBase::fmterror ( int  errnum)
protectedinherited

Definition at line 333 of file AthenaMPToolBase.cxx.

334 {
335  char buf[256];
336  strerror_r(errnum, buf, sizeof(buf));
337  return std::string(buf);
338 }

◆ generateOutputReport()

AthenaMP::AllWorkerOutputs_ptr EvtRangeProcessor::generateOutputReport ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 362 of file EvtRangeProcessor.cxx.

363 {
365  return jobOutputs;
366 }

◆ handleSavedPfc()

int AthenaMPToolBase::handleSavedPfc ( const std::filesystem::path &  dest_path)
protectedinherited

Definition at line 396 of file AthenaMPToolBase.cxx.

397 {
398  if(std::filesystem::is_regular_file("PoolFileCatalog.xml.AthenaMP-saved"))
399  COPY_FILE_HACK("PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+"/PoolFileCatalog.xml");
400  return 0;
401 }

◆ initialize()

StatusCode EvtRangeProcessor::initialize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 48 of file EvtRangeProcessor.cxx.

49 {
50  ATH_MSG_DEBUG("In initialize");
51 
53  m_evtSeek = serviceLocator()->service(m_evtSelName);
54  ATH_CHECK(m_evtSeek.isValid());
55  ATH_CHECK(m_chronoStatSvc.retrieve());
56  ATH_CHECK(m_incidentSvc.retrieve());
57 
58  return StatusCode::SUCCESS;
59 }

◆ killChildren()

void AthenaMPToolBase::killChildren ( )
overridevirtualinherited

Definition at line 201 of file AthenaMPToolBase.cxx.

202 {
204  kill(child.getProcessID(),SIGKILL);
205  }
206 }

◆ operator()

virtual std::unique_ptr<ScheduledWork> AthenaInterprocess::IMessageDecoder::operator ( ) const &
pure virtualinherited

◆ redirectLog()

int AthenaMPToolBase::redirectLog ( const std::string &  rundir,
bool  addTimeStamp = true 
)
protectedinherited

Definition at line 269 of file AthenaMPToolBase.cxx.

270 {
271  // Redirect both stdout and stderr to the same file AthenaMP.log
272  int dup2result1(0), dup2result2(0);
273 
274  int newout = open(std::string(rundir+"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
275  if(newout==-1) {
276  ATH_MSG_ERROR("Unable to open log file in the run directory. " << fmterror(errno));
277  return -1;
278  }
279  dup2result1 = dup2(newout, STDOUT_FILENO);
280  dup2result2 = dup2(newout, STDERR_FILENO);
281  TEMP_FAILURE_RETRY(close(newout));
282  if(dup2result1==-1) {
283  ATH_MSG_ERROR("Unable to redirect standard output. " << fmterror(errno));
284  return -1;
285  }
286  if(dup2result2==-1) {
287  ATH_MSG_ERROR("Unable to redirect standard error. " << fmterror(errno));
288  return -1;
289  }
290 
291  if(addTimeStamp) {
292  SmartIF<IProperty> propertyServer(msgSvc());
293  if(propertyServer==0) {
294  ATH_MSG_ERROR("Unable to cast message svc to IProperty");
295  return -1;
296  }
297 
298  std::string propertyName("Format");
299  std::string oldFormat("");
300  StringProperty formatProp(propertyName,oldFormat);
301  StatusCode sc = propertyServer->getProperty(&formatProp);
302  if(sc.isFailure()) {
303  ATH_MSG_WARNING("Message Service does not have Format property");
304  }
305  else {
306  oldFormat = formatProp.value();
307  if(oldFormat.find("%t")==std::string::npos) {
308  // Add time stamps
309  std::string newFormat("%t " + oldFormat);
310  StringProperty newFormatProp(std::move(propertyName),newFormat);
311  ATH_CHECK(propertyServer->setProperty(newFormatProp), -1);
312  }
313  else {
314  ATH_MSG_DEBUG("MsgSvc format already contains timestamps. Nothing to be done");
315  }
316  }
317  }
318 
319  return 0;
320 }

◆ reopenFd()

int AthenaMPToolBase::reopenFd ( int  fd,
const std::string &  name 
)
privateinherited

Definition at line 419 of file AthenaMPToolBase.cxx.

420 {
421  ATH_MSG_DEBUG("Attempting to reopen descriptor for " << name);
422  int old_openflags = fcntl(fd,F_GETFL,0);
423  switch(old_openflags & O_ACCMODE) {
424  case O_RDONLY: {
425  ATH_MSG_DEBUG("The File Access Mode is RDONLY");
426  break;
427  }
428  case O_WRONLY: {
429  ATH_MSG_DEBUG("The File Access Mode is WRONLY");
430  break;
431  }
432  case O_RDWR: {
433  ATH_MSG_DEBUG("The File Access Mode is RDWR");
434  break;
435  }
436  }
437 
438  int old_descflags = fcntl(fd,F_GETFD,0);
439  off_t oldpos = lseek(fd,0,SEEK_CUR);
440  if(oldpos==-1) {
441  if(errno==ESPIPE) {
442  ATH_MSG_WARNING("Dealing with PIPE. Skipping ... (FIXME!)");
443  }
444  else {
445  ATH_MSG_ERROR("When re-opening file descriptors lseek failed on " << name << ". " << fmterror(errno));
446  return -1;
447  }
448  }
449  else {
450  Io::Fd newfd = open(name.c_str(),old_openflags);
451  if(newfd==-1) {
452  ATH_MSG_ERROR("When re-opening file descriptors unable to open " << name << " for reading. " << fmterror(errno));
453  return -1;
454  }
455  if(lseek(newfd,oldpos,SEEK_SET)==-1){
456  ATH_MSG_ERROR("When re-opening file descriptors lseek failed on the newly opened " << name << ". " << fmterror(errno));
457  TEMP_FAILURE_RETRY(close(newfd));
458  return -1;
459  }
460  TEMP_FAILURE_RETRY(close(fd));
461  if(dup2(newfd,fd)==-1) {
462  ATH_MSG_ERROR("When re-opening file descriptors unable to duplicate descriptor for " << name << ". " << fmterror(errno));
463  TEMP_FAILURE_RETRY(close(newfd));
464  return -1;
465  }
466  if(fcntl(fd,F_SETFD,old_descflags)==-1) {
467  ATH_MSG_ERROR("When re-opening file descriptors unable to set descriptor flags for " << name << ". " << fmterror(errno));
468  TEMP_FAILURE_RETRY(close(newfd));
469  return -1;
470  }
471  TEMP_FAILURE_RETRY(close(newfd));
472  }
473  return 0;
474 }

◆ reopenFds()

int AthenaMPToolBase::reopenFds ( )
protectedinherited

Definition at line 340 of file AthenaMPToolBase.cxx.

341 {
342  // Reopen file descriptors.
343  // First go over all open files, which have been registered with the FileMgr
344  // Then also check the FdsRegistry, in case it contains some files not registered with the FileMgr
345  std::set<int> fdLog;
346 
347  // Query the FileMgr contents
348  std::vector<const Io::FileAttr*> filemgrFiles;
349  std::vector<const Io::FileAttr*>::const_iterator itFile;
350  unsigned filenum = m_fileMgr->getFiles(filemgrFiles); // Get attributes for open files only. We don't care about closed ones at this point
351  if(filenum!=filemgrFiles.size())
352  ATH_MSG_WARNING("getFiles returned " << filenum << " while vector size is " << filemgrFiles.size());
353 
354  for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
355  ATH_MSG_DEBUG("* " << **itFile);
356  const std::string& filename = (**itFile).name();
357  Io::Fd fd = (**itFile).fd();
358 
359  if(fd==-1) {
360  // It is legal to have fd=-1 for remote inputs
361  // On the other hand, these inputs should not remain open after fork. The issue being tracked at ATEAM-434.
362  // So, this hopefully is a temporary patch
363  ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " << filename);
364  continue;
365  }
366 
367  if(reopenFd(fd,filename))
368  return -1;
369 
370  fdLog.insert(fd);
371  }
372 
373  // Check the FdsRegistry
374  for(const AthenaInterprocess::FdsRegistryEntry& regEntry : *m_fdsRegistry) {
375  if(fdLog.find(regEntry.fd)!=fdLog.end()) {
376  ATH_MSG_DEBUG("The file from FdsRegistry " << regEntry.name << " was registered with FileMgr. Skip reopening");
377  }
378  else {
379  ATH_MSG_WARNING("The file " << regEntry.name << " has not been registered with the FileMgr!");
380 
381  if(regEntry.fd==-1) {
382  // Same protection as the one above
383  ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
384  continue;
385  }
386 
387  if(reopenFd(regEntry.fd,regEntry.name))
388  return -1;
389 
390  fdLog.insert(regEntry.fd);
391  }
392  }
393  return 0;
394 }

◆ reportError()

void EvtRangeProcessor::reportError ( yampl::ISocket *  socket,
AthenaMPToolBase::ESRange_Status  status 
)
private

Definition at line 784 of file EvtRangeProcessor.cxx.

785 {
786  pid_t pid = getpid();
787  size_t messageSize = sizeof(pid_t)+sizeof(AthenaMPToolBase::ESRange_Status);
788  void* message2scatterer = CxxUtils::xmalloc(messageSize);
789  memcpy(message2scatterer,&pid,sizeof(pid_t));
790  memcpy((pid_t*)message2scatterer+1,&status,sizeof(AthenaMPToolBase::ESRange_Status));
791  socket->send(message2scatterer,messageSize);
792 }

◆ reportSubprocessStatuses()

void EvtRangeProcessor::reportSubprocessStatuses ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 332 of file EvtRangeProcessor.cxx.

333 {
334  ATH_MSG_INFO("Statuses of event processors");
335  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
336  for(size_t i=0; i<statuses.size(); ++i) {
337  // Get the number of events processed by this worker
338  std::map<pid_t,int>::const_iterator it = m_nProcessedEvents.find(statuses[i].pid);
339  std::ostringstream ostr;
340  if(it==m_nProcessedEvents.end())
341  ostr << "N/A";
342  else
343  ostr << it->second;
344  ATH_MSG_INFO("*** Process PID=" << statuses[i].pid
345  << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
346  << ". Number of events processed: " << ostr.str());
347  }
348 }

◆ setMaxEvt()

virtual void AthenaMPToolBase::setMaxEvt ( int  maxEvt)
inlineoverridevirtualinherited

Definition at line 44 of file AthenaMPToolBase.h.

44 {m_maxEvt=maxEvt;}

◆ setMPRunStop()

virtual void AthenaMPToolBase::setMPRunStop ( const AthenaInterprocess::IMPRunStop runStop)
inlineoverridevirtualinherited

Definition at line 45 of file AthenaMPToolBase.h.

45 {m_mpRunStop=runStop;}

◆ setNewInputFile()

StatusCode EvtRangeProcessor::setNewInputFile ( const std::string &  newFile)
private

Definition at line 750 of file EvtRangeProcessor.cxx.

751 {
752  if(m_inpFile == newFile) return StatusCode::SUCCESS;
753 
754  // Get Property Server
755  IProperty* propertyServer = dynamic_cast<IProperty*>(evtSelector());
756  if(!propertyServer) {
757  ATH_MSG_ERROR("Unable to dyn-cast the event selector to IProperty");
758  return StatusCode::FAILURE;
759  }
760 
761  std::string propertyName("InputCollections");
762  if(m_inpFile.empty()) {
763  std::vector<std::string> vect;
764  StringArrayProperty inputFileList(propertyName, vect);
765  if(propertyServer->getProperty(&inputFileList).isFailure()) {
766  ATH_MSG_ERROR("Failed to get InputCollections property value of the Event Selector");
767  return StatusCode::FAILURE;
768  }
769  if(newFile==inputFileList.value()[0]) {
770  m_inpFile = newFile;
771  return StatusCode::SUCCESS;
772  }
773  }
774  std::vector<std::string> vect{newFile,};
775  StringArrayProperty newInputFileList(std::move(propertyName), vect);
776  if(propertyServer->setProperty(newInputFileList).isFailure()) {
777  ATH_MSG_ERROR("Unable to update " << newInputFileList.name() << " property on the Event Selector");
778  return StatusCode::FAILURE;
779  }
780  m_inpFile=newFile;
781  return StatusCode::SUCCESS;
782 }

◆ setRandString()

void AthenaMPToolBase::setRandString ( const std::string &  randStr)
overridevirtualinherited

Definition at line 196 of file AthenaMPToolBase.cxx.

197 {
198  m_randStr = randStr;
199 }

◆ subProcessLogs()

void EvtRangeProcessor::subProcessLogs ( std::vector< std::string > &  filenames)
overridevirtual

Definition at line 350 of file EvtRangeProcessor.cxx.

351 {
352  filenames.clear();
353  for(int i=0; i<m_nprocs; ++i) {
354  std::ostringstream workerIndex;
355  workerIndex << i;
356  std::filesystem::path worker_rundir(m_subprocTopDir);
357  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
358  filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
359  }
360 }

◆ updateIoReg()

int AthenaMPToolBase::updateIoReg ( const std::string &  rundir)
protectedinherited

Definition at line 322 of file AthenaMPToolBase.cxx.

323 {
324  ATH_CHECK(m_ioMgr.retrieve(), -1);
325 
326  // update the IoRegistry for the new workdir - make sure we use absolute path
328  ATH_CHECK(m_ioMgr->io_update_all(abs_rundir.string()), -1);
329 
330  return 0;
331 }

◆ useFdsRegistry()

void AthenaMPToolBase::useFdsRegistry ( std::shared_ptr< AthenaInterprocess::FdsRegistry registry)
overridevirtualinherited

Definition at line 191 of file AthenaMPToolBase.cxx.

192 {
193  m_fdsRegistry = std::move(registry);
194 }

◆ waitForSignal()

void AthenaMPToolBase::waitForSignal ( )
protectedinherited

Definition at line 403 of file AthenaMPToolBase.cxx.

404 {
405  ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
406  sigset_t mask, oldmask;
407 
409 
410  sigemptyset (&mask);
411  sigaddset (&mask, SIGUSR1);
412 
413  sigprocmask (SIG_BLOCK, &mask, &oldmask);
415  sigsuspend (&oldmask);
416  sigprocmask (SIG_UNBLOCK, &mask, NULL);
417 }

Member Data Documentation

◆ m_activeWorkers

int EvtRangeProcessor::m_activeWorkers {0}
private

Keep track of the number of workers.

Definition at line 69 of file EvtRangeProcessor.h.

◆ m_appMgr

ServiceHandle<IAppMgrUI> AthenaMPToolBase::m_appMgr
protectedinherited

Definition at line 95 of file AthenaMPToolBase.h.

◆ m_channel2EvtSel

Gaudi::Property<std::string> EvtRangeProcessor::m_channel2EvtSel {this, "Channel2EvtSel", {}}
private

Definition at line 65 of file EvtRangeProcessor.h.

◆ m_channel2Scatterer

Gaudi::Property<std::string> EvtRangeProcessor::m_channel2Scatterer {this, "Channel2Scatterer", {}}
private

Definition at line 64 of file EvtRangeProcessor.h.

◆ m_chronoStatSvc

ServiceHandle<IChronoStatSvc> EvtRangeProcessor::m_chronoStatSvc
private

Definition at line 72 of file EvtRangeProcessor.h.

◆ m_debug

Gaudi::Property<bool> EvtRangeProcessor::m_debug {this, "Debug", false}
private

Definition at line 66 of file EvtRangeProcessor.h.

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthenaMPToolBase::m_evtProcessor
protectedinherited

Definition at line 94 of file AthenaMPToolBase.h.

◆ m_evtSeek

SmartIF<IEvtSelectorSeek> EvtRangeProcessor::m_evtSeek
private

Definition at line 74 of file EvtRangeProcessor.h.

◆ m_evtSelector

SmartIF<IEvtSelector> AthenaMPToolBase::m_evtSelector
protectedinherited

Definition at line 98 of file AthenaMPToolBase.h.

◆ m_evtSelName

std::string AthenaMPToolBase::m_evtSelName
protectedinherited

Name of the event selector.

Definition at line 89 of file AthenaMPToolBase.h.

◆ m_fdsRegistry

std::shared_ptr<AthenaInterprocess::FdsRegistry> AthenaMPToolBase::m_fdsRegistry
protectedinherited

Definition at line 100 of file AthenaMPToolBase.h.

◆ m_fileMgr

ServiceHandle<IFileMgr> AthenaMPToolBase::m_fileMgr
protectedinherited

Definition at line 96 of file AthenaMPToolBase.h.

◆ m_fileMgrLog

std::string AthenaMPToolBase::m_fileMgrLog
protectedinherited

Definition at line 99 of file AthenaMPToolBase.h.

◆ m_finQueue

std::deque<pid_t> EvtRangeProcessor::m_finQueue
private

Definition at line 80 of file EvtRangeProcessor.h.

◆ m_incidentSvc

ServiceHandle<IIncidentSvc> EvtRangeProcessor::m_incidentSvc
private

Definition at line 73 of file EvtRangeProcessor.h.

◆ m_inpFile

std::string EvtRangeProcessor::m_inpFile
private

Cached name of the input file. To avoid reopening.

Definition at line 70 of file EvtRangeProcessor.h.

◆ m_ioMgr

ServiceHandle<IIoComponentMgr> AthenaMPToolBase::m_ioMgr
protectedinherited

Definition at line 97 of file AthenaMPToolBase.h.

◆ m_isPileup

Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
protectedinherited

Definition at line 103 of file AthenaMPToolBase.h.

◆ m_maxEvt

int AthenaMPToolBase::m_maxEvt {-1}
protectedinherited

Maximum number of events assigned to the job.

Definition at line 86 of file AthenaMPToolBase.h.

◆ m_mpRunStop

const AthenaInterprocess::IMPRunStop* AthenaMPToolBase::m_mpRunStop {nullptr}
protectedinherited

Definition at line 92 of file AthenaMPToolBase.h.

◆ m_nEventsBeforeFork

Gaudi::Property<int> EvtRangeProcessor::m_nEventsBeforeFork {this, "EventsBeforeFork", 0, "Number of events before forking"}
private

Definition at line 63 of file EvtRangeProcessor.h.

◆ m_nProcessedEvents

std::map<pid_t,int> EvtRangeProcessor::m_nProcessedEvents
private

Definition at line 79 of file EvtRangeProcessor.h.

◆ m_nprocs

int AthenaMPToolBase::m_nprocs {-1}
protectedinherited

Number of workers spawned by the master process.

Definition at line 85 of file AthenaMPToolBase.h.

◆ m_processGroup

AthenaInterprocess::ProcessGroup* AthenaMPToolBase::m_processGroup {nullptr}
protectedinherited

Definition at line 91 of file AthenaMPToolBase.h.

◆ m_procStates

std::map<pid_t,ProcessState> EvtRangeProcessor::m_procStates
private

Definition at line 81 of file EvtRangeProcessor.h.

◆ m_randStr

std::string AthenaMPToolBase::m_randStr
protectedinherited

Definition at line 101 of file AthenaMPToolBase.h.

◆ m_rankId

int EvtRangeProcessor::m_rankId {-1}
private

Each worker has its own unique RankID from the range (0,...,m_nprocs-1)

Definition at line 68 of file EvtRangeProcessor.h.

◆ m_sharedFailedPidQueue

AthenaInterprocess::SharedQueue* EvtRangeProcessor::m_sharedFailedPidQueue {nullptr}
private

Definition at line 77 of file EvtRangeProcessor.h.

◆ m_sharedRankQueue

std::unique_ptr<AthenaInterprocess::SharedQueue> EvtRangeProcessor::m_sharedRankQueue
private

Definition at line 76 of file EvtRangeProcessor.h.

◆ m_subprocDirPrefix

std::string AthenaMPToolBase::m_subprocDirPrefix
protectedinherited

For ex. "worker__".

Definition at line 88 of file AthenaMPToolBase.h.

◆ m_subprocTopDir

std::string AthenaMPToolBase::m_subprocTopDir
protectedinherited

Top run directory for subprocesses.

Definition at line 87 of file AthenaMPToolBase.h.


The documentation for this class was generated from the following files:
python.Dso.registry
registry
Definition: Control/AthenaServices/python/Dso.py:158
python.DQPostProcessMod.rundir
def rundir(fname)
Definition: DQPostProcessMod.py:115
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
AthenaMPToolBase::waitForSignal
void waitForSignal()
Definition: AthenaMPToolBase.cxx:403
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
AthenaMPToolBase::ESRANGE_BADINPFILE
@ ESRANGE_BADINPFILE
Definition: AthenaMPToolBase.h:64
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:15
LArBadChanBlobUtils::Channel
Identifier32::value_type Channel
Definition: LArBadChanBlobUtils.h:24
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:91
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
EvtRangeProcessor::m_debug
Gaudi::Property< bool > m_debug
Definition: EvtRangeProcessor.h:66
EvtRangeProcessor::m_sharedRankQueue
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
Definition: EvtRangeProcessor.h:76
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:101
AthenaMPToolBase::m_nprocs
int m_nprocs
Number of workers spawned by the master process.
Definition: AthenaMPToolBase.h:85
AthenaMPToolBase::ESRange_Status
ESRange_Status
Definition: AthenaMPToolBase.h:58
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:70
AthenaInterprocess::ProcessGroup::getChildren
const std::vector< Process > & getChildren() const
Definition: ProcessGroup.cxx:197
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
skel.it
it
Definition: skel.GENtoEVGEN.py:407
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:333
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
AthenaMPToolBase::m_evtSelName
std::string m_evtSelName
Name of the event selector.
Definition: AthenaMPToolBase.h:89
athena.exitcode
int exitcode
Definition: athena.py:161
AthenaMPToolBase::ESRANGE_SEEKFAILED
@ ESRANGE_SEEKFAILED
Definition: AthenaMPToolBase.h:61
EvtRangeProcessor::m_inpFile
std::string m_inpFile
Cached name of the input file. To avoid reopening.
Definition: EvtRangeProcessor.h:70
EvtRangeProcessor::m_evtSeek
SmartIF< IEvtSelectorSeek > m_evtSeek
Definition: EvtRangeProcessor.h:74
EvtRangeProcessor::m_rankId
int m_rankId
Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
Definition: EvtRangeProcessor.h:68
EvtRangeProcessor::PROC_STATE_EXEC
@ PROC_STATE_EXEC
Definition: EvtRangeProcessor.h:58
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
python.utils.AtlRunQueryLookup.mask
string mask
Definition: AtlRunQueryLookup.py:459
Fd
IIoSvc::Fd Fd
Definition: IoSvc.cxx:22
sigaddset
#define sigaddset(x, y)
Definition: SealSignal.h:84
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:69
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
sigset_t
int sigset_t
Definition: SealSignal.h:80
AthenaMPToolBase::m_evtSelector
SmartIF< IEvtSelector > m_evtSelector
Definition: AthenaMPToolBase.h:98
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:95
EvtRangeProcessor::m_nProcessedEvents
std::map< pid_t, int > m_nProcessedEvents
Definition: EvtRangeProcessor.h:79
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
AthenaMPToolBase::m_isPileup
Gaudi::Property< bool > m_isPileup
Definition: AthenaMPToolBase.h:103
StdJOSetup.msgSvc
msgSvc
Provide convenience handles for various services.
Definition: StdJOSetup.py:36
lumiFormat.i
int i
Definition: lumiFormat.py:85
python.DecayParser.buf
buf
print ("=> [%s]"cmd)
Definition: DecayParser.py:27
AthenaMPToolBase::Func_Flag
Func_Flag
Definition: AthenaMPToolBase.h:67
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaMPToolBase::reopenFd
int reopenFd(int fd, const std::string &name)
Definition: AthenaMPToolBase.cxx:419
AthenaMPToolBase::AthenaMPToolBase
AthenaMPToolBase()
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
AthenaMPToolBase::evtSelector
IEvtSelector * evtSelector()
Definition: AthenaMPToolBase.h:83
test_pyathena.parent
parent
Definition: test_pyathena.py:15
AthenaMPToolBase_d::pauseForDebug
void pauseForDebug(int)
Definition: AthenaMPToolBase.cxx:28
AthenaMPToolBase::ESRANGE_SUCCESS
@ ESRANGE_SUCCESS
Definition: AthenaMPToolBase.h:59
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:322
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:48
AthenaMPToolBase::m_mpRunStop
const AthenaInterprocess::IMPRunStop * m_mpRunStop
Definition: AthenaMPToolBase.h:92
ReadFromCoolCompare.fd
fd
Definition: ReadFromCoolCompare.py:196
EvtRangeProcessor::reportError
void reportError(yampl::ISocket *socket, AthenaMPToolBase::ESRange_Status status)
Definition: EvtRangeProcessor.cxx:784
EvtRangeProcessor::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: EvtRangeProcessor.h:72
AthenaMPToolBase::ESRANGE_PROCFAILED
@ ESRANGE_PROCFAILED
Definition: AthenaMPToolBase.h:62
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:97
EvtRangeProcessor::setNewInputFile
StatusCode setNewInputFile(const std::string &newFile)
Definition: EvtRangeProcessor.cxx:750
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:269
AthenaInterprocess::Process
Definition: Process.h:17
EvtRangeProcessor::m_channel2Scatterer
Gaudi::Property< std::string > m_channel2Scatterer
Definition: EvtRangeProcessor.h:64
LArG4FSStartPointFilter.outdata
outdata
Definition: LArG4FSStartPointFilter.py:62
Cut::signal
@ signal
Definition: SUSYToolsAlg.cxx:67
grepfile.filenames
list filenames
Definition: grepfile.py:34
Trk::open
@ open
Definition: BinningType.h:40
AthenaMPToolBase::m_maxEvt
int m_maxEvt
Maximum number of events assigned to the job.
Definition: AthenaMPToolBase.h:86
EvtRangeProcessor::PROC_STATE_INIT
@ PROC_STATE_INIT
Definition: EvtRangeProcessor.h:57
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
EvtRangeProcessor::m_incidentSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
Definition: EvtRangeProcessor.h:73
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaInterprocess::FdsRegistryEntry
Definition: FdsRegistry.h:13
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:32
CaloCellTimeCorrFiller.filename
filename
Definition: CaloCellTimeCorrFiller.py:23
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
merge.status
status
Definition: merge.py:16
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
For ex. "worker__".
Definition: AthenaMPToolBase.h:88
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Top run directory for subprocesses.
Definition: AthenaMPToolBase.h:87
EvtRangeProcessor::PROC_STATE_FIN
@ PROC_STATE_FIN
Definition: EvtRangeProcessor.h:59
EvtRangeProcessor::PROC_STATE_STOP
@ PROC_STATE_STOP
Definition: EvtRangeProcessor.h:60
AthenaMPToolBase::ESRANGE_NOTFOUND
@ ESRANGE_NOTFOUND
Definition: AthenaMPToolBase.h:60
CxxUtils::xmalloc
void * xmalloc(size_t size)
Trapping version of malloc.
Definition: xmalloc.cxx:31
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:396
AthenaMPToolBase::m_fileMgr
ServiceHandle< IFileMgr > m_fileMgr
Definition: AthenaMPToolBase.h:96
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:29
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:68
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:340
sigemptyset
#define sigemptyset(x)
Definition: SealSignal.h:82
AthenaMPToolBase::ESRANGE_FILENOTMADE
@ ESRANGE_FILENOTMADE
Definition: AthenaMPToolBase.h:63
AthenaMPToolBase::m_fdsRegistry
std::shared_ptr< AthenaInterprocess::FdsRegistry > m_fdsRegistry
Definition: AthenaMPToolBase.h:100
AthenaMPToolBase_d::sig_done
std::atomic< bool > sig_done
Definition: AthenaMPToolBase.cxx:27
AthenaMPToolBase::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthenaMPToolBase.h:94