ATLAS Offline Software
Public Member Functions | Private Attributes | List of all members
MPIClusterSvc Class Reference

A service managing communications within a cluster using MPI. More...

#include <MPIClusterSvc.h>

Inheritance diagram for MPIClusterSvc:
Collaboration diagram for MPIClusterSvc:

Public Member Functions

 MPIClusterSvc (const std::string &name, ISvcLocator *svcLoc)
 Constructor. More...
 
virtual StatusCode initialize () override final
 Initialize. More...
 
virtual StatusCode finalize () override final
 Finalize. More...
 
virtual int numRanks () const override final
 Return number of ranks. More...
 
virtual int rank () const override final
 Return our rank. More...
 
virtual void barrier () override final
 Insert a barrier No rank will continue until all ranks reach this point. More...
 
virtual void abort () override final
 Abort the MPI run. More...
 
virtual void sendMessage (int destRank, ClusterMessage message, ClusterComm communicator=ClusterComm::Default) override final
 Send an MPI message. More...
 
virtual ClusterMessage waitReceiveMessage (ClusterComm communicator=ClusterComm::Default) override final
 Block until we receive an MPI message. More...
 
virtual mpi3::communicator & data_communicator () override final
 Return the data communicator. More...
 
virtual void log_addEvent (int eventIdx, std::int64_t run_number, std::int64_t event_number, std::int64_t request_time_ns) override final
 Add (begin) an event in the log. More...
 
virtual void log_completeEvent (std::int64_t run_number, std::int64_t event_number, std::int64_t status) override final
 Complete an event in the log. More...
 

Private Attributes

std::unique_ptr< mpi3::environment > m_env
 
mpi3::communicator m_world
 
mpi3::communicator m_datacom
 
int m_rank = -1
 
ServiceHandle< ISQLiteDBSvcm_mpiLog
 
SQLite::Statement m_mpiLog_addEvent
 
SQLite::Statement m_mpiLog_completeEvent
 

Detailed Description

A service managing communications within a cluster using MPI.

Definition at line 23 of file MPIClusterSvc.h.

Constructor & Destructor Documentation

◆ MPIClusterSvc()

MPIClusterSvc::MPIClusterSvc ( const std::string &  name,
ISvcLocator *  svcLoc 
)
inline

Constructor.

Definition at line 26 of file MPIClusterSvc.h.

27  : extends(name, svcLoc) {}

Member Function Documentation

◆ abort()

void MPIClusterSvc::abort ( )
finaloverridevirtual

Abort the MPI run.

Definition at line 74 of file MPIClusterSvc.cxx.

74  {
75  m_world.abort();
76 }

◆ barrier()

void MPIClusterSvc::barrier ( )
finaloverridevirtual

Insert a barrier No rank will continue until all ranks reach this point.

Definition at line 69 of file MPIClusterSvc.cxx.

69  {
70  ATH_MSG_DEBUG("Barrier on rank " << rank() << " of " << numRanks());
71  m_world.barrier();
72 }

◆ data_communicator()

virtual mpi3::communicator& MPIClusterSvc::data_communicator ( )
inlinefinaloverridevirtual

Return the data communicator.

Definition at line 58 of file MPIClusterSvc.h.

58  {
59  return m_datacom;
60  }

◆ finalize()

StatusCode MPIClusterSvc::finalize ( )
finaloverridevirtual

Finalize.

Definition at line 52 of file MPIClusterSvc.cxx.

52  {
53  m_mpiLog
54  ->createStatement(
55  "UPDATE ranks SET end_time = julianday('now') WHERE rank = ?1")
56  .run(m_rank);
57  m_env.reset(nullptr);
58  return StatusCode::SUCCESS;
59 }

◆ initialize()

StatusCode MPIClusterSvc::initialize ( )
finaloverridevirtual

Initialize.

Definition at line 8 of file MPIClusterSvc.cxx.

8  {
9  ATH_MSG_DEBUG("Initializing MPI");
10  m_env = std::make_unique<mpi3::environment>(mpi3::thread_level::single);
11  ATH_MSG_DEBUG("Created MPI environment");
12  m_world = m_env->world();
13  m_datacom =
14  m_world.duplicate(); // make a duplicate communicator for event data
15  ATH_MSG_DEBUG("Got MPI_COMM_WORLD");
16  m_rank = m_world.rank();
17  ATH_MSG_INFO("On MPI rank " << m_rank);
18 
19  ATH_CHECK(m_mpiLog.retrieve());
20  m_mpiLog->createStatement("PRAGMA foreign_keys = ON").run();
21 
22  m_mpiLog
23  ->createStatement(
24  "CREATE TABLE ranks (rank INTEGER PRIMARY KEY, "
25  "node TEXT, start_time FLOAT, end_time FLOAT)")
26  .run();
27  m_mpiLog
28  ->createStatement(
29  "INSERT INTO ranks (rank, node, start_time) "
30  "VALUES(?1, ?2, julianday('now'))")
31  .run(m_rank, m_env->processor_name());
32  m_mpiLog
33  ->createStatement(
34  "CREATE TABLE event_log (rank INTEGER, id INTEGER UNIQUE,"
35  "runNumber INTEGER, eventNumber INTEGER, complete INTEGER,"
36  "status INTEGER, request_time_ns INTEGER, start_time FLOAT,"
37  "end_time FLOAT, PRIMARY KEY (runNumber, eventNumber), "
38  "FOREIGN KEY (rank) REFERENCES ranks(rank))")
39  .run();
40  m_mpiLog_addEvent = m_mpiLog->createStatement(
41  "INSERT INTO event_log(id, rank, runNumber, eventNumber, complete, "
42  "start_time, request_time_ns) "
43  "VALUES(?1, ?4, ?2, ?3, 0, julianday('now'), ?5)");
44  m_mpiLog_completeEvent = m_mpiLog->createStatement(
45  "UPDATE event_log SET complete = 1, status = ?3, end_time = "
46  "julianday('now') WHERE runNumber = ?1 "
47  "AND "
48  "eventNumber = ?2");
49  return StatusCode::SUCCESS;
50 }

