ATLAS Offline Software
Loading...
Searching...
No Matches
AthMpEvtLoopMgr.h
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5#ifndef ATHENAMP_ATHMPEVTLOOPMGR_H
6#define ATHENAMP_ATHMPEVTLOOPMGR_H
7
8#include "GaudiKernel/IEventProcessor.h"
10#include "GaudiKernel/ToolHandle.h"
15#include <memory>
16
17class ISvcLocator;
18
19class ATLAS_NOT_THREAD_SAFE AthMpEvtLoopMgr : public extends<AthService,
20 IEventProcessor,
21 AthenaInterprocess::IMPRunStop>
22{
23 public:
24 AthMpEvtLoopMgr(const std::string& name, ISvcLocator* svcLocator);
25 virtual ~AthMpEvtLoopMgr() = default;
26
27 AthMpEvtLoopMgr() = delete;
29 AthMpEvtLoopMgr& operator = (const AthMpEvtLoopMgr&) = delete;
30
31
32 virtual StatusCode initialize() override;
33 virtual StatusCode finalize() override;
34
35 virtual StatusCode nextEvent(int maxevt) override;
36 virtual StatusCode executeEvent(EventContext &&ctx) override;
37 virtual StatusCode executeRun(int maxevt) override;
38 virtual StatusCode stopRun() override;
39
40 virtual EventContext createEventContext() override;
41
42 virtual bool stopScheduled() const override {return m_scheduledStop;};
43
44 private:
45 ServiceHandle<IEventProcessor> m_evtProcessor{this,"EventLoopManager","AthenaEventLoopMgr"};
46 SmartIF<IService> m_evtSelector{nullptr};
47 SmartIF<IDataShare> m_dataShare;
48
49 Gaudi::Property<int> m_nWorkers{this, "NWorkers", 0,
50 "Number of AthenaMP worker processes"};
51
52 Gaudi::Property<std::string> m_workerTopDir{this, "WorkerTopDir", "athenaMP_workers",
53 "Sub-directory of the main run directory that contains run directories of all workers"};
54
55 Gaudi::Property<std::string> m_outputReportName{this, "OutputReportFile", "AthenaMPOutputs",
56 "ASCII file in the main run directory that lists outputs of all workers. Used by Job Transform"};
57
58 Gaudi::Property<std::string> m_strategy{this, "Strategy", "",
59 "Event processing strategy used by AthenaMP workers. E.g, Shared Queue, Round Robin"};
60
61 Gaudi::Property<bool> m_isPileup{this, "IsPileup", false,
62 "Is AthenaMP running a PileUp Digitization job?"};
63
64 Gaudi::Property<bool> m_collectSubprocessLogs{this, "CollectSubprocessLogs", false,
65 "Copy all workers' logs into the main log file at the end of the job?"};
66
67 ToolHandleArray<IAthenaMPTool> m_tools{this,"Tools", {}};
68
69 Gaudi::Property<int> m_nPollingInterval{this, "PollingInterval", 100,
70 "Interval in milliseconds between checks of sub-processes statuses"};
71
72 Gaudi::Property<int> m_nMemSamplingInterval{this, "MemSamplingInterval", 0,
73 "Interval in seconds between taking memory usage samples. 0 - no sampling"};
74
75 Gaudi::Property<int> m_nEventsBeforeFork{this, "EventsBeforeFork", 0,
76 "Number of events to be processed by the main process before forking the workers. 0 - fork after BeginRun incident"};
77
78 Gaudi::Property<unsigned int> m_eventPrintoutInterval{this, "EventPrintoutInterval", 1,
79 "The value to be forwarded to the EventPrintoutInterval property of the AthenaEventLoopMgr"};
80
81 StringArrayProperty m_execAtPreFork{this, "ExecAtPreFork", {},
82 "The value to be forwarded to the ExecAtPreFork property of the AthenaEventLoopMgr"};
83
84 int m_nChildProcesses{0}; // Total number of child processes
85 pid_t m_masterPid{}; // PID of the main process
86 bool m_scheduledStop{false}; // Flag for early termination of the event loop (for the generators use-case)
87
88 // vectors for collecting memory samples
89 std::vector<unsigned long> m_samplesRss;
90 std::vector<unsigned long> m_samplesPss;
91 std::vector<unsigned long> m_samplesSize;
92 std::vector<unsigned long> m_samplesSwap;
93
94 StatusCode wait();
95 StatusCode generateOutputReport();
96 std::shared_ptr<AthenaInterprocess::FdsRegistry> extractFds();
97 StatusCode updateSkipEvents(int skipEvents);
98};
99
100#endif
int32_t pid_t
#define ATLAS_NOT_THREAD_SAFE
getNoisyStrip() Find noisy strips from hitmaps and write out into xml/db formats
virtual StatusCode stopRun() override
std::vector< unsigned long > m_samplesSize
Gaudi::Property< int > m_nPollingInterval
virtual EventContext createEventContext() override
SmartIF< IDataShare > m_dataShare
AthMpEvtLoopMgr(const std::string &name, ISvcLocator *svcLocator)
virtual StatusCode finalize() override
virtual bool stopScheduled() const override
virtual StatusCode executeEvent(EventContext &&ctx) override
StringArrayProperty m_execAtPreFork
virtual StatusCode nextEvent(int maxevt) override
ToolHandleArray< IAthenaMPTool > m_tools
Gaudi::Property< int > m_nWorkers
Gaudi::Property< std::string > m_workerTopDir
StatusCode updateSkipEvents(int skipEvents)
Gaudi::Property< bool > m_collectSubprocessLogs
Gaudi::Property< unsigned int > m_eventPrintoutInterval
std::shared_ptr< AthenaInterprocess::FdsRegistry > extractFds()
Gaudi::Property< int > m_nEventsBeforeFork
Gaudi::Property< bool > m_isPileup
virtual ~AthMpEvtLoopMgr()=default
std::vector< unsigned long > m_samplesRss
virtual StatusCode executeRun(int maxevt) override
Gaudi::Property< std::string > m_strategy
AthMpEvtLoopMgr(const AthMpEvtLoopMgr &)=delete
AthMpEvtLoopMgr()=delete
ServiceHandle< IEventProcessor > m_evtProcessor
StatusCode generateOutputReport()
Gaudi::Property< std::string > m_outputReportName
std::vector< unsigned long > m_samplesSwap
std::vector< unsigned long > m_samplesPss
Gaudi::Property< int > m_nMemSamplingInterval
SmartIF< IService > m_evtSelector
void initialize()