ATLAS Offline Software
MPIClusterSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 #include "MPIClusterSvc.h"
5 
6 #include <boost/serialization/variant.hpp>
7 
9  ATH_MSG_DEBUG("Initializing MPI");
10  m_env = std::make_unique<mpi3::environment>(mpi3::thread_level::single);
11  ATH_MSG_DEBUG("Created MPI environment");
12  m_world = m_env->world();
13  m_datacom =
14  m_world.duplicate(); // make a duplicate communicator for event data
15  ATH_MSG_DEBUG("Got MPI_COMM_WORLD");
16  m_rank = m_world.rank();
17  ATH_MSG_INFO("On MPI rank " << m_rank);
18 
19  ATH_CHECK(m_mpiLog.retrieve());
20  m_mpiLog->createStatement("PRAGMA foreign_keys = ON").run();
21 
22  m_mpiLog
23  ->createStatement(
24  "CREATE TABLE ranks (rank INTEGER PRIMARY KEY, "
25  "node TEXT, start_time FLOAT, end_time FLOAT)")
26  .run();
27  m_mpiLog
28  ->createStatement(
29  "INSERT INTO ranks (rank, node, start_time) "
30  "VALUES(?1, ?2, julianday('now'))")
31  .run(m_rank, m_env->processor_name());
32  m_mpiLog
33  ->createStatement(
34  "CREATE TABLE event_log (rank INTEGER, id INTEGER UNIQUE,"
35  "runNumber INTEGER, eventNumber INTEGER, complete INTEGER,"
36  "status INTEGER, request_time_ns INTEGER, start_time FLOAT,"
37  "end_time FLOAT, PRIMARY KEY (runNumber, eventNumber), "
38  "FOREIGN KEY (rank) REFERENCES ranks(rank))")
39  .run();
40  m_mpiLog_addEvent = m_mpiLog->createStatement(
41  "INSERT INTO event_log(id, rank, runNumber, eventNumber, complete, "
42  "start_time, request_time_ns) "
43  "VALUES(?1, ?4, ?2, ?3, 0, julianday('now'), ?5)");
44  m_mpiLog_completeEvent = m_mpiLog->createStatement(
45  "UPDATE event_log SET complete = 1, status = ?3, end_time = "
46  "julianday('now') WHERE runNumber = ?1 "
47  "AND "
48  "eventNumber = ?2");
49  return StatusCode::SUCCESS;
50 }
51 
53  m_mpiLog
54  ->createStatement(
55  "UPDATE ranks SET end_time = julianday('now') WHERE rank = ?1")
56  .run(m_rank);
57  m_env.reset(nullptr);
58  return StatusCode::SUCCESS;
59 }
60 
62  return m_world.size();
63 }
64 
65 int MPIClusterSvc::rank() const {
66  return m_rank;
67 }
68 
70  ATH_MSG_DEBUG("Barrier on rank " << rank() << " of " << numRanks());
71  m_world.barrier();
72 }
73 
75  m_world.abort();
76 }
77 
79  ClusterComm communicator) {
80  ATH_MSG_DEBUG("Sending message from rank " << rank() << " to " << destRank);
81  // Don't send event request message if we're not the master *and* we have a
82  // message waiting.
83  // Probably an emergency stop message
84  if (m_rank != 0 && message.messageType == ClusterMessageType::RequestEvent &&
85  m_world.iprobe().has_value()) {
86  return;
87  }
88 
89  // Select correct communicator
90  mpi3::communicator& comm =
91  (communicator == ClusterComm::EventData) ? m_datacom : m_world;
92  if (message.messageType == ClusterMessageType::Data &&
93  communicator != ClusterComm::EventData) {
95  "Event data should be sent with EventData communicator. "
96  "Dropping message");
97  return;
98  }
99 
100  message.source = m_rank;
101  const auto& [header, body] = message.wire_msg();
102  comm.send_n(header.begin(), header.size(), destRank, 0);
103  if (body.has_value()) {
104  comm.send_n(body->begin(), body->size(), destRank, header[2]);
105  if (message.messageType == ClusterMessageType::Data) {
106  const ClusterMessage::WireMsgBody& bdy = *body;
107  // Decode the body to figure out what to send
108  char* ptr = reinterpret_cast<char*>((std::uint64_t(bdy[0]) << 32) +
109  std::uint64_t(bdy[1]));
110  std::size_t len = (std::uint64_t(bdy[2]) << 32) + std::uint64_t(bdy[3]);
111 
112  // Offset the tag by 16384 to minimize chance of conflict
113  // (max tag in MPI spec is 32767)
114  constexpr int tag_offset = 16384;
115  comm.send_n(ptr, len, destRank, header[2] + tag_offset);
116  }
117  }
118 }
119 
121  // Same offset as line 114
122  constexpr int tag_offset = 16384;
123  constexpr std::uint64_t thirtytwo_ones = 0xFFFFFFFF;
124 
125  // Select correct communicator
126  mpi3::communicator& comm =
127  (communicator == ClusterComm::EventData) ? m_datacom : m_world;
129  auto&& [head, body] = msg;
130  comm.receive_n(head.begin(), head.size());
131  // Only time we need to figure out ourselves whether there's a body
136  comm.receive_n(body->begin(), body->size(), head[1], head[2]);
137  if (head[0] == int(ClusterMessageType::Data)) {
138  ClusterMessage::WireMsgBody& bdy = *body;
139  // Decode the body to figure out what to recieve
140  std::size_t len = (std::uint64_t(bdy[2]) << 32) + std::uint64_t(bdy[3]);
141  std::size_t align = (std::uint64_t(bdy[4]) << 32) + std::uint64_t(bdy[5]);
142 
143  char* ptr = static_cast<char*>(std::aligned_alloc(align, len));
144  comm.receive_n(ptr, len, head[1], head[2] + tag_offset);
145 
146  // update the pointer in the WireMsgBody
147  bdy[0] = int(std::uint64_t(ptr) >> 32);
148  bdy[1] = int(std::uint64_t(ptr) & thirtytwo_ones);
149  }
150  }
152  ATH_MSG_DEBUG("Rank " << rank() << " received message from "
153  << message.source);
154  return message;
155 }
156 
157 void MPIClusterSvc::log_addEvent(int eventIdx, std::int64_t run_number,
158  std::int64_t event_number,
159  std::int64_t request_time_ns) {
160  m_mpiLog_addEvent.run(eventIdx, run_number, event_number, m_rank,
161  request_time_ns);
162 }
163 
165  std::int64_t event_number,
166  std::int64_t status) {
168 }
ClusterMessageType::Data
@ Data
plotting.plot_kinematics.run_number
run_number
Definition: plot_kinematics.py:29
header
Definition: hcg.cxx:526
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
MPIClusterSvc::log_addEvent
virtual void log_addEvent(int eventIdx, std::int64_t run_number, std::int64_t event_number, std::int64_t request_time_ns) override final
Add (begin) an event in the log.
Definition: MPIClusterSvc.cxx:157
MPIClusterSvc::numRanks
virtual int numRanks() const override final
Return number of ranks.
Definition: MPIClusterSvc.cxx:61
ClusterMessageType::RequestEvent
@ RequestEvent
dbg::ptr
void * ptr(T *p)
Definition: SGImplSvc.cxx:74
MPIClusterSvc::sendMessage
virtual void sendMessage(int destRank, ClusterMessage message, ClusterComm communicator=ClusterComm::Default) override final
Send an MPI message.
Definition: MPIClusterSvc.cxx:78
ReweightUtils.message
message
Definition: ReweightUtils.py:15
MPIClusterSvc::abort
virtual void abort() override final
Abort the MPI run.
Definition: MPIClusterSvc.cxx:74
ClusterComm::EventData
@ EventData
ClusterMessage::WireMsgBody
std::array< int, 10 > WireMsgBody
Definition: ClusterMessage.h:37
MPIClusterSvc::finalize
virtual StatusCode finalize() override final
Finalize.
Definition: MPIClusterSvc.cxx:52
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
MPIClusterSvc.h
MPIClusterSvc::m_mpiLog_addEvent
SQLite::Statement m_mpiLog_addEvent
Definition: MPIClusterSvc.h:81
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
SQLite::Statement::run
ResultTypeWrapper< ReturnArgs... >::type run(ParamArgs... params)
Run the statement.
MPIClusterSvc::initialize
virtual StatusCode initialize() override final
Initialize.
Definition: MPIClusterSvc.cxx:8
ClusterMessageType::FinalWorkerStatus
@ FinalWorkerStatus
ClusterMessage
A class describing a message sent between nodes in a cluster.
Definition: ClusterMessage.h:30
head
std::string head(std::string s, const std::string &pattern)
head of a string
Definition: computils.cxx:312
MPIClusterSvc::waitReceiveMessage
virtual ClusterMessage waitReceiveMessage(ClusterComm communicator=ClusterComm::Default) override final
Block until we receive an MPI message.
Definition: MPIClusterSvc.cxx:120
ClusterMessageType::WorkerError
@ WorkerError
ClusterComm
ClusterComm
Definition: IMPIClusterSvc.h:18
MPIClusterSvc::rank
virtual int rank() const override final
Return our rank.
Definition: MPIClusterSvc.cxx:65
MPIClusterSvc::m_mpiLog_completeEvent
SQLite::Statement m_mpiLog_completeEvent
Definition: MPIClusterSvc.h:82
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
ReadCalibFromCool.comm
comm
Definition: ReadCalibFromCool.py:440
MPIClusterSvc::m_mpiLog
ServiceHandle< ISQLiteDBSvc > m_mpiLog
Definition: MPIClusterSvc.h:79
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
MPIClusterSvc::m_datacom
mpi3::communicator m_datacom
Definition: MPIClusterSvc.h:75
MPIClusterSvc::log_completeEvent
virtual void log_completeEvent(std::int64_t run_number, std::int64_t event_number, std::int64_t status) override final
Complete an event in the log.
Definition: MPIClusterSvc.cxx:164
ClusterMessage::WireMsg
std::tuple< WireMsgHdr, std::optional< WireMsgBody > > WireMsg
Definition: ClusterMessage.h:38
MPIClusterSvc::m_rank
int m_rank
Definition: MPIClusterSvc.h:76
MPIClusterSvc::m_world
mpi3::communicator m_world
Definition: MPIClusterSvc.h:73
merge.status
status
Definition: merge.py:16
makeTOC.header
header
Definition: makeTOC.py:28
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
MPIClusterSvc::barrier
virtual void barrier() override final
Insert a barrier No rank will continue until all ranks reach this point.
Definition: MPIClusterSvc.cxx:69
MPIClusterSvc::m_env
std::unique_ptr< mpi3::environment > m_env
Definition: MPIClusterSvc.h:72