7 #include "GaudiKernel/AppReturnCode.h"
18 using Clock = std::chrono::high_resolution_clock;
47 return StatusCode::SUCCESS;
70 return StatusCode::FAILURE;
72 if (maxEvt < 0 || maxEvt >
evt) {
75 ATH_MSG_INFO(
"Will be processing " << maxEvt <<
" events");
79 std::vector<bool> workers_done(
m_clusterSvc->numRanks(),
false);
82 int num_workers_done = 1;
90 for (
int evt = skipEvts;
evt < skipEvts + maxEvt;) {
103 statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
104 workers_done.at(
msg.source) =
108 if (!workers_done.at(
i)) {
112 workers_done[
i] =
true;
121 statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
122 workers_done.at(
msg.source) =
true;
133 ATH_MSG_INFO(
"Provided all events to workers, waiting for them to complete.");
135 while (num_workers_done < m_clusterSvc->numRanks()) {
146 statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
147 workers_done.at(
msg.source) =
151 if (!workers_done.at(
i)) {
155 workers_done[
i] =
true;
164 statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
165 workers_done.at(
msg.source) =
true;
183 for (
const auto& worker_status :
statuses) {
184 if (worker_status.status.isFailure() &&
186 sc = worker_status.status;
188 n_created += worker_status.createdEvents;
189 n_skipped += worker_status.skippedEvents;
190 n_finished += worker_status.finishedEvents;
192 if ((worker_idx++) != 0) {
193 ATH_MSG_INFO(
"Worker " << worker_idx <<
": SC " << worker_status.status
194 <<
", created " << worker_status.createdEvents
195 <<
", skipped " << worker_status.skippedEvents
196 <<
", finished " << worker_status.finishedEvents);
200 ATH_MSG_INFO(
"Overall: SC " <<
sc <<
", created " << n_created <<
", skipped "
201 << n_skipped <<
", finished " << n_finished);
202 ATH_MSG_INFO(
"MASTER: Took " << std::chrono::hh_mm_ss(all_provided)
203 <<
" to provide all events.");
204 ATH_MSG_INFO(
"MASTER: Took " << std::chrono::hh_mm_ss(all_done)
205 <<
" to complete all events.");
211 bool end_of_stream =
false;
226 if (
sc.isFailure()) {
245 std::size_t numSlots =
m_whiteboard->getNumberOfStores();
251 return StatusCode::FAILURE;
257 << std::chrono::hh_mm_ss(loop_time)
265 std::size_t numSlots =
m_whiteboard->getNumberOfStores();
284 return StatusCode::FAILURE;
287 int evt = get<int>(
msg.payload);
291 std::chrono::duration_cast<std::chrono::nanoseconds>(request_time)
293 if (
sc.isFailure() && !
sc.isRecoverable()) {
306 << std::chrono::hh_mm_ss(loop_time)
313 std::size_t numSlots =
m_whiteboard->getNumberOfStores();
330 std::int64_t requestTime_ns) {
336 Gaudi::Hive::setCurrentContext(ctx);
337 ctx.setEvt(eventIdx);
341 return StatusCode::FAILURE;
344 const std::size_t slot = ctx.slot();
349 m_clusterSvc->log_addEvent(eventIdx, evtID.run_number(), evtID.event_number(),
350 requestTime_ns, slot);
352 if (
sc.isRecoverable()) {
354 }
else if (
sc.isSuccess()) {
367 std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
369 EventContext* finishedEvtContext(
nullptr);
373 <<
"] Waiting for a context");
377 if (
sc.isSuccess()) {
378 ATH_MSG_DEBUG(
"drainLocalScheduler: scheduler not empty: Context "
379 << finishedEvtContext);
380 finishedEvtContexts.emplace_back(finishedEvtContext);
384 return StatusCode::SUCCESS;
388 while (
m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()) {
389 finishedEvtContexts.emplace_back(finishedEvtContext);
394 for (
auto& thisFinishedEvtContext : finishedEvtContexts) {
395 if (!thisFinishedEvtContext) {
397 fail = StatusCode::FAILURE;
403 thisFinishedEvtContext->eventID().run_number(),
404 thisFinishedEvtContext->eventID().event_number(),
405 m_aess->eventStatus(*thisFinishedEvtContext));
407 if (
m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
409 << thisFinishedEvtContext <<
" w/ fail mode: "
410 <<
m_aess->eventStatus(*thisFinishedEvtContext));
415 thisFinishedEvtContext.reset();
416 fail = StatusCode::FAILURE;
428 if (
m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
429 n_run = thisFinishedEvtContext->eventID().run_number();
430 n_evt = thisFinishedEvtContext->eventID().event_number();
433 << thisFinishedEvtContext->slot());
434 thisFinishedEvtContext.reset();
435 fail = StatusCode::FAILURE;
441 Gaudi::Hive::setCurrentContext(*thisFinishedEvtContext);
443 Incident(
name(), IncidentType::EndProcessing, *thisFinishedEvtContext));
446 << thisFinishedEvtContext->slot() <<
" (event "
447 << thisFinishedEvtContext->evt() <<
") of the whiteboard");
450 if (!
sc.isSuccess()) {
451 ATH_MSG_ERROR(
"Whiteboard slot " << thisFinishedEvtContext->slot()
452 <<
" could not be properly cleared");
453 if (
fail != StatusCode::FAILURE) {
456 thisFinishedEvtContext.reset();
468 << n_evt <<
", run #" << n_run <<
" on slot "
469 << thisFinishedEvtContext->slot() <<
", " <<
m_proc
470 <<
" events processed so far <<<===");
473 << n_evt <<
", run #" << n_run <<
" on slot "
474 << thisFinishedEvtContext->slot() <<
", " <<
m_nev
475 <<
" events read and " <<
m_proc
476 <<
" events processed so far <<<===");
478 std::ofstream
outfile(
"eventLoopHeartBeat.txt");
481 fail = StatusCode::FAILURE;
482 thisFinishedEvtContext.reset();
485 outfile <<
" done processing event #" << n_evt <<
", run #" << n_run
486 <<
" " <<
m_nev <<
" events read so far <<<===" << std::endl;
491 << thisFinishedEvtContext);
493 thisFinishedEvtContext.reset();