ATLAS Offline Software
SharedEvtQueueConsumer.h
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #ifndef ATHENAMPTOOLS_SHAREDEVTQUEUECONSUMER_H
6 #define ATHENAMPTOOLS_SHAREDEVTQUEUECONSUMER_H
7 
8 #include "AthenaMPToolBase.h"
9 
11 #include "GaudiKernel/Timing.h"
12 #include "GaudiKernel/IEvtSelector.h"
13 #include <queue>
14 
15 class IEventSeek;
16 class IEvtSelectorSeek;
17 class IEventShare;
18 class IDataShare;
19 class IChronoStatSvc;
20 
22 {
23  public:
24  SharedEvtQueueConsumer(const std::string& type
25  , const std::string& name
26  , const IInterface* parent);
27 
28  virtual ~SharedEvtQueueConsumer() override;
29 
30  virtual StatusCode initialize() override;
31  virtual StatusCode finalize() override;
32 
33  // _________IAthenaMPTool_________
34  virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string& topdir) override;
35  virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override;
36  virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t& pid) override;
37 
38  virtual void reportSubprocessStatuses() override;
39  virtual void subProcessLogs(std::vector<std::string>&) override;
40 
41  // _____ Actual working horses ________
42  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> bootstrap_func() override;
43  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> exec_func() override;
44  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> fin_func() override;
45 
46  private:
50 
51  // Decode process results
52  // 1. Store number of processed events for FUNC_EXEC
53  // 2. If doFinalize flag is set then serialize process finalizations
54  int decodeProcessResult ATLAS_NOT_THREAD_SAFE (const AthenaInterprocess::ProcessResult* presult, bool doFinalize);
55 
56  // Properties
57  bool m_useSharedReader; // Work in pair with a SharedReader
58  bool m_useSharedWriter; // Work in pair with a SharedWriter
59  bool m_isRoundRobin; // Are we running in the "reproducible mode"?
62  bool m_debug;
63 
64  int m_rankId; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
65 
67  SmartIF<IEventSeek> m_evtSeek;
68  SmartIF<IEvtSelectorSeek> m_evtSelSeek;
69  IEvtSelector::Context* m_evtContext;
70  SmartIF<IEventShare> m_evtShare;
71  SmartIF<IDataShare> m_dataShare;
72 
75 
76  typedef System::ProcessTime::TimeValueType TimeValType;
77  std::map<pid_t,std::pair<int,TimeValType>> m_eventStat; // Number of processed events by PID
78  std::queue<pid_t> m_finQueue; // PIDs of processes queued for finalization
79 
80  // "Persistent" event orders for reproducibility
82  std::string m_eventOrdersFile;
83  std::vector<int> m_eventOrders;
84  pid_t m_masterPid; // In finalize() of the master process merge workers' saved orders into one
85 };
86 
87 #endif
SharedEvtQueueConsumer::m_nSkipEvents
int m_nSkipEvents
Definition: SharedEvtQueueConsumer.h:61
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE(pid_t &pid) override
SharedEvtQueueConsumer::m_finQueue
std::queue< pid_t > m_finQueue
Definition: SharedEvtQueueConsumer.h:78
SharedEvtQueueConsumer::initialize
virtual StatusCode initialize() override
Definition: SharedEvtQueueConsumer.cxx:73
SharedEvtQueueConsumer::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedEvtQueueConsumer.cxx:707
SharedEvtQueueConsumer::m_evtSeek
SmartIF< IEventSeek > m_evtSeek
Definition: SharedEvtQueueConsumer.h:67
SharedEvtQueueConsumer::m_evtSelSeek
SmartIF< IEvtSelectorSeek > m_evtSelSeek
Definition: SharedEvtQueueConsumer.h:68
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE
virtual int makePool ATLAS_NOT_THREAD_SAFE(int maxevt, int nprocs, const std::string &topdir) override
SharedEvtQueueConsumer::operator=
SharedEvtQueueConsumer & operator=(const SharedEvtQueueConsumer &)
SharedEvtQueueConsumer::m_eventStat
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
Definition: SharedEvtQueueConsumer.h:77
SharedEvtQueueConsumer::m_rankId
int m_rankId
Definition: SharedEvtQueueConsumer.h:64
SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE() override
IDataShare
Abstract interface for sharing data.
Definition: IDataShare.h:24
SharedEvtQueueConsumer::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedEvtQueueConsumer.cxx:273
SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE
int decodeProcessResult ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess::ProcessResult *presult, bool doFinalize)
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
IEventShare
Abstract interface for sharing within an event stream.
Definition: IEventShare.h:25
SharedEvtQueueConsumer::SharedEvtQueueConsumer
SharedEvtQueueConsumer(const SharedEvtQueueConsumer &)
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
SharedEvtQueueConsumer::m_dataShare
SmartIF< IDataShare > m_dataShare
Definition: SharedEvtQueueConsumer.h:71
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
SharedEvtQueueConsumer::m_eventOrders
std::vector< int > m_eventOrders
Definition: SharedEvtQueueConsumer.h:83
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
test_pyathena.parent
parent
Definition: test_pyathena.py:15
SharedEvtQueueConsumer::m_isRoundRobin
bool m_isRoundRobin
Definition: SharedEvtQueueConsumer.h:59
SharedEvtQueueConsumer::m_useSharedWriter
bool m_useSharedWriter
Definition: SharedEvtQueueConsumer.h:58
SharedEvtQueueConsumer
Definition: SharedEvtQueueConsumer.h:22
SharedEvtQueueConsumer::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: SharedEvtQueueConsumer.h:66
SharedEvtQueueConsumer::m_useSharedReader
bool m_useSharedReader
Definition: SharedEvtQueueConsumer.h:57
SharedEvtQueueConsumer::m_readEventOrders
bool m_readEventOrders
Definition: SharedEvtQueueConsumer.h:81
SharedEvtQueueConsumer::finalize
virtual StatusCode finalize() override
Definition: SharedEvtQueueConsumer.cxx:121
SharedEvtQueueConsumer::m_evtShare
SmartIF< IEventShare > m_evtShare
Definition: SharedEvtQueueConsumer.h:70
AthenaMPToolBase.h
SharedEvtQueueConsumer::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedEvtQueueConsumer.h:73
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
SharedEvtQueueConsumer::m_nEventsBeforeFork
int m_nEventsBeforeFork
Definition: SharedEvtQueueConsumer.h:60
SharedEvtQueueConsumer::m_masterPid
pid_t m_masterPid
Definition: SharedEvtQueueConsumer.h:84
SharedEvtQueueConsumer::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedEvtQueueConsumer.cxx:285
SharedEvtQueueConsumer::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedEvtQueueConsumer.cxx:537
SharedEvtQueueConsumer::reportSubprocessStatuses
virtual void reportSubprocessStatuses() override
Definition: SharedEvtQueueConsumer.cxx:254
SharedEvtQueueConsumer::m_evtContext
IEvtSelector::Context * m_evtContext
Definition: SharedEvtQueueConsumer.h:69
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
SharedQueue.h
IEventSeek
Abstract interface for seeking within an event stream.
Definition: IEventSeek.h:27
SharedEvtQueueConsumer::~SharedEvtQueueConsumer
virtual ~SharedEvtQueueConsumer() override
Definition: SharedEvtQueueConsumer.cxx:69
AthenaInterprocess::ProcessResult
Definition: ProcessGroup.h:22
SharedEvtQueueConsumer::SharedEvtQueueConsumer
SharedEvtQueueConsumer()
SharedEvtQueueConsumer::m_sharedRankQueue
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Definition: SharedEvtQueueConsumer.h:74
IEvtSelectorSeek
Abstract interface for seeking for an event selector.
Definition: IEvtSelectorSeek.h:28
SharedEvtQueueConsumer::TimeValType
System::ProcessTime::TimeValueType TimeValType
Definition: SharedEvtQueueConsumer.h:76
SharedEvtQueueConsumer::m_debug
bool m_debug
Definition: SharedEvtQueueConsumer.h:62
ServiceHandle< IChronoStatSvc >
SharedEvtQueueConsumer::m_eventOrdersFile
std::string m_eventOrdersFile
Definition: SharedEvtQueueConsumer.h:82