◆ log_addEvent()

void MPIClusterSvc::log_addEvent ( int  eventIdx,
std::int64_t  run_number,
std::int64_t  event_number,
std::int64_t  request_time_ns 
)
finaloverridevirtual

Add (begin) an event in the log.

Definition at line 157 of file MPIClusterSvc.cxx.

159  {
160  m_mpiLog_addEvent.run(eventIdx, run_number, event_number, m_rank,
161  request_time_ns);
162 }

◆ log_completeEvent()

void MPIClusterSvc::log_completeEvent ( std::int64_t  run_number,
std::int64_t  event_number,
std::int64_t  status 
)
finaloverridevirtual

Complete an event in the log.

Definition at line 164 of file MPIClusterSvc.cxx.

166  {
168 }

◆ numRanks()

int MPIClusterSvc::numRanks ( ) const
finaloverridevirtual

Return number of ranks.

Definition at line 61 of file MPIClusterSvc.cxx.

61  {
62  return m_world.size();
63 }

◆ rank()

int MPIClusterSvc::rank ( ) const
finaloverridevirtual

Return our rank.

Definition at line 65 of file MPIClusterSvc.cxx.

65  {
66  return m_rank;
67 }

◆ sendMessage()

void MPIClusterSvc::sendMessage ( int  destRank,
ClusterMessage  message,
ClusterComm  communicator = ClusterComm::Default 
)
finaloverridevirtual

Send an MPI message.

Definition at line 78 of file MPIClusterSvc.cxx.

79  {
80  ATH_MSG_DEBUG("Sending message from rank " << rank() << " to " << destRank);
81  // Don't send event request message if we're not the master *and* we have a
82  // message waiting.
83  // Probably an emergency stop message
84  if (m_rank != 0 && message.messageType == ClusterMessageType::RequestEvent &&
85  m_world.iprobe().has_value()) {
86  return;
87  }
88 
89  // Select correct communicator
90  mpi3::communicator& comm =
91  (communicator == ClusterComm::EventData) ? m_datacom : m_world;
92  if (message.messageType == ClusterMessageType::Data &&
93  communicator != ClusterComm::EventData) {
95  "Event data should be sent with EventData communicator. "
96  "Dropping message");
97  return;
98  }
99 
100  message.source = m_rank;
101  const auto& [header, body] = message.wire_msg();
102  comm.send_n(header.begin(), header.size(), destRank, 0);
103  if (body.has_value()) {
104  comm.send_n(body->begin(), body->size(), destRank, header[2]);
105  if (message.messageType == ClusterMessageType::Data) {
106  const ClusterMessage::WireMsgBody& bdy = *body;
107  // Decode the body to figure out what to send
108  char* ptr = reinterpret_cast<char*>((std::uint64_t(bdy[0]) << 32) +
109  std::uint64_t(bdy[1]));
110  std::size_t len = (std::uint64_t(bdy[2]) << 32) + std::uint64_t(bdy[3]);
111 
112  // Offset the tag by 16384 to minimize chance of conflict
113  // (max tag in MPI spec is 32767)
114  constexpr int tag_offset = 16384;
115  comm.send_n(ptr, len, destRank, header[2] + tag_offset);
116  }
117  }
118 }

◆ waitReceiveMessage()

ClusterMessage MPIClusterSvc::waitReceiveMessage ( ClusterComm  communicator = ClusterComm::Default)
finaloverridevirtual

Block until we receive an MPI message.

Definition at line 120 of file MPIClusterSvc.cxx.

