A service managing communications within a cluster using MPI.
More...
#include <MPIClusterSvc.h>
A service managing communications within a cluster using MPI.
Definition at line 26 of file MPIClusterSvc.h.
◆ MPIClusterSvc()
| MPIClusterSvc::MPIClusterSvc |
( |
const std::string & |
name, |
|
|
ISvcLocator * |
svcLoc |
|
) |
| |
|
inline |
◆ abort()
| void MPIClusterSvc::abort |
( |
| ) |
|
|
finaloverridevirtual |
◆ barrier()
| void MPIClusterSvc::barrier |
( |
| ) |
|
|
finaloverridevirtual |
Insert a barrier No rank will continue until all ranks reach this point.
Definition at line 114 of file MPIClusterSvc.cxx.
◆ data_communicator()
| virtual mpi3::communicator& MPIClusterSvc::data_communicator |
( |
| ) |
|
|
inlinefinaloverridevirtual |
◆ finalize()
| StatusCode MPIClusterSvc::finalize |
( |
| ) |
|
|
finaloverridevirtual |
Finalize.
Definition at line 72 of file MPIClusterSvc.cxx.
75 "UPDATE ranks SET end_time = julianday('now') WHERE rank = ?1")
78 return StatusCode::SUCCESS;
◆ handle()
| void MPIClusterSvc::handle |
( |
const Incident & |
inc | ) |
|
|
overridevirtual |
IIncidentListener handle.
Handles BeginInputFile to keep track of which input file an event came from.
Definition at line 82 of file MPIClusterSvc.cxx.
84 if (inc.type() == IncidentType::BeginProcessing) {
85 const std::size_t slot = Gaudi::Hive::currentContext().slot();
90 if (inc.type() == IncidentType::BeginInputFile) {
91 const FileIncident* fileInc =
dynamic_cast<const FileIncident*
>(&inc);
92 if (fileInc ==
nullptr) {
93 ATH_MSG_ERROR(
"BeginInputFile does not have a file name attached");
97 const std::string
fileName = fileInc->fileName();
◆ initialize()
| StatusCode MPIClusterSvc::initialize |
( |
| ) |
|
|
finaloverridevirtual |
Initialize.
Definition at line 11 of file MPIClusterSvc.cxx.
13 m_env = std::make_unique<mpi3::environment>(mpi3::thread_level::single);
23 m_mpiLog->createStatement(
"PRAGMA foreign_keys = ON").run();
27 "CREATE TABLE ranks (rank INTEGER PRIMARY KEY, "
28 "node TEXT, start_time FLOAT, end_time FLOAT)")
32 "INSERT INTO ranks (rank, node, start_time) "
33 "VALUES(?1, ?2, julianday('now'))")
36 "CREATE TABLE files (fileId INTEGER PRIMARY KEY, fileName TEXT)")
40 "CREATE TABLE event_log (rank INTEGER, id INTEGER UNIQUE,"
41 "inputFileId INTEGER,"
42 "runNumber INTEGER, eventNumber INTEGER, complete INTEGER,"
43 "status INTEGER, request_time_ns INTEGER, start_time FLOAT,"
44 "end_time FLOAT, PRIMARY KEY (runNumber, eventNumber), "
45 "FOREIGN KEY (rank) REFERENCES ranks(rank),"
46 "FOREIGN KEY (inputFileId) REFERENCES files(fileId))")
49 "INSERT INTO event_log(id, rank, inputFileId, runNumber, eventNumber, complete, "
50 "start_time, request_time_ns) "
51 "VALUES(?1, ?4, ?6, ?2, ?3, 0, julianday('now'), ?5)");
53 "UPDATE event_log SET complete = 1, status = ?3, end_time = "
54 "julianday('now') WHERE runNumber = ?1 "
58 "INSERT INTO files (fileId, fileName) VALUES(?1, ?2)");
62 if (!incsvc.retrieve().isSuccess()) {
64 return(StatusCode::FAILURE);
66 incsvc->addListener(
this, IncidentType::BeginInputFile, 100);
67 incsvc->addListener(
this, IncidentType::BeginProcessing, 100);
69 return StatusCode::SUCCESS;
◆ log_addEvent()
| void MPIClusterSvc::log_addEvent |
( |
int |
eventIdx, |
|
|
std::int64_t |
run_number, |
|
|
std::int64_t |
event_number, |
|
|
std::int64_t |
request_time_ns, |
|
|
std::size_t |
slot |
|
) |
| |
|
finaloverridevirtual |
◆ log_completeEvent()
| void MPIClusterSvc::log_completeEvent |
( |
std::int64_t |
run_number, |
|
|
std::int64_t |
event_number, |
|
|
std::int64_t |
status |
|
) |
| |
|
finaloverridevirtual |
◆ numRanks()
| int MPIClusterSvc::numRanks |
( |
| ) |
const |
|
finaloverridevirtual |
◆ rank()
| int MPIClusterSvc::rank |
( |
| ) |
const |
|
finaloverridevirtual |
◆ sendMessage()
Send an MPI message.
Definition at line 123 of file MPIClusterSvc.cxx.
130 m_world.iprobe().has_value()) {
135 mpi3::communicator&
comm =
140 "Event data should be sent with EventData communicator. "
148 if (body.has_value()) {
149 comm.send_n(body->begin(), body->size(), destRank,
header[2]);
159 constexpr
int tag_offset = 16384;
◆ waitReceiveMessage()
Block until we receive an MPI message.
Definition at line 165 of file MPIClusterSvc.cxx.
167 constexpr
int tag_offset = 16384;
171 mpi3::communicator&
comm =
181 comm.receive_n(body->begin(), body->size(),
head[1],
head[2]);
188 char*
ptr =
static_cast<char*
>(std::aligned_alloc(align, len));
◆ m_datacom
| mpi3::communicator MPIClusterSvc::m_datacom |
|
private |
◆ m_env
| std::unique_ptr<mpi3::environment> MPIClusterSvc::m_env |
|
private |
◆ m_inputFileSlotMap
| std::map<std::size_t, std::int64_t> MPIClusterSvc::m_inputFileSlotMap {} |
|
private |
◆ m_lastInputFileHash
| std::int64_t MPIClusterSvc::m_lastInputFileHash {} |
|
private |
◆ m_mpiLog
Initial value:{this, "LogDatabaseSvc", "",
"SQLiteDBSvc for the MPI event log"}
Definition at line 86 of file MPIClusterSvc.h.
◆ m_mpiLog_addEvent
◆ m_mpiLog_addFile
◆ m_mpiLog_completeEvent
◆ m_rank
| int MPIClusterSvc::m_rank = -1 |
|
private |
◆ m_world
| mpi3::communicator MPIClusterSvc::m_world |
|
private |
The documentation for this class was generated from the following files: