ATLAS Offline Software
MPIHiveEventLoopMgr.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 #include "MPIHiveEventLoopMgr.h"
5 
6 // Gaudi includes
7 #include "GaudiKernel/AppReturnCode.h"
8 
9 // Utilities
12 
13 // Standard Library
14 #include <chrono>
15 #include <fstream>
16 #include <string>
17 
18 using Clock = std::chrono::high_resolution_clock;
19 
20 // Standard Constructor
22  ISvcLocator* svcLoc)
23  : AthenaHiveEventLoopMgr(name, svcLoc) {}
24 
25 // Standard Destructor
27 
31  // Initialize cluster svc
32  ATH_CHECK(m_clusterSvc.retrieve());
34 }
35 
40 }
41 
45  // make nextEvent(0) a dummy call
46  if (0 == maxevt) {
47  return StatusCode::SUCCESS;
48  }
49 
50  // Reset the application return code.
51  Gaudi::setAppReturnCode(m_appMgrProperty, Gaudi::ReturnCode::Success, true)
52  .ignore();
53  ATH_MSG_INFO("Starting loop on events");
54 
55  if (m_clusterSvc->rank() == 0) {
56  return masterEventLoop(maxevt);
57  }
58  return workerEventLoop();
59 }
60 
63  ATH_MSG_INFO("Running with " << m_clusterSvc->numRanks() << " ranks");
64  // Determine number of events to process
65  int skipEvts = int(m_firstEventIndex.value());
66  if (m_evtSelector != nullptr) {
67  int evt = size();
68  if (evt == -1) {
69  m_clusterSvc->abort();
70  return StatusCode::FAILURE;
71  }
72  if (maxEvt < 0 || maxEvt > evt) {
73  maxEvt = evt;
74  }
75  ATH_MSG_INFO("Will be processing " << maxEvt << " events");
76  }
77 
78  // Setup worker status DB (Spare one at start)
79  std::vector<bool> workers_done(m_clusterSvc->numRanks(), false);
80  workers_done[0] =
81  true; // Set 0 to true because it doesn't correspond to a worker
82  int num_workers_done = 1; // Init to 1 so we can compare to numRanks
83  std::vector<ClusterMessage::WorkerStatus> statuses(m_clusterSvc->numRanks());
84 
85  // Entering event loop
86  m_clusterSvc->barrier();
87  // Note: no ++evt. This is because this is really a message loop, and we don't
88  // want to increment evt if we haven't provided an event
89  auto start = Clock::now();
90  for (int evt = skipEvts; evt < skipEvts + maxEvt;) {
91  ClusterMessage msg = m_clusterSvc->waitReceiveMessage();
92  // Messages we can get are RequestEvent, FinalWorkerStatus, or WorkerError
93  if (msg.messageType == ClusterMessageType::RequestEvent) {
94  ATH_MSG_DEBUG("Starting event " << evt << " on " << msg.source);
95  m_clusterSvc->sendMessage(
97  ++evt;
98  continue;
99  }
100 
101  if (msg.messageType == ClusterMessageType::WorkerError) {
102  ATH_MSG_ERROR("Received WorkerError message from " << msg.source);
103  statuses.at(msg.source) = get<ClusterMessage::WorkerStatus>(msg.payload);
104  workers_done.at(msg.source) =
105  true; // If a worker hits an error, it's done
106  ++num_workers_done;
107  for (int i = 1; i < m_clusterSvc->numRanks(); ++i) {
108  if (!workers_done.at(i)) {
109  // Tell workers that aren't done to emergency stop
110  m_clusterSvc->sendMessage(
112  workers_done[i] = true;
113  ++num_workers_done;
114  }
115  }
116  break;
117  }
118 
119  if (msg.messageType == ClusterMessageType::FinalWorkerStatus) {
120  ATH_MSG_INFO("Received FinalWorkerStatus from " << msg.source);
121  statuses.at(msg.source) = get<ClusterMessage::WorkerStatus>(msg.payload);
122  workers_done.at(msg.source) = true; // Worker hit end of stream
123  ++num_workers_done;
124  continue;
125  }
126 
127  // Other message types are an error
128  ATH_MSG_ERROR("Received unexpected message "
129  << std::format("{}", msg.messageType) << " from "
130  << msg.source);
131  }
132  auto all_provided = Clock::now() - start;
133  ATH_MSG_INFO("Provided all events to workers, waiting for them to complete.");
134  // Event loop done, tell remaining workers
135  while (num_workers_done < m_clusterSvc->numRanks()) {
136  ClusterMessage msg = m_clusterSvc->waitReceiveMessage();
137  // Messages we can get are RequestEvent, FinalWorkerStatus, or WorkerError
138  if (msg.messageType == ClusterMessageType::RequestEvent) {
139  m_clusterSvc->sendMessage(msg.source,
141  continue;
142  }
143 
144  if (msg.messageType == ClusterMessageType::WorkerError) {
145  ATH_MSG_ERROR("Received WorkerError message from " << msg.source);
146  statuses.at(msg.source) = get<ClusterMessage::WorkerStatus>(msg.payload);
147  workers_done.at(msg.source) =
148  true; // If a worker hits an error, it's done
149  ++num_workers_done;
150  for (int i = 1; i < m_clusterSvc->numRanks(); ++i) {
151  if (!workers_done.at(i)) {
152  // Tell workers that aren't done to emergency stop
153  m_clusterSvc->sendMessage(
155  workers_done[i] = true;
156  ++num_workers_done;
157  }
158  }
159  break;
160  }
161 
162  if (msg.messageType == ClusterMessageType::FinalWorkerStatus) {
163  ATH_MSG_INFO("Received FinalWorkerStatus from " << msg.source);
164  statuses.at(msg.source) = get<ClusterMessage::WorkerStatus>(msg.payload);
165  workers_done.at(msg.source) = true; // Told worker we're done
166  ++num_workers_done;
167  continue;
168  }
169 
170  // Other message types are an error
171  ATH_MSG_ERROR("Received unexpected message "
172  << std::format("{}", msg.messageType) << " from "
173  << msg.source);
174  }
175  auto all_done = Clock::now() - start;
176  // Collate status
177  int n_created = 0;
178  int n_skipped = 0;
179  int n_finished = 0;
180 
181  StatusCode sc = StatusCode::SUCCESS;
182  int worker_idx = 0;
183  for (const auto& worker_status : statuses) {
184  if (worker_status.status.isFailure() &&
185  worker_status.status != StatusCode(9999)) {
186  sc = worker_status.status;
187  }
188  n_created += worker_status.createdEvents;
189  n_skipped += worker_status.skippedEvents;
190  n_finished += worker_status.finishedEvents;
191 
192  if ((worker_idx++) != 0) {
193  ATH_MSG_INFO("Worker " << worker_idx << ": SC " << worker_status.status
194  << ", created " << worker_status.createdEvents
195  << ", skipped " << worker_status.skippedEvents
196  << ", finished " << worker_status.finishedEvents);
197  }
198  }
199 
200  ATH_MSG_INFO("Overall: SC " << sc << ", created " << n_created << ", skipped "
201  << n_skipped << ", finished " << n_finished);
202  ATH_MSG_INFO("MASTER: Took " << std::chrono::hh_mm_ss(all_provided)
203  << " to provide all events.");
204  ATH_MSG_INFO("MASTER: Took " << std::chrono::hh_mm_ss(all_done)
205  << " to complete all events.");
206  return sc;
207 }
208 
211  bool end_of_stream = false;
212  // barrier so all ranks enter message loop together
213  m_clusterSvc->barrier();
214  auto start = Clock::now();
216  while (true) {
217  // Drain the scheduler (wait for at least one event to complete, then free
218  // up completed slots) in two circumstances
219  // 1. Have created exactly one event, so the first event runs to completion
220  // before any more are scheduled
221  // 2. There are no free slots left
222  bool haveFreeSlots =
223  m_schedulerSvc->freeSlots() > 0 && m_whiteboard->freeSlots() > 0;
224  if (!haveFreeSlots || m_nLocalCreatedEvts == 1) {
226  if (sc.isFailure()) {
228  status.status = sc;
229  status.createdEvents = m_nLocalCreatedEvts;
230  status.skippedEvents = m_nLocalSkippedEvts;
231  status.finishedEvents = m_nLocalFinishedEvts;
232  m_clusterSvc->sendMessage(
234  return sc;
235  }
236  }
237 
238  auto start_time = Clock::now();
239  m_clusterSvc->sendMessage(0,
241  ClusterMessage msg = m_clusterSvc->waitReceiveMessage();
242  auto request_time = Clock::now() - start_time;
243  if (msg.messageType == ClusterMessageType::EmergencyStop) {
244  // Emergency stop, return FAILURE after fully draining the scheduler to prevent segfault
245  std::size_t numSlots = m_whiteboard->getNumberOfStores();
246  while (m_schedulerSvc->freeSlots() < numSlots) {
247  // Ignore StatusCode, going to return FAILURE anyway
248  (void)(drainLocalScheduler());
249  }
250  ATH_MSG_ERROR("Received EmergencyStop message!");
251  return StatusCode::FAILURE;
252  }
253 
254  if (msg.messageType == ClusterMessageType::EventsDone) {
255  auto loop_time = Clock::now() - start;
256  ATH_MSG_INFO("Worker " << m_clusterSvc->rank() << " DONE. Loop took "
257  << std::chrono::hh_mm_ss(loop_time)
258  << " to process " << m_nLocalCreatedEvts
259  << " events.");
260  // Been told we've reached end
261  // Provide status to master
263  // At end of stream, we need to *fully* drain the scheduler
264  StatusCode sc = StatusCode::SUCCESS;
265  std::size_t numSlots = m_whiteboard->getNumberOfStores();
266  while (sc.isSuccess() && m_schedulerSvc->freeSlots() < numSlots) {
268  }
269  status.status = sc;
270  status.createdEvents = m_nLocalCreatedEvts;
271  status.skippedEvents = m_nLocalSkippedEvts;
272  status.finishedEvents = m_nLocalFinishedEvts;
273  m_clusterSvc->sendMessage(
275  return sc;
276  }
277 
278  // Any other message other than ProvideEvent would now be an error
279  if (msg.messageType != ClusterMessageType::ProvideEvent ||
280  msg.source != 0) {
281  ATH_MSG_ERROR("Received unexpected message "
282  << std::format("{}", msg.messageType) << " from "
283  << msg.source);
284  return StatusCode::FAILURE;
285  }
286 
287  int evt = get<int>(msg.payload);
288  ATH_MSG_INFO("Starting event " << evt);
290  evt, end_of_stream,
291  std::chrono::duration_cast<std::chrono::nanoseconds>(request_time)
292  .count());
293  if (sc.isFailure() && !sc.isRecoverable()) {
295  status.status = sc;
296  status.createdEvents = m_nLocalCreatedEvts;
297  status.skippedEvents = m_nLocalSkippedEvts;
298  status.finishedEvents = m_nLocalFinishedEvts;
299  m_clusterSvc->sendMessage(
301  return sc;
302  }
303  if (end_of_stream || m_terminateLoop) {
304  auto loop_time = Clock::now() - start;
305  ATH_MSG_INFO("Worker " << m_clusterSvc->rank() << " DONE. Loop took "
306  << std::chrono::hh_mm_ss(loop_time)
307  << " to process " << m_nLocalCreatedEvts
308  << " events.");
309  // reached end of stream, drain scheduler
311  // At end of stream, we need to *fully* drain the scheduler
312  StatusCode sc = StatusCode::SUCCESS;
313  std::size_t numSlots = m_whiteboard->getNumberOfStores();
314  while (sc.isSuccess() && m_schedulerSvc->freeSlots() < numSlots) {
316  }
317  status.status = sc;
318  status.createdEvents = m_nLocalCreatedEvts;
319  status.skippedEvents = m_nLocalSkippedEvts;
320  status.finishedEvents = m_nLocalFinishedEvts;
321  m_clusterSvc->sendMessage(
323  return sc;
324  }
325  }
326 }
327 
329 StatusCode MPIHiveEventLoopMgr::insertEvent(int eventIdx, bool& endOfStream,
330  std::int64_t requestTime_ns) {
331  // fast-forward to event
332  // Create the event context now so next writes into the next slot when
333  // skipping, not the one that's being used
334  endOfStream = false;
335  auto ctx = createEventContext();
336  Gaudi::Hive::setCurrentContext(ctx);
337  ctx.setEvt(eventIdx); // Make the event numbers in the log actually make sense
338  if (!ctx.valid()) {
339  endOfStream = true; // BUG: Doesn't actually mean end of stream. Remove
340  // after making sure!
341  return StatusCode::FAILURE;
342  }
343 
344  const std::size_t slot = ctx.slot(); // Need this for later
345  ATH_CHECK(seek(eventIdx));
346  // execute event
347  StatusCode sc = executeEvent(std::move(ctx));
348  const auto evtID = m_lastEventContext.eventID(); // Set in AthenaHiveEventLoopMgr
349  m_clusterSvc->log_addEvent(eventIdx, evtID.run_number(), evtID.event_number(),
350  requestTime_ns, slot);
351 
352  if (sc.isRecoverable()) {
354  } else if (sc.isSuccess()) {
356  }
357  return sc;
358 }
359 
363 
364  StatusCode sc(StatusCode::SUCCESS);
365 
366  // maybe we can do better
367  std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
368 
369  EventContext* finishedEvtContext(nullptr);
370 
371  // Here we wait not to loose cpu resources
372  ATH_MSG_DEBUG("drainLocalScheduler: [" << m_nLocalFinishedEvts
373  << "] Waiting for a context");
374  sc = m_schedulerSvc->popFinishedEvent(finishedEvtContext);
375 
376  // We got past it: cache the pointer
377  if (sc.isSuccess()) {
378  ATH_MSG_DEBUG("drainLocalScheduler: scheduler not empty: Context "
379  << finishedEvtContext);
380  finishedEvtContexts.emplace_back(finishedEvtContext);
381  } else {
382  // no more events left in scheduler to be drained
383  ATH_MSG_DEBUG("drainLocalScheduler: scheduler empty");
384  return StatusCode::SUCCESS;
385  }
386 
387  // Let's see if we can pop other event contexts
388  while (m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()) {
389  finishedEvtContexts.emplace_back(finishedEvtContext);
390  }
391 
392  // Now we flush them
393  StatusCode fail(StatusCode::SUCCESS);
394  for (auto& thisFinishedEvtContext : finishedEvtContexts) {
395  if (!thisFinishedEvtContext) {
396  ATH_MSG_FATAL("Detected nullptr ctxt while clearing WB!");
397  fail = StatusCode::FAILURE;
398  continue;
399  }
400 
401  // Update event log
402  m_clusterSvc->log_completeEvent(
403  thisFinishedEvtContext->eventID().run_number(),
404  thisFinishedEvtContext->eventID().event_number(),
405  m_aess->eventStatus(*thisFinishedEvtContext));
406 
407  if (m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
408  ATH_MSG_ERROR("Failed event detected on "
409  << thisFinishedEvtContext << " w/ fail mode: "
410  << m_aess->eventStatus(*thisFinishedEvtContext));
413  if (m_contiguousFailedEvts >= 3 || m_totalFailedEvts >= 10) {
414  // If we have 3 contiguous failed events or 10 total, end the job
415  thisFinishedEvtContext.reset();
416  fail = StatusCode::FAILURE;
417  continue;
418  }
419  }
420  else {
421  // Event succeeded, reset contiguous failed events
423  }
424 
425  EventID::number_type n_run(0);
426  EventID::event_number_t n_evt(0);
427 
428  if (m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
429  n_run = thisFinishedEvtContext->eventID().run_number();
430  n_evt = thisFinishedEvtContext->eventID().event_number();
431  } else {
432  ATH_MSG_ERROR("DrainSched: unable to select store "
433  << thisFinishedEvtContext->slot());
434  thisFinishedEvtContext.reset();
435  fail = StatusCode::FAILURE;
436  continue;
437  }
438 
439  // Some code still needs global context in addition to that passed in the
440  // incident
441  Gaudi::Hive::setCurrentContext(*thisFinishedEvtContext);
442  m_incidentSvc->fireIncident(
443  Incident(name(), IncidentType::EndProcessing, *thisFinishedEvtContext));
444 
445  ATH_MSG_DEBUG("Clearing slot "
446  << thisFinishedEvtContext->slot() << " (event "
447  << thisFinishedEvtContext->evt() << ") of the whiteboard");
448 
449  StatusCode sc = clearWBSlot(thisFinishedEvtContext->slot());
450  if (!sc.isSuccess()) {
451  ATH_MSG_ERROR("Whiteboard slot " << thisFinishedEvtContext->slot()
452  << " could not be properly cleared");
453  if (fail != StatusCode::FAILURE) {
454  fail = sc;
455  }
456  thisFinishedEvtContext.reset();
457  continue;
458  }
459 
461 
462  writeHistograms().ignore();
463  ++m_proc;
464 
465  if (m_doEvtHeartbeat) {
466  if (!m_useTools) {
467  ATH_MSG_INFO(" ===>>> done processing event #"
468  << n_evt << ", run #" << n_run << " on slot "
469  << thisFinishedEvtContext->slot() << ", " << m_proc
470  << " events processed so far <<<===");
471  } else {
472  ATH_MSG_INFO(" ===>>> done processing event #"
473  << n_evt << ", run #" << n_run << " on slot "
474  << thisFinishedEvtContext->slot() << ", " << m_nev
475  << " events read and " << m_proc
476  << " events processed so far <<<===");
477  }
478  std::ofstream outfile("eventLoopHeartBeat.txt");
479  if (!outfile) {
480  ATH_MSG_ERROR(" unable to open: eventLoopHeartBeat.txt");
481  fail = StatusCode::FAILURE;
482  thisFinishedEvtContext.reset();
483  continue;
484  }
485  outfile << " done processing event #" << n_evt << ", run #" << n_run
486  << " " << m_nev << " events read so far <<<===" << std::endl;
487  outfile.close();
488  }
489 
490  ATH_MSG_DEBUG("drainLocalScheduler thisFinishedEvtContext: "
491  << thisFinishedEvtContext);
492 
493  thisFinishedEvtContext.reset();
494  }
495 
496  return fail;
497 }
498 
500  return m_eventStore.get();
501 }
python.ZdcPhysRecConfig.start_time
start_time
Definition: ZdcPhysRecConfig.py:74
AthenaHiveEventLoopMgr::size
virtual int size() override
Return the size of the collection.
Definition: AthenaHiveEventLoopMgr.cxx:833
AthenaHiveEventLoopMgr::m_evtSelector
IEvtSelector * m_evtSelector
Reference to the Event Selector.
Definition: AthenaHiveEventLoopMgr.h:86
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
vtune_athena.format
format
Definition: vtune_athena.py:14
AthCheckMacros.h
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
MPIHiveEventLoopMgr::insertEvent
StatusCode insertEvent(int eventIdx, bool &endOfStream, std::int64_t requestTime_ns)
Insert an event into the local scheduler.
Definition: MPIHiveEventLoopMgr.cxx:329
ClusterMessage::WorkerStatus
Definition: ClusterMessage.h:43
AthenaHiveEventLoopMgr::m_incidentSvc
IIncidentSvc_t m_incidentSvc
Reference to the incident service.
Definition: AthenaHiveEventLoopMgr.h:79
AthenaHiveEventLoopMgr
The default ATLAS batch event loop manager.
Definition: AthenaHiveEventLoopMgr.h:72
mergePhysValFiles.start
start
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:13
MPIHiveEventLoopMgr.h
The MPI event loop manager.
MPIHiveEventLoopMgr::m_evtSelectorCurrentPos
int m_evtSelectorCurrentPos
Definition: MPIHiveEventLoopMgr.h:75
DeMoUpdate.statuses
list statuses
Definition: DeMoUpdate.py:568
AthenaHiveEventLoopMgr::name
virtual const std::string & name() const override
Definition: AthenaHiveEventLoopMgr.h:228
ClusterMessageType::RequestEvent
@ RequestEvent
MPIHiveEventLoopMgr::MPIHiveEventLoopMgr
MPIHiveEventLoopMgr(const std::string &name, ISvcLocator *svcLoc)
Standard Constructor.
Definition: MPIHiveEventLoopMgr.cxx:21
LArG4FSStartPointFilter.evt
evt
Definition: LArG4FSStartPointFilter.py:42
MPIHiveEventLoopMgr::m_totalFailedEvts
int m_totalFailedEvts
Definition: MPIHiveEventLoopMgr.h:53
AthenaHiveEventLoopMgr::m_eventStore
StoreGateSvc_t m_eventStore
Reference to StoreGateSvc;.
Definition: AthenaHiveEventLoopMgr.h:83
XMLtoHeader.count
count
Definition: XMLtoHeader.py:84
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
AthenaHiveEventLoopMgr::m_schedulerSvc
SmartIF< IScheduler > m_schedulerSvc
A shortcut for the scheduler.
Definition: AthenaHiveEventLoopMgr.h:166
AthenaHiveEventLoopMgr::m_proc
unsigned int m_proc
Definition: AthenaHiveEventLoopMgr.h:249
AthenaHiveEventLoopMgr::clearWBSlot
StatusCode clearWBSlot(int evtSlot)
Clear a slot in the WB.
Definition: AthenaHiveEventLoopMgr.cxx:1304
event_number_t
EventIDBase::event_number_t event_number_t
Definition: IEvtIdModifierSvc.h:30
ClusterMessageType::EmergencyStop
@ EmergencyStop
MPIHiveEventLoopMgr::~MPIHiveEventLoopMgr
virtual ~MPIHiveEventLoopMgr()
Standard Destructor.
StoreGateSvc
The Athena Transient Store API.
Definition: StoreGateSvc.h:122
python.handimod.now
now
Definition: handimod.py:674
AthenaHiveEventLoopMgr::m_appMgrProperty
SmartIF< IProperty > m_appMgrProperty
Property interface of ApplicationMgr.
Definition: AthenaHiveEventLoopMgr.h:163
AthenaHiveEventLoopMgr::m_aess
SmartIF< IAlgExecStateSvc > m_aess
Reference to the Algorithm Execution State Svc.
Definition: AthenaHiveEventLoopMgr.h:160
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
MPIHiveEventLoopMgr::workerEventLoop
StatusCode workerEventLoop()
Worker event loop (runs on worker, requests events over MPI)
Definition: MPIHiveEventLoopMgr.cxx:210
EventID::number_type
EventIDBase::number_type number_type
Definition: EventID.h:37
lumiFormat.i
int i
Definition: lumiFormat.py:85
AthenaHiveEventLoopMgr::m_doEvtHeartbeat
bool m_doEvtHeartbeat
Definition: AthenaHiveEventLoopMgr.h:251
MPIHiveEventLoopMgr::m_nLocalFinishedEvts
int m_nLocalFinishedEvts
Definition: MPIHiveEventLoopMgr.h:57
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
MPIHiveEventLoopMgr::m_firstEventIndex
UnsignedIntegerProperty m_firstEventIndex
Definition: MPIHiveEventLoopMgr.h:73
AthenaHiveEventLoopMgr::m_terminateLoop
bool m_terminateLoop
Definition: AthenaHiveEventLoopMgr.h:246
Clock
std::chrono::high_resolution_clock Clock
Definition: MPIHiveEventLoopMgr.cxx:18
ClusterMessageType::FinalWorkerStatus
@ FinalWorkerStatus
AthMessaging::msg
MsgStream & msg() const
The standard message stream.
Definition: AthMessaging.h:164
AthenaHiveEventLoopMgr::writeHistograms
virtual StatusCode writeHistograms(bool force=false)
Dump out histograms as needed.
Definition: AthenaHiveEventLoopMgr.cxx:428
ClusterMessage
A class describing a message sent between nodes in a cluster.
Definition: ClusterMessage.h:30
MPIHiveEventLoopMgr::masterEventLoop
StatusCode masterEventLoop(int maxEvt)
Master event loop (runs on master, provides events over MPI)
Definition: MPIHiveEventLoopMgr.cxx:62
AthenaHiveEventLoopMgr::executeEvent
virtual StatusCode executeEvent(EventContext &&ctx) override
implementation of IEventProcessor::executeEvent(void* par)
Definition: AthenaHiveEventLoopMgr.cxx:501
MPIHiveEventLoopMgr::drainLocalScheduler
StatusCode drainLocalScheduler()
Drain the local scheduler of any (at least one) completed events.
Definition: MPIHiveEventLoopMgr.cxx:362
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AthenaHiveEventLoopMgr::createEventContext
virtual EventContext createEventContext() override
Create event context.
Definition: AthenaHiveEventLoopMgr.cxx:1167
MPIHiveEventLoopMgr::initialize
virtual StatusCode initialize() override
implementation of IAppMgrUI::initalize
Definition: MPIHiveEventLoopMgr.cxx:30
MPIHiveEventLoopMgr::finalize
virtual StatusCode finalize() override
implementation of IAppMgrUI::finalize
Definition: MPIHiveEventLoopMgr.cxx:38
MPIHiveEventLoopMgr::m_nLocalCreatedEvts
int m_nLocalCreatedEvts
Definition: MPIHiveEventLoopMgr.h:55
ClusterMessageType::WorkerError
@ WorkerError
AthenaHiveEventLoopMgr::initialize
virtual StatusCode initialize() override
implementation of IAppMgrUI::initalize
Definition: AthenaHiveEventLoopMgr.cxx:136
MPIHiveEventLoopMgr::m_contiguousFailedEvts
int m_contiguousFailedEvts
Definition: MPIHiveEventLoopMgr.h:52
AthenaHiveEventLoopMgr::finalize
virtual StatusCode finalize() override
implementation of IAppMgrUI::finalize
Definition: AthenaHiveEventLoopMgr.cxx:360
MPIHiveEventLoopMgr::m_clusterSvc
ServiceHandle< IMPIClusterSvc > m_clusterSvc
Reference to the MPIClusterSvc.
Definition: MPIHiveEventLoopMgr.h:38
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
ClusterMessageType::EventsDone
@ EventsDone
AthenaHiveEventLoopMgr::m_whiteboard
SmartIF< IHiveWhiteBoard > m_whiteboard
Reference to the Whiteboard interface.
Definition: AthenaHiveEventLoopMgr.h:154
AthenaHiveEventLoopMgr::m_lastEventContext
EventContext m_lastEventContext
Definition: AthenaHiveEventLoopMgr.h:255
merge.status
status
Definition: merge.py:16
AthenaHiveEventLoopMgr::m_nev
unsigned int m_nev
events processed
Definition: AthenaHiveEventLoopMgr.h:248
ClusterMessageType::ProvideEvent
@ ProvideEvent
PrepareReferenceFile.outfile
outfile
Definition: PrepareReferenceFile.py:42
MPIHiveEventLoopMgr::eventStore
StoreGateSvc * eventStore() const
Definition: MPIHiveEventLoopMgr.cxx:499
MPIHiveEventLoopMgr::nextEvent
virtual StatusCode nextEvent(int maxevt) override
implementation of IAppMgrUI::nextEvent. maxevt==0 returns immediately
Definition: MPIHiveEventLoopMgr.cxx:44
ClusterMessage.h
AthenaHiveEventLoopMgr::seek
virtual StatusCode seek(int evt) override
Seek to a given event.
Definition: AthenaHiveEventLoopMgr.cxx:793
AthenaHiveEventLoopMgr::m_useTools
bool m_useTools
Definition: AthenaHiveEventLoopMgr.h:250
beamspotman.fail
def fail(message)
Definition: beamspotman.py:197
MPIHiveEventLoopMgr::m_nLocalSkippedEvts
int m_nLocalSkippedEvts
Definition: MPIHiveEventLoopMgr.h:56