ATLAS Offline Software
MPIClusterSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 #include "MPIClusterSvc.h"
5 
6 #include "CxxUtils/XXH.h"
7 #include "GaudiKernel/FileIncident.h"
8 
9 #include <boost/serialization/variant.hpp>
10 
12  ATH_MSG_DEBUG("Initializing MPI");
13  m_env = std::make_unique<mpi3::environment>(mpi3::thread_level::single);
14  ATH_MSG_DEBUG("Created MPI environment");
15  m_world = m_env->world();
16  m_datacom =
17  m_world.duplicate(); // make a duplicate communicator for event data
18  ATH_MSG_DEBUG("Got MPI_COMM_WORLD");
19  m_rank = m_world.rank();
20  ATH_MSG_INFO("On MPI rank " << m_rank);
21 
22  ATH_CHECK(m_mpiLog.retrieve());
23  m_mpiLog->createStatement("PRAGMA foreign_keys = ON").run();
24 
25  m_mpiLog
26  ->createStatement(
27  "CREATE TABLE ranks (rank INTEGER PRIMARY KEY, "
28  "node TEXT, start_time FLOAT, end_time FLOAT)")
29  .run();
30  m_mpiLog
31  ->createStatement(
32  "INSERT INTO ranks (rank, node, start_time) "
33  "VALUES(?1, ?2, julianday('now'))")
34  .run(m_rank, m_env->processor_name());
35  m_mpiLog->createStatement(
36  "CREATE TABLE files (fileId INTEGER PRIMARY KEY, fileName TEXT)")
37  .run();
38  m_mpiLog
39  ->createStatement(
40  "CREATE TABLE event_log (rank INTEGER, id INTEGER UNIQUE,"
41  "inputFileId INTEGER,"
42  "runNumber INTEGER, eventNumber INTEGER, complete INTEGER,"
43  "status INTEGER, request_time_ns INTEGER, start_time FLOAT,"
44  "end_time FLOAT, PRIMARY KEY (runNumber, eventNumber), "
45  "FOREIGN KEY (rank) REFERENCES ranks(rank),"
46  "FOREIGN KEY (inputFileId) REFERENCES files(fileId))")
47  .run();
48  m_mpiLog_addEvent = m_mpiLog->createStatement(
49  "INSERT INTO event_log(id, rank, inputFileId, runNumber, eventNumber, complete, "
50  "start_time, request_time_ns) "
51  "VALUES(?1, ?4, ?6, ?2, ?3, 0, julianday('now'), ?5)");
52  m_mpiLog_completeEvent = m_mpiLog->createStatement(
53  "UPDATE event_log SET complete = 1, status = ?3, end_time = "
54  "julianday('now') WHERE runNumber = ?1 "
55  "AND "
56  "eventNumber = ?2");
57  m_mpiLog_addFile = m_mpiLog->createStatement(
58  "INSERT INTO files (fileId, fileName) VALUES(?1, ?2)");
59 
60  // Set up incident listener
61  ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name());
62  if (!incsvc.retrieve().isSuccess()) {
63  ATH_MSG_FATAL("Cannot get IncidentSvc.");
64  return(StatusCode::FAILURE);
65  }
66  incsvc->addListener(this, IncidentType::BeginInputFile, 100);
67  incsvc->addListener(this, IncidentType::BeginProcessing, 100);
68 
69  return StatusCode::SUCCESS;
70 }
71 
73  m_mpiLog
74  ->createStatement(
75  "UPDATE ranks SET end_time = julianday('now') WHERE rank = ?1")
76  .run(m_rank);
77  m_env.reset(nullptr);
78  return StatusCode::SUCCESS;
79 }
80 
82 void MPIClusterSvc::handle(const Incident& inc) {
83  // Fill in slot map at start of every event
84  if (inc.type() == IncidentType::BeginProcessing) {
85  const std::size_t slot = Gaudi::Hive::currentContext().slot();
87  }
88 
89  // Cache new input filename on start of every file
90  if (inc.type() == IncidentType::BeginInputFile) {
91  const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
92  if (fileInc == nullptr) {
93  ATH_MSG_ERROR("BeginInputFile does not have a file name attached");
94  return;
95  }
96 
97  const std::string fileName = fileInc->fileName();
98  // Convert the hash into a signed int64. Just a hash so this doesn't matter.
99  m_lastInputFileHash = static_cast<std::int64_t>(xxh3::hash64(fileName));
101  }
102  return;
103 }
104 
105 
107  return m_world.size();
108 }
109 
110 int MPIClusterSvc::rank() const {
111  return m_rank;
112 }
113 
115  ATH_MSG_DEBUG("Barrier on rank " << rank() << " of " << numRanks());
116  m_world.barrier();
117 }
118 
120  m_world.abort();
121 }
122 
124  ClusterComm communicator) {
125  ATH_MSG_DEBUG("Sending message from rank " << rank() << " to " << destRank);
126  // Don't send event request message if we're not the master *and* we have a
127  // message waiting.
128  // Probably an emergency stop message
129  if (m_rank != 0 && message.messageType == ClusterMessageType::RequestEvent &&
130  m_world.iprobe().has_value()) {
131  return;
132  }
133 
134  // Select correct communicator
135  mpi3::communicator& comm =
136  (communicator == ClusterComm::EventData) ? m_datacom : m_world;
137  if (message.messageType == ClusterMessageType::Data &&
138  communicator != ClusterComm::EventData) {
140  "Event data should be sent with EventData communicator. "
141  "Dropping message");
142  return;
143  }
144 
145  message.source = m_rank;
146  const auto& [header, body] = message.wire_msg();
147  comm.send_n(header.begin(), header.size(), destRank, 0);
148  if (body.has_value()) {
149  comm.send_n(body->begin(), body->size(), destRank, header[2]);
150  if (message.messageType == ClusterMessageType::Data) {
151  const ClusterMessage::WireMsgBody& bdy = *body;
152  // Decode the body to figure out what to send
153  char* ptr = reinterpret_cast<char*>((std::uint64_t(bdy[0]) << 32) +
154  std::uint64_t(bdy[1]));
155  std::size_t len = (std::uint64_t(bdy[2]) << 32) + std::uint64_t(bdy[3]);
156 
157  // Offset the tag by 16384 to minimize chance of conflict
158  // (max tag in MPI spec is 32767)
159  constexpr int tag_offset = 16384;
160  comm.send_n(ptr, len, destRank, header[2] + tag_offset);
161  }
162  }
163 }
164 
166  // Same offset as line 114
167  constexpr int tag_offset = 16384;
168  constexpr std::uint64_t thirtytwo_ones = 0xFFFFFFFF;
169 
170  // Select correct communicator
171  mpi3::communicator& comm =
172  (communicator == ClusterComm::EventData) ? m_datacom : m_world;
174  auto&& [head, body] = msg;
175  comm.receive_n(head.begin(), head.size());
176  // Only time we need to figure out ourselves whether there's a body
181  comm.receive_n(body->begin(), body->size(), head[1], head[2]);
182  if (head[0] == int(ClusterMessageType::Data)) {
183  ClusterMessage::WireMsgBody& bdy = *body;
184  // Decode the body to figure out what to recieve
185  std::size_t len = (std::uint64_t(bdy[2]) << 32) + std::uint64_t(bdy[3]);
186  std::size_t align = (std::uint64_t(bdy[4]) << 32) + std::uint64_t(bdy[5]);
187 
188  char* ptr = static_cast<char*>(std::aligned_alloc(align, len));
189  comm.receive_n(ptr, len, head[1], head[2] + tag_offset);
190 
191  // update the pointer in the WireMsgBody
192  bdy[0] = int(std::uint64_t(ptr) >> 32);
193  bdy[1] = int(std::uint64_t(ptr) & thirtytwo_ones);
194  }
195  }
197  ATH_MSG_DEBUG("Rank " << rank() << " received message from "
198  << message.source);
199  return message;
200 }
201 
202 void MPIClusterSvc::log_addEvent(int eventIdx, std::int64_t run_number,
203  std::int64_t event_number,
204  std::int64_t request_time_ns,
205  std::size_t slot) {
206  m_mpiLog_addEvent.run(eventIdx, run_number, event_number, m_rank,
207  request_time_ns,
208  m_inputFileSlotMap[slot]);
209 }
210 
212  std::int64_t event_number,
213  std::int64_t status) {
215 }
MPIClusterSvc::handle
virtual void handle(const Incident &inc) override
IIncidentListener handle.
Definition: MPIClusterSvc.cxx:82
ClusterMessageType::Data
@ Data
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
plotting.plot_kinematics.run_number
run_number
Definition: plot_kinematics.py:29
header
Definition: hcg.cxx:526
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
MPIClusterSvc::log_addEvent
virtual void log_addEvent(int eventIdx, std::int64_t run_number, std::int64_t event_number, std::int64_t request_time_ns, std::size_t slot) override final
Add (begin) an event in the log.
Definition: MPIClusterSvc.cxx:202
MPIClusterSvc::numRanks
virtual int numRanks() const override final
Return number of ranks.
Definition: MPIClusterSvc.cxx:106
ClusterMessageType::RequestEvent
@ RequestEvent
dbg::ptr
void * ptr(T *p)
Definition: SGImplSvc.cxx:74
MPIClusterSvc::sendMessage
virtual void sendMessage(int destRank, ClusterMessage message, ClusterComm communicator=ClusterComm::Default) override final
Send an MPI message.
Definition: MPIClusterSvc.cxx:123
ReweightUtils.message
message
Definition: ReweightUtils.py:15
MPIClusterSvc::m_mpiLog_addFile
SQLite::Statement m_mpiLog_addFile
Definition: MPIClusterSvc.h:90
MPIClusterSvc::abort
virtual void abort() override final
Abort the MPI run.
Definition: MPIClusterSvc.cxx:119
ClusterComm::EventData
@ EventData
ClusterMessage::WireMsgBody
std::array< int, 10 > WireMsgBody
Definition: ClusterMessage.h:37
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
MPIClusterSvc::finalize
virtual StatusCode finalize() override final
Finalize.
Definition: MPIClusterSvc.cxx:72
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
MPIClusterSvc.h
MPIClusterSvc::m_mpiLog_addEvent
SQLite::Statement m_mpiLog_addEvent
Definition: MPIClusterSvc.h:88
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
SQLite::Statement::run
ResultTypeWrapper< ReturnArgs... >::type run(ParamArgs... params)
Run the statement.
XXH.h
C++ native wrapper for the C xxhash API.
MPIClusterSvc::m_lastInputFileHash
std::int64_t m_lastInputFileHash
Definition: MPIClusterSvc.h:93
MPIClusterSvc::initialize
virtual StatusCode initialize() override final
Initialize.
Definition: MPIClusterSvc.cxx:11
ClusterMessageType::FinalWorkerStatus
@ FinalWorkerStatus
ClusterMessage
A class describing a message sent between nodes in a cluster.
Definition: ClusterMessage.h:30
head
std::string head(std::string s, const std::string &pattern)
head of a string
Definition: computils.cxx:312
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
MPIClusterSvc::waitReceiveMessage
virtual ClusterMessage waitReceiveMessage(ClusterComm communicator=ClusterComm::Default) override final
Block until we receive an MPI message.
Definition: MPIClusterSvc.cxx:165
ClusterMessageType::WorkerError
@ WorkerError
ClusterComm
ClusterComm
Definition: IMPIClusterSvc.h:18
MPIClusterSvc::rank
virtual int rank() const override final
Return our rank.
Definition: MPIClusterSvc.cxx:110
xxh3::hash64
std::uint64_t hash64(const void *data, std::size_t size)
Passthrough to XXH3_64bits.
Definition: XXH.cxx:9
MPIClusterSvc::m_mpiLog_completeEvent
SQLite::Statement m_mpiLog_completeEvent
Definition: MPIClusterSvc.h:89
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
ReadCalibFromCool.comm
comm
Definition: ReadCalibFromCool.py:440
MPIClusterSvc::m_inputFileSlotMap
std::map< std::size_t, std::int64_t > m_inputFileSlotMap
Definition: MPIClusterSvc.h:94
MPIClusterSvc::m_mpiLog
ServiceHandle< ISQLiteDBSvc > m_mpiLog
Definition: MPIClusterSvc.h:86
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
MPIClusterSvc::m_datacom
mpi3::communicator m_datacom
Definition: MPIClusterSvc.h:82
MPIClusterSvc::log_completeEvent
virtual void log_completeEvent(std::int64_t run_number, std::int64_t event_number, std::int64_t status) override final
Complete an event in the log.
Definition: MPIClusterSvc.cxx:211
ClusterMessage::WireMsg
std::tuple< WireMsgHdr, std::optional< WireMsgBody > > WireMsg
Definition: ClusterMessage.h:38
MPIClusterSvc::m_rank
int m_rank
Definition: MPIClusterSvc.h:83
MPIClusterSvc::m_world
mpi3::communicator m_world
Definition: MPIClusterSvc.h:80
merge.status
status
Definition: merge.py:16
jobOptions.fileName
fileName
Definition: jobOptions.SuperChic_ALP2.py:39
makeTOC.header
header
Definition: makeTOC.py:28
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
MPIClusterSvc::barrier
virtual void barrier() override final
Insert a barrier No rank will continue until all ranks reach this point.
Definition: MPIClusterSvc.cxx:114
ServiceHandle< IIncidentSvc >
MPIClusterSvc::m_env
std::unique_ptr< mpi3::environment > m_env
Definition: MPIClusterSvc.h:79