ATLAS Offline Software
Loading...
Searching...
No Matches
MPIHiveEventLoopMgr.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
5
6// Gaudi includes
7#include "GaudiKernel/AppReturnCode.h"
8
9// Utilities
12
13// Standard Library
14#include <chrono>
15#include <fstream>
16#include <string>
17
18using Clock = std::chrono::high_resolution_clock;
19
20// Standard Constructor
22 ISvcLocator* svcLoc)
23 : AthenaHiveEventLoopMgr(name, svcLoc) {}
24
25// Standard Destructor
27
31 // Initialize cluster svc
32 ATH_CHECK(m_clusterSvc.retrieve());
34}
35
41
44StatusCode MPIHiveEventLoopMgr::nextEvent(int maxevt) {
45 // make nextEvent(0) a dummy call
46 if (0 == maxevt) {
47 return StatusCode::SUCCESS;
48 }
49
50 // Reset the application return code.
51 Gaudi::setAppReturnCode(m_appMgrProperty, Gaudi::ReturnCode::Success, true)
52 .ignore();
53 ATH_MSG_INFO("Starting loop on events");
54
55 if (m_clusterSvc->rank() == 0) {
56 return masterEventLoop(maxevt);
57 }
58 return workerEventLoop();
59}
60
63 ATH_MSG_INFO("Running with " << m_clusterSvc->numRanks() << " ranks");
64 // Determine number of events to process
65 int skipEvts = int(m_firstEventIndex.value());
66 if (m_evtSelector != nullptr) {
67 int evt = size();
68 if (evt == -1) {
69 m_clusterSvc->abort();
70 return StatusCode::FAILURE;
71 }
72 if (maxEvt < 0 || maxEvt > evt) {
73 maxEvt = evt;
74 }
75 ATH_MSG_INFO("Will be processing " << maxEvt << " events");
76 }
77
78 // Setup worker status DB (Spare one at start)
79 std::vector<bool> workers_done(m_clusterSvc->numRanks(), false);
80 workers_done[0] =
81 true; // Set 0 to true because it doesn't correspond to a worker
82 int num_workers_done = 1; // Init to 1 so we can compare to numRanks
83 std::vector<ClusterMessage::WorkerStatus> statuses(m_clusterSvc->numRanks());
84
85 // Entering event loop
86 m_clusterSvc->barrier();
87 // Note: no ++evt. This is because this is really a message loop, and we don't
88 // want to increment evt if we haven't provided an event
89 auto start = Clock::now();
90 for (int evt = skipEvts; evt < skipEvts + maxEvt;) {
91 ClusterMessage msg = m_clusterSvc->waitReceiveMessage();
92 // Messages we can get are RequestEvent, FinalWorkerStatus, or WorkerError
93 if (msg.messageType == ClusterMessageType::RequestEvent) {
94 ATH_MSG_DEBUG("Starting event " << evt << " on " << msg.source);
95 m_clusterSvc->sendMessage(
96 msg.source, ClusterMessage(ClusterMessageType::ProvideEvent, evt));
97 ++evt;
98 continue;
99 }
100
101 if (msg.messageType == ClusterMessageType::WorkerError) {
102 ATH_MSG_ERROR("Received WorkerError message from " << msg.source);
103 statuses.at(msg.source) = get<ClusterMessage::WorkerStatus>(msg.payload);
104 workers_done.at(msg.source) =
105 true; // If a worker hits an error, it's done
106 ++num_workers_done;
107 for (int i = 1; i < m_clusterSvc->numRanks(); ++i) {
108 if (!workers_done.at(i)) {
109 // Tell workers that aren't done to emergency stop
110 m_clusterSvc->sendMessage(
111 i, ClusterMessage(ClusterMessageType::EmergencyStop));
112 workers_done[i] = true;
113 ++num_workers_done;
114 }
115 }
116 break;
117 }
118
119 if (msg.messageType == ClusterMessageType::FinalWorkerStatus) {
120 ATH_MSG_INFO("Received FinalWorkerStatus from " << msg.source);
121 statuses.at(msg.source) = get<ClusterMessage::WorkerStatus>(msg.payload);
122 workers_done.at(msg.source) = true; // Worker hit end of stream
123 ++num_workers_done;
124 continue;
125 }
126
127 // Other message types are an error
128 ATH_MSG_ERROR("Received unexpected message "
129 << std::format("{}", msg.messageType) << " from "
130 << msg.source);
131 }
132 auto all_provided = Clock::now() - start;
133 ATH_MSG_INFO("Provided all events to workers, waiting for them to complete.");
134 // Event loop done, tell remaining workers
135 while (num_workers_done < m_clusterSvc->numRanks()) {
136 ClusterMessage msg = m_clusterSvc->waitReceiveMessage();
137 // Messages we can get are RequestEvent, FinalWorkerStatus, or WorkerError
138 if (msg.messageType == ClusterMessageType::RequestEvent) {
139 m_clusterSvc->sendMessage(msg.source,
140 ClusterMessage(ClusterMessageType::EventsDone));
141 continue;
142 }
143
144 if (msg.messageType == ClusterMessageType::WorkerError) {
145 ATH_MSG_ERROR("Received WorkerError message from " << msg.source);
146 statuses.at(msg.source) = get<ClusterMessage::WorkerStatus>(msg.payload);
147 workers_done.at(msg.source) =
148 true; // If a worker hits an error, it's done
149 ++num_workers_done;
150 for (int i = 1; i < m_clusterSvc->numRanks(); ++i) {
151 if (!workers_done.at(i)) {
152 // Tell workers that aren't done to emergency stop
153 m_clusterSvc->sendMessage(
154 i, ClusterMessage(ClusterMessageType::EmergencyStop));
155 workers_done[i] = true;
156 ++num_workers_done;
157 }
158 }
159 break;
160 }
161
162 if (msg.messageType == ClusterMessageType::FinalWorkerStatus) {
163 ATH_MSG_INFO("Received FinalWorkerStatus from " << msg.source);
164 statuses.at(msg.source) = get<ClusterMessage::WorkerStatus>(msg.payload);
165 workers_done.at(msg.source) = true; // Told worker we're done
166 ++num_workers_done;
167 continue;
168 }
169
170 // Other message types are an error
171 ATH_MSG_ERROR("Received unexpected message "
172 << std::format("{}", msg.messageType) << " from "
173 << msg.source);
174 }
175 auto all_done = Clock::now() - start;
176 // Collate status
177 int n_created = 0;
178 int n_skipped = 0;
179 int n_finished = 0;
180
181 StatusCode sc = StatusCode::SUCCESS;
182 int worker_idx = 0;
183 for (const auto& worker_status : statuses) {
184 if (worker_status.status.isFailure() &&
185 worker_status.status != StatusCode(9999)) {
186 sc = worker_status.status;
187 }
188 n_created += worker_status.createdEvents;
189 n_skipped += worker_status.skippedEvents;
190 n_finished += worker_status.finishedEvents;
191
192 if ((worker_idx++) != 0) {
193 ATH_MSG_INFO("Worker " << worker_idx << ": SC " << worker_status.status
194 << ", created " << worker_status.createdEvents
195 << ", skipped " << worker_status.skippedEvents
196 << ", finished " << worker_status.finishedEvents);
197 }
198 }
199
200 ATH_MSG_INFO("Overall: SC " << sc << ", created " << n_created << ", skipped "
201 << n_skipped << ", finished " << n_finished);
202 ATH_MSG_INFO("MASTER: Took " << std::chrono::hh_mm_ss(all_provided)
203 << " to provide all events.");
204 ATH_MSG_INFO("MASTER: Took " << std::chrono::hh_mm_ss(all_done)
205 << " to complete all events.");
206 return sc;
207}
208
211 bool end_of_stream = false;
212 // barrier so all ranks enter message loop together
213 m_clusterSvc->barrier();
214 auto start = Clock::now();
216 while (true) {
217 // Drain the scheduler (wait for at least one event to complete, then free
218 // up completed slots) in two circumstances
219 // 1. Have created exactly one event, so the first event runs to completion
220 // before any more are scheduled
221 // 2. There are no free slots left
222 bool haveFreeSlots =
223 m_schedulerSvc->freeSlots() > 0 && m_whiteboard->freeSlots() > 0;
224 if (!haveFreeSlots || m_nLocalCreatedEvts == 1) {
225 StatusCode sc = drainLocalScheduler();
226 if (sc.isFailure()) {
228 status.status = sc;
229 status.createdEvents = m_nLocalCreatedEvts;
230 status.skippedEvents = m_nLocalSkippedEvts;
231 status.finishedEvents = m_nLocalFinishedEvts;
232 m_clusterSvc->sendMessage(
233 0, ClusterMessage(ClusterMessageType::WorkerError, status));
234 return sc;
235 }
236 }
237
238 auto start_time = Clock::now();
239 m_clusterSvc->sendMessage(0,
240 ClusterMessage(ClusterMessageType::RequestEvent));
241 ClusterMessage msg = m_clusterSvc->waitReceiveMessage();
242 auto request_time = Clock::now() - start_time;
243 if (msg.messageType == ClusterMessageType::EmergencyStop) {
244 // Emergency stop, return FAILURE after fully draining the scheduler to prevent segfault
245 std::size_t numSlots = m_whiteboard->getNumberOfStores();
246 while (m_schedulerSvc->freeSlots() < numSlots) {
247 // Ignore StatusCode, going to return FAILURE anyway
248 (void)(drainLocalScheduler());
249 }
250 ATH_MSG_ERROR("Received EmergencyStop message!");
251 return StatusCode::FAILURE;
252 }
253
254 if (msg.messageType == ClusterMessageType::EventsDone) {
255 auto loop_time = Clock::now() - start;
256 ATH_MSG_INFO("Worker " << m_clusterSvc->rank() << " DONE. Loop took "
257 << std::chrono::hh_mm_ss(loop_time)
258 << " to process " << m_nLocalCreatedEvts
259 << " events.");
260 // Been told we've reached end
261 // Provide status to master
263 // At end of stream, we need to *fully* drain the scheduler
264 StatusCode sc = StatusCode::SUCCESS;
265 std::size_t numSlots = m_whiteboard->getNumberOfStores();
266 while (sc.isSuccess() && m_schedulerSvc->freeSlots() < numSlots) {
268 }
269 status.status = sc;
270 status.createdEvents = m_nLocalCreatedEvts;
271 status.skippedEvents = m_nLocalSkippedEvts;
272 status.finishedEvents = m_nLocalFinishedEvts;
273 m_clusterSvc->sendMessage(
274 0, ClusterMessage(ClusterMessageType::FinalWorkerStatus, status));
275 return sc;
276 }
277
278 // Any other message other than ProvideEvent would now be an error
279 if (msg.messageType != ClusterMessageType::ProvideEvent ||
280 msg.source != 0) {
281 ATH_MSG_ERROR("Received unexpected message "
282 << std::format("{}", msg.messageType) << " from "
283 << msg.source);
284 return StatusCode::FAILURE;
285 }
286
287 int evt = get<int>(msg.payload);
288 ATH_MSG_INFO("Starting event " << evt);
289 StatusCode sc = insertEvent(
290 evt, end_of_stream,
291 std::chrono::duration_cast<std::chrono::nanoseconds>(request_time)
292 .count());
293 if (sc.isFailure() && !sc.isRecoverable()) {
295 status.status = sc;
296 status.createdEvents = m_nLocalCreatedEvts;
297 status.skippedEvents = m_nLocalSkippedEvts;
298 status.finishedEvents = m_nLocalFinishedEvts;
299 m_clusterSvc->sendMessage(
300 0, ClusterMessage(ClusterMessageType::WorkerError, status));
301 return sc;
302 }
303 if (end_of_stream || m_terminateLoop) {
304 auto loop_time = Clock::now() - start;
305 ATH_MSG_INFO("Worker " << m_clusterSvc->rank() << " DONE. Loop took "
306 << std::chrono::hh_mm_ss(loop_time)
307 << " to process " << m_nLocalCreatedEvts
308 << " events.");
309 // reached end of stream, drain scheduler
311 // At end of stream, we need to *fully* drain the scheduler
312 StatusCode sc = StatusCode::SUCCESS;
313 std::size_t numSlots = m_whiteboard->getNumberOfStores();
314 while (sc.isSuccess() && m_schedulerSvc->freeSlots() < numSlots) {
316 }
317 status.status = sc;
318 status.createdEvents = m_nLocalCreatedEvts;
319 status.skippedEvents = m_nLocalSkippedEvts;
320 status.finishedEvents = m_nLocalFinishedEvts;
321 m_clusterSvc->sendMessage(
322 0, ClusterMessage(ClusterMessageType::FinalWorkerStatus, status));
323 return sc;
324 }
325 }
326}
327
329StatusCode MPIHiveEventLoopMgr::insertEvent(int eventIdx, bool& endOfStream,
330 std::int64_t requestTime_ns) {
331 // fast-forward to event
332 // Create the event context now so next writes into the next slot when
333 // skipping, not the one that's being used
334 endOfStream = false;
335 auto ctx = createEventContext();
336 Gaudi::Hive::setCurrentContext(ctx);
337 ctx.setEvt(eventIdx); // Make the event numbers in the log actually make sense
338 if (!ctx.valid()) {
339 endOfStream = true; // BUG: Doesn't actually mean end of stream. Remove
340 // after making sure!
341 return StatusCode::FAILURE;
342 }
343
344 const std::size_t slot = ctx.slot(); // Need this for later
345 ATH_CHECK(seek(eventIdx));
346 // execute event
347 StatusCode sc = executeEvent(std::move(ctx));
348 const auto evtID = m_lastEventContext.eventID(); // Set in AthenaHiveEventLoopMgr
349 m_clusterSvc->log_addEvent(eventIdx, evtID.run_number(), evtID.event_number(),
350 requestTime_ns, slot);
351
352 if (sc.isRecoverable()) {
354 } else if (sc.isSuccess()) {
356 }
357 return sc;
358}
359
363
364 StatusCode sc(StatusCode::SUCCESS);
365
366 // maybe we can do better
367 std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
368
369 EventContext* finishedEvtContext(nullptr);
370
371 // Here we wait not to loose cpu resources
372 ATH_MSG_DEBUG("drainLocalScheduler: [" << m_nLocalFinishedEvts
373 << "] Waiting for a context");
374 sc = m_schedulerSvc->popFinishedEvent(finishedEvtContext);
375
376 // We got past it: cache the pointer
377 if (sc.isSuccess()) {
378 ATH_MSG_DEBUG("drainLocalScheduler: scheduler not empty: Context "
379 << finishedEvtContext);
380 finishedEvtContexts.emplace_back(finishedEvtContext);
381 } else {
382 // no more events left in scheduler to be drained
383 ATH_MSG_DEBUG("drainLocalScheduler: scheduler empty");
384 return StatusCode::SUCCESS;
385 }
386
387 // Let's see if we can pop other event contexts
388 while (m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()) {
389 finishedEvtContexts.emplace_back(finishedEvtContext);
390 }
391
392 // Now we flush them
393 StatusCode fail(StatusCode::SUCCESS);
394 for (auto& thisFinishedEvtContext : finishedEvtContexts) {
395 if (!thisFinishedEvtContext) {
396 ATH_MSG_FATAL("Detected nullptr ctxt while clearing WB!");
397 fail = StatusCode::FAILURE;
398 continue;
399 }
400
401 // Update event log
402 m_clusterSvc->log_completeEvent(
403 thisFinishedEvtContext->eventID().run_number(),
404 thisFinishedEvtContext->eventID().event_number(),
405 m_aess->eventStatus(*thisFinishedEvtContext));
406
407 if (m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
408 ATH_MSG_ERROR("Failed event detected on "
409 << thisFinishedEvtContext << " w/ fail mode: "
410 << m_aess->eventStatus(*thisFinishedEvtContext));
413 if (m_contiguousFailedEvts >= 3 || m_totalFailedEvts >= 10) {
414 // If we have 3 contiguous failed events or 10 total, end the job
415 thisFinishedEvtContext.reset();
416 fail = StatusCode::FAILURE;
417 continue;
418 }
419 }
420 else {
421 // Event succeeded, reset contiguous failed events
423 }
424
425 EventID::number_type n_run(0);
426 EventID::event_number_t n_evt(0);
427
428 if (m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
429 n_run = thisFinishedEvtContext->eventID().run_number();
430 n_evt = thisFinishedEvtContext->eventID().event_number();
431 } else {
432 ATH_MSG_ERROR("DrainSched: unable to select store "
433 << thisFinishedEvtContext->slot());
434 thisFinishedEvtContext.reset();
435 fail = StatusCode::FAILURE;
436 continue;
437 }
438
439 // Some code still needs global context in addition to that passed in the
440 // incident
441 Gaudi::Hive::setCurrentContext(*thisFinishedEvtContext);
442 m_incidentSvc->fireIncident(
443 Incident(name(), IncidentType::EndProcessing, *thisFinishedEvtContext));
444
445 ATH_MSG_DEBUG("Clearing slot "
446 << thisFinishedEvtContext->slot() << " (event "
447 << thisFinishedEvtContext->evt() << ") of the whiteboard");
448
449 StatusCode sc = clearWBSlot(thisFinishedEvtContext->slot());
450 if (!sc.isSuccess()) {
451 ATH_MSG_ERROR("Whiteboard slot " << thisFinishedEvtContext->slot()
452 << " could not be properly cleared");
453 if (fail != StatusCode::FAILURE) {
454 fail = sc;
455 }
456 thisFinishedEvtContext.reset();
457 continue;
458 }
459
461
462 writeHistograms().ignore();
463 ++m_proc;
464
465 if (m_doEvtHeartbeat) {
466 if (!m_useTools) {
467 ATH_MSG_INFO(" ===>>> done processing event #"
468 << n_evt << ", run #" << n_run << " on slot "
469 << thisFinishedEvtContext->slot() << ", " << m_proc
470 << " events processed so far <<<===");
471 } else {
472 ATH_MSG_INFO(" ===>>> done processing event #"
473 << n_evt << ", run #" << n_run << " on slot "
474 << thisFinishedEvtContext->slot() << ", " << m_nev
475 << " events read and " << m_proc
476 << " events processed so far <<<===");
477 }
478 std::ofstream outfile("eventLoopHeartBeat.txt");
479 if (!outfile) {
480 ATH_MSG_ERROR(" unable to open: eventLoopHeartBeat.txt");
481 fail = StatusCode::FAILURE;
482 thisFinishedEvtContext.reset();
483 continue;
484 }
485 outfile << " done processing event #" << n_evt << ", run #" << n_run
486 << " " << m_nev << " events read so far <<<===" << std::endl;
487 outfile.close();
488 }
489
490 ATH_MSG_DEBUG("drainLocalScheduler thisFinishedEvtContext: "
491 << thisFinishedEvtContext);
492
493 thisFinishedEvtContext.reset();
494 }
495
496 return fail;
497}
498
500 return m_eventStore.get();
501}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_DEBUG(x)
static Double_t sc
std::chrono::high_resolution_clock Clock
The MPI event loop manager.
IEvtSelector * m_evtSelector
Reference to the Event Selector.
virtual StatusCode initialize() override
implementation of IAppMgrUI::initalize
SmartIF< IHiveWhiteBoard > m_whiteboard
Reference to the Whiteboard interface.
virtual int size() override
Return the size of the collection.
SmartIF< IAlgExecStateSvc > m_aess
Reference to the Algorithm Execution State Svc.
virtual const std::string & name() const override
unsigned int m_nev
events processed
IIncidentSvc_t m_incidentSvc
Reference to the incident service.
SmartIF< IProperty > m_appMgrProperty
Property interface of ApplicationMgr.
StoreGateSvc_t m_eventStore
Reference to StoreGateSvc;.
MsgStream & msg() const
The standard message stream.
virtual StatusCode seek(int evt) override
Seek to a given event.
AthenaHiveEventLoopMgr(const std::string &nam, ISvcLocator *svcLoc)
Standard Constructor.
virtual StatusCode finalize() override
implementation of IAppMgrUI::finalize
virtual StatusCode executeEvent(EventContext &&ctx) override
implementation of IEventProcessor::executeEvent(void* par)
virtual EventContext createEventContext() override
Create event context.
StatusCode clearWBSlot(int evtSlot)
Clear a slot in the WB.
virtual StatusCode writeHistograms(bool force=false)
Dump out histograms as needed.
SmartIF< IScheduler > m_schedulerSvc
A shortcut for the scheduler.
EventIDBase::number_type number_type
Definition EventID.h:37
StatusCode workerEventLoop()
Worker event loop (runs on worker, requests events over MPI)
StatusCode drainLocalScheduler()
Drain the local scheduler of any (at least one) completed events.
ServiceHandle< IMPIClusterSvc > m_clusterSvc
Reference to the MPIClusterSvc.
MPIHiveEventLoopMgr(const std::string &name, ISvcLocator *svcLoc)
Standard Constructor.
virtual StatusCode finalize() override
implementation of IAppMgrUI::finalize
virtual ~MPIHiveEventLoopMgr()
Standard Destructor.
StoreGateSvc * eventStore() const
virtual StatusCode nextEvent(int maxevt) override
implementation of IAppMgrUI::nextEvent. maxevt==0 returns immediately
StatusCode masterEventLoop(int maxEvt)
Master event loop (runs on master, provides events over MPI)
virtual StatusCode initialize() override
implementation of IAppMgrUI::initalize
StatusCode insertEvent(int eventIdx, bool &endOfStream, std::int64_t requestTime_ns)
Insert an event into the local scheduler.
UnsignedIntegerProperty m_firstEventIndex
The Athena Transient Store API.
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition hcg.cxx:130
int count(std::string s, const std::string &regx)
count how many occurances of a regx are in a string
Definition hcg.cxx:146
A class describing a message sent between nodes in a cluster.