ATLAS Offline Software
SharedHiveEvtQueueConsumer.h
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #ifndef ATHENAMPTOOLS_SHAREDHIVEEVTQUEUECONSUMER_H
6 #define ATHENAMPTOOLS_SHAREDHIVEEVTQUEUECONSUMER_H
7 
8 #include "AthenaMPToolBase.h"
9 
11 #include <queue>
12 #include "GaudiKernel/IScheduler.h"
13 #include "GaudiKernel/IEvtSelector.h"
14 
15 class IDataShare;
16 class IEvtSelectorSeek;
17 class IChronoStatSvc;
18 
20 {
21  public:
22  SharedHiveEvtQueueConsumer(const std::string& type
23  , const std::string& name
24  , const IInterface* parent);
25 
26  virtual ~SharedHiveEvtQueueConsumer() override;
27 
28  virtual StatusCode initialize() override;
29  virtual StatusCode finalize() override;
30 
31  // _________IAthenaMPTool_________
32  virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string& topdir) override;
33  virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override;
34  virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t& pid) override;
35 
36  virtual void reportSubprocessStatuses() override;
37  virtual void subProcessLogs(std::vector<std::string>&) override;
38 
39  // _____ Actual working horses ________
40  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> bootstrap_func() override;
41  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> exec_func() override;
42  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> fin_func() override;
43 
44  private:
48 
50 
51  // Decode process results
52  // 1. Store number of processed events for FUNC_EXEC
53  // 2. If doFinalize flag is set then serialize process finalizations
54  int decodeProcessResult ATLAS_NOT_THREAD_SAFE (const AthenaInterprocess::ProcessResult* presult, bool doFinalize);
55 
56  // Properties
57  Gaudi::Property<int> m_nEventsBeforeFork{
58  this, "EventsBeforeFork", 0,
59  "The number of events before forking the workers. The default is 0."};
60 
61  Gaudi::Property<bool> m_debug{
62  this, "Debug", false,
63  "Perform extra debugging if true. The default is false."};
64 
65  Gaudi::Property<bool> m_useSharedWriter{
66  this, "UseSharedWriter", false,
67  "Use SharedWriter to merge worker outputs on-the-fly if true. The default is false."};
68 
69 
70  int m_rankId{}; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
71 
75  IEvtSelector::Context* m_evtContext{};
76 
79 
80  std::map<pid_t,int> m_nProcessedEvents; // Number of processed events by PID
81  std::queue<pid_t> m_finQueue; // PIDs of processes queued for finalization
82 
83  SmartIF<IScheduler> m_schedulerSvc;
84 
85 };
86 
87 #endif
SharedHiveEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE(pid_t &pid) override
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
SharedHiveEvtQueueConsumer::m_nEventsBeforeFork
Gaudi::Property< int > m_nEventsBeforeFork
Definition: SharedHiveEvtQueueConsumer.h:57
SharedHiveEvtQueueConsumer::m_schedulerSvc
SmartIF< IScheduler > m_schedulerSvc
Definition: SharedHiveEvtQueueConsumer.h:83
SharedHiveEvtQueueConsumer::~SharedHiveEvtQueueConsumer
virtual ~SharedHiveEvtQueueConsumer() override
Definition: SharedHiveEvtQueueConsumer.cxx:59
SharedHiveEvtQueueConsumer::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: SharedHiveEvtQueueConsumer.cxx:223
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
SharedHiveEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE
int decodeProcessResult ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess::ProcessResult *presult, bool doFinalize)
SharedHiveEvtQueueConsumer::m_evtSelSeek
IEvtSelectorSeek * m_evtSelSeek
Definition: SharedHiveEvtQueueConsumer.h:74
SharedHiveEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE() override
SharedHiveEvtQueueConsumer::m_sharedRankQueue
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Definition: SharedHiveEvtQueueConsumer.h:78
SharedHiveEvtQueueConsumer::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: SharedHiveEvtQueueConsumer.cxx:542
IDataShare
Abstract interface for sharing data.
Definition: IDataShare.h:28
SharedHiveEvtQueueConsumer::m_nProcessedEvents
std::map< pid_t, int > m_nProcessedEvents
Definition: SharedHiveEvtQueueConsumer.h:80
SharedHiveEvtQueueConsumer::m_sharedEventQueue
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Definition: SharedHiveEvtQueueConsumer.h:77
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
SharedHiveEvtQueueConsumer::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: SharedHiveEvtQueueConsumer.cxx:373
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
test_pyathena.parent
parent
Definition: test_pyathena.py:15
SharedHiveEvtQueueConsumer::SharedHiveEvtQueueConsumer
SharedHiveEvtQueueConsumer(const SharedHiveEvtQueueConsumer &)
SharedHiveEvtQueueConsumer::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: SharedHiveEvtQueueConsumer.cxx:238
AthenaMPToolBase.h
SharedHiveEvtQueueConsumer::m_evtContext
IEvtSelector::Context * m_evtContext
Definition: SharedHiveEvtQueueConsumer.h:75
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
SharedHiveEvtQueueConsumer::finalize
virtual StatusCode finalize() override
Definition: SharedHiveEvtQueueConsumer.cxx:98
SharedHiveEvtQueueConsumer::SharedHiveEvtQueueConsumer
SharedHiveEvtQueueConsumer()
SharedHiveEvtQueueConsumer::reportSubprocessStatuses
virtual void reportSubprocessStatuses() override
Definition: SharedHiveEvtQueueConsumer.cxx:203
SharedHiveEvtQueueConsumer::initHive
StatusCode initHive()
Definition: SharedHiveEvtQueueConsumer.cxx:641
SharedHiveEvtQueueConsumer::m_rankId
int m_rankId
Definition: SharedHiveEvtQueueConsumer.h:70
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
SharedHiveEvtQueueConsumer::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: SharedHiveEvtQueueConsumer.h:72
SharedHiveEvtQueueConsumer::ATLAS_NOT_THREAD_SAFE
virtual int makePool ATLAS_NOT_THREAD_SAFE(int maxevt, int nprocs, const std::string &topdir) override
SharedQueue.h
AthenaInterprocess::ProcessResult
Definition: ProcessGroup.h:22
SharedHiveEvtQueueConsumer::m_finQueue
std::queue< pid_t > m_finQueue
Definition: SharedHiveEvtQueueConsumer.h:81
SharedHiveEvtQueueConsumer
Definition: SharedHiveEvtQueueConsumer.h:20
SharedHiveEvtQueueConsumer::initialize
virtual StatusCode initialize() override
Definition: SharedHiveEvtQueueConsumer.cxx:65
SharedHiveEvtQueueConsumer::operator=
SharedHiveEvtQueueConsumer & operator=(const SharedHiveEvtQueueConsumer &)
SharedHiveEvtQueueConsumer::m_useSharedWriter
Gaudi::Property< bool > m_useSharedWriter
Definition: SharedHiveEvtQueueConsumer.h:65
IEvtSelectorSeek
Abstract interface for seeking for an event selector.
Definition: IEvtSelectorSeek.h:28
SharedHiveEvtQueueConsumer::m_debug
Gaudi::Property< bool > m_debug
Definition: SharedHiveEvtQueueConsumer.h:61
SharedHiveEvtQueueConsumer::m_dataShare
IDataShare * m_dataShare
Definition: SharedHiveEvtQueueConsumer.h:73
ServiceHandle< IChronoStatSvc >