ATLAS Offline Software
SharedEvtQueueConsumer.h
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #ifndef ATHENAMPTOOLS_SHAREDEVTQUEUECONSUMER_H
6 #define ATHENAMPTOOLS_SHAREDEVTQUEUECONSUMER_H
7 
8 #include "AthenaMPToolBase.h"
9 
11 #include "GaudiKernel/Timing.h"
12 #include "GaudiKernel/IEvtSelector.h"
13 
14 #include <memory>
15 #include <queue>
16 
17 class IEventSeek;
18 class IEvtSelectorSeek;
19 class IEventShare;
20 class IDataShare;
21 class IChronoStatSvc;
22 
24 {
25  public:
26  SharedEvtQueueConsumer(const std::string& type
27  , const std::string& name
28  , const IInterface* parent);
29 
30  virtual ~SharedEvtQueueConsumer() override;
31 
32  virtual StatusCode initialize() override;
33  virtual StatusCode finalize() override;
34 
35  // _________IAthenaMPTool_________
36  virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string& topdir) override;
37  virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override;
38  virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t& pid) override;
39 
40  virtual void reportSubprocessStatuses() override;
41  virtual void subProcessLogs(std::vector<std::string>&) override;
42 
43  // _____ Actual working horses ________
44  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> bootstrap_func() override;
45  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> exec_func() override;
46  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> fin_func() override;
47 
48  private:
49 
50  // Decode process results
51  // 1. Store number of processed events for FUNC_EXEC
52  // 2. If doFinalize flag is set then serialize process finalizations
53  int decodeProcessResult ATLAS_NOT_THREAD_SAFE (const AthenaInterprocess::ProcessResult* presult, bool doFinalize);
54 
55  Gaudi::Property<bool> m_useSharedReader{this, "UseSharedReader", false, "Work in pair with a SharedReader"};
56  Gaudi::Property<bool> m_useSharedWriter{this, "UseSharedWriter", false, "Work in pair with a SharedWriter"};
57  Gaudi::Property<bool> m_isRoundRobin{this, "IsRoundRobin", false, "Are we running in the 'reproducible mode'?"};
58  Gaudi::Property<bool> m_debug{this, "Debug", false};
59  Gaudi::Property<bool> m_readEventOrders{this, "ReadEventOrders", false};
60  Gaudi::Property<int> m_nEventsBeforeFork{this, "EventsBeforeFork", 0};
61  Gaudi::Property<std::string> m_eventOrdersFile{this, "EventOrdersFile", "athenamp_eventorders.txt"};
62 
63  int m_rankId{-1}; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
64  int m_nSkipEvents{0};
65 
67  SmartIF<IEventSeek> m_evtSeek;
68  SmartIF<IEvtSelectorSeek> m_evtSelSeek;
69  IEvtSelector::Context* m_evtContext{nullptr};
70  SmartIF<IEventShare> m_evtShare;
71  SmartIF<IDataShare> m_dataShare;
72 
74  std::unique_ptr<AthenaInterprocess::SharedQueue> m_sharedRankQueue;
75 
76  typedef System::ProcessTime::TimeValueType TimeValType;
77  std::map<pid_t,std::pair<int,TimeValType>> m_eventStat; // Number of processed events by PID
78  std::queue<pid_t> m_finQueue; // PIDs of processes queued for finalization
79 
80  // "Persistent" event orders for reproducibility
81  std::vector<int> m_eventOrders;
82  pid_t m_masterPid; // In finalize() of the master process merge workers' saved orders into one
83 };
84 
85 #endif
SharedEvtQueueConsumer::m_nSkipEvents
int m_nSkipEvents
Definition: SharedEvtQueueConsumer.h:64
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE(pid_t &pid) override
SharedEvtQueueConsumer::m_isRoundRobin
Gaudi::Property< bool > m_isRoundRobin
Definition: SharedEvtQueueConsumer.h:57
SharedEvtQueueConsumer::m_finQueue
std::queue< pid_t > m_finQueue
Definition: SharedEvtQueueConsumer.h:78
SharedEvtQueueConsumer::m_useSharedReader
Gaudi::Property< bool > m_useSharedReader
Definition: SharedEvtQueueConsumer.h:55
SharedEvtQueueConsumer::m_readEventOrders
Gaudi::Property< bool > m_readEventOrders
Definition: SharedEvtQueueConsumer.h:59
SharedEvtQueueConsumer::initialize
virtual StatusCode initialize() override
Definition: SharedEvtQueueConsumer.cxx:48
SharedEvtQueueConsumer::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedEvtQueueConsumer.cxx:642
SharedEvtQueueConsumer::m_evtSeek
SmartIF< IEventSeek > m_evtSeek
Definition: SharedEvtQueueConsumer.h:67
SharedEvtQueueConsumer::m_evtSelSeek
SmartIF< IEvtSelectorSeek > m_evtSelSeek
Definition: SharedEvtQueueConsumer.h:68
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE
virtual int makePool ATLAS_NOT_THREAD_SAFE(int maxevt, int nprocs, const std::string &topdir) override
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
SharedEvtQueueConsumer::m_eventStat
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
Definition: SharedEvtQueueConsumer.h:77
SharedEvtQueueConsumer::m_rankId
int m_rankId
Definition: SharedEvtQueueConsumer.h:63
SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE() override
IDataShare
Abstract interface for sharing data.
Definition: IDataShare.h:24
SharedEvtQueueConsumer::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedEvtQueueConsumer.cxx:242
SharedEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE
int decodeProcessResult ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess::ProcessResult *presult, bool doFinalize)
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
IEventShare
Abstract interface for sharing within an event stream.
Definition: IEventShare.h:25
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
SharedEvtQueueConsumer::m_dataShare
SmartIF< IDataShare > m_dataShare
Definition: SharedEvtQueueConsumer.h:71
SharedEvtQueueConsumer::m_nEventsBeforeFork
Gaudi::Property< int > m_nEventsBeforeFork
Definition: SharedEvtQueueConsumer.h:60
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
SharedEvtQueueConsumer::m_eventOrders
std::vector< int > m_eventOrders
Definition: SharedEvtQueueConsumer.h:81
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
test_pyathena.parent
parent
Definition: test_pyathena.py:15
SharedEvtQueueConsumer::m_sharedRankQueue
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
Definition: SharedEvtQueueConsumer.h:74
SharedEvtQueueConsumer::m_eventOrdersFile
Gaudi::Property< std::string > m_eventOrdersFile
Definition: SharedEvtQueueConsumer.h:61
SharedEvtQueueConsumer
Definition: SharedEvtQueueConsumer.h:24
SharedEvtQueueConsumer::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: SharedEvtQueueConsumer.h:66
columnar::final
CM final
Definition: ColumnAccessor.h:106
SharedEvtQueueConsumer::finalize
virtual StatusCode finalize() override
Definition: SharedEvtQueueConsumer.cxx:96
SharedEvtQueueConsumer::m_evtShare
SmartIF< IEventShare > m_evtShare
Definition: SharedEvtQueueConsumer.h:70
AthenaMPToolBase.h
SharedEvtQueueConsumer::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedEvtQueueConsumer.h:73
SharedEvtQueueConsumer::SharedEvtQueueConsumer
SharedEvtQueueConsumer(const std::string &type, const std::string &name, const IInterface *parent)
Definition: SharedEvtQueueConsumer.cxx:34
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
SharedEvtQueueConsumer::m_masterPid
pid_t m_masterPid
Definition: SharedEvtQueueConsumer.h:82
SharedEvtQueueConsumer::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedEvtQueueConsumer.cxx:254
SharedEvtQueueConsumer::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedEvtQueueConsumer.cxx:479
SharedEvtQueueConsumer::reportSubprocessStatuses
virtual void reportSubprocessStatuses() override
Definition: SharedEvtQueueConsumer.cxx:223
SharedEvtQueueConsumer::m_evtContext
IEvtSelector::Context * m_evtContext
Definition: SharedEvtQueueConsumer.h:69
SharedQueue.h
IEventSeek
Abstract interface for seeking within an event stream.
Definition: IEventSeek.h:27
SharedEvtQueueConsumer::m_useSharedWriter
Gaudi::Property< bool > m_useSharedWriter
Definition: SharedEvtQueueConsumer.h:56
SharedEvtQueueConsumer::~SharedEvtQueueConsumer
virtual ~SharedEvtQueueConsumer() override
Definition: SharedEvtQueueConsumer.cxx:44
AthenaInterprocess::ProcessResult
Definition: ProcessGroup.h:22
SharedEvtQueueConsumer::m_debug
Gaudi::Property< bool > m_debug
Definition: SharedEvtQueueConsumer.h:58
IEvtSelectorSeek
Abstract interface for seeking for an event selector.
Definition: IEvtSelectorSeek.h:28
SharedEvtQueueConsumer::TimeValType
System::ProcessTime::TimeValueType TimeValType
Definition: SharedEvtQueueConsumer.h:76
ServiceHandle< IChronoStatSvc >