ATLAS Offline Software
Loading...
Searching...
No Matches
EvtRangeProcessor.h
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5#ifndef ATHENAMPTOOLS_EVTRANGEPROCESSOR_H
6#define ATHENAMPTOOLS_EVTRANGEPROCESSOR_H
7
8#include "AthenaMPToolBase.h"
9
11#include "yampl/Exceptions.h"
12
13#include <deque>
14#include <map>
15#include <memory>
16
18class IChronoStatSvc;
19class IIncidentSvc;
20namespace yampl {
21 class ISocketFactory;
22 class ISocket;
23}
24
26{
27 public:
28 EvtRangeProcessor(const std::string& type
29 , const std::string& name
30 , const IInterface* parent);
31
32 virtual ~EvtRangeProcessor() override;
33
34 virtual StatusCode initialize() override;
35
36 // _________IAthenaMPTool_________
37 virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string& topdir) override;
38 virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override;
39 virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE (pid_t& pid) override;
40
41 virtual void reportSubprocessStatuses() override;
42 virtual void subProcessLogs(std::vector<std::string>&) override;
44
45 // _____ Actual working horses ________
46 virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> bootstrap_func() override;
47 virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> exec_func() override;
48 virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> fin_func() override;
49
50 private:
51
52 StatusCode startProcess ATLAS_NOT_THREAD_SAFE();
53 StatusCode setNewInputFile(const std::string& newFile);
54 void reportError(yampl::ISocket* socket,AthenaMPToolBase::ESRange_Status status);
55
62
63 Gaudi::Property<int> m_nEventsBeforeFork{this, "EventsBeforeFork", 0, "Number of events before forking"};
64 Gaudi::Property<std::string> m_channel2Scatterer{this, "Channel2Scatterer", {}};
65 Gaudi::Property<std::string> m_channel2EvtSel{this, "Channel2EvtSel", {}};
66 Gaudi::Property<bool> m_debug{this, "Debug", false};
67
68 int m_rankId{-1};
70 std::string m_inpFile;
71
74 SmartIF<IEvtSelectorSeek> m_evtSeek;
75
76 std::unique_ptr<AthenaInterprocess::SharedQueue> m_sharedRankQueue;
78
79 std::map<pid_t,int> m_nProcessedEvents; // Number of processed events by PID
80 std::deque<pid_t> m_finQueue; // PIDs of processes queued for finalization
81 std::map<pid_t,ProcessState> m_procStates; // Map for keeping track of states of the subprocesses
82};
83
84#endif
int32_t pid_t
AthenaMPToolBase(const std::string &type, const std::string &name, const IInterface *parent)
StatusCode setNewInputFile(const std::string &newFile)
std::string m_inpFile
Cached name of the input file. To avoid reopening.
std::deque< pid_t > m_finQueue
virtual StatusCode initialize() override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
Gaudi::Property< bool > m_debug
ServiceHandle< IChronoStatSvc > m_chronoStatSvc
void reportError(yampl::ISocket *socket, AthenaMPToolBase::ESRange_Status status)
virtual void subProcessLogs(std::vector< std::string > &) override
virtual StatusCode wait_once ATLAS_NOT_THREAD_SAFE(pid_t &pid) override
virtual void reportSubprocessStatuses() override
virtual int makePool ATLAS_NOT_THREAD_SAFE(int maxevt, int nprocs, const std::string &topdir) override
ServiceHandle< IIncidentSvc > m_incidentSvc
Gaudi::Property< std::string > m_channel2EvtSel
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE() override
std::unique_ptr< AthenaInterprocess::SharedQueue > m_sharedRankQueue
std::map< pid_t, int > m_nProcessedEvents
int m_rankId
Each worker has its own unique RankID from the range (0,...,m_nprocs-1)
EvtRangeProcessor(const std::string &type, const std::string &name, const IInterface *parent)
virtual ~EvtRangeProcessor() override
Gaudi::Property< std::string > m_channel2Scatterer
AthenaInterprocess::SharedQueue * m_sharedFailedPidQueue
StatusCode startProcess ATLAS_NOT_THREAD_SAFE()
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
Gaudi::Property< int > m_nEventsBeforeFork
std::map< pid_t, ProcessState > m_procStates
SmartIF< IEvtSelectorSeek > m_evtSeek
int m_activeWorkers
Keep track of the number of workers.
Abstract interface for seeking for an event selector.
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr