Loading [MathJax]/extensions/tex2jax.js
ATLAS Offline Software
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Public Member Functions | Static Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Private Types | Private Member Functions | Private Attributes | List of all members
SharedEvtQueueConsumer Class Referencefinalabstract

#include <SharedEvtQueueConsumer.h>

Inheritance diagram for SharedEvtQueueConsumer:
Collaboration diagram for SharedEvtQueueConsumer:

Public Member Functions

 SharedEvtQueueConsumer (const std::string &type, const std::string &name, const IInterface *parent)
 
virtual ~SharedEvtQueueConsumer () override
 
virtual StatusCode initialize () override
 
virtual StatusCode finalize () override
 
virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string &topdir) override
 
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override
 
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t &pid) override
 
virtual void reportSubprocessStatuses () override
 
virtual void subProcessLogs (std::vector< std::string > &) override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkexec_func () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWorkfin_func () override
 
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport () override
 
virtual void useFdsRegistry (std::shared_ptr< AthenaInterprocess::FdsRegistry >) override
 
virtual void setRandString (const std::string &randStr) override
 
virtual void setMaxEvt (int maxEvt) override
 
virtual void setMPRunStop (const AthenaInterprocess::IMPRunStop *runStop) override
 
virtual void killChildren () override
 
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > virtual operator() ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess std::unique_ptr< AthenaInterprocess::ScheduledWorkbootstrap_func ()=0
 
ServiceHandle< StoreGateSvc > & evtStore ()
 The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc. More...
 
const ServiceHandle< StoreGateSvc > & evtStore () const
 The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc. More...
 
const ServiceHandle< StoreGateSvc > & detStore () const
 The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc. More...
 
virtual StatusCode sysInitialize () override
 Perform system initialization for an algorithm. More...
 
virtual StatusCode sysStart () override
 Handle START transition. More...
 
virtual std::vector< Gaudi::DataHandle * > inputHandles () const override
 Return this algorithm's input handles. More...
 
virtual std::vector< Gaudi::DataHandle * > outputHandles () const override
 Return this algorithm's output handles. More...
 
Gaudi::Details::PropertyBase & declareProperty (Gaudi::Property< T > &t)
 
Gaudi::Details::PropertyBase * declareProperty (const std::string &name, SG::VarHandleKey &hndl, const std::string &doc, const SG::VarHandleKeyType &)
 Declare a new Gaudi property. More...
 
Gaudi::Details::PropertyBase * declareProperty (const std::string &name, SG::VarHandleBase &hndl, const std::string &doc, const SG::VarHandleType &)
 Declare a new Gaudi property. More...
 
Gaudi::Details::PropertyBase * declareProperty (const std::string &name, SG::VarHandleKeyArray &hndArr, const std::string &doc, const SG::VarHandleKeyArrayType &)
 
Gaudi::Details::PropertyBase * declareProperty (const std::string &name, T &property, const std::string &doc, const SG::NotHandleType &)
 Declare a new Gaudi property. More...
 
Gaudi::Details::PropertyBase * declareProperty (const std::string &name, T &property, const std::string &doc="none")
 Declare a new Gaudi property. More...
 
void updateVHKA (Gaudi::Details::PropertyBase &)
 
MsgStream & msg () const
 
MsgStream & msg (const MSG::Level lvl) const
 
bool msgLvl (const MSG::Level lvl) const
 
virtual std::unique_ptr< ScheduledWork > operator () ATLAS_NOT_THREAD_SAFE(const ScheduledWork &)=0
 

Static Public Member Functions

static const InterfaceID & interfaceID ()
 

Protected Types

enum  ESRange_Status {
  ESRANGE_SUCCESS, ESRANGE_NOTFOUND, ESRANGE_SEEKFAILED, ESRANGE_PROCFAILED,
  ESRANGE_FILENOTMADE, ESRANGE_BADINPFILE
}
 
enum  Func_Flag { FUNC_BOOTSTRAP, FUNC_EXEC, FUNC_FIN }
 

Protected Member Functions

int mapAsyncFlag ATLAS_NOT_THREAD_SAFE (Func_Flag flag, pid_t pid=0)
 
int redirectLog (const std::string &rundir, bool addTimeStamp=true)
 
int updateIoReg (const std::string &rundir)
 
std::string fmterror (int errnum)
 
int reopenFds ()
 
int handleSavedPfc (const std::filesystem::path &dest_path)
 
void waitForSignal ()
 
IEvtSelector * evtSelector ()
 
void renounceArray (SG::VarHandleKeyArray &handlesArray)
 remove all handles from I/O resolution More...
 
std::enable_if_t< std::is_void_v< std::result_of_t< decltype(&T::renounce)(T)> > &&!std::is_base_of_v< SG::VarHandleKeyArray, T > &&std::is_base_of_v< Gaudi::DataHandle, T >, void > renounce (T &h)
 
void extraDeps_update_handler (Gaudi::Details::PropertyBase &ExtraDeps)
 Add StoreName to extra input/output deps as needed. More...
 

Protected Attributes

int m_nprocs
 
int m_maxEvt
 
std::string m_subprocTopDir
 
std::string m_subprocDirPrefix
 
std::string m_evtSelName
 
AthenaInterprocess::ProcessGroupm_processGroup
 
const AthenaInterprocess::IMPRunStopm_mpRunStop {nullptr}
 
ServiceHandle< IEventProcessor > m_evtProcessor
 
ServiceHandle< IAppMgrUI > m_appMgr
 
ServiceHandle< IFileMgr > m_fileMgr
 
ServiceHandle< IIoComponentMgr > m_ioMgr
 
SmartIF< IEvtSelector > m_evtSelector
 
std::string m_fileMgrLog
 
std::shared_ptr< AthenaInterprocess::FdsRegistrym_fdsRegistry
 
std::string m_randStr
 
Gaudi::Property< bool > m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
 

Private Types

typedef System::ProcessTime::TimeValueType TimeValType
 
typedef ServiceHandle< StoreGateSvcStoreGateSvc_t
 

Private Member Functions

 SharedEvtQueueConsumer ()
 
 SharedEvtQueueConsumer (const SharedEvtQueueConsumer &)
 
SharedEvtQueueConsumeroperator= (const SharedEvtQueueConsumer &)
 
int decodeProcessResult ATLAS_NOT_THREAD_SAFE (const AthenaInterprocess::ProcessResult *presult, bool doFinalize)
 
int reopenFd (int fd, const std::string &name)
 
Gaudi::Details::PropertyBase & declareGaudiProperty (Gaudi::Property< T > &hndl, const SG::VarHandleKeyType &)
 specialization for handling Gaudi::Property<SG::VarHandleKey> More...
 
Gaudi::Details::PropertyBase & declareGaudiProperty (Gaudi::Property< T > &hndl, const SG::VarHandleKeyArrayType &)
 specialization for handling Gaudi::Property<SG::VarHandleKeyArray> More...
 
Gaudi::Details::PropertyBase & declareGaudiProperty (Gaudi::Property< T > &hndl, const SG::VarHandleType &)
 specialization for handling Gaudi::Property<SG::VarHandleBase> More...
 
Gaudi::Details::PropertyBase & declareGaudiProperty (Gaudi::Property< T > &t, const SG::NotHandleType &)
 specialization for handling everything that's not a Gaudi::Property<SG::VarHandleKey> or a <SG::VarHandleKeyArray> More...
 

Private Attributes

bool m_useSharedReader
 
bool m_useSharedWriter
 
bool m_isRoundRobin
 
int m_nEventsBeforeFork
 
int m_nSkipEvents
 
bool m_debug
 
int m_rankId
 
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
 
SmartIF< IEventSeekm_evtSeek
 
SmartIF< IEvtSelectorSeekm_evtSelSeek
 
IEvtSelector::Context * m_evtContext
 
SmartIF< IEventSharem_evtShare
 
SmartIF< IDataSharem_dataShare
 
AthenaInterprocess::SharedQueuem_sharedEventQueue
 
AthenaInterprocess::SharedQueuem_sharedRankQueue
 
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
 
std::queue< pid_tm_finQueue
 
bool m_readEventOrders
 
std::string m_eventOrdersFile
 
std::vector< int > m_eventOrders
 
pid_t m_masterPid
 
StoreGateSvc_t m_evtStore
 Pointer to StoreGate (event store by default) More...
 
StoreGateSvc_t m_detStore
 Pointer to StoreGate (detector store by default) More...
 
std::vector< SG::VarHandleKeyArray * > m_vhka
 
bool m_varHandleArraysDeclared
 

Detailed Description

Definition at line 21 of file SharedEvtQueueConsumer.h.

Member Typedef Documentation

◆ StoreGateSvc_t

typedef ServiceHandle<StoreGateSvc> AthCommonDataStore< AthCommonMsg< AlgTool > >::StoreGateSvc_t
privateinherited

Definition at line 388 of file AthCommonDataStore.h.

◆ TimeValType

typedef System::ProcessTime::TimeValueType SharedEvtQueueConsumer::TimeValType
private

Definition at line 76 of file SharedEvtQueueConsumer.h.

Member Enumeration Documentation

◆ ESRange_Status

enum AthenaMPToolBase::ESRange_Status
protectedinherited
Enumerator
ESRANGE_SUCCESS 
ESRANGE_NOTFOUND 
ESRANGE_SEEKFAILED 
ESRANGE_PROCFAILED 
ESRANGE_FILENOTMADE 
ESRANGE_BADINPFILE 

Definition at line 59 of file AthenaMPToolBase.h.

◆ Func_Flag

enum AthenaMPToolBase::Func_Flag
protectedinherited
Enumerator
FUNC_BOOTSTRAP 
FUNC_EXEC 
FUNC_FIN 

Definition at line 68 of file AthenaMPToolBase.h.

68  {
70  , FUNC_EXEC
71  , FUNC_FIN
72  };

Constructor & Destructor Documentation

◆ SharedEvtQueueConsumer() [1/3]

SharedEvtQueueConsumer::SharedEvtQueueConsumer ( const std::string &  type,
const std::string &  name,
const IInterface *  parent 
)

Definition at line 33 of file SharedEvtQueueConsumer.cxx.

37  , m_useSharedReader(false)
38  , m_useSharedWriter(false)
39  , m_isRoundRobin(false)
41  , m_nSkipEvents(0)
42  , m_debug(false)
43  , m_rankId(-1)
44  , m_chronoStatSvc("ChronoStatSvc", name)
45  , m_evtSeek(nullptr)
46  , m_evtSelSeek(nullptr)
47  , m_evtContext(nullptr)
48  , m_evtShare(nullptr)
49  , m_dataShare(nullptr)
50  , m_sharedEventQueue(nullptr)
51  , m_sharedRankQueue(nullptr)
52  , m_readEventOrders(false)
53  , m_eventOrdersFile("athenamp_eventorders.txt")
54  , m_masterPid(getpid())
55 {
56  declareInterface<IAthenaMPTool>(this);
57 
58  declareProperty("UseSharedReader",m_useSharedReader);
59  declareProperty("UseSharedWriter",m_useSharedWriter);
60  declareProperty("IsRoundRobin",m_isRoundRobin);
61  declareProperty("EventsBeforeFork",m_nEventsBeforeFork);
62  declareProperty("Debug", m_debug);
63  declareProperty("ReadEventOrders",m_readEventOrders);
64  declareProperty("EventOrdersFile",m_eventOrdersFile);
65 
66  m_subprocDirPrefix = "worker_";
67 }

