7#include "GaudiKernel/FileIncident.h"
9#include <boost/serialization/variant.hpp>
13 m_env = std::make_unique<mpi3::environment>(mpi3::thread_level::single);
23 m_mpiLog->createStatement(
"PRAGMA foreign_keys = ON").run();
27 "CREATE TABLE ranks (rank INTEGER PRIMARY KEY, "
28 "node TEXT, start_time FLOAT, end_time FLOAT)")
32 "INSERT INTO ranks (rank, node, start_time) "
33 "VALUES(?1, ?2, julianday('now'))")
36 "CREATE TABLE files (fileId INTEGER PRIMARY KEY, fileName TEXT)")
40 "CREATE TABLE event_log (rank INTEGER, id INTEGER UNIQUE,"
41 "inputFileId INTEGER,"
42 "runNumber INTEGER, eventNumber INTEGER, complete INTEGER,"
43 "status INTEGER, request_time_ns INTEGER, start_time FLOAT,"
44 "end_time FLOAT, PRIMARY KEY (runNumber, eventNumber), "
45 "FOREIGN KEY (rank) REFERENCES ranks(rank),"
46 "FOREIGN KEY (inputFileId) REFERENCES files(fileId))")
49 "INSERT INTO event_log(id, rank, inputFileId, runNumber, eventNumber, complete, "
50 "start_time, request_time_ns) "
51 "VALUES(?1, ?4, ?6, ?2, ?3, 0, julianday('now'), ?5)");
53 "UPDATE event_log SET complete = 1, status = ?3, end_time = "
54 "julianday('now') WHERE runNumber = ?1 "
58 "INSERT INTO files (fileId, fileName) VALUES(?1, ?2)");
62 if (!incsvc.retrieve().isSuccess()) {
64 return(StatusCode::FAILURE);
66 incsvc->addListener(
this, IncidentType::BeginInputFile, 100);
67 incsvc->addListener(
this, IncidentType::BeginProcessing, 100);
69 return StatusCode::SUCCESS;
75 "UPDATE ranks SET end_time = julianday('now') WHERE rank = ?1")
78 return StatusCode::SUCCESS;
84 if (inc.type() == IncidentType::BeginProcessing) {
85 const std::size_t slot = Gaudi::Hive::currentContext().slot();
90 if (inc.type() == IncidentType::BeginInputFile) {
91 const FileIncident* fileInc =
dynamic_cast<const FileIncident*
>(&inc);
92 if (fileInc ==
nullptr) {
93 ATH_MSG_ERROR(
"BeginInputFile does not have a file name attached");
97 const std::string fileName = fileInc->fileName();
129 if (
m_rank != 0 && message.messageType == ClusterMessageType::RequestEvent &&
130 m_world.iprobe().has_value()) {
135 mpi3::communicator& comm =
137 if (message.messageType == ClusterMessageType::Data &&
140 "Event data should be sent with EventData communicator. "
146 const auto& [
header, body] = message.wire_msg();
147 comm.send_n(
header.begin(),
header.size(), destRank, 0);
148 if (body.has_value()) {
149 comm.send_n(body->begin(), body->size(), destRank,
header[2]);
150 if (message.messageType == ClusterMessageType::Data) {
153 char* ptr =
reinterpret_cast<char*
>((std::uint64_t(bdy[0]) << 32) +
154 std::uint64_t(bdy[1]));
155 std::size_t len = (std::uint64_t(bdy[2]) << 32) + std::uint64_t(bdy[3]);
159 constexpr int tag_offset = 16384;
160 comm.send_n(ptr, len, destRank,
header[2] + tag_offset);
167 constexpr int tag_offset = 16384;
168 constexpr std::uint64_t thirtytwo_ones = 0xFFFFFFFF;
171 mpi3::communicator& comm =
175 comm.receive_n(
head.begin(),
head.size());
177 if (
head[0] ==
int(ClusterMessageType::FinalWorkerStatus) ||
178 head[0] ==
int(ClusterMessageType::WorkerError) ||
179 head[0] ==
int(ClusterMessageType::Data)) {
181 comm.receive_n(body->begin(), body->size(),
head[1],
head[2]);
182 if (
head[0] ==
int(ClusterMessageType::Data)) {
185 std::size_t len = (std::uint64_t(bdy[2]) << 32) + std::uint64_t(bdy[3]);
186 std::size_t align = (std::uint64_t(bdy[4]) << 32) + std::uint64_t(bdy[5]);
188 char* ptr =
static_cast<char*
>(std::aligned_alloc(align, len));
189 comm.receive_n(ptr, len,
head[1],
head[2] + tag_offset);
192 bdy[0] = int(std::uint64_t(ptr) >> 32);
193 bdy[1] = int(std::uint64_t(ptr) & thirtytwo_ones);
203 std::int64_t event_number,
204 std::int64_t request_time_ns,
212 std::int64_t event_number,
213 std::int64_t status) {
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)
C++ native wrapper for the C xxhash API.
std::int64_t m_lastInputFileHash
virtual ClusterMessage waitReceiveMessage(ClusterComm communicator=ClusterComm::Default) override final
Block until we receive an MPI message.
virtual void barrier() override final
Insert a barrier No rank will continue until all ranks reach this point.
virtual void log_addEvent(int eventIdx, std::int64_t run_number, std::int64_t event_number, std::int64_t request_time_ns, std::size_t slot) override final
Add (begin) an event in the log.
SQLite::Statement m_mpiLog_addFile
std::unique_ptr< mpi3::environment > m_env
SQLite::Statement m_mpiLog_addEvent
virtual void handle(const Incident &inc) override
IIncidentListener handle.
virtual void abort() override final
Abort the MPI run.
virtual int rank() const override final
Return our rank.
virtual void log_completeEvent(std::int64_t run_number, std::int64_t event_number, std::int64_t status) override final
Complete an event in the log.
mpi3::communicator m_world
virtual StatusCode initialize() override final
Initialize.
mpi3::communicator m_datacom
virtual StatusCode finalize() override final
Finalize.
SQLite::Statement m_mpiLog_completeEvent
virtual void sendMessage(int destRank, ClusterMessage message, ClusterComm communicator=ClusterComm::Default) override final
Send an MPI message.
ServiceHandle< ISQLiteDBSvc > m_mpiLog
virtual int numRanks() const override final
Return number of ranks.
std::map< std::size_t, std::int64_t > m_inputFileSlotMap
std::string head(std::string s, const std::string &pattern)
head of a string
std::uint64_t hash64(const void *data, std::size_t size)
Passthrough to XXH3_64bits.
A class describing a message sent between nodes in a cluster.
std::array< int, 10 > WireMsgBody
std::tuple< WireMsgHdr, std::optional< WireMsgBody > > WireMsg