ATLAS Offline Software
Public Member Functions | Private Attributes | List of all members
MPIClusterSvc Class Reference

A service managing communications within a cluster using MPI. More...

#include <MPIClusterSvc.h>

Inheritance diagram for MPIClusterSvc:
Collaboration diagram for MPIClusterSvc:

Public Member Functions

 MPIClusterSvc (const std::string &name, ISvcLocator *svcLoc)
 Constructor. More...
 
virtual StatusCode initialize () override final
 Initialize. More...
 
virtual StatusCode finalize () override final
 Finalize. More...
 
virtual void handle (const Incident &inc) override
 IIncidentListener handle. More...
 
virtual int numRanks () const override final
 Return number of ranks. More...
 
virtual int rank () const override final
 Return our rank. More...
 
virtual void barrier () override final
 Insert a barrier No rank will continue until all ranks reach this point. More...
 
virtual void abort () override final
 Abort the MPI run. More...
 
virtual void sendMessage (int destRank, ClusterMessage message, ClusterComm communicator=ClusterComm::Default) override final
 Send an MPI message. More...
 
virtual ClusterMessage waitReceiveMessage (ClusterComm communicator=ClusterComm::Default) override final
 Block until we receive an MPI message. More...
 
virtual mpi3::communicator & data_communicator () override final
 Return the data communicator. More...
 
virtual void log_addEvent (int eventIdx, std::int64_t run_number, std::int64_t event_number, std::int64_t request_time_ns, std::size_t slot) override final
 Add (begin) an event in the log. More...
 
virtual void log_completeEvent (std::int64_t run_number, std::int64_t event_number, std::int64_t status) override final
 Complete an event in the log. More...
 

Private Attributes

std::unique_ptr< mpi3::environment > m_env
 
mpi3::communicator m_world
 
mpi3::communicator m_datacom
 
int m_rank = -1
 
ServiceHandle< ISQLiteDBSvcm_mpiLog
 
SQLite::Statement m_mpiLog_addEvent
 
SQLite::Statement m_mpiLog_completeEvent
 
SQLite::Statement m_mpiLog_addFile
 
std::int64_t m_lastInputFileHash {}
 
std::map< std::size_t, std::int64_t > m_inputFileSlotMap {}
 

Detailed Description

A service managing communications within a cluster using MPI.

Definition at line 26 of file MPIClusterSvc.h.

Constructor & Destructor Documentation

◆ MPIClusterSvc()

MPIClusterSvc::MPIClusterSvc ( const std::string &  name,
ISvcLocator *  svcLoc 
)
inline

Constructor.

Definition at line 29 of file MPIClusterSvc.h.

30  : extends(name, svcLoc) {}

Member Function Documentation

◆ abort()

void MPIClusterSvc::abort ( )
finaloverridevirtual

Abort the MPI run.

Definition at line 119 of file MPIClusterSvc.cxx.

119  {
120  m_world.abort();
121 }

◆ barrier()

void MPIClusterSvc::barrier ( )
finaloverridevirtual

Insert a barrier No rank will continue until all ranks reach this point.

Definition at line 114 of file MPIClusterSvc.cxx.

114  {
115  ATH_MSG_DEBUG("Barrier on rank " << rank() << " of " << numRanks());
116  m_world.barrier();
117 }

◆ data_communicator()

virtual mpi3::communicator& MPIClusterSvc::data_communicator ( )
inlinefinaloverridevirtual

Return the data communicator.

Definition at line 64 of file MPIClusterSvc.h.

64  {
65  return m_datacom;
66  }

◆ finalize()

StatusCode MPIClusterSvc::finalize ( )
finaloverridevirtual

Finalize.

Definition at line 72 of file MPIClusterSvc.cxx.

72  {
73  m_mpiLog
74  ->createStatement(
75  "UPDATE ranks SET end_time = julianday('now') WHERE rank = ?1")
76  .run(m_rank);
77  m_env.reset(nullptr);
78  return StatusCode::SUCCESS;
79 }

◆ handle()

void MPIClusterSvc::handle ( const Incident &  inc)
overridevirtual

IIncidentListener handle.

Handles BeginInputFile to keep track of which input file an event came from.

Definition at line 82 of file MPIClusterSvc.cxx.

82  {
83  // Fill in slot map at start of every event
84  if (inc.type() == IncidentType::BeginProcessing) {
85  const std::size_t slot = Gaudi::Hive::currentContext().slot();
87  }
88 
89  // Cache new input filename on start of every file
90  if (inc.type() == IncidentType::BeginInputFile) {
91  const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
92  if (fileInc == nullptr) {
93  ATH_MSG_ERROR("BeginInputFile does not have a file name attached");
94  return;
95  }
96 
97  const std::string fileName = fileInc->fileName();
98  // Convert the hash into a signed int64. Just a hash so this doesn't matter.
99  m_lastInputFileHash = static_cast<std::int64_t>(xxh3::hash64(fileName));
101  }
102  return;
103 }