◆ ~SharedEvtQueueConsumer()

SharedEvtQueueConsumer::~SharedEvtQueueConsumer ( )
overridevirtual

Definition at line 69 of file SharedEvtQueueConsumer.cxx.

70 {
71 }

◆ SharedEvtQueueConsumer() [2/3]

SharedEvtQueueConsumer::SharedEvtQueueConsumer ( )
private

◆ SharedEvtQueueConsumer() [3/3]

SharedEvtQueueConsumer::SharedEvtQueueConsumer ( const SharedEvtQueueConsumer )
private

Member Function Documentation

◆ ATLAS_NOT_THREAD_SAFE() [1/5]

virtual StatusCode exec SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( )
overridevirtual

Implements IAthenaMPTool.

◆ ATLAS_NOT_THREAD_SAFE() [2/5]

int decodeProcessResult SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( const AthenaInterprocess::ProcessResult presult,
bool  doFinalize 
)
private

◆ ATLAS_NOT_THREAD_SAFE() [3/5]

int mapAsyncFlag AthenaMPToolBase::ATLAS_NOT_THREAD_SAFE ( Func_Flag  flag,
pid_t  pid = 0 
)
protectedinherited

◆ ATLAS_NOT_THREAD_SAFE() [4/5]

virtual int makePool SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( int  maxevt,
int  nprocs,
const std::string &  topdir 
)
overridevirtual

Implements IAthenaMPTool.

◆ ATLAS_NOT_THREAD_SAFE() [5/5]

virtual StatusCode wait_once SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE ( pid_t pid)
overridevirtual

Reimplemented from AthenaMPToolBase.

◆ bootstrap_func() [1/2]

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueConsumer::bootstrap_func ( )
overridevirtual

Definition at line 285 of file SharedEvtQueueConsumer.cxx.

286 {
287  if(m_debug) waitForSignal();
288 
289  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
290  outwork->data = malloc(sizeof(int));
291  *(int*)(outwork->data) = 1; // Error code: for now use 0 success, 1 failure
292  outwork->size = sizeof(int);
293 
294  // For PileUp Digi Fork-After-N-Events >>>>
295  // Retrieve cuEvent-s for all background event selectors, if we forked after N events
296  std::map<IService*,int> bkgEvtSelectors;
297 
298  if(m_isPileup) {
299  for(IService* ptrSvc : serviceLocator()->getServices()) {
300  IEvtSelector* evtsel = dynamic_cast<IEvtSelector*>(ptrSvc);
301  if(evtsel && (evtsel != m_evtSelector)) {
302  if(m_nEventsBeforeFork>0) {
303  IEvtSelectorSeek* evtselseek = dynamic_cast<IEvtSelectorSeek*>(evtsel);
304  if(evtselseek) {
305  bkgEvtSelectors.emplace(ptrSvc,evtselseek->curEvent(*m_evtContext));
306  }
307  else {
308  ATH_MSG_ERROR("Failed to cast IEvtSelector* onto IEvtSelectorSeek* for " << (ptrSvc)->name());
309  return outwork;
310  }
311  }
312  else {
313  bkgEvtSelectors.emplace(ptrSvc,0);
314  }
315  }
316  }
317  }
318  // <<<< For PileUp Digi Fork-After-N-Events
319 
320  // ...
321  // (possible) TODO: extend outwork with some error message, which will be eventually
322  // reported in the master proces
323  // ...
324 
325  // ________________________ Get IncidentSvc and fire PostFork ________________________
326  SmartIF<IIncidentSvc> p_incidentSvc(serviceLocator()->service("IncidentSvc"));
327  if(!p_incidentSvc) {
328  ATH_MSG_ERROR("Unable to retrieve IncidentSvc");
329  return outwork;
330  }
331  p_incidentSvc->fireIncident(Incident(name(),"PostFork"));
332 
333 
334  // ________________________ Get RankID ________________________
335  //
337  ATH_MSG_ERROR("Unable to get rank ID!");
338  return outwork;
339  }
340  std::ostringstream workindex;
341  workindex<<m_rankId;
342 
343  // ________________________ Worker dir: mkdir ________________________
344  std::filesystem::path worker_rundir(m_subprocTopDir);
345  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
346  // TODO: this "worker_" can be made configurable too
347 
348  if(mkdir(worker_rundir.string().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==-1) {
349  ATH_MSG_ERROR("Unable to make worker run directory: " << worker_rundir.string() << ". " << fmterror(errno));
350  return outwork;
351  }
352 
353  // __________ Redirect logs unless we want to attach debugger ____________
354  if(!m_debug) {
355  if(redirectLog(worker_rundir.string()))
356  return outwork;
357 
358  ATH_MSG_INFO("Logs redirected in the AthenaMP event worker PID=" << getpid());
359  }
360 
361  // ________________________ Update Io Registry ____________________________
362  if(updateIoReg(worker_rundir.string()))
363  return outwork;
364 
365  ATH_MSG_INFO("Io registry updated in the AthenaMP event worker PID=" << getpid());
366 
367  // ________________________ SimParams & DigiParams & PDGTABLE.MeV ____________________________
368  std::filesystem::path abs_worker_rundir = std::filesystem::absolute(worker_rundir);
369  if(std::filesystem::is_regular_file("SimParams.db"))
370  COPY_FILE_HACK("SimParams.db", abs_worker_rundir.string()+"/SimParams.db");
371  if(std::filesystem::is_regular_file("DigitParams.db"))
372  COPY_FILE_HACK("DigitParams.db", abs_worker_rundir.string()+"/DigitParams.db");
373  if(std::filesystem::is_regular_file("PDGTABLE.MeV"))
374  COPY_FILE_HACK("PDGTABLE.MeV", abs_worker_rundir.string()+"/PDGTABLE.MeV");
375 
376  // _______________________ Handle saved PFC (if any) ______________________
377  if(handleSavedPfc(abs_worker_rundir))
378  return outwork;
379 
380  // ________________________ reopen descriptors ____________________________
381  if(reopenFds())
382  return outwork;
383 
384  ATH_MSG_INFO("File descriptors re-opened in the AthenaMP event worker PID=" << getpid());
385 
386 
387  // ________________________ Make Shared Reader/Writer Client ________________________
389  if(!m_evtShare->makeClient(m_rankId).isSuccess()) {
390  ATH_MSG_ERROR("Failed to make the event selector a share client");
391  return outwork;
392  }
393  else {
394  ATH_MSG_DEBUG("Successfully made the event selector a share client");
395  }
396  }
397 
399  SmartIF<IProperty> propertyServer(m_dataShare);
400  if (!propertyServer || propertyServer->setProperty("MakeStreamingToolClient", m_rankId + 1).isFailure()) {
401  ATH_MSG_ERROR("Could not change AthenaPoolCnvSvc MakeClient Property");
402  return outwork;
403  }
404  else {
405  ATH_MSG_DEBUG("Successfully made the conversion service a share client");
406  }
407  }
408 
409  // ________________________ I/O reinit ________________________
410  if(!m_ioMgr->io_reinitialize().isSuccess()) {
411  ATH_MSG_ERROR("Failed to reinitialize I/O");
412  return outwork;
413  }
414  else {
415  ATH_MSG_DEBUG("Successfully reinitialized I/O");
416  }
417 
418  // _______________ Get the value of SkipEvent ________________________
419  if(m_evtSelector) {
420  SmartIF<IProperty> propertyServer(m_evtSelector);
421  if(!propertyServer) {
422  ATH_MSG_ERROR("Unable to cast event selector to IProperty");
423  return outwork;
424  }
425  else {
426  std::string propertyName("SkipEvents");
427  IntegerProperty skipEventsProp(propertyName,m_nSkipEvents);
428  if(propertyServer->getProperty(&skipEventsProp).isFailure()) {
429  ATH_MSG_INFO("Event Selector does not have SkipEvents property");
430  }
431  else {
432  m_nSkipEvents = skipEventsProp.value();
433  }
434  }
435 
436 
437  // ________________________ Event selector restart ________________________
438  SmartIF<IService> evtSelSvc(m_evtSelector);
439  if(!evtSelSvc) {
440  ATH_MSG_ERROR("Failed to dyncast event selector to IService");
441  return outwork;
442  }
443  if(!evtSelSvc->start().isSuccess()) {
444  ATH_MSG_ERROR("Failed to restart the event selector");
445  return outwork;
446  }
447  else {
448  ATH_MSG_DEBUG("Successfully restarted the event selector");
449  }
450  }
451  // For PileUp jobs >>>>
452  // Main event selector: advance it if we either forked after N events, or skipEvents!=0
453  // Background event selectors: restart, and advance if we forked after N events
454  if(m_isPileup) {
455  // Deal with the main event selector first
456  m_evtSelSeek = serviceLocator()->service(m_evtSelName);
457  if(!m_evtSelSeek) {
458  ATH_MSG_ERROR("Error retrieving Event Selector with IEvtSelectorSeek interface for PileUp job");
459  return outwork;
460  }
461 
464  ATH_MSG_ERROR("Failed to seek to " << m_nEventsBeforeFork+m_nSkipEvents);
465  return outwork;
466  }
467 
468  // Deal with background event selectors
469  for(auto [evtsel,curEvt] : bkgEvtSelectors) {
470  if(evtsel->start().isSuccess()) {
471  if (m_nEventsBeforeFork>0) {
472  SmartIF<IEvtSelectorSeek> evtselseek(evtsel);
473  if(evtselseek->seek(*m_evtContext,curEvt).isFailure()) {
474  ATH_MSG_ERROR("Failed to seek to " << curEvt << " in the BKG Event Selector " << evtsel->name());
475  return outwork;
476  }
477  }
478  }
479  else {
480  ATH_MSG_ERROR("Failed to restart BKG Event Selector " << evtsel->name());
481  return outwork;
482  }
483  }
484  }
485  // <<<< For PileUp jobs
486 
487  // _______________________ Event orders for debugging ________________________________
488  if(m_readEventOrders) {
489  std::fstream fs(m_eventOrdersFile.c_str(),std::fstream::in);
490  if(fs.good()) {
491  ATH_MSG_INFO("Reading predefined event orders from " << m_eventOrdersFile);
492  while(fs.good()){
493  std::string line;
494  std::getline(fs,line);
495  if(line.empty())continue;
496 
497  // Parse the string
498  size_t idx(0);
499  int rank = std::stoi(line,&idx);
500  if(rank==m_rankId) {
501  msg(MSG::INFO) << "This worker will proces the following events #";
502  while(idx<line.size()-1) {
503  line = line.substr(idx+1);
504  int evtnum = std::stoi(line,&idx);
505  m_eventOrders.push_back(evtnum);
506  msg(MSG::INFO) << " " << evtnum;
507  }
508  msg(MSG::INFO) << endmsg;
509  }
510  }
511  if(m_eventOrders.empty()) {
512  ATH_MSG_ERROR("Could not read event orders for the rank " << m_rankId);
513  return outwork;
514  }
515  fs.close();
516  }
517  else {
518  ATH_MSG_ERROR("Unable to read predefined event orders from " << m_eventOrdersFile);
519  return outwork;
520  }
521  }
522 
523  // ________________________ Worker dir: chdir ________________________
524  if(chdir(worker_rundir.string().c_str())==-1) {
525  ATH_MSG_ERROR("Failed to chdir to " << worker_rundir.string());
526  return outwork;
527  }
528 
529  // ___________________ Fire UpdateAfterFork incident _________________
530  p_incidentSvc->fireIncident(AthenaInterprocess::UpdateAfterFork(m_rankId,getpid(),name()));
531 
532  // Declare success and return
533  *(int*)(outwork->data) = 0;
534  return outwork;
535 }

