7 #include "GaudiKernel/AppReturnCode.h" 
   18 using Clock = std::chrono::high_resolution_clock;
 
   47     return StatusCode::SUCCESS;
 
   70       return StatusCode::FAILURE;
 
   72     if (maxEvt < 0 || maxEvt > 
evt) {
 
   75     ATH_MSG_INFO(
"Will be processing " << maxEvt << 
" events");
 
   79   std::vector<bool> workers_done(
m_clusterSvc->numRanks(), 
false);
 
   82   int num_workers_done = 1;  
 
   90   for (
int evt = skipEvts; 
evt < skipEvts + maxEvt;) {
 
  103       statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
 
  104       workers_done.at(
msg.source) =
 
  108         if (!workers_done.at(
i)) {
 
  112           workers_done[
i] = 
true;
 
  121       statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
 
  122       workers_done.at(
msg.source) = 
true;  
 
  133   ATH_MSG_INFO(
"Provided all events to workers, waiting for them to complete.");
 
  135   while (num_workers_done < m_clusterSvc->numRanks()) {
 
  146       statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
 
  147       workers_done.at(
msg.source) =
 
  151         if (!workers_done.at(
i)) {
 
  155           workers_done[
i] = 
true;
 
  164       statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
 
  165       workers_done.at(
msg.source) = 
true;  
 
  183   for (
const auto& worker_status : 
statuses) {
 
  184     if (worker_status.status.isFailure() &&
 
  186       sc = worker_status.status;
 
  188     n_created += worker_status.createdEvents;
 
  189     n_skipped += worker_status.skippedEvents;
 
  190     n_finished += worker_status.finishedEvents;
 
  192     if ((worker_idx++) != 0) {
 
  193       ATH_MSG_INFO(
"Worker " << worker_idx << 
": SC " << worker_status.status
 
  194                              << 
", created " << worker_status.createdEvents
 
  195                              << 
", skipped " << worker_status.skippedEvents
 
  196                              << 
", finished " << worker_status.finishedEvents);
 
  200   ATH_MSG_INFO(
"Overall: SC " << 
sc << 
", created " << n_created << 
", skipped " 
  201                               << n_skipped << 
", finished " << n_finished);
 
  202   ATH_MSG_INFO(
"MASTER: Took " << std::chrono::hh_mm_ss(all_provided)
 
  203                                << 
" to provide all events.");
 
  204   ATH_MSG_INFO(
"MASTER: Took " << std::chrono::hh_mm_ss(all_done)
 
  205                                << 
" to complete all events.");
 
  211   bool end_of_stream = 
false;
 
  226       if (
sc.isFailure()) {
 
  245       std::size_t numSlots = 
m_whiteboard->getNumberOfStores();
 
  251       return StatusCode::FAILURE;
 
  257                              << std::chrono::hh_mm_ss(loop_time)
 
  265       std::size_t numSlots = 
m_whiteboard->getNumberOfStores();
 
  284       return StatusCode::FAILURE;
 
  287     int evt = get<int>(
msg.payload);
 
  291         std::chrono::duration_cast<std::chrono::nanoseconds>(request_time)
 
  293     if (
sc.isFailure() && !
sc.isRecoverable()) {
 
  306                              << std::chrono::hh_mm_ss(loop_time)
 
  313       std::size_t numSlots = 
m_whiteboard->getNumberOfStores();
 
  330                                             std::int64_t requestTime_ns) {
 
  336   Gaudi::Hive::setCurrentContext(ctx);
 
  337   ctx.setEvt(eventIdx); 
 
  341     return StatusCode::FAILURE;
 
  344   const std::size_t slot = ctx.slot(); 
 
  349   m_clusterSvc->log_addEvent(eventIdx, evtID.run_number(), evtID.event_number(),
 
  350                              requestTime_ns, slot);
 
  352   if (
sc.isRecoverable()) {
 
  354   } 
else if (
sc.isSuccess()) {
 
  367   std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
 
  369   EventContext* finishedEvtContext(
nullptr);
 
  373                                          << 
"] Waiting for a context");
 
  377   if (
sc.isSuccess()) {
 
  378     ATH_MSG_DEBUG(
"drainLocalScheduler: scheduler not empty: Context " 
  379                   << finishedEvtContext);
 
  380     finishedEvtContexts.emplace_back(finishedEvtContext);
 
  384     return StatusCode::SUCCESS;
 
  388   while (
m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()) {
 
  389     finishedEvtContexts.emplace_back(finishedEvtContext);
 
  394   for (
auto& thisFinishedEvtContext : finishedEvtContexts) {
 
  395     if (!thisFinishedEvtContext) {
 
  397       fail = StatusCode::FAILURE;
 
  403         thisFinishedEvtContext->eventID().run_number(),
 
  404         thisFinishedEvtContext->eventID().event_number(),
 
  405         m_aess->eventStatus(*thisFinishedEvtContext));
 
  407     if (
m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
 
  409                     << thisFinishedEvtContext << 
" w/ fail mode: " 
  410                     << 
m_aess->eventStatus(*thisFinishedEvtContext));
 
  415         thisFinishedEvtContext.reset();
 
  416         fail = StatusCode::FAILURE;
 
  428     if (
m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
 
  429       n_run = thisFinishedEvtContext->eventID().run_number();
 
  430       n_evt = thisFinishedEvtContext->eventID().event_number();
 
  433                     << thisFinishedEvtContext->slot());
 
  434       thisFinishedEvtContext.reset();
 
  435       fail = StatusCode::FAILURE;
 
  441     Gaudi::Hive::setCurrentContext(*thisFinishedEvtContext);
 
  443         Incident(
name(), IncidentType::EndProcessing, *thisFinishedEvtContext));
 
  446                   << thisFinishedEvtContext->slot() << 
" (event " 
  447                   << thisFinishedEvtContext->evt() << 
") of the whiteboard");
 
  450     if (!
sc.isSuccess()) {
 
  451       ATH_MSG_ERROR(
"Whiteboard slot " << thisFinishedEvtContext->slot()
 
  452                                        << 
" could not be properly cleared");
 
  453       if (
fail != StatusCode::FAILURE) {
 
  456       thisFinishedEvtContext.reset();
 
  468                      << n_evt << 
", run #" << n_run << 
" on slot " 
  469                      << thisFinishedEvtContext->slot() << 
",  " << 
m_proc 
  470                      << 
" events processed so far <<<===");
 
  473                      << n_evt << 
", run #" << n_run << 
" on slot " 
  474                      << thisFinishedEvtContext->slot() << 
",  " << 
m_nev 
  475                      << 
" events read and " << 
m_proc 
  476                      << 
" events processed so far <<<===");
 
  478       std::ofstream 
outfile(
"eventLoopHeartBeat.txt");
 
  481         fail = StatusCode::FAILURE;
 
  482         thisFinishedEvtContext.reset();
 
  485       outfile << 
"  done processing event #" << n_evt << 
", run #" << n_run
 
  486               << 
" " << 
m_nev << 
" events read so far <<<===" << std::endl;
 
  491                   << thisFinishedEvtContext);
 
  493     thisFinishedEvtContext.reset();