◆ initialize()

StatusCode MPIClusterSvc::initialize ( )
finaloverridevirtual

Initialize.

Definition at line 11 of file MPIClusterSvc.cxx.

11  {
12  ATH_MSG_DEBUG("Initializing MPI");
13  m_env = std::make_unique<mpi3::environment>(mpi3::thread_level::single);
14  ATH_MSG_DEBUG("Created MPI environment");
15  m_world = m_env->world();
16  m_datacom =
17  m_world.duplicate(); // make a duplicate communicator for event data
18  ATH_MSG_DEBUG("Got MPI_COMM_WORLD");
19  m_rank = m_world.rank();
20  ATH_MSG_INFO("On MPI rank " << m_rank);
21 
22  ATH_CHECK(m_mpiLog.retrieve());
23  m_mpiLog->createStatement("PRAGMA foreign_keys = ON").run();
24 
25  m_mpiLog
26  ->createStatement(
27  "CREATE TABLE ranks (rank INTEGER PRIMARY KEY, "
28  "node TEXT, start_time FLOAT, end_time FLOAT)")
29  .run();
30  m_mpiLog
31  ->createStatement(
32  "INSERT INTO ranks (rank, node, start_time) "
33  "VALUES(?1, ?2, julianday('now'))")
34  .run(m_rank, m_env->processor_name());
35  m_mpiLog->createStatement(
36  "CREATE TABLE files (fileId INTEGER PRIMARY KEY, fileName TEXT)")
37  .run();
38  m_mpiLog
39  ->createStatement(
40  "CREATE TABLE event_log (rank INTEGER, id INTEGER UNIQUE,"
41  "inputFileId INTEGER,"
42  "runNumber INTEGER, eventNumber INTEGER, complete INTEGER,"
43  "status INTEGER, request_time_ns INTEGER, start_time FLOAT,"
44  "end_time FLOAT, PRIMARY KEY (runNumber, eventNumber), "
45  "FOREIGN KEY (rank) REFERENCES ranks(rank),"
46  "FOREIGN KEY (inputFileId) REFERENCES files(fileId))")
47  .run();
48  m_mpiLog_addEvent = m_mpiLog->createStatement(
49  "INSERT INTO event_log(id, rank, inputFileId, runNumber, eventNumber, complete, "
50  "start_time, request_time_ns) "
51  "VALUES(?1, ?4, ?6, ?2, ?3, 0, julianday('now'), ?5)");
52  m_mpiLog_completeEvent = m_mpiLog->createStatement(
53  "UPDATE event_log SET complete = 1, status = ?3, end_time = "
54  "julianday('now') WHERE runNumber = ?1 "
55  "AND "
56  "eventNumber = ?2");
57  m_mpiLog_addFile = m_mpiLog->createStatement(
58  "INSERT INTO files (fileId, fileName) VALUES(?1, ?2)");
59 
60  // Set up incident listener
61  ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name());
62  if (!incsvc.retrieve().isSuccess()) {
63  ATH_MSG_FATAL("Cannot get IncidentSvc.");
64  return(StatusCode::FAILURE);
65  }
66  incsvc->addListener(this, IncidentType::BeginInputFile, 100);
67  incsvc->addListener(this, IncidentType::BeginProcessing, 100);
68 
69  return StatusCode::SUCCESS;
70 }

◆ log_addEvent()

void MPIClusterSvc::log_addEvent ( int  eventIdx,
std::int64_t  run_number,
std::int64_t  event_number,
std::int64_t  request_time_ns,
std::size_t  slot 
)
finaloverridevirtual

Add (begin) an event in the log.

Definition at line 202 of file MPIClusterSvc.cxx.

205  {
206  m_mpiLog_addEvent.run(eventIdx, run_number, event_number, m_rank,
207  request_time_ns,
208  m_inputFileSlotMap[slot]);
209 }

◆ log_completeEvent()

void MPIClusterSvc::log_completeEvent ( std::int64_t  run_number,
std::int64_t  event_number,
std::int64_t  status 
)
finaloverridevirtual

Complete an event in the log.

Definition at line 211 of file MPIClusterSvc.cxx.

213  {
215 }

◆ numRanks()

int MPIClusterSvc::numRanks ( ) const
finaloverridevirtual

Return number of ranks.

Definition at line 106 of file MPIClusterSvc.cxx.