◆ bootstrap_func() [2/2]

virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> virtual operator () ATLAS_NOT_THREAD_SAFE ( const AthenaInterprocess std::unique_ptr<AthenaInterprocess::ScheduledWork> AthenaMPToolBase::bootstrap_func ( )
pure virtualinherited

◆ declareGaudiProperty() [1/4]

Gaudi::Details::PropertyBase& AthCommonDataStore< AthCommonMsg< AlgTool > >::declareGaudiProperty ( Gaudi::Property< T > &  hndl,
const SG::VarHandleKeyArrayType  
)
inlineprivateinherited

specialization for handling Gaudi::Property<SG::VarHandleKeyArray>

Definition at line 170 of file AthCommonDataStore.h.

172  {
173  return *AthCommonDataStore<PBASE>::declareProperty(hndl.name(),
174  hndl.value(),
175  hndl.documentation());
176 
177  }

◆ declareGaudiProperty() [2/4]

Gaudi::Details::PropertyBase& AthCommonDataStore< AthCommonMsg< AlgTool > >::declareGaudiProperty ( Gaudi::Property< T > &  hndl,
const SG::VarHandleKeyType  
)
inlineprivateinherited

specialization for handling Gaudi::Property<SG::VarHandleKey>

Definition at line 156 of file AthCommonDataStore.h.

158  {
159  return *AthCommonDataStore<PBASE>::declareProperty(hndl.name(),
160  hndl.value(),
161  hndl.documentation());
162 
163  }

◆ declareGaudiProperty() [3/4]

Gaudi::Details::PropertyBase& AthCommonDataStore< AthCommonMsg< AlgTool > >::declareGaudiProperty ( Gaudi::Property< T > &  hndl,
const SG::VarHandleType  
)
inlineprivateinherited

specialization for handling Gaudi::Property<SG::VarHandleBase>

Definition at line 184 of file AthCommonDataStore.h.

186  {
187  return *AthCommonDataStore<PBASE>::declareProperty(hndl.name(),
188  hndl.value(),
189  hndl.documentation());
190  }

◆ declareGaudiProperty() [4/4]

Gaudi::Details::PropertyBase& AthCommonDataStore< AthCommonMsg< AlgTool > >::declareGaudiProperty ( Gaudi::Property< T > &  t,
const SG::NotHandleType  
)
inlineprivateinherited

specialization for handling everything that's not a Gaudi::Property<SG::VarHandleKey> or a <SG::VarHandleKeyArray>

Definition at line 199 of file AthCommonDataStore.h.

200  {
201  return PBASE::declareProperty(t);
202  }

◆ declareProperty() [1/6]

Gaudi::Details::PropertyBase* AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty ( const std::string &  name,
SG::VarHandleBase hndl,
const std::string &  doc,
const SG::VarHandleType  
)
inlineinherited

Declare a new Gaudi property.

Parameters
nameName of the property.
hndlObject holding the property value.
docDocumentation string for the property.

This is the version for types that derive from SG::VarHandleBase. The property value object is put on the input and output lists as appropriate; then we forward to the base class.

Definition at line 245 of file AthCommonDataStore.h.

249  {
250  this->declare(hndl.vhKey());
251  hndl.vhKey().setOwner(this);
252 
253  return PBASE::declareProperty(name,hndl,doc);
254  }

◆ declareProperty() [2/6]

Gaudi::Details::PropertyBase* AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty ( const std::string &  name,
SG::VarHandleKey hndl,
const std::string &  doc,
const SG::VarHandleKeyType  
)
inlineinherited

Declare a new Gaudi property.

Parameters
nameName of the property.
hndlObject holding the property value.
docDocumentation string for the property.

This is the version for types that derive from SG::VarHandleKey. The property value object is put on the input and output lists as appropriate; then we forward to the base class.

Definition at line 221 of file AthCommonDataStore.h.

225  {
226  this->declare(hndl);
227  hndl.setOwner(this);
228 
229  return PBASE::declareProperty(name,hndl,doc);
230  }

◆ declareProperty() [3/6]

Gaudi::Details::PropertyBase* AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty ( const std::string &  name,
SG::VarHandleKeyArray hndArr,
const std::string &  doc,
const SG::VarHandleKeyArrayType  
)
inlineinherited

Definition at line 259 of file AthCommonDataStore.h.

263  {
264 
265  // std::ostringstream ost;
266  // ost << Algorithm::name() << " VHKA declareProp: " << name
267  // << " size: " << hndArr.keys().size()
268  // << " mode: " << hndArr.mode()
269  // << " vhka size: " << m_vhka.size()
270  // << "\n";
271  // debug() << ost.str() << endmsg;
272 
273  hndArr.setOwner(this);
274  m_vhka.push_back(&hndArr);
275 
276  Gaudi::Details::PropertyBase* p = PBASE::declareProperty(name, hndArr, doc);
277  if (p != 0) {
278  p->declareUpdateHandler(&AthCommonDataStore<PBASE>::updateVHKA, this);
279  } else {
280  ATH_MSG_ERROR("unable to call declareProperty on VarHandleKeyArray "
281  << name);
282  }
283 
284  return p;
285 
286  }

◆ declareProperty() [4/6]

Gaudi::Details::PropertyBase* AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty ( const std::string &  name,
T &  property,
const std::string &  doc,
const SG::NotHandleType  
)
inlineinherited

Declare a new Gaudi property.

Parameters
nameName of the property.
propertyObject holding the property value.
docDocumentation string for the property.

This is the generic version, for types that do not derive from SG::VarHandleKey. It just forwards to the base class version of declareProperty.

Definition at line 333 of file AthCommonDataStore.h.

337  {
338  return PBASE::declareProperty(name, property, doc);
339  }

◆ declareProperty() [5/6]

Gaudi::Details::PropertyBase* AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty ( const std::string &  name,
T &  property,
const std::string &  doc = "none" 
)
inlineinherited

Declare a new Gaudi property.

Parameters
nameName of the property.
propertyObject holding the property value.
docDocumentation string for the property.

This dispatches to either the generic declareProperty or the one for VarHandle/Key/KeyArray.

Definition at line 352 of file AthCommonDataStore.h.

355  {
356  typedef typename SG::HandleClassifier<T>::type htype;
357  return declareProperty (name, property, doc, htype());
358  }

◆ declareProperty() [6/6]

Gaudi::Details::PropertyBase& AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty ( Gaudi::Property< T > &  t)
inlineinherited

Definition at line 145 of file AthCommonDataStore.h.

145  {
146  typedef typename SG::HandleClassifier<T>::type htype;
148  }

◆ detStore()

const ServiceHandle<StoreGateSvc>& AthCommonDataStore< AthCommonMsg< AlgTool > >::detStore ( ) const
inlineinherited

The standard StoreGateSvc/DetectorStore Returns (kind of) a pointer to the StoreGateSvc.

Definition at line 95 of file AthCommonDataStore.h.

95 { return m_detStore; }

◆ evtSelector()

IEvtSelector* AthenaMPToolBase::evtSelector ( )
inlineprotectedinherited

Definition at line 84 of file AthenaMPToolBase.h.

84 { return m_evtSelector; }

◆ evtStore() [1/2]

ServiceHandle<StoreGateSvc>& AthCommonDataStore< AthCommonMsg< AlgTool > >::evtStore ( )
inlineinherited

The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc.

Definition at line 85 of file AthCommonDataStore.h.

85 { return m_evtStore; }

◆ evtStore() [2/2]

const ServiceHandle<StoreGateSvc>& AthCommonDataStore< AthCommonMsg< AlgTool > >::evtStore ( ) const
inlineinherited

The standard StoreGateSvc (event store) Returns (kind of) a pointer to the StoreGateSvc.

Definition at line 90 of file AthCommonDataStore.h.

90 { return m_evtStore; }

◆ exec_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueConsumer::exec_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 537 of file SharedEvtQueueConsumer.cxx.

538 {
539  ATH_MSG_INFO("Exec function in the AthenaMP worker PID=" << getpid());
540 
541  bool all_ok(true);
542 
543  long intmask = pow(0x100,sizeof(int))-1; // Mask for decoding event number from the value posted to the queue
544  int nEvt(m_nEventsBeforeFork);
545  int nEventsProcessed(0);
546  long evtnumAndChunk(0);
547 
548  unsigned evtCounter(0);
549  int evtnum(0), chunkSize(1);
550  auto predefinedEvt = m_eventOrders.cbegin();
551 
552  // If the event orders file already exists in worker's run directory, then it's an unexpected error!
554  if(std::filesystem::exists(ordersFile)) {
555  ATH_MSG_ERROR(m_eventOrdersFile << " already exists in the worker's run directory!");
556  all_ok = false;
557  }
558 
559  System::ProcessTime time_start = System::getProcessTime();
560  if(all_ok) {
561  std::fstream fs(m_eventOrdersFile.c_str(),std::fstream::out);
562  fs << m_rankId;
563  bool firstOrder(true);
564  while(true) {
565  if(m_isRoundRobin) {
566  evtnum = m_nSkipEvents + m_nprocs*evtCounter + m_rankId;
567  if(m_maxEvt!=-1 && evtnum>=m_maxEvt+m_nSkipEvents) {
568  break;
569  }
570  evtCounter++;
571  }
572  else {
573  if(m_readEventOrders) {
574  if(predefinedEvt==m_eventOrders.cend()) break;
575  evtnum = *predefinedEvt;
576  predefinedEvt++;
577  fs << (firstOrder?":":",") << evtnum;
578  fs.flush();
579  firstOrder=false;
580  ATH_MSG_INFO("Read event number from the orders file: " << evtnum);
581  }
582  else {
583  if(!m_sharedEventQueue->try_receive_basic<long>(evtnumAndChunk)) {
584  // The event queue is empty, but we should check whether there are more events to come or not
585  ATH_MSG_DEBUG("Event queue is empty");
586  usleep(1000);
587  continue;
588  }
589  if(evtnumAndChunk<=0) {
590  evtnumAndChunk *= -1;
591  ATH_MSG_DEBUG("No more events are expected. The total number of events for this job = " << evtnumAndChunk);
592  break;
593  }
594  ATH_MSG_DEBUG("Received value from the queue 0x" << std::hex << evtnumAndChunk << std::dec);
595  chunkSize = evtnumAndChunk >> (sizeof(int)*8);
596  evtnum = evtnumAndChunk & intmask;
597  ATH_MSG_INFO("Received from the queue: event num=" << evtnum << " chunk size=" << chunkSize);
598 
599  // Save event order
600  for(int i(0);i<chunkSize;++i) {
601  fs << (firstOrder?":":",") << evtnum+i;
602  firstOrder=false;
603  }
604  fs.flush();
605  } // Get event numbers from the shared queue
606  } // Not RoundRobin
607  nEvt+=chunkSize;
608  StatusCode sc;
609  if(m_useSharedReader) {
610  sc = m_evtShare->share(evtnum);
611  if(sc.isFailure()){
612  ATH_MSG_ERROR("Unable to share " << evtnum);
613  all_ok=false;
614  break;
615  }
616  else {
617  ATH_MSG_INFO("Share of " << evtnum << " succeeded");
618  }
619  }
620  else if(m_evtSelector) {
621  m_chronoStatSvc->chronoStart("AthenaMP_seek");
622  if (m_evtSeek) {
623  sc=m_evtSeek->seek(evtnum);
624  }
625  else {
626  sc=m_evtSelSeek->seek(*m_evtContext, evtnum);
627  }
628  if(sc.isFailure()){
629  ATH_MSG_ERROR("Unable to seek to " << evtnum);
630  all_ok=false;
631  break;
632  }
633  else {
634  ATH_MSG_INFO("Seek to " << evtnum << " succeeded");
635  }
636  m_chronoStatSvc->chronoStop("AthenaMP_seek");
637  }
638  m_chronoStatSvc->chronoStart("AthenaMP_nextEvent");
639  sc = m_evtProcessor->nextEvent(nEvt);
640  nEventsProcessed += chunkSize;
641  if(sc.isFailure()){
642  if(chunkSize==1) {
643  ATH_MSG_ERROR("Unable to process event " << evtnum);
644  }
645  else {
646  ATH_MSG_ERROR("Unable to process the chunk (" << evtnum << "," << evtnum+chunkSize-1 << ")");
647  }
648  all_ok=false;
649  break;
650  }
651  m_chronoStatSvc->chronoStop("AthenaMP_nextEvent");
652  if(m_mpRunStop->stopScheduled()) {
653  ATH_MSG_INFO("Scheduled stop");
654  break;
655  }
656  }
657  fs.close();
658  }
659  System::ProcessTime time_delta = System::getProcessTime() - time_start;
660  TimeValType elapsedTime = time_delta.elapsedTime<System::Sec>();
661 
662  if(all_ok) {
663  if(m_evtProcessor->executeRun(0).isFailure()) {
664  ATH_MSG_ERROR("Could not finalize the Run");
665  all_ok=false;
666  }
667  else if(!m_useSharedReader && m_evtSelector) {
668  StatusCode sc;
669  if (m_evtSeek) {
670  sc = m_evtSeek->seek(evtnumAndChunk+m_nSkipEvents);
671  }
672  else {
673  sc = m_evtSelSeek->seek(*m_evtContext, evtnumAndChunk+m_nSkipEvents);
674  }
675  if(sc.isFailure()) {
676  ATH_MSG_WARNING("Seek past maxevt to " << evtnumAndChunk+m_nSkipEvents << " returned failure.");
677  }
678  }
679  }
680 
681  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
682 
683  // Return value: "ERRCODE|Func_Flag|NEvt|EvtLoopTime"
684  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(elapsedTime);
685  void* outdata = malloc(outsize);
686  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
688  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
689  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEventsProcessed,sizeof(int));
690  memcpy((char*)outdata+2*sizeof(int)+sizeof(func),&elapsedTime,sizeof(elapsedTime));
691  outwork->data = outdata;
692  outwork->size = outsize;
693  // ...
694  // (possible) TODO: extend outwork with some error message, which will be eventually
695  // reported in the master proces
696  // ...
697  return outwork;
698 }

