ATLAS Offline Software
Loading...
Searching...
No Matches
TimeoutAlg.cxx
Go to the documentation of this file.
1/*
2 * Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4
10
11#include "TimeoutAlg.h"
12
14#include "GaudiKernel/IScheduler.h"
15#include "GaudiKernel/ServiceHandle.h"
16
17#include <format>
18
19
21{
22 m_timeout = std::chrono::nanoseconds(m_timeoutProp);
23
24 // Subscribe to EndAlgorithms (includes output sequence)
25 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc/IncidentSvc", name());
26 ATH_CHECK(incSvc.retrieve());
27 incSvc->addListener(this, "EndAlgorithms", /*priority*/ 0);
28
29 return StatusCode::SUCCESS;
30}
31
32
33StatusCode TimeoutAlg::execute (const EventContext& ctx) const
34{
35 // Timeout thread is started on first event to make sure this also works
36 // in athenaMP (threads usually don't survive forking).
37 [[maybe_unused]] static const bool initThread = [&](){
38 if (m_timeoutProp > 0) {
39 const auto nc_this ATLAS_THREAD_SAFE = const_cast<TimeoutAlg*>(this);
40 m_thread = std::thread(&TimeoutAlg::timeoutThread, nc_this);
41 }
42 return true;
43 }();
44
45 // Set event start time for current slot
46 *m_eventStartTime.get(ctx) = clock_t::now();
47
48 return StatusCode::SUCCESS;
49}
50
51
52void TimeoutAlg::handle(const Incident& inc)
53{
54 if (inc.type() == "EndAlgorithms") {
55 ATH_MSG_DEBUG("Resetting event timeout for slot " << inc.context().slot());
56 // Reset start time for slot to zero
57 *m_eventStartTime.get(inc.context()) = {};
58 }
59}
60
61
62StatusCode TimeoutAlg::stop()
63{
64 if (m_thread.joinable() && !m_stopped.exchange(true)) {
65 // Signal timeout thread to stop
66 ATH_MSG_DEBUG("Stopping timeout thread");
67 m_stop_thread.set_value();
68 m_thread.join();
69 }
70
71 return StatusCode::SUCCESS;
72}
73
74
76{
77 ATH_MSG_INFO(std::format("Setting per-event timeout of {}",
78 std::chrono::duration<double>(m_timeout)));
79
80 // Wakeup at regular intervals (with a minimum frequency, useful for long timeouts)
81 const std::chrono::nanoseconds wakeup_interval =
82 std::min(m_timeout, std::chrono::nanoseconds(m_checkInterval));
83
84 // Loop until we have received stop signal
85 auto stop_signal = m_stop_thread.get_future();
86 while ( stop_signal.wait_for(wakeup_interval) == std::future_status::timeout ) {
87
88 // Loop over all slots and check if event has reached timeout
89 const auto now = clock_t::now();
90 for (EventContext::ContextID_t slot = 0;
91 const auto& startTime : m_eventStartTime) {
92
93 if (startTime.time_since_epoch().count() > 0 && now > startTime + m_timeout) {
94 handleTimeout(slot);
95 }
96
97 ++slot;
98 }
99 }
100}
101
102
103void TimeoutAlg::handleTimeout(EventContext::ContextID_t slot)
104{
105 // To avoid getting another timeout while handling this one
106 std::scoped_lock lock(m_handleMutex);
107
108 // Create minimal context with slot number
109 const EventContext ctx(0, slot);
110
111 // Don't duplicate the actions if the timeout was already reached for this slot
112 if (Athena::Timeout::instance(ctx).reached()) return;
113
114 // Print ERROR message
115 const std::string msg = std::format("Event timeout ({}) in slot {} reached",
116 std::chrono::duration<double>(m_timeout), slot);
118
119 // Set timeout flag
121
122 // Dump scheduler state if requested
123 if (m_dumpState) {
124 ServiceHandle<IScheduler> schedulerSvc("AvalancheSchedulerSvc", name());
125 if (schedulerSvc.retrieve().isSuccess()) {
126 schedulerSvc->dumpState();
127 }
128 }
129
130 // Abort job if requested
131 if (m_abort) {
132 // Stop the timeout thread to avoid additional triggers
133 stop().ignore();
134
135 // Tell CoreDumpSvc about the reason for the abort
136 ServiceHandle<ICoreDumpSvc> coreDumpSvc("CoreDumpSvc", name());
137 if ( coreDumpSvc.retrieve().isSuccess() ) {
138 coreDumpSvc->setCoreDumpInfo(ctx, "Reason", msg);
139 }
140 else {
141 std::cerr << msg << std::endl;
142 }
143 // Abort job (and let CoreDumpSvc handle SIGABRT)
144 std::abort();
145 }
146
147}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_DEBUG(x)
Interface of a core dump service.
void setTimeout(Timeout &instance)
Set timeout.
Definition Timeout.h:80
static Timeout & instance()
Get reference to Timeout singleton.
Definition Timeout.h:64
virtual void setCoreDumpInfo(const std::string &name, const std::string &value) override
Set a name/value pair in the core dump record.
Algorithm to monitor event timeouts.
Definition TimeoutAlg.h:37
std::promise< void > m_stop_thread
Signal to stop watchdog thread.
Definition TimeoutAlg.h:82
void handleTimeout(EventContext::ContextID_t slot)
Handle timeout.
Gaudi::Property< bool > m_dumpState
Definition TimeoutAlg.h:64
virtual void handle(const Incident &inc) override
std::mutex m_handleMutex
Mutex for handleTimeout.
Definition TimeoutAlg.h:88
virtual StatusCode stop() override
Gaudi::Property< bool > m_abort
Definition TimeoutAlg.h:67
std::atomic< bool > m_stopped
Has watchdog thread already been stopped? (to avoid setting future twice)
Definition TimeoutAlg.h:85
Gaudi::Property< unsigned long long > m_timeoutProp
Definition TimeoutAlg.h:58
void timeoutThread()
Watchdog thread.
virtual StatusCode execute(const EventContext &ctx) const override
std::chrono::nanoseconds m_timeout
Timeout property as duration.
Definition TimeoutAlg.h:73
SG::SlotSpecificObj< clock_t::time_point > m_eventStartTime ATLAS_THREAD_SAFE
Start time of each event per slot.
Definition TimeoutAlg.h:76
Gaudi::Property< unsigned long long > m_checkInterval
Definition TimeoutAlg.h:61
virtual StatusCode initialize() override
Algorithm to monitor event timeouts.
MsgStream & msg
Definition testRead.cxx:32