106  {
107  return m_world.size();
108 }

◆ rank()

int MPIClusterSvc::rank ( ) const
finaloverridevirtual

Return our rank.

Definition at line 110 of file MPIClusterSvc.cxx.

110  {
111  return m_rank;
112 }

◆ sendMessage()

void MPIClusterSvc::sendMessage ( int  destRank,
ClusterMessage  message,
ClusterComm  communicator = ClusterComm::Default 
)
finaloverridevirtual

Send an MPI message.

Definition at line 123 of file MPIClusterSvc.cxx.

124  {
125  ATH_MSG_DEBUG("Sending message from rank " << rank() << " to " << destRank);
126  // Don't send event request message if we're not the master *and* we have a
127  // message waiting.
128  // Probably an emergency stop message
129  if (m_rank != 0 && message.messageType == ClusterMessageType::RequestEvent &&
130  m_world.iprobe().has_value()) {
131  return;
132  }
133 
134  // Select correct communicator
135  mpi3::communicator& comm =
136  (communicator == ClusterComm::EventData) ? m_datacom : m_world;
137  if (message.messageType == ClusterMessageType::Data &&
138  communicator != ClusterComm::EventData) {
140  "Event data should be sent with EventData communicator. "
141  "Dropping message");
142  return;
143  }
144 
145  message.source = m_rank;
146  const auto& [header, body] = message.wire_msg();
147  comm.send_n(header.begin(), header.size(), destRank, 0);
148  if (body.has_value()) {
149  comm.send_n(body->begin(), body->size(), destRank, header[2]);
150  if (message.messageType == ClusterMessageType::Data) {
151  const ClusterMessage::WireMsgBody& bdy = *body;
152  // Decode the body to figure out what to send
153  char* ptr = reinterpret_cast<char*>((std::uint64_t(bdy[0]) << 32) +
154  std::uint64_t(bdy[1]));
155  std::size_t len = (std::uint64_t(bdy[2]) << 32) + std::uint64_t(bdy[3]);
156 
157  // Offset the tag by 16384 to minimize chance of conflict
158  // (max tag in MPI spec is 32767)
159  constexpr int tag_offset = 16384;
160  comm.send_n(ptr, len, destRank, header[2] + tag_offset);
161  }
162  }
163 }

◆ waitReceiveMessage()

ClusterMessage MPIClusterSvc::waitReceiveMessage ( ClusterComm  communicator = ClusterComm::Default)
finaloverridevirtual

Block until we receive an MPI message.

Definition at line 165 of file MPIClusterSvc.cxx.

165  {
166  // Same offset as line 114
167  constexpr int tag_offset = 16384;
168  constexpr std::uint64_t thirtytwo_ones = 0xFFFFFFFF;
169 
170  // Select correct communicator
171  mpi3::communicator& comm =
172  (communicator == ClusterComm::EventData) ? m_datacom : m_world;
174  auto&& [head, body] = msg;
175  comm.receive_n(head.begin(), head.size());
176  // Only time we need to figure out ourselves whether there's a body
181  comm.receive_n(body->begin(), body->size(), head[1], head[2]);
182  if (head[0] == int(ClusterMessageType::Data)) {
183  ClusterMessage::WireMsgBody& bdy = *body;
184  // Decode the body to figure out what to recieve
185  std::size_t len = (std::uint64_t(bdy[2]) << 32) + std::uint64_t(bdy[3]);
186  std::size_t align = (std::uint64_t(bdy[4]) << 32) + std::uint64_t(bdy[5]);
187 
188  char* ptr = static_cast<char*>(std::aligned_alloc(align, len));
189  comm.receive_n(ptr, len, head[1], head[2] + tag_offset);
190 
191  // update the pointer in the WireMsgBody
192  bdy[0] = int(std::uint64_t(ptr) >> 32);
193  bdy[1] = int(std::uint64_t(ptr) & thirtytwo_ones);
194  }
195  }
197  ATH_MSG_DEBUG("Rank " << rank() << " received message from "
198  << message.source);
199  return message;
200 }

Member Data Documentation

◆ m_datacom

mpi3::communicator MPIClusterSvc::m_datacom
private

Definition at line 82 of file MPIClusterSvc.h.

◆ m_env

std::unique_ptr<mpi3::environment> MPIClusterSvc::m_env
private

Definition at line 79 of file MPIClusterSvc.h.

◆ m_inputFileSlotMap

std::map<std::size_t, std::int64_t> MPIClusterSvc::m_inputFileSlotMap {}
private

Definition at line 94 of file MPIClusterSvc.h.

◆ m_lastInputFileHash

std::int64_t MPIClusterSvc::m_lastInputFileHash {}
private

