7#include "GaudiKernel/AppReturnCode.h"
18using Clock = std::chrono::high_resolution_clock;
47 return StatusCode::SUCCESS;
70 return StatusCode::FAILURE;
72 if (maxEvt < 0 || maxEvt > evt) {
75 ATH_MSG_INFO(
"Will be processing " << maxEvt <<
" events");
79 std::vector<bool> workers_done(
m_clusterSvc->numRanks(),
false);
82 int num_workers_done = 1;
83 std::vector<ClusterMessage::WorkerStatus> statuses(
m_clusterSvc->numRanks());
89 auto start = Clock::now();
90 for (
int evt = skipEvts; evt < skipEvts + maxEvt;) {
93 if (
msg.messageType == ClusterMessageType::RequestEvent) {
101 if (
msg.messageType == ClusterMessageType::WorkerError) {
104 workers_done.at(
msg.source) =
108 if (!workers_done.at(i)) {
112 workers_done[i] =
true;
119 if (
msg.messageType == ClusterMessageType::FinalWorkerStatus) {
122 workers_done.at(
msg.source) =
true;
129 << std::format(
"{}",
msg.messageType) <<
" from "
132 auto all_provided = Clock::now() - start;
133 ATH_MSG_INFO(
"Provided all events to workers, waiting for them to complete.");
135 while (num_workers_done < m_clusterSvc->numRanks()) {
138 if (
msg.messageType == ClusterMessageType::RequestEvent) {
144 if (
msg.messageType == ClusterMessageType::WorkerError) {
147 workers_done.at(
msg.source) =
151 if (!workers_done.at(i)) {
155 workers_done[i] =
true;
162 if (
msg.messageType == ClusterMessageType::FinalWorkerStatus) {
165 workers_done.at(
msg.source) =
true;
172 << std::format(
"{}",
msg.messageType) <<
" from "
175 auto all_done = Clock::now() - start;
181 StatusCode
sc = StatusCode::SUCCESS;
183 for (
const auto& worker_status : statuses) {
184 if (worker_status.status.isFailure() &&
185 worker_status.status != StatusCode(9999)) {
186 sc = worker_status.status;
188 n_created += worker_status.createdEvents;
189 n_skipped += worker_status.skippedEvents;
190 n_finished += worker_status.finishedEvents;
192 if ((worker_idx++) != 0) {
193 ATH_MSG_INFO(
"Worker " << worker_idx <<
": SC " << worker_status.status
194 <<
", created " << worker_status.createdEvents
195 <<
", skipped " << worker_status.skippedEvents
196 <<
", finished " << worker_status.finishedEvents);
200 ATH_MSG_INFO(
"Overall: SC " <<
sc <<
", created " << n_created <<
", skipped "
201 << n_skipped <<
", finished " << n_finished);
202 ATH_MSG_INFO(
"MASTER: Took " << std::chrono::hh_mm_ss(all_provided)
203 <<
" to provide all events.");
204 ATH_MSG_INFO(
"MASTER: Took " << std::chrono::hh_mm_ss(all_done)
205 <<
" to complete all events.");
211 bool end_of_stream =
false;
214 auto start = Clock::now();
226 if (
sc.isFailure()) {
238 auto start_time = Clock::now();
242 auto request_time = Clock::now() - start_time;
243 if (
msg.messageType == ClusterMessageType::EmergencyStop) {
245 std::size_t numSlots =
m_whiteboard->getNumberOfStores();
251 return StatusCode::FAILURE;
254 if (
msg.messageType == ClusterMessageType::EventsDone) {
255 auto loop_time = Clock::now() - start;
257 << std::chrono::hh_mm_ss(loop_time)
264 StatusCode
sc = StatusCode::SUCCESS;
265 std::size_t numSlots =
m_whiteboard->getNumberOfStores();
274 0,
ClusterMessage(ClusterMessageType::FinalWorkerStatus, status));
279 if (
msg.messageType != ClusterMessageType::ProvideEvent ||
282 << std::format(
"{}",
msg.messageType) <<
" from "
284 return StatusCode::FAILURE;
291 std::chrono::duration_cast<std::chrono::nanoseconds>(request_time)
293 if (
sc.isFailure() && !
sc.isRecoverable()) {
304 auto loop_time = Clock::now() - start;
306 << std::chrono::hh_mm_ss(loop_time)
312 StatusCode
sc = StatusCode::SUCCESS;
313 std::size_t numSlots =
m_whiteboard->getNumberOfStores();
322 0,
ClusterMessage(ClusterMessageType::FinalWorkerStatus, status));
330 std::int64_t requestTime_ns) {
336 Gaudi::Hive::setCurrentContext(ctx);
337 ctx.setEvt(eventIdx);
341 return StatusCode::FAILURE;
344 const std::size_t slot = ctx.slot();
349 m_clusterSvc->log_addEvent(eventIdx, evtID.run_number(), evtID.event_number(),
350 requestTime_ns, slot);
352 if (
sc.isRecoverable()) {
354 }
else if (
sc.isSuccess()) {
364 StatusCode
sc(StatusCode::SUCCESS);
367 std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
369 EventContext* finishedEvtContext(
nullptr);
373 <<
"] Waiting for a context");
377 if (
sc.isSuccess()) {
378 ATH_MSG_DEBUG(
"drainLocalScheduler: scheduler not empty: Context "
379 << finishedEvtContext);
380 finishedEvtContexts.emplace_back(finishedEvtContext);
384 return StatusCode::SUCCESS;
388 while (
m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()) {
389 finishedEvtContexts.emplace_back(finishedEvtContext);
393 StatusCode fail(StatusCode::SUCCESS);
394 for (
auto& thisFinishedEvtContext : finishedEvtContexts) {
395 if (!thisFinishedEvtContext) {
397 fail = StatusCode::FAILURE;
403 thisFinishedEvtContext->eventID().run_number(),
404 thisFinishedEvtContext->eventID().event_number(),
405 m_aess->eventStatus(*thisFinishedEvtContext));
407 if (
m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
409 << thisFinishedEvtContext <<
" w/ fail mode: "
410 <<
m_aess->eventStatus(*thisFinishedEvtContext));
415 thisFinishedEvtContext.reset();
416 fail = StatusCode::FAILURE;
426 EventID::event_number_t n_evt(0);
428 if (
m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
429 n_run = thisFinishedEvtContext->eventID().run_number();
430 n_evt = thisFinishedEvtContext->eventID().event_number();
433 << thisFinishedEvtContext->slot());
434 thisFinishedEvtContext.reset();
435 fail = StatusCode::FAILURE;
441 Gaudi::Hive::setCurrentContext(*thisFinishedEvtContext);
443 Incident(
name(), IncidentType::EndProcessing, *thisFinishedEvtContext));
446 << thisFinishedEvtContext->slot() <<
" (event "
447 << thisFinishedEvtContext->evt() <<
") of the whiteboard");
450 if (!
sc.isSuccess()) {
451 ATH_MSG_ERROR(
"Whiteboard slot " << thisFinishedEvtContext->slot()
452 <<
" could not be properly cleared");
453 if (fail != StatusCode::FAILURE) {
456 thisFinishedEvtContext.reset();
468 << n_evt <<
", run #" << n_run <<
" on slot "
469 << thisFinishedEvtContext->slot() <<
", " <<
m_proc
470 <<
" events processed so far <<<===");
473 << n_evt <<
", run #" << n_run <<
" on slot "
474 << thisFinishedEvtContext->slot() <<
", " <<
m_nev
475 <<
" events read and " <<
m_proc
476 <<
" events processed so far <<<===");
478 std::ofstream outfile(
"eventLoopHeartBeat.txt");
481 fail = StatusCode::FAILURE;
482 thisFinishedEvtContext.reset();
485 outfile <<
" done processing event #" << n_evt <<
", run #" << n_run
486 <<
" " <<
m_nev <<
" events read so far <<<===" << std::endl;
491 << thisFinishedEvtContext);
493 thisFinishedEvtContext.reset();
#define ATH_CHECK
Evaluate an expression and check for errors.
std::chrono::high_resolution_clock Clock
The MPI event loop manager.
IEvtSelector * m_evtSelector
Reference to the Event Selector.
virtual StatusCode initialize() override
implementation of IAppMgrUI::initalize
SmartIF< IHiveWhiteBoard > m_whiteboard
Reference to the Whiteboard interface.
virtual int size() override
Return the size of the collection.
SmartIF< IAlgExecStateSvc > m_aess
Reference to the Algorithm Execution State Svc.
virtual const std::string & name() const override
unsigned int m_nev
events processed
IIncidentSvc_t m_incidentSvc
Reference to the incident service.
SmartIF< IProperty > m_appMgrProperty
Property interface of ApplicationMgr.
StoreGateSvc_t m_eventStore
Reference to StoreGateSvc;.
MsgStream & msg() const
The standard message stream.
virtual StatusCode seek(int evt) override
Seek to a given event.
AthenaHiveEventLoopMgr(const std::string &nam, ISvcLocator *svcLoc)
Standard Constructor.
virtual StatusCode finalize() override
implementation of IAppMgrUI::finalize
virtual StatusCode executeEvent(EventContext &&ctx) override
implementation of IEventProcessor::executeEvent(void* par)
EventContext m_lastEventContext
virtual EventContext createEventContext() override
Create event context.
StatusCode clearWBSlot(int evtSlot)
Clear a slot in the WB.
virtual StatusCode writeHistograms(bool force=false)
Dump out histograms as needed.
SmartIF< IScheduler > m_schedulerSvc
A shortcut for the scheduler.
EventIDBase::number_type number_type
StatusCode workerEventLoop()
Worker event loop (runs on worker, requests events over MPI)
StatusCode drainLocalScheduler()
Drain the local scheduler of any (at least one) completed events.
int m_contiguousFailedEvts
ServiceHandle< IMPIClusterSvc > m_clusterSvc
Reference to the MPIClusterSvc.
MPIHiveEventLoopMgr(const std::string &name, ISvcLocator *svcLoc)
Standard Constructor.
virtual StatusCode finalize() override
implementation of IAppMgrUI::finalize
virtual ~MPIHiveEventLoopMgr()
Standard Destructor.
StoreGateSvc * eventStore() const
virtual StatusCode nextEvent(int maxevt) override
implementation of IAppMgrUI::nextEvent. maxevt==0 returns immediately
StatusCode masterEventLoop(int maxEvt)
Master event loop (runs on master, provides events over MPI)
virtual StatusCode initialize() override
implementation of IAppMgrUI::initalize
StatusCode insertEvent(int eventIdx, bool &endOfStream, std::int64_t requestTime_ns)
Insert an event into the local scheduler.
UnsignedIntegerProperty m_firstEventIndex
int m_evtSelectorCurrentPos
The Athena Transient Store API.
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
int count(std::string s, const std::string ®x)
count how many occurances of a regx are in a string
A class describing a message sent between nodes in a cluster.