ATLAS Offline Software
EvtRangeProcessor.h
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #ifndef ATHENAMPTOOLS_EVTRANGEPROCESSOR_H
6 #define ATHENAMPTOOLS_EVTRANGEPROCESSOR_H
7 
8 #include "AthenaMPToolBase.h"
9 
11 #include "yampl/Exceptions.h"
12 
13 #include <deque>
14 #include <map>
15 
16 class IEvtSelectorSeek;
17 class IChronoStatSvc;
18 class IIncidentSvc;
19 namespace yampl {
20  class ISocketFactory;
21  class ISocket;
22 }
23 
24 class EvtRangeProcessor final : public AthenaMPToolBase
25 {
26  public:
27  EvtRangeProcessor(const std::string& type
28  , const std::string& name
29  , const IInterface* parent);
30 
31  virtual ~EvtRangeProcessor() override;
32 
33  virtual StatusCode initialize() override;
34  virtual StatusCode finalize() override;
35 
36  // _________IAthenaMPTool_________
37  virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string& topdir) override;
38  virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override;
39  virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t& pid) override;
40 
41  virtual void reportSubprocessStatuses() override;
42  virtual void subProcessLogs(std::vector<std::string>&) override;
44 
45  // _____ Actual working horses ________
46  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> bootstrap_func() override;
47  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> exec_func() override;
48  virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> fin_func() override;
49 
50  private:
54 
56  StatusCode setNewInputFile(const std::string& newFile);
57  void reportError(yampl::ISocket* socket,AthenaMPToolBase::ESRange_Status status);
58 
59  enum ProcessState {
64  };
65 
66  int m_rankId; // Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
68  int m_activeWorkers; // Keep track of the number of workers
69  std::string m_inpFile; // Cached name of the input file. To avoid reopening
70 
73  SmartIF<IEvtSelectorSeek> m_evtSeek;
74 
75  StringProperty m_channel2Scatterer;
76  StringProperty m_channel2EvtSel;
77 
80 
81  std::map<pid_t,int> m_nProcessedEvents; // Number of processed events by PID
82  std::deque<pid_t> m_finQueue; // PIDs of processes queued for finalization
83  std::map<pid_t,ProcessState> m_procStates; // Map for keeping track of states of the subprocesses
84 
85  bool m_debug;
86 };
87 
88 #endif
pid_t
int32_t pid_t
Definition: FPGATrackSimTypes.h:19
EvtRangeProcessor::~EvtRangeProcessor
virtual ~EvtRangeProcessor() override
Definition: EvtRangeProcessor.cxx:60
EvtRangeProcessor::fin_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Definition: EvtRangeProcessor.cxx:707
EvtRangeProcessor::EvtRangeProcessor
EvtRangeProcessor()
EvtRangeProcessor::bootstrap_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Definition: EvtRangeProcessor.cxx:380
EvtRangeProcessor::m_activeWorkers
int m_activeWorkers
Definition: EvtRangeProcessor.h:68
EvtRangeProcessor::generateOutputReport
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
Definition: EvtRangeProcessor.cxx:374
EvtRangeProcessor::EvtRangeProcessor
EvtRangeProcessor(const EvtRangeProcessor &)
EvtRangeProcessor::reportSubprocessStatuses
virtual void reportSubprocessStatuses() override
Definition: EvtRangeProcessor.cxx:344
AthenaMPToolBase::ESRange_Status
ESRange_Status
Definition: AthenaMPToolBase.h:56
AthenaMPToolBase
Definition: AthenaMPToolBase.h:25
EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE
StatusCode startProcess ATLAS_NOT_THREAD_SAFE()
EvtRangeProcessor::m_inpFile
std::string m_inpFile
Definition: EvtRangeProcessor.h:69
EvtRangeProcessor::m_evtSeek
SmartIF< IEvtSelectorSeek > m_evtSeek
Definition: EvtRangeProcessor.h:73
EvtRangeProcessor::m_rankId
int m_rankId
Definition: EvtRangeProcessor.h:66
EvtRangeProcessor::PROC_STATE_EXEC
@ PROC_STATE_EXEC
Definition: EvtRangeProcessor.h:61
EvtRangeProcessor::m_finQueue
std::deque< pid_t > m_finQueue
Definition: EvtRangeProcessor.h:82
EvtRangeProcessor::m_channel2EvtSel
StringProperty m_channel2EvtSel
Definition: EvtRangeProcessor.h:76
LHEonly.nprocs
nprocs
Definition: LHEonly.py:17
EvtRangeProcessor::m_procStates
std::map< pid_t, ProcessState > m_procStates
Definition: EvtRangeProcessor.h:83
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
EvtRangeProcessor::m_nProcessedEvents
std::map< pid_t, int > m_nProcessedEvents
Definition: EvtRangeProcessor.h:81
EvtRangeProcessor::m_nEventsBeforeFork
int m_nEventsBeforeFork
Definition: EvtRangeProcessor.h:67
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
EvtRangeProcessor::m_channel2Scatterer
StringProperty m_channel2Scatterer
Definition: EvtRangeProcessor.h:75
ParticleGun_EoverP_Config.pid
pid
Definition: ParticleGun_EoverP_Config.py:62
test_pyathena.parent
parent
Definition: test_pyathena.py:15
EvtRangeProcessor::m_debug
bool m_debug
Definition: EvtRangeProcessor.h:85
EvtRangeProcessor::exec_func
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
Definition: EvtRangeProcessor.cxx:496
EvtRangeProcessor::m_sharedRankQueue
AthenaInterprocess::SharedQueue * m_sharedRankQueue
Definition: EvtRangeProcessor.h:78
EvtRangeProcessor::subProcessLogs
virtual void subProcessLogs(std::vector< std::string > &) override
Definition: EvtRangeProcessor.cxx:362
EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE(pid_t &pid) override
EvtRangeProcessor::reportError
void reportError(yampl::ISocket *socket, AthenaMPToolBase::ESRange_Status status)
Definition: EvtRangeProcessor.cxx:796
AthenaMPToolBase.h
EvtRangeProcessor::initialize
virtual StatusCode initialize() override
Definition: EvtRangeProcessor.cxx:64
EvtRangeProcessor::m_chronoStatSvc
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
Definition: EvtRangeProcessor.h:71
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:221
yampl
Definition: EvtRangeProcessor.h:19
EvtRangeProcessor::setNewInputFile
StatusCode setNewInputFile(const std::string &newFile)
Definition: EvtRangeProcessor.cxx:762
EvtRangeProcessor::PROC_STATE_INIT
@ PROC_STATE_INIT
Definition: EvtRangeProcessor.h:60
EvtRangeProcessor
Definition: EvtRangeProcessor.h:25
EvtRangeProcessor::m_incidentSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
Definition: EvtRangeProcessor.h:72
EvtRangeProcessor::m_sharedFailedPidQueue
AthenaInterprocess::SharedQueue * m_sharedFailedPidQueue
Definition: EvtRangeProcessor.h:79
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
AthenaMP::AllWorkerOutputs_ptr
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr
Definition: IAthenaMPTool.h:28
SharedQueue.h
EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE() override
EvtRangeProcessor::operator=
EvtRangeProcessor & operator=(const EvtRangeProcessor &)
merge.status
status
Definition: merge.py:17
EvtRangeProcessor::finalize
virtual StatusCode finalize() override
Definition: EvtRangeProcessor.cxx:77
EvtRangeProcessor::ProcessState
ProcessState
Definition: EvtRangeProcessor.h:59
EvtRangeProcessor::PROC_STATE_FIN
@ PROC_STATE_FIN
Definition: EvtRangeProcessor.h:62
EvtRangeProcessor::PROC_STATE_STOP
@ PROC_STATE_STOP
Definition: EvtRangeProcessor.h:63
IEvtSelectorSeek
Abstract interface for seeking for an event selector.
Definition: IEvtSelectorSeek.h:28
EvtRangeProcessor::ATLAS_NOT_THREAD_SAFE
virtual int makePool ATLAS_NOT_THREAD_SAFE(int maxevt, int nprocs, const std::string &topdir) override
ServiceHandle< IChronoStatSvc >