ATLAS Offline Software
EvtRangeProcessor.h
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #ifndef ATHENAMPTOOLS_EVTRANGEPROCESSOR_H
6 #define ATHENAMPTOOLS_EVTRANGEPROCESSOR_H
7 
8 #include "AthenaMPToolBase.h"
9 
11 #include "yampl/Exceptions.h"
12 
13 #include <deque>
14 #include <map>
15 #include <memory>
16 
17 class IEvtSelectorSeek;
18 class IChronoStatSvc;
19 class IIncidentSvc;
20 namespace yampl {
21  class ISocketFactory;
22  class ISocket;
23 }
24 
26 {
27  public:
28  EvtRangeProcessor(const std::string& type
29  , const std::string& name
30  , const IInterface* parent);
31 
32  virtual ~EvtRangeProcessor() override;
33 
34  virtual StatusCode initialize() override;
35 
36  // _________IAthenaMPTool_________
37  virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string& topdir) override;
38  virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override;
39  virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t& pid) override;
40 
41  virtual void reportSubprocessStatuses() override;
42  virtual void subProcessLogs(std::vector<std::string>&) override;
44 
45  // _____ Actual working horses ________
46  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> bootstrap_func() override;
47  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> exec_func() override;
48  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> fin_func() override;
49 
50  private:
51 
53  StatusCode setNewInputFile(const std::string& newFile);
54  void reportError(yampl::ISocket* socket,AthenaMPToolBase::ESRange_Status status);
55 
56  enum ProcessState {
61  };
62 
63  Gaudi::Property<int> m_nEventsBeforeFork{this, "EventsBeforeFork", 0, "Number of events before forking"};
64  Gaudi::Property<std::string> m_channel2Scatterer{this, "Channel2Scatterer", {}};
65  Gaudi::Property<std::string> m_channel2EvtSel{this, "Channel2EvtSel", {}};
66  Gaudi::Property<bool> m_debug{this, "Debug", false};
67 
68  int m_rankId{-1};
69  int m_activeWorkers{0};
70  std::string m_inpFile;
71 
74  SmartIF<IEvtSelectorSeek> m_evtSeek;
75 
76  std::unique_ptr<AthenaInterprocess::SharedQueue> m_sharedRankQueue;
78 
79  std::map<pid_t,int> m_nProcessedEvents; // Number of processed events by PID
80  std::deque<pid_t> m_finQueue; // PIDs of processes queued for finalization
81  std::map<pid_t,ProcessState> m_procStates; // Map for keeping track of states of the subprocesses
82 };
83 
84 #endif
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
EvtRangeProcessor::~EvtRangeProcessor
virtual ~EvtRangeProcessor() override
Definition: EvtRangeProcessor.cxx:44
EvtRangeProcessor::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: EvtRangeProcessor.cxx:695
EvtRangeProcessor::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: EvtRangeProcessor.cxx:368
EvtRangeProcessor::m_activeWorkers
int m_activeWorkers
Keep track of the number of workers.
Definition: EvtRangeProcessor.h:69
EvtRangeProcessor::generateOutputReport
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
Definition: EvtRangeProcessor.cxx:362
EvtRangeProcessor::reportSubprocessStatuses
virtual void reportSubprocessStatuses() override
Definition: EvtRangeProcessor.cxx:332
EvtRangeProcessor::m_debug
Gaudi::Property< bool > m_debug
Definition: EvtRangeProcessor.h:66
EvtRangeProcessor::m_sharedRankQueue
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
Definition: EvtRangeProcessor.h:76
AthenaMPToolBase::ESRange_Status
ESRange_Status
Definition: AthenaMPToolBase.h:58
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE
StatusCode startProcess ATLAS_NOT_THREAD_SAFE()
EvtRangeProcessor::m_inpFile
std::string m_inpFile
Cached name of the input file. To avoid reopening.
Definition: EvtRangeProcessor.h:70
EvtRangeProcessor::m_evtSeek
SmartIF< IEvtSelectorSeek > m_evtSeek
Definition: EvtRangeProcessor.h:74
EvtRangeProcessor::m_rankId
int m_rankId
Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
Definition: EvtRangeProcessor.h:68
EvtRangeProcessor::PROC_STATE_EXEC
@ PROC_STATE_EXEC
Definition: EvtRangeProcessor.h:58
EvtRangeProcessor::m_finQueue
std::deque< pid_t > m_finQueue
Definition: EvtRangeProcessor.h:80
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
EvtRangeProcessor::m_procStates
std::map< pid_t, ProcessState > m_procStates
Definition: EvtRangeProcessor.h:81
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
EvtRangeProcessor::m_nProcessedEvents
std::map< pid_t, int > m_nProcessedEvents
Definition: EvtRangeProcessor.h:79
EvtRangeProcessor::m_channel2EvtSel
Gaudi::Property< std::string > m_channel2EvtSel
Definition: EvtRangeProcessor.h:65
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
test_pyathena.parent
parent
Definition: test_pyathena.py:15
EvtRangeProcessor::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: EvtRangeProcessor.cxx:484
columnar::final
CM final
Definition: ColumnAccessor.h:106
EvtRangeProcessor::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: EvtRangeProcessor.cxx:350
EvtRangeProcessor::EvtRangeProcessor
EvtRangeProcessor(const std::string &type, const std::string &name, const IInterface *parent)
Definition: EvtRangeProcessor.cxx:34
EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE(pid_t &pid) override
EvtRangeProcessor::reportError
void reportError(yampl::ISocket *socket, AthenaMPToolBase::ESRange_Status status)
Definition: EvtRangeProcessor.cxx:784
AthenaMPToolBase.h
EvtRangeProcessor::initialize
virtual StatusCode initialize() override
Definition: EvtRangeProcessor.cxx:48
EvtRangeProcessor::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: EvtRangeProcessor.h:72
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
yampl
Definition: EvtRangeProcessor.h:20
EvtRangeProcessor::m_nEventsBeforeFork
Gaudi::Property< int > m_nEventsBeforeFork
Definition: EvtRangeProcessor.h:63
EvtRangeProcessor::setNewInputFile
StatusCode setNewInputFile(const std::string &newFile)
Definition: EvtRangeProcessor.cxx:750
EvtRangeProcessor::m_channel2Scatterer
Gaudi::Property< std::string > m_channel2Scatterer
Definition: EvtRangeProcessor.h:64
EvtRangeProcessor::PROC_STATE_INIT
@ PROC_STATE_INIT
Definition: EvtRangeProcessor.h:57
EvtRangeProcessor
Definition: EvtRangeProcessor.h:26
EvtRangeProcessor::m_incidentSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
Definition: EvtRangeProcessor.h:73
EvtRangeProcessor::m_sharedFailedPidQueue
AthenaInterprocess::SharedQueue * m_sharedFailedPidQueue
Definition: EvtRangeProcessor.h:77
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:32
SharedQueue.h
EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE() override
merge.status
status
Definition: merge.py:16
EvtRangeProcessor::ProcessState
ProcessState
Definition: EvtRangeProcessor.h:56
EvtRangeProcessor::PROC_STATE_FIN
@ PROC_STATE_FIN
Definition: EvtRangeProcessor.h:59
EvtRangeProcessor::PROC_STATE_STOP
@ PROC_STATE_STOP
Definition: EvtRangeProcessor.h:60
IEvtSelectorSeek
Abstract interface for seeking for an event selector.
Definition: IEvtSelectorSeek.h:28
EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE
virtual int makePool ATLAS_NOT_THREAD_SAFE(int maxevt, int nprocs, const std::string &topdir) override
ServiceHandle< IChronoStatSvc >