ATLAS Offline Software
AthMpEvtLoopMgr.h
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #ifndef ATHENAMP_ATHMPEVTLOOPMGR_H
6 #define ATHENAMP_ATHMPEVTLOOPMGR_H
7 
8 #include "GaudiKernel/IEventProcessor.h"
10 #include "GaudiKernel/ToolHandle.h"
15 #include <memory>
16 
17 class ISvcLocator;
18 
19 class ATLAS_NOT_THREAD_SAFE AthMpEvtLoopMgr : public extends<AthService,
20  IEventProcessor,
21  AthenaInterprocess::IMPRunStop>
22 {
23  public:
24  AthMpEvtLoopMgr(const std::string& name, ISvcLocator* svcLocator);
25  virtual ~AthMpEvtLoopMgr() = default;
26 
27  AthMpEvtLoopMgr() = delete;
28  AthMpEvtLoopMgr(const AthMpEvtLoopMgr&) = delete;
30 
31 
32  virtual StatusCode initialize() override;
33  virtual StatusCode finalize() override;
34 
35  virtual StatusCode nextEvent(int maxevt) override;
36  virtual StatusCode executeEvent(EventContext &&ctx) override;
37  virtual StatusCode executeRun(int maxevt) override;
38  virtual StatusCode stopRun() override;
39 
40  virtual EventContext createEventContext() override;
41 
42  virtual bool stopScheduled() const override {return m_scheduledStop;};
43 
44  private:
45  ServiceHandle<IEventProcessor> m_evtProcessor{this,"EventLoopManager","AthenaEventLoopMgr"};
46  SmartIF<IService> m_evtSelector{nullptr};
47  SmartIF<IDataShare> m_dataShare;
48 
49  Gaudi::Property<int> m_nWorkers{this, "NWorkers", 0,
50  "Number of AthenaMP worker processes"};
51 
52  Gaudi::Property<std::string> m_workerTopDir{this, "WorkerTopDir", "athenaMP_workers",
53  "Sub-directory of the main run directory that contains run directories of all workers"};
54 
55  Gaudi::Property<std::string> m_outputReportName{this, "OutputReportFile", "AthenaMPOutputs",
56  "ASCII file in the main run directory that lists outputs of all workers. Used by Job Transform"};
57 
58  Gaudi::Property<std::string> m_strategy{this, "Strategy", "",
59  "Event processing strategy used by AthenaMP workers. E.g, Shared Queue, Round Robin"};
60 
61  Gaudi::Property<bool> m_isPileup{this, "IsPileup", false,
62  "Is AthenaMP running a PileUp Digitization job?"};
63 
64  Gaudi::Property<bool> m_collectSubprocessLogs{this, "CollectSubprocessLogs", false,
65  "Copy all workers' logs into the main log file at the end of the job?"};
66 
67  ToolHandleArray<IAthenaMPTool> m_tools{this,"Tools", {}};
68 
69  Gaudi::Property<int> m_nPollingInterval{this, "PollingInterval", 100,
70  "Interval in milliseconds between checks of sub-processes statuses"};
71 
72  Gaudi::Property<int> m_nMemSamplingInterval{this, "MemSamplingInterval", 0,
73  "Interval in seconds between taking memory usage samples. 0 - no sampling"};
74 
75  Gaudi::Property<int> m_nEventsBeforeFork{this, "EventsBeforeFork", 0,
76  "Number of events to be processed by the main process before forking the workers. 0 - fork after BeginRun incident"};
77 
78  Gaudi::Property<unsigned int> m_eventPrintoutInterval{this, "EventPrintoutInterval", 1,
79  "The value to be forwarded to the EventPrintoutInterval property of the AthenaEventLoopMgr"};
80 
81  StringArrayProperty m_execAtPreFork{this, "ExecAtPreFork", {},
82  "The value to be forwarded to the ExecAtPreFork property of the AthenaEventLoopMgr"};
83 
84  int m_nChildProcesses{0}; // Total number of child processes
85  pid_t m_masterPid{}; // PID of the main process
86  bool m_scheduledStop{false}; // Flag for early termination of the event loop (for the generators use-case)
87 
88  // vectors for collecting memory samples
89  std::vector<unsigned long> m_samplesRss;
90  std::vector<unsigned long> m_samplesPss;
91  std::vector<unsigned long> m_samplesSize;
92  std::vector<unsigned long> m_samplesSwap;
93 
94  StatusCode wait();
95  StatusCode generateOutputReport();
96  std::shared_ptr<AthenaInterprocess::FdsRegistry> extractFds();
97  StatusCode updateSkipEvents(int skipEvents);
98 };
99 
100 #endif
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
AthMpEvtLoopMgr::stopScheduled
virtual bool stopScheduled() const override
Definition: AthMpEvtLoopMgr.h:42
python.tests.PyTestsLib.finalize
def finalize(self)
_info( "content of StoreGate..." ) self.sg.dump()
Definition: PyTestsLib.py:50
ATLAS_NOT_THREAD_SAFE
#define ATLAS_NOT_THREAD_SAFE
getNoisyStrip() Find noisy strips from hitmaps and write out into xml/db formats
Definition: checker_macros.h:212
IDataShare.h
initialize
void initialize()
Definition: run_EoverP.cxx:894
AthMpEvtLoopMgr::m_samplesRss
std::vector< unsigned long > m_samplesRss
Definition: AthMpEvtLoopMgr.h:89
columnar::operator=
AccessorTemplate & operator=(AccessorTemplate &&that)
Definition: VectorColumn.h:88
AthMpEvtLoopMgr::m_samplesSize
std::vector< unsigned long > m_samplesSize
Definition: AthMpEvtLoopMgr.h:91
jetMakeRefSamples.skipEvents
int skipEvents
Definition: jetMakeRefSamples.py:55
AthMpEvtLoopMgr::m_samplesPss
std::vector< unsigned long > m_samplesPss
Definition: AthMpEvtLoopMgr.h:90
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
AthMpEvtLoopMgr::m_samplesSwap
std::vector< unsigned long > m_samplesSwap
Definition: AthMpEvtLoopMgr.h:92
AthMpEvtLoopMgr::AthMpEvtLoopMgr
AthMpEvtLoopMgr()=delete
AthMpEvtLoopMgr::~AthMpEvtLoopMgr
virtual ~AthMpEvtLoopMgr()=default
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
FdsRegistry.h
IAthenaMPTool.h
AthMpEvtLoopMgr
Definition: AthMpEvtLoopMgr.h:22
AthService.h
AthMpEvtLoopMgr::m_dataShare
SmartIF< IDataShare > m_dataShare
Definition: AthMpEvtLoopMgr.h:47
IMPRunStop.h
ServiceHandle< IEventProcessor >
AthMpEvtLoopMgr::AthMpEvtLoopMgr
AthMpEvtLoopMgr(const AthMpEvtLoopMgr &)=delete