◆ extraDeps_update_handler()

void AthCommonDataStore< AthCommonMsg< AlgTool > >::extraDeps_update_handler ( Gaudi::Details::PropertyBase &  ExtraDeps)
protectedinherited

Add StoreName to extra input/output deps as needed.

use the logic of the VarHandleKey to parse the DataObjID keys supplied via the ExtraInputs and ExtraOuputs Properties to add the StoreName if it's not explicitly given

◆ fin_func()

std::unique_ptr< AthenaInterprocess::ScheduledWork > SharedEvtQueueConsumer::fin_func ( )
overridevirtual

Implements AthenaMPToolBase.

Definition at line 700 of file SharedEvtQueueConsumer.cxx.

701 {
702  ATH_MSG_INFO("Fin function in the AthenaMP worker PID=" << getpid());
703 
704  bool all_ok(true);
705 
706  if(m_appMgr->stop().isFailure()) {
707  ATH_MSG_ERROR("Unable to stop AppMgr");
708  all_ok=false;
709  }
710  else {
711  if(m_appMgr->finalize().isFailure()) {
712  std::cerr << "Unable to finalize AppMgr" << std::endl;
713  all_ok=false;
714  }
715  }
716 
717  std::unique_ptr<AthenaInterprocess::ScheduledWork> outwork(new AthenaInterprocess::ScheduledWork);
718 
719  // Return value: "ERRCODE|Func_Flag|NEvt|EvtLoopTime" (Here NEvt=-1 and EvtLoopTime=-1)
720  int outsize = 2*sizeof(int)+sizeof(AthenaMPToolBase::Func_Flag)+sizeof(TimeValType);
721  void* outdata = malloc(outsize);
722  *(int*)(outdata) = (all_ok?0:1); // Error code: for now use 0 success, 1 failure
724  memcpy((char*)outdata+sizeof(int),&func,sizeof(func));
725  int nEvt = -1;
726  memcpy((char*)outdata+sizeof(int)+sizeof(func),&nEvt,sizeof(int));
727  TimeValType elapsed = -1;
728  memcpy((char*)outdata+2*sizeof(int)+sizeof(func),&elapsed,sizeof(elapsed));
729 
730  outwork->data = outdata;
731  outwork->size = outsize;
732 
733  return outwork;
734 }

◆ finalize()

StatusCode SharedEvtQueueConsumer::finalize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 121 of file SharedEvtQueueConsumer.cxx.

122 {
123  if(getpid()==m_masterPid) {
124  ATH_MSG_INFO("finalize() in the master process");
125  // Merge saved event orders into one in the master run directory
126 
127  // 1. Check if master run directory already contains a file with saved orders
128  // If so, then rename it with random suffix
130  if(std::filesystem::exists(ordersFile)) {
131  srand((unsigned)time(0));
132  std::ostringstream randname;
133  randname << rand();
134  std::string ordersFileBak = m_eventOrdersFile+std::string("-bak-")+randname.str();
135  ATH_MSG_WARNING("File " << m_eventOrdersFile << " already exists in the master run directory!");
136  ATH_MSG_WARNING("Saving a backup with new name " << ordersFileBak);
137 
138  std::filesystem::path ordersFileBakpath(ordersFileBak);
139  std::filesystem::rename(ordersFile,ordersFileBakpath);
140  }
141 
142  // 2. Merge workers event orders into the master file
143  std::fstream fs(m_eventOrdersFile.c_str(),std::fstream::out);
144  for(int i=0; i<m_nprocs; ++i) {
145  std::ostringstream workerIndex;
146  workerIndex << i;
147  std::filesystem::path worker_rundir(m_subprocTopDir);
148  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
149  std::string ordersFileWorker(worker_rundir.string()+std::string("/")+m_eventOrdersFile);
150  ATH_MSG_INFO("Processing " << ordersFileWorker << " ...");
151  std::fstream fs_worker(ordersFileWorker.c_str(),std::fstream::in);
152  std::string line;
153  while(fs_worker.good()) {
154  std::getline(fs_worker,line);
155  fs << line << std::endl;
156  }
157  fs_worker.close();
158  }
159  fs.close();
160  } // if(getpid()==m_masterPid)
161 
162  if (m_evtContext) {
163  ATH_CHECK( m_evtSelector->releaseContext (m_evtContext) );
164  m_evtContext = nullptr;
165  }
166 
167  delete m_sharedRankQueue;
168  return StatusCode::SUCCESS;
169 }

◆ fmterror()

std::string AthenaMPToolBase::fmterror ( int  errnum)
protectedinherited

Definition at line 359 of file AthenaMPToolBase.cxx.

360 {
361  char buf[256];
362  strerror_r(errnum, buf, sizeof(buf));
363  return std::string(buf);
364 }

◆ generateOutputReport()

AthenaMP::AllWorkerOutputs_ptr AthenaMPToolBase::generateOutputReport ( )
overridevirtualinherited

Implements IAthenaMPTool.

Reimplemented in EvtRangeProcessor, EvtRangeScatterer, SharedEvtQueueProvider, and SharedWriterTool.

Definition at line 132 of file AthenaMPToolBase.cxx.

