7 #include "GaudiKernel/AppReturnCode.h"
18 using Clock = std::chrono::high_resolution_clock;
47 return StatusCode::SUCCESS;
70 return StatusCode::FAILURE;
72 if (maxEvt < 0 || maxEvt >
evt) {
75 ATH_MSG_INFO(
"Will be processing " << maxEvt <<
" events");
79 std::vector<bool> workers_done(
m_clusterSvc->numRanks(),
false);
82 int num_workers_done = 1;
90 for (
int evt = skipEvts;
evt < skipEvts + maxEvt;) {
103 statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
104 workers_done.at(
msg.source) =
108 if (!workers_done.at(
i)) {
112 workers_done[
i] =
true;
121 statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
122 workers_done.at(
msg.source) =
true;
133 ATH_MSG_INFO(
"Provided all events to workers, waiting for them to complete.");
135 while (num_workers_done < m_clusterSvc->numRanks()) {
146 statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
147 workers_done.at(
msg.source) =
151 if (!workers_done.at(
i)) {
155 workers_done[
i] =
true;
164 statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
165 workers_done.at(
msg.source) =
true;
183 for (
const auto& worker_status :
statuses) {
184 if (worker_status.status.isFailure() &&
186 sc = worker_status.status;
188 n_created += worker_status.createdEvents;
189 n_skipped += worker_status.skippedEvents;
190 n_finished += worker_status.finishedEvents;
192 if ((worker_idx++) != 0) {
193 ATH_MSG_INFO(
"Worker " << worker_idx <<
": SC " << worker_status.status
194 <<
", created " << worker_status.createdEvents
195 <<
", skipped " << worker_status.skippedEvents
196 <<
", finished " << worker_status.finishedEvents);
200 ATH_MSG_INFO(
"Overall: SC " <<
sc <<
", created " << n_created <<
", skipped "
201 << n_skipped <<
", finished " << n_finished);
202 ATH_MSG_INFO(
"MASTER: Took " << std::chrono::hh_mm_ss(all_provided)
203 <<
" to provide all events.");
204 ATH_MSG_INFO(
"MASTER: Took " << std::chrono::hh_mm_ss(all_done)
205 <<
" to complete all events.");
211 bool end_of_stream =
false;
226 if (
sc.isFailure()) {
246 return StatusCode::FAILURE;
252 << std::chrono::hh_mm_ss(loop_time)
260 std::size_t numSlots =
m_whiteboard->getNumberOfStores();
279 return StatusCode::FAILURE;
282 int evt = get<int>(
msg.payload);
286 std::chrono::duration_cast<std::chrono::nanoseconds>(request_time)
288 if (
sc.isFailure() && !
sc.isRecoverable()) {
301 << std::chrono::hh_mm_ss(loop_time)
308 std::size_t numSlots =
m_whiteboard->getNumberOfStores();
325 std::int64_t requestTime_ns) {
331 Gaudi::Hive::setCurrentContext(ctx);
335 return StatusCode::FAILURE;
342 m_clusterSvc->log_addEvent(eventIdx, evtID.run_number(), evtID.event_number(),
345 if (
sc.isRecoverable()) {
347 }
else if (
sc.isSuccess()) {
360 std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
362 EventContext* finishedEvtContext(
nullptr);
366 <<
"] Waiting for a context");
370 if (
sc.isSuccess()) {
371 ATH_MSG_DEBUG(
"drainLocalScheduler: scheduler not empty: Context "
372 << finishedEvtContext);
373 finishedEvtContexts.emplace_back(finishedEvtContext);
377 return StatusCode::SUCCESS;
381 while (
m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()) {
382 finishedEvtContexts.emplace_back(finishedEvtContext);
387 for (
auto& thisFinishedEvtContext : finishedEvtContexts) {
388 if (!thisFinishedEvtContext) {
390 fail = StatusCode::FAILURE;
396 thisFinishedEvtContext->eventID().run_number(),
397 thisFinishedEvtContext->eventID().event_number(),
398 m_aess->eventStatus(*thisFinishedEvtContext));
400 if (
m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
402 << thisFinishedEvtContext <<
" w/ fail mode: "
403 <<
m_aess->eventStatus(*thisFinishedEvtContext));
404 thisFinishedEvtContext.reset();
405 fail = StatusCode::FAILURE;
412 if (
m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
413 n_run = thisFinishedEvtContext->eventID().run_number();
414 n_evt = thisFinishedEvtContext->eventID().event_number();
417 << thisFinishedEvtContext->slot());
418 thisFinishedEvtContext.reset();
419 fail = StatusCode::FAILURE;
425 Gaudi::Hive::setCurrentContext(*thisFinishedEvtContext);
427 Incident(
name(), IncidentType::EndProcessing, *thisFinishedEvtContext));
430 << thisFinishedEvtContext->slot() <<
" (event "
431 << thisFinishedEvtContext->evt() <<
") of the whiteboard");
434 if (!
sc.isSuccess()) {
435 ATH_MSG_ERROR(
"Whiteboard slot " << thisFinishedEvtContext->slot()
436 <<
" could not be properly cleared");
437 if (
fail != StatusCode::FAILURE) {
440 thisFinishedEvtContext.reset();
452 << n_evt <<
", run #" << n_run <<
" on slot "
453 << thisFinishedEvtContext->slot() <<
", " <<
m_proc
454 <<
" events processed so far <<<===");
457 << n_evt <<
", run #" << n_run <<
" on slot "
458 << thisFinishedEvtContext->slot() <<
", " <<
m_nev
459 <<
" events read and " <<
m_proc
460 <<
" events processed so far <<<===");
462 std::ofstream
outfile(
"eventLoopHeartBeat.txt");
465 fail = StatusCode::FAILURE;
466 thisFinishedEvtContext.reset();
469 outfile <<
" done processing event #" << n_evt <<
", run #" << n_run
470 <<
" " <<
m_nev <<
" events read so far <<<===" << std::endl;
475 << thisFinishedEvtContext);
477 thisFinishedEvtContext.reset();