120  {
121  // Same offset as line 114
122  constexpr int tag_offset = 16384;
123  constexpr std::uint64_t thirtytwo_ones = 0xFFFFFFFF;
124 
125  // Select correct communicator
126  mpi3::communicator& comm =
127  (communicator == ClusterComm::EventData) ? m_datacom : m_world;
129  auto&& [head, body] = msg;
130  comm.receive_n(head.begin(), head.size());
131  // Only time we need to figure out ourselves whether there's a body
136  comm.receive_n(body->begin(), body->size(), head[1], head[2]);
137  if (head[0] == int(ClusterMessageType::Data)) {
138  ClusterMessage::WireMsgBody& bdy = *body;
139  // Decode the body to figure out what to recieve
140  std::size_t len = (std::uint64_t(bdy[2]) << 32) + std::uint64_t(bdy[3]);
141  std::size_t align = (std::uint64_t(bdy[4]) << 32) + std::uint64_t(bdy[5]);
142 
143  char* ptr = static_cast<char*>(std::aligned_alloc(align, len));
144  comm.receive_n(ptr, len, head[1], head[2] + tag_offset);
145 
146  // update the pointer in the WireMsgBody
147  bdy[0] = int(std::uint64_t(ptr) >> 32);
148  bdy[1] = int(std::uint64_t(ptr) & thirtytwo_ones);
149  }
150  }
152  ATH_MSG_DEBUG("Rank " << rank() << " received message from "
153  << message.source);
154  return message;
155 }

Member Data Documentation

◆ m_datacom

mpi3::communicator MPIClusterSvc::m_datacom
private

Definition at line 75 of file MPIClusterSvc.h.

◆ m_env

std::unique_ptr<mpi3::environment> MPIClusterSvc::m_env
private

Definition at line 72 of file MPIClusterSvc.h.

◆ m_mpiLog

ServiceHandle<ISQLiteDBSvc> MPIClusterSvc::m_mpiLog
private
Initial value:
{this, "LogDatabaseSvc", "",
"SQLiteDBSvc for the MPI event log"}

Definition at line 79 of file MPIClusterSvc.h.

◆ m_mpiLog_addEvent

SQLite::Statement MPIClusterSvc::m_mpiLog_addEvent
private

Definition at line 81 of file MPIClusterSvc.h.

◆ m_mpiLog_completeEvent

SQLite::Statement MPIClusterSvc::m_mpiLog_completeEvent
private

Definition at line 82 of file MPIClusterSvc.h.

◆ m_rank

int MPIClusterSvc::m_rank = -1
private

Definition at line 76 of file MPIClusterSvc.h.

◆ m_world

mpi3::communicator MPIClusterSvc::m_world
private

Definition at line 73 of file MPIClusterSvc.h.


The documentation for this class was generated from the following files:
ClusterMessageType::Data
@ Data
plotting.plot_kinematics.run_number
run_number
Definition: plot_kinematics.py:29
header
Definition: hcg.cxx:526
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
MPIClusterSvc::numRanks
virtual int numRanks() const override final
Return number of ranks.
Definition: MPIClusterSvc.cxx:61
ClusterMessageType::RequestEvent
@ RequestEvent
dbg::ptr
void * ptr(T *p)
Definition: SGImplSvc.cxx:74
ReweightUtils.message
message
Definition: ReweightUtils.py:15
ClusterComm::EventData
@ EventData
ClusterMessage::WireMsgBody
std::array< int, 10 > WireMsgBody
Definition: ClusterMessage.h:37
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
MPIClusterSvc::m_mpiLog_addEvent
SQLite::Statement m_mpiLog_addEvent
Definition: MPIClusterSvc.h:81
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
SQLite::Statement::run
ResultTypeWrapper< ReturnArgs... >::type run(ParamArgs... params)
Run the statement.
ClusterMessageType::FinalWorkerStatus
@ FinalWorkerStatus
ClusterMessage
A class describing a message sent between nodes in a cluster.
Definition: ClusterMessage.h:30
head
std::string head(std::string s, const std::string &pattern)
head of a string
Definition: computils.cxx:312
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
ClusterMessageType::WorkerError
@ WorkerError
MPIClusterSvc::rank
virtual int rank() const override final
Return our rank.
Definition: MPIClusterSvc.cxx:65
MPIClusterSvc::m_mpiLog_completeEvent
SQLite::Statement m_mpiLog_completeEvent
Definition: MPIClusterSvc.h:82
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
ReadCalibFromCool.comm
comm
Definition: ReadCalibFromCool.py:440
MPIClusterSvc::m_mpiLog
ServiceHandle< ISQLiteDBSvc > m_mpiLog
Definition: MPIClusterSvc.h:79
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
MPIClusterSvc::m_datacom
mpi3::communicator m_datacom
Definition: MPIClusterSvc.h:75
ClusterMessage::WireMsg
std::tuple< WireMsgHdr, std::optional< WireMsgBody > > WireMsg
Definition: ClusterMessage.h:38
MPIClusterSvc::m_rank
int m_rank
Definition: MPIClusterSvc.h:76
MPIClusterSvc::m_world
mpi3::communicator m_world
Definition: MPIClusterSvc.h:73
merge.status
status
Definition: merge.py:16
makeTOC.header
header
Definition: makeTOC.py:28
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
MPIClusterSvc::m_env
std::unique_ptr< mpi3::environment > m_env
Definition: MPIClusterSvc.h:72