 |
ATLAS Offline Software
|
Go to the documentation of this file.
7 #include "GaudiKernel/FileIncident.h"
9 #include <boost/serialization/variant.hpp>
13 m_env = std::make_unique<mpi3::environment>(mpi3::thread_level::single);
23 m_mpiLog->createStatement(
"PRAGMA foreign_keys = ON").run();
27 "CREATE TABLE ranks (rank INTEGER PRIMARY KEY, "
28 "node TEXT, start_time FLOAT, end_time FLOAT)")
32 "INSERT INTO ranks (rank, node, start_time) "
33 "VALUES(?1, ?2, julianday('now'))")
36 "CREATE TABLE files (fileId INTEGER PRIMARY KEY, fileName TEXT)")
40 "CREATE TABLE event_log (rank INTEGER, id INTEGER UNIQUE,"
41 "inputFileId INTEGER,"
42 "runNumber INTEGER, eventNumber INTEGER, complete INTEGER,"
43 "status INTEGER, request_time_ns INTEGER, start_time FLOAT,"
44 "end_time FLOAT, PRIMARY KEY (runNumber, eventNumber), "
45 "FOREIGN KEY (rank) REFERENCES ranks(rank),"
46 "FOREIGN KEY (inputFileId) REFERENCES files(fileId))")
49 "INSERT INTO event_log(id, rank, inputFileId, runNumber, eventNumber, complete, "
50 "start_time, request_time_ns) "
51 "VALUES(?1, ?4, ?6, ?2, ?3, 0, julianday('now'), ?5)");
53 "UPDATE event_log SET complete = 1, status = ?3, end_time = "
54 "julianday('now') WHERE runNumber = ?1 "
58 "INSERT INTO files (fileId, fileName) VALUES(?1, ?2)");
62 if (!incsvc.retrieve().isSuccess()) {
64 return(StatusCode::FAILURE);
66 incsvc->addListener(
this, IncidentType::BeginInputFile, 100);
67 incsvc->addListener(
this, IncidentType::BeginProcessing, 100);
69 return StatusCode::SUCCESS;
75 "UPDATE ranks SET end_time = julianday('now') WHERE rank = ?1")
78 return StatusCode::SUCCESS;
84 if (inc.type() == IncidentType::BeginProcessing) {
85 const std::size_t slot = Gaudi::Hive::currentContext().slot();
90 if (inc.type() == IncidentType::BeginInputFile) {
91 const FileIncident* fileInc =
dynamic_cast<const FileIncident*
>(&inc);
92 if (fileInc ==
nullptr) {
93 ATH_MSG_ERROR(
"BeginInputFile does not have a file name attached");
97 const std::string
fileName = fileInc->fileName();
130 m_world.iprobe().has_value()) {
135 mpi3::communicator&
comm =
140 "Event data should be sent with EventData communicator. "
148 if (body.has_value()) {
149 comm.send_n(body->begin(), body->size(), destRank,
header[2]);
159 constexpr
int tag_offset = 16384;
167 constexpr
int tag_offset = 16384;
171 mpi3::communicator&
comm =
181 comm.receive_n(body->begin(), body->size(),
head[1],
head[2]);
188 char*
ptr =
static_cast<char*
>(std::aligned_alloc(align, len));
203 std::int64_t event_number,
204 std::int64_t request_time_ns,
212 std::int64_t event_number,
virtual void handle(const Incident &inc) override
IIncidentListener handle.
virtual void log_addEvent(int eventIdx, std::int64_t run_number, std::int64_t event_number, std::int64_t request_time_ns, std::size_t slot) override final
Add (begin) an event in the log.
virtual int numRanks() const override final
Return number of ranks.
virtual void sendMessage(int destRank, ClusterMessage message, ClusterComm communicator=ClusterComm::Default) override final
Send an MPI message.
SQLite::Statement m_mpiLog_addFile
virtual void abort() override final
Abort the MPI run.
std::array< int, 10 > WireMsgBody
virtual StatusCode finalize() override final
Finalize.
::StatusCode StatusCode
StatusCode definition for legacy code.
SQLite::Statement m_mpiLog_addEvent
ResultTypeWrapper< ReturnArgs... >::type run(ParamArgs... params)
Run the statement.
C++ native wrapper for the C xxhash API.
std::int64_t m_lastInputFileHash
virtual StatusCode initialize() override final
Initialize.
A class describing a message sent between nodes in a cluster.
std::string head(std::string s, const std::string &pattern)
head of a string
virtual ClusterMessage waitReceiveMessage(ClusterComm communicator=ClusterComm::Default) override final
Block until we receive an MPI message.
virtual int rank() const override final
Return our rank.
std::uint64_t hash64(const void *data, std::size_t size)
Passthrough to XXH3_64bits.
SQLite::Statement m_mpiLog_completeEvent
std::map< std::size_t, std::int64_t > m_inputFileSlotMap
ServiceHandle< ISQLiteDBSvc > m_mpiLog
#define ATH_MSG_WARNING(x)
mpi3::communicator m_datacom
virtual void log_completeEvent(std::int64_t run_number, std::int64_t event_number, std::int64_t status) override final
Complete an event in the log.
std::tuple< WireMsgHdr, std::optional< WireMsgBody > > WireMsg
mpi3::communicator m_world
virtual void barrier() override final
Insert a barrier No rank will continue until all ranks reach this point.
std::unique_ptr< mpi3::environment > m_env