ATLAS Offline Software
Loading...
Searching...
No Matches
SharedHiveEvtQueueConsumer.h
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5#ifndef ATHENAMPTOOLS_SHAREDHIVEEVTQUEUECONSUMER_H
6#define ATHENAMPTOOLS_SHAREDHIVEEVTQUEUECONSUMER_H
7
8#include "AthenaMPToolBase.h"
9
11#include "GaudiKernel/IScheduler.h"
12#include "GaudiKernel/IEvtSelector.h"
13
14#include <memory>
15#include <queue>
16
17
18class IDataShare;
20class IChronoStatSvc;
21
23{
24 public:
25 SharedHiveEvtQueueConsumer(const std::string& type
26 , const std::string& name
27 , const IInterface* parent);
28
29 virtual ~SharedHiveEvtQueueConsumer() override;
30
31 virtual StatusCode initialize() override;
32 virtual StatusCode finalize() override;
33
34 // _________IAthenaMPTool_________
35 virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string& topdir) override;
36 virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override;
37 virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t& pid) override;
38
39 virtual void reportSubprocessStatuses() override;
40 virtual void subProcessLogs(std::vector<std::string>&) override;
41
42 // _____ Actual working horses ________
43 virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> bootstrap_func() override;
44 virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> exec_func() override;
45 virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> fin_func() override;
46
47 private:
48
49 StatusCode initHive();
50
51 // Decode process results
52 // 1. Store number of processed events for FUNC_EXEC
53 // 2. If doFinalize flag is set then serialize process finalizations
54 int decodeProcessResult ATLAS_NOT_THREAD_SAFE (const AthenaInterprocess::ProcessResult* presult, bool doFinalize);
55
56 // Properties
57 Gaudi::Property<int> m_nEventsBeforeFork{
58 this, "EventsBeforeFork", 0,
59 "The number of events before forking the workers. The default is 0."};
60
61 Gaudi::Property<bool> m_debug{
62 this, "Debug", false,
63 "Perform extra debugging if true. The default is false."};
64
65 Gaudi::Property<bool> m_useSharedWriter{
66 this, "UseSharedWriter", false,
67 "Use SharedWriter to merge worker outputs on-the-fly if true. The default is false."};
68
69
70 int m_rankId{-1}; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
71
73 SmartIF<IDataShare> m_dataShare;
74 SmartIF<IEvtSelectorSeek> m_evtSelSeek;
75 IEvtSelector::Context* m_evtContext{};
76
78 std::unique_ptr<AthenaInterprocess::SharedQueue> m_sharedRankQueue;
79
80 std::map<pid_t,int> m_nProcessedEvents; // Number of processed events by PID
81 std::queue<pid_t> m_finQueue; // PIDs of processes queued for finalization
82
83 SmartIF<IScheduler> m_schedulerSvc;
84
85};
86
87#endif
int32_t pid_t
AthenaMPToolBase(const std::string &type, const std::string &name, const IInterface *parent)
Abstract interface for sharing data.
Definition IDataShare.h:24
Abstract interface for seeking for an event selector.
virtual void reportSubprocessStatuses() override
Gaudi::Property< int > m_nEventsBeforeFork
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE() override
virtual StatusCode initialize() override
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE(pid_t &pid) override
virtual void subProcessLogs(std::vector< std::string > &) override
AthenaInterprocess::SharedQueue * m_sharedEventQueue
Gaudi::Property< bool > m_useSharedWriter
SmartIF< IEvtSelectorSeek > m_evtSelSeek
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
std::map< pid_t, int > m_nProcessedEvents
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
int decodeProcessResult ATLAS_NOT_THREAD_SAFE(const AthenaInterprocess::ProcessResult *presult, bool doFinalize)
virtual StatusCode finalize() override
virtual int makePool ATLAS_NOT_THREAD_SAFE(int maxevt, int nprocs, const std::string &topdir) override
SharedHiveEvtQueueConsumer(const std::string &type, const std::string &name, const IInterface *parent)