ATLAS Offline Software
Loading...
Searching...
No Matches
EvtRangeScatterer.h
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5#ifndef ATHENAMPTOOLS_EVTRANGESCATTERER_H
6#define ATHENAMPTOOLS_EVTRANGESCATTERER_H
7
8#include "AthenaMPToolBase.h"
9#include "yampl/Exceptions.h"
10#include <map>
11
12typedef std::map<pid_t,std::string> Pid2RangeID;
13
14namespace yampl {
15 class ISocket;
16}
17
19{
20 public:
21 EvtRangeScatterer(const std::string& type
22 , const std::string& name
23 , const IInterface* parent);
24
25 virtual ~EvtRangeScatterer() override;
26
27 virtual StatusCode initialize() override;
28 virtual StatusCode finalize() override;
29
30 // _________IAthenaMPTool_________
31 virtual int makePool ATLAS_NOT_THREAD_SAFE (int maxevt, int nprocs, const std::string& topdir) override;
32 virtual StatusCode exec ATLAS_NOT_THREAD_SAFE () override;
33
34 virtual void subProcessLogs(std::vector<std::string>&) override;
36
37 // _____ Actual working horses ________
38 virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> bootstrap_func() override;
39 virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> exec_func() override;
40 virtual std::unique_ptr<AthenaInterprocess::ScheduledWork> fin_func() override;
41
42 private:
46
47 // Get rid of
48 // 1. Leading and trailing spaces
49 // 2. Leading "u\'" and trailing "\'"
50 void trimRangeStrings(std::string&);
51
52 // Helper functuion for receiving new messages from the socket2Processor channel
53 // If this is an output file report, then it is forwarded to the pilot and an empty string is returned to the caller
54 std::string getNewRangeRequest (yampl::ISocket* socket2Processor
55 , yampl::ISocket* socket2Pilot
56 , int& procReportPending);
57
58 // Poll the failed PID queue to see if any of the Processors has failed
60 , yampl::ISocket* socket2Pilot
61 , int& procReportPending);
62
63 Gaudi::Property<std::string> m_processorChannel{this, "ProcessorChannel", ""};
64 Gaudi::Property<std::string> m_eventRangeChannel{this, "EventRangeChannel", ""};
65 Gaudi::Property<bool> m_doCaching{this, "DoCaching", false};
66
67 Pid2RangeID m_pid2RangeID; // Current RangeID-s by PIDs
68};
69
70#endif
std::map< pid_t, std::string > Pid2RangeID
int32_t pid_t
AthenaMPToolBase(const std::string &type, const std::string &name, const IInterface *parent)
void trimRangeStrings(std::string &)
EvtRangeScatterer(const EvtRangeScatterer &)
virtual AthenaMP::AllWorkerOutputs_ptr generateOutputReport() override
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > bootstrap_func() override
pid_t pollFailedPidQueue(AthenaInterprocess::SharedQueue *sharedFailedPidQueue, yampl::ISocket *socket2Pilot, int &procReportPending)
virtual StatusCode exec ATLAS_NOT_THREAD_SAFE() override
virtual StatusCode initialize() override
Pid2RangeID m_pid2RangeID
virtual void subProcessLogs(std::vector< std::string > &) override
Gaudi::Property< std::string > m_eventRangeChannel
EvtRangeScatterer(const std::string &type, const std::string &name, const IInterface *parent)
std::string getNewRangeRequest(yampl::ISocket *socket2Processor, yampl::ISocket *socket2Pilot, int &procReportPending)
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > fin_func() override
virtual ~EvtRangeScatterer() override
EvtRangeScatterer & operator=(const EvtRangeScatterer &)
virtual StatusCode finalize() override
virtual int makePool ATLAS_NOT_THREAD_SAFE(int maxevt, int nprocs, const std::string &topdir) override
Gaudi::Property< bool > m_doCaching
Gaudi::Property< std::string > m_processorChannel
virtual std::unique_ptr< AthenaInterprocess::ScheduledWork > exec_func() override
std::unique_ptr< AllWorkerOutputs > AllWorkerOutputs_ptr