133 {
135 
136  if(m_fileMgrLog.empty()) {
137  ATH_MSG_WARNING(name() << " cannot make output report because FileMgr has not been configured to write log file!");
138  }
139  else {
140  // Collect output files made by the workers
141  std::string line;
142  for(int i=0;i<m_nprocs;++i) {
143  // Get the name of FileMgr log
144  std::ostringstream workindex;
145  workindex<<i;
147  logFilePath /= std::filesystem::path(m_subprocDirPrefix+workindex.str());
148  std::filesystem::path logFile(logFilePath);
150  if(!(std::filesystem::exists(logFile)&&std::filesystem::is_regular_file(logFile))) {
151  ATH_MSG_WARNING(logFile.string() << " either does not exist or is not a regular file. Skipping");
152  continue;
153  }
154 
155  ATH_MSG_DEBUG("FileMgr log file (" << i << ") " << logFile);
156 
157  std::ifstream inpStream(logFile.string().c_str());
158  std::set<std::string> reportedFiles; // Don't report the same file twice
159  while(!inpStream.eof()) {
160  std::getline(inpStream,line);
161  if(line.find("WRITE")!=std::string::npos) {
162  // Parse the entry
163  size_t startpos(0);
164  std::vector<std::string> entries;
165  while(startpos<line.size()) {
166  while(line[startpos]==' ')
167  startpos++;
168 
169  size_t endpos = line.find(' ',startpos);
170  if(endpos==std::string::npos) endpos = line.size();
171  entries.push_back(line.substr(startpos,endpos-startpos));
172  startpos=endpos+1;
173  }
174 
175  // enties[0] is filename
176  std::filesystem::path filenamePath(entries[0]);
177  std::filesystem::path basename = filenamePath.filename();
178  if(reportedFiles.find(basename.string())==reportedFiles.end())
179  reportedFiles.insert(basename.string());
180  else
181  continue;
182  std::filesystem::path absolutename = basename.is_absolute() ? basename : std::filesystem::absolute(std::filesystem::path(logFilePath)/=basename);
183  AthenaMP::AllWorkerOutputsIterator it1 = jobOutputs->find(basename.string());
184  if(it1==jobOutputs->end()) {
185  (*jobOutputs)[basename.string()] = AthenaMP::SingleWorkerOutputs();
186  (*jobOutputs)[basename.string()].reserve(m_nprocs);
187  }
188 
189  AthenaMP::WorkerOutput newOutput;
190  newOutput.filename = absolutename.string();
191  newOutput.technology = entries[1];
192  newOutput.description = entries[2];
193  newOutput.access_mode = entries[3];
194  newOutput.shared = (line.find("SHARED")!=std::string::npos);
195 
196  (*jobOutputs)[basename.string()].push_back(newOutput);
197  }
198  }
199  }
200  }
201  return jobOutputs;
202 }

◆ handleSavedPfc()

int AthenaMPToolBase::handleSavedPfc ( const std::filesystem::path &  dest_path)
protectedinherited

Definition at line 422 of file AthenaMPToolBase.cxx.

423 {
424  if(std::filesystem::is_regular_file("PoolFileCatalog.xml.AthenaMP-saved"))
425  COPY_FILE_HACK("PoolFileCatalog.xml.AthenaMP-saved",dest_path.string()+"/PoolFileCatalog.xml");
426  return 0;
427 }

◆ initialize()

StatusCode SharedEvtQueueConsumer::initialize ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 73 of file SharedEvtQueueConsumer.cxx.

74 {
75  ATH_MSG_DEBUG("In initialize");
76 
78 
79  // For pile-up jobs use event loop manager for seeking
80  // otherwise use event selector
81  if(m_isPileup) {
82  m_evtSeek = SmartIF<IEventSeek>(m_evtProcessor.get());
83  if(!m_evtSeek) {
84  ATH_MSG_ERROR("Unable to dyn-cast PileUpEventLoopMgr to IEventSeek");
85  return StatusCode::FAILURE;
86  }
87  }
88  else if(m_evtSelector) {
89  m_evtSelSeek = serviceLocator()->service(m_evtSelName);
90  ATH_CHECK(m_evtSelSeek.isValid());
91  }
92 
93  if(m_evtSelector) {
94  ATH_CHECK( m_evtSelector->createContext (m_evtContext) );
95 
96  m_evtShare = serviceLocator()->service(m_evtSelName);
97  if(!m_evtShare) {
98  if(m_useSharedReader) {
99  ATH_MSG_ERROR("Error retrieving IEventShare");
100  return StatusCode::FAILURE;
101  }
102  ATH_MSG_INFO("Could not retrieve IEventShare");
103  }
104 
105  //FIXME: AthenaPool dependent for now
106 
107  m_dataShare = SmartIF<IDataShare>(serviceLocator()->service("AthenaPoolCnvSvc"));
108  if(!m_dataShare) {
109  if(m_useSharedWriter) {
110  ATH_MSG_ERROR("Error retrieving AthenaPoolCnvSvc");
111  return StatusCode::FAILURE;
112  }
113  }
114  }
115 
116  ATH_CHECK(m_chronoStatSvc.retrieve());
117 
118  return StatusCode::SUCCESS;
119 }

◆ inputHandles()

virtual std::vector<Gaudi::DataHandle*> AthCommonDataStore< AthCommonMsg< AlgTool > >::inputHandles ( ) const
overridevirtualinherited

Return this algorithm's input handles.

We override this to include handle instances from key arrays if they have not yet been declared. See comments on updateVHKA.

◆ interfaceID()

static const InterfaceID& IAthenaMPTool::interfaceID ( )
inlinestaticinherited

Definition at line 40 of file IAthenaMPTool.h.

40 { return IID_IAthenaMPTool; }

◆ killChildren()

void AthenaMPToolBase::killChildren ( )
overridevirtualinherited

Implements IAthenaMPTool.

Definition at line 214 of file AthenaMPToolBase.cxx.

215 {
217  kill(child.getProcessID(),SIGKILL);
218  }
219 }

◆ msg() [1/2]

MsgStream& AthCommonMsg< AlgTool >::msg ( ) const
inlineinherited

Definition at line 24 of file AthCommonMsg.h.

24  {
25  return this->msgStream();
26  }

◆ msg() [2/2]

MsgStream& AthCommonMsg< AlgTool >::msg ( const MSG::Level  lvl) const
inlineinherited

Definition at line 27 of file AthCommonMsg.h.

27  {
28  return this->msgStream(lvl);
29  }

◆ msgLvl()

bool AthCommonMsg< AlgTool >::msgLvl ( const MSG::Level  lvl) const
inlineinherited

Definition at line 30 of file AthCommonMsg.h.

30  {
31  return this->msgLevel(lvl);
32  }

◆ operator()

virtual std::unique_ptr<ScheduledWork> AthenaInterprocess::IMessageDecoder::operator ( ) const &
pure virtualinherited

◆ operator=()

◆ outputHandles()

virtual std::vector<Gaudi::DataHandle*> AthCommonDataStore< AthCommonMsg< AlgTool > >::outputHandles ( ) const
overridevirtualinherited

Return this algorithm's output handles.

We override this to include handle instances from key arrays if they have not yet been declared. See comments on updateVHKA.

◆ redirectLog()

int AthenaMPToolBase::redirectLog ( const std::string &  rundir,
bool  addTimeStamp = true 
)
protectedinherited

Definition at line 282 of file AthenaMPToolBase.cxx.

283 {
284  // Redirect both stdout and stderr to the same file AthenaMP.log
285  int dup2result1(0), dup2result2(0);
286 
287  int newout = open(std::string(rundir+"/AthenaMP.log").c_str(),O_CREAT | O_RDWR, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
288  if(newout==-1) {
289  ATH_MSG_ERROR("Unable to open log file in the run directory. " << fmterror(errno));
290  return -1;
291  }
292  dup2result1 = dup2(newout, STDOUT_FILENO);
293  dup2result2 = dup2(newout, STDERR_FILENO);
294  TEMP_FAILURE_RETRY(close(newout));
295  if(dup2result1==-1) {
296  ATH_MSG_ERROR("Unable to redirect standard output. " << fmterror(errno));
297  return -1;
298  }
299  if(dup2result2==-1) {
300  ATH_MSG_ERROR("Unable to redirect standard error. " << fmterror(errno));
301  return -1;
302  }
303 
304  if(addTimeStamp) {
305  SmartIF<IProperty> propertyServer(msgSvc());
306  if(propertyServer==0) {
307  ATH_MSG_ERROR("Unable to cast message svc to IProperty");
308  return -1;
309  }
310 
311  std::string propertyName("Format");
312  std::string oldFormat("");
313  StringProperty formatProp(propertyName,oldFormat);
314  StatusCode sc = propertyServer->getProperty(&formatProp);
315  if(sc.isFailure()) {
316  ATH_MSG_WARNING("Message Service does not have Format property");
317  }
318  else {
319  oldFormat = formatProp.value();
320  if(oldFormat.find("%t")==std::string::npos) {
321  // Add time stamps
322  std::string newFormat("%t " + oldFormat);
323  StringProperty newFormatProp(propertyName,newFormat);
324  if(propertyServer->setProperty(newFormatProp).isFailure()) {
325  ATH_MSG_ERROR("Unable to set new Format property on the Message Service");
326  return -1;
327  }
328  }
329  else {
330  ATH_MSG_DEBUG("MsgSvc format already contains timestamps. Nothing to be done");
331  }
332  }
333  }
334 
335  return 0;
336 }

◆ renounce()

std::enable_if_t<std::is_void_v<std::result_of_t<decltype(&T::renounce)(T)> > && !std::is_base_of_v<SG::VarHandleKeyArray, T> && std::is_base_of_v<Gaudi::DataHandle, T>, void> AthCommonDataStore< AthCommonMsg< AlgTool > >::renounce ( T &  h)
inlineprotectedinherited

Definition at line 380 of file AthCommonDataStore.h.

381  {
382  h.renounce();
383  PBASE::renounce (h);
384  }

◆ renounceArray()

void AthCommonDataStore< AthCommonMsg< AlgTool > >::renounceArray ( SG::VarHandleKeyArray handlesArray)
inlineprotectedinherited

remove all handles from I/O resolution

Definition at line 364 of file AthCommonDataStore.h.

364  {
365  handlesArray.renounce();
366  }

◆ reopenFd()

int AthenaMPToolBase::reopenFd ( int  fd,
const std::string &  name 
)
privateinherited

Definition at line 445 of file AthenaMPToolBase.cxx.

446 {
447  ATH_MSG_DEBUG("Attempting to reopen descriptor for " << name);
448  int old_openflags = fcntl(fd,F_GETFL,0);
449  switch(old_openflags & O_ACCMODE) {
450  case O_RDONLY: {
451  ATH_MSG_DEBUG("The File Access Mode is RDONLY");
452  break;
453  }
454  case O_WRONLY: {
455  ATH_MSG_DEBUG("The File Access Mode is WRONLY");
456  break;
457  }
458  case O_RDWR: {
459  ATH_MSG_DEBUG("The File Access Mode is RDWR");
460  break;
461  }
462  }
463 
464  int old_descflags = fcntl(fd,F_GETFD,0);
465  off_t oldpos = lseek(fd,0,SEEK_CUR);
466  if(oldpos==-1) {
467  if(errno==ESPIPE) {
468  ATH_MSG_WARNING("Dealing with PIPE. Skipping ... (FIXME!)");
469  }
470  else {
471  ATH_MSG_ERROR("When re-opening file descriptors lseek failed on " << name << ". " << fmterror(errno));
472  return -1;
473  }
474  }
475  else {
476  Io::Fd newfd = open(name.c_str(),old_openflags);
477  if(newfd==-1) {
478  ATH_MSG_ERROR("When re-opening file descriptors unable to open " << name << " for reading. " << fmterror(errno));
479  return -1;
480  }
481  if(lseek(newfd,oldpos,SEEK_SET)==-1){
482  ATH_MSG_ERROR("When re-opening file descriptors lseek failed on the newly opened " << name << ". " << fmterror(errno));
483  TEMP_FAILURE_RETRY(close(newfd));
484  return -1;
485  }
486  TEMP_FAILURE_RETRY(close(fd));
487  if(dup2(newfd,fd)==-1) {
488  ATH_MSG_ERROR("When re-opening file descriptors unable to duplicate descriptor for " << name << ". " << fmterror(errno));
489  TEMP_FAILURE_RETRY(close(newfd));
490  return -1;
491  }
492  if(fcntl(fd,F_SETFD,old_descflags)==-1) {
493  ATH_MSG_ERROR("When re-opening file descriptors unable to set descriptor flags for " << name << ". " << fmterror(errno));
494  TEMP_FAILURE_RETRY(close(newfd));
495  return -1;
496  }
497  TEMP_FAILURE_RETRY(close(newfd));
498  }
499  return 0;
500 }

◆ reopenFds()

int AthenaMPToolBase::reopenFds ( )
protectedinherited

Definition at line 366 of file AthenaMPToolBase.cxx.

367 {
368  // Reopen file descriptors.
369  // First go over all open files, which have been registered with the FileMgr
370  // Then also check the FdsRegistry, in case it contains some files not registered with the FileMgr
371  std::set<int> fdLog;
372 
373  // Query the FileMgr contents
374  std::vector<const Io::FileAttr*> filemgrFiles;
375  std::vector<const Io::FileAttr*>::const_iterator itFile;
376  unsigned filenum = m_fileMgr->getFiles(filemgrFiles); // Get attributes for open files only. We don't care about closed ones at this point
377  if(filenum!=filemgrFiles.size())
378  ATH_MSG_WARNING("getFiles returned " << filenum << " while vector size is " << filemgrFiles.size());
379 
380  for(itFile=filemgrFiles.begin();itFile!=filemgrFiles.end();++itFile) {
381  ATH_MSG_DEBUG("* " << **itFile);
382  const std::string& filename = (**itFile).name();
383  Io::Fd fd = (**itFile).fd();
384 
385  if(fd==-1) {
386  // It is legal to have fd=-1 for remote inputs
387  // On the other hand, these inputs should not remain open after fork. The issue being tracked at ATEAM-434.
388  // So, this hopefully is a temporary patch
389  ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FileMgr. Skip FD reopening. File name: " << filename);
390  continue;
391  }
392 
393  if(reopenFd(fd,filename))
394  return -1;
395 
396  fdLog.insert(fd);
397  }
398 
399  // Check the FdsRegistry
400  for(const AthenaInterprocess::FdsRegistryEntry& regEntry : *m_fdsRegistry) {
401  if(fdLog.find(regEntry.fd)!=fdLog.end()) {
402  ATH_MSG_DEBUG("The file from FdsRegistry " << regEntry.name << " was registered with FileMgr. Skip reopening");
403  }
404  else {
405  ATH_MSG_WARNING("The file " << regEntry.name << " has not been registered with the FileMgr!");
406 
407  if(regEntry.fd==-1) {
408  // Same protection as the one above
409  ATH_MSG_WARNING("FD=-1 detected on an open file retrieved from FD Registry. Skip FD reopening. File name: " << regEntry.name);
410  continue;
411  }
412 
413  if(reopenFd(regEntry.fd,regEntry.name))
414  return -1;
415 
416  fdLog.insert(regEntry.fd);
417  }
418  }
419  return 0;
420 }

◆ reportSubprocessStatuses()

void SharedEvtQueueConsumer::reportSubprocessStatuses ( )
overridevirtual

Reimplemented from AthenaMPToolBase.

Definition at line 254 of file SharedEvtQueueConsumer.cxx.

255 {
256  ATH_MSG_INFO("Statuses of event processors");
257  const std::vector<AthenaInterprocess::ProcessStatus>& statuses = m_processGroup->getStatuses();
258  for(size_t i=0; i<statuses.size(); ++i) {
259  // Get the number of events processed by this worker
260  auto it = m_eventStat.find(statuses[i].pid);
261  msg(MSG::INFO) << "*** Process PID=" << statuses[i].pid
262  << ". Status " << ((statuses[i].exitcode)?"FAILURE":"SUCCESS")
263  << ". Number of events processed: ";
264  if(it==m_eventStat.end())
265  msg(MSG::INFO) << "N/A" << endmsg;
266  else
267  msg(MSG::INFO) << it->second.first
268  << ", Event Loop Time: " << it->second.second << "sec."
269  << endmsg;
270  }
271 }

◆ setMaxEvt()

virtual void AthenaMPToolBase::setMaxEvt ( int  maxEvt)
inlineoverridevirtualinherited

Implements IAthenaMPTool.

Definition at line 45 of file AthenaMPToolBase.h.

45 {m_maxEvt=maxEvt;}

◆ setMPRunStop()

virtual void AthenaMPToolBase::setMPRunStop ( const AthenaInterprocess::IMPRunStop runStop)
inlineoverridevirtualinherited

Implements IAthenaMPTool.

Definition at line 46 of file AthenaMPToolBase.h.

46 {m_mpRunStop=runStop;}

◆ setRandString()

void AthenaMPToolBase::setRandString ( const std::string &  randStr)
overridevirtualinherited

Implements IAthenaMPTool.

Definition at line 209 of file AthenaMPToolBase.cxx.

210 {
211  m_randStr = randStr;
212 }

◆ subProcessLogs()

void SharedEvtQueueConsumer::subProcessLogs ( std::vector< std::string > &  filenames)
overridevirtual

Implements IAthenaMPTool.

Definition at line 273 of file SharedEvtQueueConsumer.cxx.

274 {
275  filenames.clear();
276  for(int i=0; i<m_nprocs; ++i) {
277  std::ostringstream workerIndex;
278  workerIndex << i;
279  std::filesystem::path worker_rundir(m_subprocTopDir);
280  worker_rundir /= std::filesystem::path(m_subprocDirPrefix+workerIndex.str());
281  filenames.push_back(worker_rundir.string()+std::string("/AthenaMP.log"));
282  }
283 }

◆ sysInitialize()

virtual StatusCode AthCommonDataStore< AthCommonMsg< AlgTool > >::sysInitialize ( )
overridevirtualinherited

Perform system initialization for an algorithm.

We override this to declare all the elements of handle key arrays at the end of initialization. See comments on updateVHKA.

Reimplemented in DerivationFramework::CfAthAlgTool, AthCheckedComponent< AthAlgTool >, AthCheckedComponent<::AthAlgTool >, and asg::AsgMetadataTool.

◆ sysStart()

virtual StatusCode AthCommonDataStore< AthCommonMsg< AlgTool > >::sysStart ( )
overridevirtualinherited

Handle START transition.

We override this in order to make sure that conditions handle keys can cache a pointer to the conditions container.

◆ updateIoReg()

int AthenaMPToolBase::updateIoReg ( const std::string &  rundir)
protectedinherited

Definition at line 338 of file AthenaMPToolBase.cxx.

339 {
340  if (!m_ioMgr.retrieve().isSuccess()) {
341  ATH_MSG_ERROR("Error retrieving IoComponentMgr");
342  return -1;
343  } else {
344  ATH_MSG_DEBUG("Successfully retrieved IoComponentMgr");
345  }
346 
347  // update the IoRegistry for the new workdir - make sure we use absolute path
349  if(!m_ioMgr->io_update_all(abs_rundir.string()).isSuccess()) {
350  ATH_MSG_ERROR("Error updating IoRegistry");
351  return -1;
352  } else {
353  ATH_MSG_DEBUG("Successfully updated IoRegistry");
354  }
355 
356  return 0;
357 }

◆ updateVHKA()

void AthCommonDataStore< AthCommonMsg< AlgTool > >::updateVHKA ( Gaudi::Details::PropertyBase &  )
inlineinherited

Definition at line 308 of file AthCommonDataStore.h.

308  {
309  // debug() << "updateVHKA for property " << p.name() << " " << p.toString()
310  // << " size: " << m_vhka.size() << endmsg;
311  for (auto &a : m_vhka) {
312  std::vector<SG::VarHandleKey*> keys = a->keys();
313  for (auto k : keys) {
314  k->setOwner(this);
315  }
316  }
317  }

◆ useFdsRegistry()

void AthenaMPToolBase::useFdsRegistry ( std::shared_ptr< AthenaInterprocess::FdsRegistry registry)
overridevirtualinherited

Implements IAthenaMPTool.

Definition at line 204 of file AthenaMPToolBase.cxx.

205 {
207 }

◆ waitForSignal()

void AthenaMPToolBase::waitForSignal ( )
protectedinherited

Definition at line 429 of file AthenaMPToolBase.cxx.

430 {
431  ATH_MSG_INFO("Bootstrap worker PID " << getpid() << " - waiting for SIGUSR1");
432  sigset_t mask, oldmask;
433 
435 
436  sigemptyset (&mask);
437  sigaddset (&mask, SIGUSR1);
438 
439  sigprocmask (SIG_BLOCK, &mask, &oldmask);
441  sigsuspend (&oldmask);
442  sigprocmask (SIG_UNBLOCK, &mask, NULL);
443 }

Member Data Documentation

◆ m_appMgr

ServiceHandle<IAppMgrUI> AthenaMPToolBase::m_appMgr
protectedinherited

Definition at line 96 of file AthenaMPToolBase.h.

◆ m_chronoStatSvc

ServiceHandle<IChronoStatSvc> SharedEvtQueueConsumer::m_chronoStatSvc
private

Definition at line 66 of file SharedEvtQueueConsumer.h.

◆ m_dataShare

SmartIF<IDataShare> SharedEvtQueueConsumer::m_dataShare
private

Definition at line 71 of file SharedEvtQueueConsumer.h.

◆ m_debug

bool SharedEvtQueueConsumer::m_debug
private

Definition at line 62 of file SharedEvtQueueConsumer.h.

◆ m_detStore

StoreGateSvc_t AthCommonDataStore< AthCommonMsg< AlgTool > >::m_detStore
privateinherited

Pointer to StoreGate (detector store by default)

Definition at line 393 of file AthCommonDataStore.h.

◆ m_eventOrders

std::vector<int> SharedEvtQueueConsumer::m_eventOrders
private

Definition at line 83 of file SharedEvtQueueConsumer.h.

◆ m_eventOrdersFile

std::string SharedEvtQueueConsumer::m_eventOrdersFile
private

Definition at line 82 of file SharedEvtQueueConsumer.h.

◆ m_eventStat

std::map<pid_t,std::pair<int,TimeValType> > SharedEvtQueueConsumer::m_eventStat
private

Definition at line 77 of file SharedEvtQueueConsumer.h.

◆ m_evtContext

IEvtSelector::Context* SharedEvtQueueConsumer::m_evtContext
private

Definition at line 69 of file SharedEvtQueueConsumer.h.

◆ m_evtProcessor

ServiceHandle<IEventProcessor> AthenaMPToolBase::m_evtProcessor
protectedinherited

Definition at line 95 of file AthenaMPToolBase.h.

◆ m_evtSeek

SmartIF<IEventSeek> SharedEvtQueueConsumer::m_evtSeek
private

Definition at line 67 of file SharedEvtQueueConsumer.h.

◆ m_evtSelector

SmartIF<IEvtSelector> AthenaMPToolBase::m_evtSelector
protectedinherited

Definition at line 99 of file AthenaMPToolBase.h.

◆ m_evtSelName

std::string AthenaMPToolBase::m_evtSelName
protectedinherited

Definition at line 90 of file AthenaMPToolBase.h.

◆ m_evtSelSeek

SmartIF<IEvtSelectorSeek> SharedEvtQueueConsumer::m_evtSelSeek
private

Definition at line 68 of file SharedEvtQueueConsumer.h.

◆ m_evtShare

SmartIF<IEventShare> SharedEvtQueueConsumer::m_evtShare
private

Definition at line 70 of file SharedEvtQueueConsumer.h.

◆ m_evtStore

StoreGateSvc_t AthCommonDataStore< AthCommonMsg< AlgTool > >::m_evtStore
privateinherited

Pointer to StoreGate (event store by default)

Definition at line 390 of file AthCommonDataStore.h.

◆ m_fdsRegistry

std::shared_ptr<AthenaInterprocess::FdsRegistry> AthenaMPToolBase::m_fdsRegistry
protectedinherited

Definition at line 101 of file AthenaMPToolBase.h.

◆ m_fileMgr

ServiceHandle<IFileMgr> AthenaMPToolBase::m_fileMgr
protectedinherited

Definition at line 97 of file AthenaMPToolBase.h.

◆ m_fileMgrLog

std::string AthenaMPToolBase::m_fileMgrLog
protectedinherited

Definition at line 100 of file AthenaMPToolBase.h.

◆ m_finQueue

std::queue<pid_t> SharedEvtQueueConsumer::m_finQueue
private

Definition at line 78 of file SharedEvtQueueConsumer.h.

◆ m_ioMgr

ServiceHandle<IIoComponentMgr> AthenaMPToolBase::m_ioMgr
protectedinherited

Definition at line 98 of file AthenaMPToolBase.h.

◆ m_isPileup

Gaudi::Property<bool> AthenaMPToolBase::m_isPileup {this, "IsPileup", false, "Flag for configuring PileUpEventLoopMgr"}
protectedinherited

Definition at line 104 of file AthenaMPToolBase.h.

◆ m_isRoundRobin

bool SharedEvtQueueConsumer::m_isRoundRobin
private

Definition at line 59 of file SharedEvtQueueConsumer.h.

◆ m_masterPid

pid_t SharedEvtQueueConsumer::m_masterPid
private

Definition at line 84 of file SharedEvtQueueConsumer.h.

◆ m_maxEvt

int AthenaMPToolBase::m_maxEvt
protectedinherited

Definition at line 87 of file AthenaMPToolBase.h.

◆ m_mpRunStop

const AthenaInterprocess::IMPRunStop* AthenaMPToolBase::m_mpRunStop {nullptr}
protectedinherited

Definition at line 93 of file AthenaMPToolBase.h.

◆ m_nEventsBeforeFork

int SharedEvtQueueConsumer::m_nEventsBeforeFork
private

Definition at line 60 of file SharedEvtQueueConsumer.h.

◆ m_nprocs

int AthenaMPToolBase::m_nprocs
protectedinherited

Definition at line 86 of file AthenaMPToolBase.h.

◆ m_nSkipEvents

int SharedEvtQueueConsumer::m_nSkipEvents
private

Definition at line 61 of file SharedEvtQueueConsumer.h.

◆ m_processGroup

AthenaInterprocess::ProcessGroup* AthenaMPToolBase::m_processGroup
protectedinherited

Definition at line 92 of file AthenaMPToolBase.h.

◆ m_randStr

std::string AthenaMPToolBase::m_randStr
protectedinherited

Definition at line 102 of file AthenaMPToolBase.h.

◆ m_rankId

int SharedEvtQueueConsumer::m_rankId
private

Definition at line 64 of file SharedEvtQueueConsumer.h.

◆ m_readEventOrders

bool SharedEvtQueueConsumer::m_readEventOrders
private

Definition at line 81 of file SharedEvtQueueConsumer.h.

◆ m_sharedEventQueue

AthenaInterprocess::SharedQueue* SharedEvtQueueConsumer::m_sharedEventQueue
private

Definition at line 73 of file SharedEvtQueueConsumer.h.

◆ m_sharedRankQueue

AthenaInterprocess::SharedQueue* SharedEvtQueueConsumer::m_sharedRankQueue
private

Definition at line 74 of file SharedEvtQueueConsumer.h.

◆ m_subprocDirPrefix

std::string AthenaMPToolBase::m_subprocDirPrefix
protectedinherited

Definition at line 89 of file AthenaMPToolBase.h.

◆ m_subprocTopDir

std::string AthenaMPToolBase::m_subprocTopDir
protectedinherited

Definition at line 88 of file AthenaMPToolBase.h.

◆ m_useSharedReader

bool SharedEvtQueueConsumer::m_useSharedReader
private

Definition at line 57 of file SharedEvtQueueConsumer.h.

◆ m_useSharedWriter

bool SharedEvtQueueConsumer::m_useSharedWriter
private

Definition at line 58 of file SharedEvtQueueConsumer.h.

◆ m_varHandleArraysDeclared

bool AthCommonDataStore< AthCommonMsg< AlgTool > >::m_varHandleArraysDeclared
privateinherited

Definition at line 399 of file AthCommonDataStore.h.

◆ m_vhka

std::vector<SG::VarHandleKeyArray*> AthCommonDataStore< AthCommonMsg< AlgTool > >::m_vhka
privateinherited

Definition at line 398 of file AthCommonDataStore.h.


The documentation for this class was generated from the following files:
SharedEvtQueueConsumer::m_nSkipEvents
int m_nSkipEvents
Definition: SharedEvtQueueConsumer.h:61
python.Dso.registry
registry
Definition: Control/AthenaServices/python/Dso.py:159
python.DQPostProcessMod.rundir
def rundir(fname)
Definition: DQPostProcessMod.py:116
AthenaInterprocess::ProcessGroup::getStatuses
const std::vector< ProcessStatus > & getStatuses() const
Definition: ProcessGroup.cxx:204
AthenaMPToolBase::waitForSignal
void waitForSignal()
Definition: AthenaMPToolBase.cxx:429
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
AthenaMPToolBase::ESRANGE_BADINPFILE
@ ESRANGE_BADINPFILE
Definition: AthenaMPToolBase.h:65
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
plot_material.mkdir
def mkdir(path, recursive=True)
Definition: plot_material.py:16
collListGuids.line
string line
Definition: collListGuids.py:77
AthenaMP::WorkerOutput::description
std::string description
Definition: IAthenaMPTool.h:24
AthenaMPToolBase::m_processGroup
AthenaInterprocess::ProcessGroup * m_processGroup
Definition: AthenaMPToolBase.h:92
AthCommonDataStore< AthCommonMsg< AlgTool > >::declareProperty
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T > &t)
Definition: AthCommonDataStore.h:145
AthenaInterprocess::ScheduledWork
Definition: IMessageDecoder.h:12
AthenaInterprocess::UpdateAfterFork
Definition: Incidents.h:22
SharedEvtQueueConsumer::m_evtSeek
SmartIF< IEventSeek > m_evtSeek
Definition: SharedEvtQueueConsumer.h:67
SharedEvtQueueConsumer::m_evtSelSeek
SmartIF< IEvtSelectorSeek > m_evtSelSeek
Definition: SharedEvtQueueConsumer.h:68
AthenaMPToolBase::m_randStr
std::string m_randStr
Definition: AthenaMPToolBase.h:102
AthenaMPToolBase::m_nprocs
int m_nprocs
Definition: AthenaMPToolBase.h:86
AthenaMPToolBase::FUNC_FIN
@ FUNC_FIN
Definition: AthenaMPToolBase.h:71
AthenaInterprocess::ProcessGroup::getChildren
const std::vector< Process > & getChildren() const
Definition: ProcessGroup.cxx:197
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
skel.it
it
Definition: skel.GENtoEVGEN.py:407
AthenaMPToolBase::fmterror
std::string fmterror(int errnum)
Definition: AthenaMPToolBase.cxx:359
python.AthDsoLogger.out
out
Definition: AthDsoLogger.py:71
AthCommonDataStore< AthCommonMsg< AlgTool > >::m_evtStore
StoreGateSvc_t m_evtStore
Pointer to StoreGate (event store by default)
Definition: AthCommonDataStore.h:390
AthCommonDataStore< AthCommonMsg< AlgTool > >::m_vhka
std::vector< SG::VarHandleKeyArray * > m_vhka
Definition: AthCommonDataStore.h:398
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
AthenaMPToolBase::m_evtSelName
std::string m_evtSelName
Definition: AthenaMPToolBase.h:90
AthenaInterprocess::SharedQueue::try_receive_basic
bool try_receive_basic(T &)
Definition: SharedQueue.h:119
read_hist_ntuple.t
t
Definition: read_hist_ntuple.py:5
AthenaMPToolBase::ESRANGE_SEEKFAILED
@ ESRANGE_SEEKFAILED
Definition: AthenaMPToolBase.h:62
AthenaMPToolBase::m_fileMgrLog
std::string m_fileMgrLog
Definition: AthenaMPToolBase.h:100
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
SharedEvtQueueConsumer::m_eventStat
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
Definition: SharedEvtQueueConsumer.h:77
python.utils.AtlRunQueryLookup.mask
string mask
Definition: AtlRunQueryLookup.py:460
AthenaMP::AllWorkerOutputsIterator
AllWorkerOutputs::iterator AllWorkerOutputsIterator
Definition: IAthenaMPTool.h:30
AthenaMP::SingleWorkerOutputs
std::vector< WorkerOutput > SingleWorkerOutputs
Definition: IAthenaMPTool.h:28
Fd
IIoSvc::Fd Fd
Definition: IoSvc.cxx:22
sigaddset
#define sigaddset(x, y)
Definition: SealSignal.h:84
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaMPToolBase::FUNC_EXEC
@ FUNC_EXEC
Definition: AthenaMPToolBase.h:70
SG::VarHandleKeyArray::setOwner
virtual void setOwner(IDataHandleHolder *o)=0
COPY_FILE_HACK
#define COPY_FILE_HACK(_src, _dest)
Definition: copy_file_icc_hack.h:15
SharedEvtQueueConsumer::m_rankId
int m_rankId
Definition: SharedEvtQueueConsumer.h:64
IDTPMcnv.htype
htype
Definition: IDTPMcnv.py:29
sigset_t
int sigset_t
Definition: SealSignal.h:80
IEvtSelectorSeek::seek
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
AthenaMPToolBase::m_evtSelector
SmartIF< IEvtSelector > m_evtSelector
Definition: AthenaMPToolBase.h:99
python.utils.AtlRunQueryDQUtils.p
p
Definition: AtlRunQueryDQUtils.py:210
AthCommonDataStore
Definition: AthCommonDataStore.h:52
AthenaMPToolBase::m_appMgr
ServiceHandle< IAppMgrUI > m_appMgr
Definition: AthenaMPToolBase.h:96
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
IEvtSelectorSeek::curEvent
virtual int curEvent(const IEvtSelector::Context &c) const =0
return the current event number.
LArG4FSStartPointFilter.rand
rand
Definition: LArG4FSStartPointFilter.py:80
AthenaMPToolBase::m_isPileup
Gaudi::Property< bool > m_isPileup
Definition: AthenaMPToolBase.h:104
StdJOSetup.msgSvc
msgSvc
Provide convenience handles for various services.
Definition: StdJOSetup.py:36
lumiFormat.i
int i
Definition: lumiFormat.py:85
SharedEvtQueueConsumer::m_dataShare
SmartIF< IDataShare > m_dataShare
Definition: SharedEvtQueueConsumer.h:71
python.DecayParser.buf
buf
print ("=> [%s]"cmd)
Definition: DecayParser.py:27
DetDescrDictionaryDict::it1
std::vector< HWIdentifier >::iterator it1
Definition: DetDescrDictionaryDict.h:17
AthenaMPToolBase::Func_Flag
Func_Flag
Definition: AthenaMPToolBase.h:68
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
python.LArMinBiasAlgConfig.int
int
Definition: LArMinBiasAlgConfig.py:59
AthenaMPToolBase::reopenFd
int reopenFd(int fd, const std::string &name)
Definition: AthenaMPToolBase.cxx:445
AthenaMPToolBase::AthenaMPToolBase
AthenaMPToolBase()
AthenaMP::WorkerOutput::shared
bool shared
Definition: IAthenaMPTool.h:26
SharedEvtQueueConsumer::m_eventOrders
std::vector< int > m_eventOrders
Definition: SharedEvtQueueConsumer.h:83
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
test_pyathena.parent
parent
Definition: test_pyathena.py:15
AthenaMPToolBase_d::pauseForDebug
void pauseForDebug(int)
Definition: AthenaMPToolBase.cxx:27
AthenaMPToolBase::ESRANGE_SUCCESS
@ ESRANGE_SUCCESS
Definition: AthenaMPToolBase.h:60
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
SharedEvtQueueConsumer::m_isRoundRobin
bool m_isRoundRobin
Definition: SharedEvtQueueConsumer.h:59
AthenaMP::WorkerOutput::access_mode
std::string access_mode
Definition: IAthenaMPTool.h:25
TrigInDetValidation_Base.malloc
malloc
Definition: TrigInDetValidation_Base.py:132
SharedEvtQueueConsumer::m_useSharedWriter
bool m_useSharedWriter
Definition: SharedEvtQueueConsumer.h:58
AthCommonDataStore< AthCommonMsg< AlgTool > >::m_detStore
StoreGateSvc_t m_detStore
Pointer to StoreGate (detector store by default)
Definition: AthCommonDataStore.h:393
AthenaMPToolBase::updateIoReg
int updateIoReg(const std::string &rundir)
Definition: AthenaMPToolBase.cxx:338
AthenaMPToolBase::initialize
virtual StatusCode initialize() override
Definition: AthenaMPToolBase.cxx:56
AthenaInterprocess::SharedQueue::receive_basic
bool receive_basic(T &)
Definition: SharedQueue.h:124
SharedEvtQueueConsumer::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: SharedEvtQueueConsumer.h:66
AthenaMPToolBase::m_mpRunStop
const AthenaInterprocess::IMPRunStop * m_mpRunStop
Definition: AthenaMPToolBase.h:93
SharedEvtQueueConsumer::m_useSharedReader
bool m_useSharedReader
Definition: SharedEvtQueueConsumer.h:57
SharedEvtQueueConsumer::m_readEventOrders
bool m_readEventOrders
Definition: SharedEvtQueueConsumer.h:81
SG::VarHandleKeyArray::renounce
virtual void renounce()=0
SG::HandleClassifier::type
std::conditional< std::is_base_of< SG::VarHandleKeyArray, T >::value, VarHandleKeyArrayType, type2 >::type type
Definition: HandleClassifier.h:54
SharedEvtQueueConsumer::m_evtShare
SmartIF< IEventShare > m_evtShare
Definition: SharedEvtQueueConsumer.h:70
ReadFromCoolCompare.fd
fd
Definition: ReadFromCoolCompare.py:196
SharedEvtQueueConsumer::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedEvtQueueConsumer.h:73
merge_scale_histograms.doc
string doc
Definition: merge_scale_histograms.py:9
AthenaInterprocess::IMPRunStop::stopScheduled
virtual bool stopScheduled() const =0
AthenaMPToolBase::ESRANGE_PROCFAILED
@ ESRANGE_PROCFAILED
Definition: AthenaMPToolBase.h:63
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AthenaMPToolBase::m_ioMgr
ServiceHandle< IIoComponentMgr > m_ioMgr
Definition: AthenaMPToolBase.h:98
SharedEvtQueueConsumer::m_nEventsBeforeFork
int m_nEventsBeforeFork
Definition: SharedEvtQueueConsumer.h:60
AthenaMPToolBase::redirectLog
int redirectLog(const std::string &rundir, bool addTimeStamp=true)
Definition: AthenaMPToolBase.cxx:282
AthenaInterprocess::Process
Definition: Process.h:17
LArG4FSStartPointFilter.outdata
outdata
Definition: LArG4FSStartPointFilter.py:62
Cut::signal
@ signal
Definition: SUSYToolsAlg.cxx:67
SharedEvtQueueConsumer::m_masterPid
pid_t m_masterPid
Definition: SharedEvtQueueConsumer.h:84
grepfile.filenames
list filenames
Definition: grepfile.py:34
Trk::open
@ open
Definition: BinningType.h:40
AthenaMPToolBase::m_maxEvt
int m_maxEvt
Definition: AthenaMPToolBase.h:87
a
TList * a
Definition: liststreamerinfos.cxx:10
h
CaloSwCorrections.time
def time(flags, cells_name, *args, **kw)
Definition: CaloSwCorrections.py:242
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaMP::WorkerOutput::technology
std::string technology
Definition: IAthenaMPTool.h:23
AthenaInterprocess::FdsRegistryEntry
Definition: FdsRegistry.h:13
SharedEvtQueueConsumer::m_evtContext
IEvtSelector::Context * m_evtContext
Definition: SharedEvtQueueConsumer.h:69
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:32
AthenaMP::WorkerOutput
Definition: IAthenaMPTool.h:21
AthCommonMsg< AlgTool >::msg
MsgStream & msg() const
Definition: AthCommonMsg.h:24
Herwig7_QED_EvtGen_ll.fs
dictionary fs
Definition: Herwig7_QED_EvtGen_ll.py:17
LArNewCalib_DelayDump_OFC_Cali.idx
idx
Definition: LArNewCalib_DelayDump_OFC_Cali.py:69
CaloCellTimeCorrFiller.filename
filename
Definition: CaloCellTimeCorrFiller.py:24
SG::VarHandleBase::vhKey
SG::VarHandleKey & vhKey()
Return a non-const reference to the HandleKey.
Definition: StoreGate/src/VarHandleBase.cxx:629
entries
double entries
Definition: listroot.cxx:49
python.Bindings.keys
keys
Definition: Control/AthenaPython/python/Bindings.py:798
AthenaMPToolBase::m_subprocDirPrefix
std::string m_subprocDirPrefix
Definition: AthenaMPToolBase.h:89
AthenaMPToolBase::m_subprocTopDir
std::string m_subprocTopDir
Definition: AthenaMPToolBase.h:88
python.dummyaccess.exists
def exists(filename)
Definition: dummyaccess.py:9
AthenaMPToolBase::ESRANGE_NOTFOUND
@ ESRANGE_NOTFOUND
Definition: AthenaMPToolBase.h:61
pow
constexpr int pow(int base, int exp) noexcept
Definition: ap_fixedTest.cxx:15
jetMakeRefSamples.logFile
string logFile
Definition: jetMakeRefSamples.py:57
AthenaMPToolBase::handleSavedPfc
int handleSavedPfc(const std::filesystem::path &dest_path)
Definition: AthenaMPToolBase.cxx:422
AthenaMPToolBase::m_fileMgr
ServiceHandle< IFileMgr > m_fileMgr
Definition: AthenaMPToolBase.h:97
AthenaMP::AllWorkerOutputs
std::map< std::string, SingleWorkerOutputs > AllWorkerOutputs
Definition: IAthenaMPTool.h:29
AthCommonDataStore::declareGaudiProperty
Gaudi::Details::PropertyBase & declareGaudiProperty(Gaudi::Property< T > &hndl, const SG::VarHandleKeyType &)
specialization for handling Gaudi::Property<SG::VarHandleKey>
Definition: AthCommonDataStore.h:156
AthenaMPToolBase::FUNC_BOOTSTRAP
@ FUNC_BOOTSTRAP
Definition: AthenaMPToolBase.h:69
SharedEvtQueueConsumer::m_sharedRankQueue
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Definition: SharedEvtQueueConsumer.h:74
AthenaMPToolBase::reopenFds
int reopenFds()
Definition: AthenaMPToolBase.cxx:366
IEvtSelectorSeek
Abstract interface for seeking for an event selector.
Definition: IEvtSelectorSeek.h:28
sigemptyset
#define sigemptyset(x)
Definition: SealSignal.h:82
AthenaMPToolBase::ESRANGE_FILENOTMADE
@ ESRANGE_FILENOTMADE
Definition: AthenaMPToolBase.h:64
AthenaMPToolBase::m_fdsRegistry
std::shared_ptr< AthenaInterprocess::FdsRegistry > m_fdsRegistry
Definition: AthenaMPToolBase.h:101
AthenaMP::WorkerOutput::filename
std::string filename
Definition: IAthenaMPTool.h:22
SharedEvtQueueConsumer::TimeValType
System::ProcessTime::TimeValueType TimeValType
Definition: SharedEvtQueueConsumer.h:76
SharedEvtQueueConsumer::m_debug
bool m_debug
Definition: SharedEvtQueueConsumer.h:62
fitman.k
k
Definition: fitman.py:528
AthenaMPToolBase_d::sig_done
std::atomic< bool > sig_done
Definition: AthenaMPToolBase.cxx:26
AthenaMPToolBase::m_evtProcessor
ServiceHandle< IEventProcessor > m_evtProcessor
Definition: AthenaMPToolBase.h:95
SharedEvtQueueConsumer::m_eventOrdersFile
std::string m_eventOrdersFile
Definition: SharedEvtQueueConsumer.h:82
beamspotman.basename
basename
Definition: beamspotman.py:640