 |
ATLAS Offline Software
|
Go to the documentation of this file.
6 #include <boost/serialization/variant.hpp>
10 m_env = std::make_unique<mpi3::environment>(mpi3::thread_level::single);
20 m_mpiLog->createStatement(
"PRAGMA foreign_keys = ON").run();
24 "CREATE TABLE ranks (rank INTEGER PRIMARY KEY, "
25 "node TEXT, start_time FLOAT, end_time FLOAT)")
29 "INSERT INTO ranks (rank, node, start_time) "
30 "VALUES(?1, ?2, julianday('now'))")
34 "CREATE TABLE event_log (rank INTEGER, id INTEGER UNIQUE,"
35 "runNumber INTEGER, eventNumber INTEGER, complete INTEGER,"
36 "status INTEGER, request_time_ns INTEGER, start_time FLOAT,"
37 "end_time FLOAT, PRIMARY KEY (runNumber, eventNumber), "
38 "FOREIGN KEY (rank) REFERENCES ranks(rank))")
41 "INSERT INTO event_log(id, rank, runNumber, eventNumber, complete, "
42 "start_time, request_time_ns) "
43 "VALUES(?1, ?4, ?2, ?3, 0, julianday('now'), ?5)");
45 "UPDATE event_log SET complete = 1, status = ?3, end_time = "
46 "julianday('now') WHERE runNumber = ?1 "
49 return StatusCode::SUCCESS;
55 "UPDATE ranks SET end_time = julianday('now') WHERE rank = ?1")
58 return StatusCode::SUCCESS;
90 mpi3::communicator&
comm =
95 "Event data should be sent with EventData communicator. "
103 if (body.has_value()) {
104 comm.send_n(body->begin(), body->size(), destRank,
header[2]);
114 constexpr
int tag_offset = 16384;
122 constexpr
int tag_offset = 16384;
126 mpi3::communicator&
comm =
136 comm.receive_n(body->begin(), body->size(),
head[1],
head[2]);
143 char*
ptr =
static_cast<char*
>(std::aligned_alloc(align, len));
158 std::int64_t event_number,
159 std::int64_t request_time_ns) {
165 std::int64_t event_number,
virtual void log_addEvent(int eventIdx, std::int64_t run_number, std::int64_t event_number, std::int64_t request_time_ns) override final
Add (begin) an event in the log.
virtual int numRanks() const override final
Return number of ranks.
virtual void sendMessage(int destRank, ClusterMessage message, ClusterComm communicator=ClusterComm::Default) override final
Send an MPI message.
virtual void abort() override final
Abort the MPI run.
std::array< int, 10 > WireMsgBody
virtual StatusCode finalize() override final
Finalize.
::StatusCode StatusCode
StatusCode definition for legacy code.
SQLite::Statement m_mpiLog_addEvent
ResultTypeWrapper< ReturnArgs... >::type run(ParamArgs... params)
Run the statement.
virtual StatusCode initialize() override final
Initialize.
A class describing a message sent between nodes in a cluster.
std::string head(std::string s, const std::string &pattern)
head of a string
virtual ClusterMessage waitReceiveMessage(ClusterComm communicator=ClusterComm::Default) override final
Block until we receive an MPI message.
virtual int rank() const override final
Return our rank.
SQLite::Statement m_mpiLog_completeEvent
ServiceHandle< ISQLiteDBSvc > m_mpiLog
#define ATH_MSG_WARNING(x)
mpi3::communicator m_datacom
virtual void log_completeEvent(std::int64_t run_number, std::int64_t event_number, std::int64_t status) override final
Complete an event in the log.
std::tuple< WireMsgHdr, std::optional< WireMsgBody > > WireMsg
mpi3::communicator m_world
virtual void barrier() override final
Insert a barrier No rank will continue until all ranks reach this point.
std::unique_ptr< mpi3::environment > m_env