ATLAS Offline Software
Loading...
Searching...
No Matches
SharedEvtQueueConsumer.h
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5#ifndef ATHENAMPTOOLS_SHAREDEVTQUEUECONSUMER_H
6#define ATHENAMPTOOLS_SHAREDEVTQUEUECONSUMER_H
7
8#include "AthenaMPToolBase.h"
9
11#include "GaudiKernel/Timing.h"
12#include "GaudiKernel/IEvtSelector.h"
13
14#include <memory>
15#include <queue>
16
17class IEventSeek;
19class IEventShare;
20class IDataShare;
21class IChronoStatSvc;
22
24{
25 public:
26 SharedEvtQueueConsumer(const std::string& type
27 , const std::string& name
28 , const IInterface* parent);
29
30 virtual ~SharedEvtQueueConsumer() override;
31
32 virtual StatusCode initialize() override;
33 virtual StatusCode finalize() override;
34
35 // _________IAthenaMPTool_________
36 virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string& topdir) override;
37 virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override;
38 virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t& pid) override;
39
40 virtual void reportSubprocessStatuses() override;
41 virtual void subProcessLogs(std::vector<std::string>&) override;
42
43 // _____ Actual working horses ________
44 virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> bootstrap_func() override;
45 virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> exec_func() override;
46 virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> fin_func() override;
47
48 private:
49
50 // Decode process results
51 // 1. Store number of processed events for FUNC_EXEC
52 // 2. If doFinalize flag is set then serialize process finalizations
53 int decodeProcessResult ATLAS_NOT_THREAD_SAFE (const AthenaInterprocess::ProcessResult* presult, bool doFinalize);
54
55 Gaudi::Property<bool> m_useSharedReader{this, "UseSharedReader", false, "Work in pair with a SharedReader"};
56 Gaudi::Property<bool> m_useSharedWriter{this, "UseSharedWriter", false, "Work in pair with a SharedWriter"};
57 Gaudi::Property<bool> m_isRoundRobin{this, "IsRoundRobin", false, "Are we running in the 'reproducible mode'?"};
58 Gaudi::Property<bool> m_debug{this, "Debug", false};
59 Gaudi::Property<bool> m_readEventOrders{this, "ReadEventOrders", false};
60 Gaudi::Property<int> m_nEventsBeforeFork{this, "EventsBeforeFork", 0};
61 Gaudi::Property<std::string> m_eventOrdersFile{this, "EventOrdersFile", "athenamp_eventorders.txt"};
62
63 int m_rankId{-1}; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
65
67 SmartIF<IEventSeek> m_evtSeek;
68 SmartIF<IEvtSelectorSeek> m_evtSelSeek;
69 IEvtSelector::Context* m_evtContext{nullptr};
70 SmartIF<IEventShare> m_evtShare;
71 SmartIF<IDataShare> m_dataShare;
72
74 std::unique_ptr<AthenaInterprocess::SharedQueue> m_sharedRankQueue;
75
76 typedef System::ProcessTime::TimeValueType TimeValType;
77 std::map<pid_t,std::pair<int,TimeValType>> m_eventStat; // Number of processed events by PID
78 std::queue<pid_t> m_finQueue; // PIDs of processes queued for finalization
79
80 // "Persistent" event orders for reproducibility
81 std::vector<int> m_eventOrders;
82 pid_t m_masterPid; // In finalize() of the master process merge workers' saved orders into one
83};
84
85#endif
int32_t pid_t
AthenaMPToolBase(const std::string &type, const std::string &name, const IInterface *parent)
Abstract interface for sharing data.
Definition IDataShare.h:24
Abstract interface for seeking within an event stream.
Definition IEventSeek.h:27
Abstract interface for sharing within an event stream.
Definition IEventShare.h:25
Abstract interface for seeking for an event selector.
virtual StatusCode finalize() override
virtual StatusCode initialize() override
std::map< pid_t, std::pair< int, TimeValType > > m_eventStat
SharedEvtQueueConsumer(const std::string &type, const std::string &name, const IInterface *parent)
int decodeProcessResult ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess::ProcessResult *presult, bool doFinalize)
SmartIF< IEventShare > m_evtShare
std::queue< pid_t > m_finQueue
Gaudi::Property< std::string > m_eventOrdersFile
SmartIF< IEventSeek > m_evtSeek
std::vector< int > m_eventOrders
SmartIF< IEvtSelectorSeek > m_evtSelSeek
Gaudi::Property< int > m_nEventsBeforeFork
Gaudi::Property< bool > m_readEventOrders
Gaudi::Property< bool > m_useSharedWriter
Gaudi::Property< bool > m_useSharedReader
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE(pid_t &pid) override
virtual void reportSubprocessStatuses() override
SmartIF< IDataShare > m_dataShare
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Gaudi::Property< bool > m_isRoundRobin
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Gaudi::Property< bool > m_debug
IEvtSelector::Context * m_evtContext
virtual int makePool ATLAS_NOT_THREAD_SAFE(int maxevt, int nprocs, const std::string &topdir) override
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
virtual ~SharedEvtQueueConsumer() override
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE() override
System::ProcessTime::TimeValueType TimeValType
virtual void subProcessLogs(std::vector< std::string > &) override