Definition at line 93 of file MPIClusterSvc.h.

◆ m_mpiLog

ServiceHandle<ISQLiteDBSvc> MPIClusterSvc::m_mpiLog
private
Initial value:
{this, "LogDatabaseSvc", "",
"SQLiteDBSvc for the MPI event log"}

Definition at line 86 of file MPIClusterSvc.h.

◆ m_mpiLog_addEvent

SQLite::Statement MPIClusterSvc::m_mpiLog_addEvent
private

Definition at line 88 of file MPIClusterSvc.h.

◆ m_mpiLog_addFile

SQLite::Statement MPIClusterSvc::m_mpiLog_addFile
private

Definition at line 90 of file MPIClusterSvc.h.

◆ m_mpiLog_completeEvent

SQLite::Statement MPIClusterSvc::m_mpiLog_completeEvent
private

Definition at line 89 of file MPIClusterSvc.h.

◆ m_rank

int MPIClusterSvc::m_rank = -1
private

Definition at line 83 of file MPIClusterSvc.h.

◆ m_world

mpi3::communicator MPIClusterSvc::m_world
private

Definition at line 80 of file MPIClusterSvc.h.


The documentation for this class was generated from the following files:
ClusterMessageType::Data
@ Data
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
plotting.plot_kinematics.run_number
run_number
Definition: plot_kinematics.py:29
header
Definition: hcg.cxx:526
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
MPIClusterSvc::numRanks
virtual int numRanks() const override final
Return number of ranks.
Definition: MPIClusterSvc.cxx:106
ClusterMessageType::RequestEvent
@ RequestEvent
dbg::ptr
void * ptr(T *p)
Definition: SGImplSvc.cxx:74
ReweightUtils.message
message
Definition: ReweightUtils.py:15
MPIClusterSvc::m_mpiLog_addFile
SQLite::Statement m_mpiLog_addFile
Definition: MPIClusterSvc.h:90
ClusterComm::EventData
@ EventData
ClusterMessage::WireMsgBody
std::array< int, 10 > WireMsgBody
Definition: ClusterMessage.h:37
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
MPIClusterSvc::m_mpiLog_addEvent
SQLite::Statement m_mpiLog_addEvent
Definition: MPIClusterSvc.h:88
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
SQLite::Statement::run
ResultTypeWrapper< ReturnArgs... >::type run(ParamArgs... params)
Run the statement.
MPIClusterSvc::m_lastInputFileHash
std::int64_t m_lastInputFileHash
Definition: MPIClusterSvc.h:93
ClusterMessageType::FinalWorkerStatus
@ FinalWorkerStatus
ClusterMessage
A class describing a message sent between nodes in a cluster.
Definition: ClusterMessage.h:30
head
std::string head(std::string s, const std::string &pattern)
head of a string
Definition: computils.cxx:312
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
ClusterMessageType::WorkerError
@ WorkerError
MPIClusterSvc::rank
virtual int rank() const override final
Return our rank.
Definition: MPIClusterSvc.cxx:110
xxh3::hash64
std::uint64_t hash64(const void *data, std::size_t size)
Passthrough to XXH3_64bits.
Definition: XXH.cxx:9
MPIClusterSvc::m_mpiLog_completeEvent
SQLite::Statement m_mpiLog_completeEvent
Definition: MPIClusterSvc.h:89
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
ReadCalibFromCool.comm
comm
Definition: ReadCalibFromCool.py:440
MPIClusterSvc::m_inputFileSlotMap
std::map< std::size_t, std::int64_t > m_inputFileSlotMap
Definition: MPIClusterSvc.h:94
MPIClusterSvc::m_mpiLog
ServiceHandle< ISQLiteDBSvc > m_mpiLog
Definition: MPIClusterSvc.h:86
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
MPIClusterSvc::m_datacom
mpi3::communicator m_datacom
Definition: MPIClusterSvc.h:82
ClusterMessage::WireMsg
std::tuple< WireMsgHdr, std::optional< WireMsgBody > > WireMsg
Definition: ClusterMessage.h:38
MPIClusterSvc::m_rank
int m_rank
Definition: MPIClusterSvc.h:83
MPIClusterSvc::m_world
mpi3::communicator m_world
Definition: MPIClusterSvc.h:80
merge.status
status
Definition: merge.py:16
jobOptions.fileName
fileName
Definition: jobOptions.SuperChic_ALP2.py:39
makeTOC.header
header
Definition: makeTOC.py:28
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
ServiceHandle< IIncidentSvc >
MPIClusterSvc::m_env
std::unique_ptr< mpi3::environment > m_env
Definition: MPIClusterSvc.h:79