ATLAS Offline Software
Loading...
Searching...
No Matches
TimeoutAlg.cxx
Go to the documentation of this file.
1/*
2 * Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4
10
11#include "TimeoutAlg.h"
12
14#include "GaudiKernel/IScheduler.h"
15#include "GaudiKernel/ServiceHandle.h"
16
17#include <format>
18#include "valgrind/valgrind.h"
19
20
22{
23 if (RUNNING_ON_VALGRIND) {
24 ATH_MSG_INFO("Detected running on valgrind. Disabling algorithm timeout");
26 return StatusCode::SUCCESS;
27 }
28 m_timeout = std::chrono::nanoseconds(m_timeoutProp);
29
30 // Subscribe to EndAlgorithms (includes output sequence)
31 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc/IncidentSvc", name());
32 ATH_CHECK(incSvc.retrieve());
33 incSvc->addListener(this, "EndAlgorithms", /*priority*/ 0);
34
35 return StatusCode::SUCCESS;
36}
37
38
39StatusCode TimeoutAlg::execute (const EventContext& ctx) const
40{
41 // Timeout thread is started on first event to make sure this also works
42 // in athenaMP (threads usually don't survive forking).
43 [[maybe_unused]] static const bool initThread = [&](){
44 if (m_timeoutProp > 0) {
45 const auto nc_this ATLAS_THREAD_SAFE = const_cast<TimeoutAlg*>(this);
46 m_thread = std::thread(&TimeoutAlg::timeoutThread, nc_this);
47 }
48 return true;
49 }();
50
51 // Set event start time for current slot
52 *m_eventStartTime.get(ctx) = clock_t::now();
53
54 return StatusCode::SUCCESS;
55}
56
57
58void TimeoutAlg::handle(const Incident& inc)
59{
60 if (inc.type() == "EndAlgorithms") {
61 ATH_MSG_DEBUG("Resetting event timeout for slot " << inc.context().slot());
62 // Reset start time for slot to zero
63 *m_eventStartTime.get(inc.context()) = {};
64 }
65}
66
67
68StatusCode TimeoutAlg::stop()
69{
70 if (m_thread.joinable() && !m_stopped.exchange(true)) {
71 // Signal timeout thread to stop
72 ATH_MSG_DEBUG("Stopping timeout thread");
73 m_stop_thread.set_value();
74 m_thread.join();
75 }
76
77 return StatusCode::SUCCESS;
78}
79
80
82{
83 ATH_MSG_INFO(std::format("Setting per-event timeout of {}",
84 std::chrono::duration<double>(m_timeout)));
85
86 // Wakeup at regular intervals (with a minimum frequency, useful for long timeouts)
87 const std::chrono::nanoseconds wakeup_interval =
88 std::min(m_timeout, std::chrono::nanoseconds(m_checkInterval));
89
90 // Loop until we have received stop signal
91 auto stop_signal = m_stop_thread.get_future();
92 while ( stop_signal.wait_for(wakeup_interval) == std::future_status::timeout ) {
93
94 // Loop over all slots and check if event has reached timeout
95 const auto now = clock_t::now();
96 for (EventContext::ContextID_t slot = 0;
97 const auto& startTime : m_eventStartTime) {
98
99 if (startTime.time_since_epoch().count() > 0 && now > startTime + m_timeout) {
100 handleTimeout(slot);
101 }
102
103 ++slot;
104 }
105 }
106}
107
108
109void TimeoutAlg::handleTimeout(EventContext::ContextID_t slot)
110{
111 // To avoid getting another timeout while handling this one
112 std::scoped_lock lock(m_handleMutex);
113
114 // Create minimal context with slot number
115 const EventContext ctx(0, slot);
116
117 // Don't duplicate the actions if the timeout was already reached for this slot
118 if (Athena::Timeout::instance(ctx).reached()) return;
119
120 // Print ERROR message
121 const std::string msg = std::format("Event timeout ({}) in slot {} reached",
122 std::chrono::duration<double>(m_timeout), slot);
124
125 // Set timeout flag
127
128 // Dump scheduler state if requested
129 if (m_dumpState) {
130 ServiceHandle<IScheduler> schedulerSvc("AvalancheSchedulerSvc", name());
131 if (schedulerSvc.retrieve().isSuccess()) {
132 schedulerSvc->dumpState();
133 }
134 }
135
136 // Abort job if requested
137 if (m_abort) {
138 // Stop the timeout thread to avoid additional triggers
139 stop().ignore();
140
141 // Tell CoreDumpSvc about the reason for the abort
142 ServiceHandle<ICoreDumpSvc> coreDumpSvc("CoreDumpSvc", name());
143 if ( coreDumpSvc.retrieve().isSuccess() ) {
144 coreDumpSvc->setCoreDumpInfo(ctx, "Reason", msg);
145 }
146 else {
147 std::cerr << msg << std::endl;
148 }
149 // Abort job (and let CoreDumpSvc handle SIGABRT)
150 std::abort();
151 }
152
153}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_DEBUG(x)
Interface of a core dump service.
void setTimeout(Timeout &instance)
Set timeout.
Definition Timeout.h:80
static Timeout & instance()
Get reference to Timeout singleton.
Definition Timeout.h:64
virtual void setCoreDumpInfo(const std::string &name, const std::string &value) override
Set a name/value pair in the core dump record.
Algorithm to monitor event timeouts.
Definition TimeoutAlg.h:37
std::promise< void > m_stop_thread
Signal to stop watchdog thread.
Definition TimeoutAlg.h:82
void handleTimeout(EventContext::ContextID_t slot)
Handle timeout.
Gaudi::Property< bool > m_dumpState
Definition TimeoutAlg.h:64
virtual void handle(const Incident &inc) override
std::mutex m_handleMutex
Mutex for handleTimeout.
Definition TimeoutAlg.h:88
virtual StatusCode stop() override
Gaudi::Property< bool > m_abort
Definition TimeoutAlg.h:67
std::atomic< bool > m_stopped
Has watchdog thread already been stopped? (to avoid setting future twice)
Definition TimeoutAlg.h:85
Gaudi::Property< unsigned long long > m_timeoutProp
Definition TimeoutAlg.h:58
void timeoutThread()
Watchdog thread.
virtual StatusCode execute(const EventContext &ctx) const override
std::chrono::nanoseconds m_timeout
Timeout property as duration.
Definition TimeoutAlg.h:73
SG::SlotSpecificObj< clock_t::time_point > m_eventStartTime ATLAS_THREAD_SAFE
Start time of each event per slot.
Definition TimeoutAlg.h:76
Gaudi::Property< unsigned long long > m_checkInterval
Definition TimeoutAlg.h:61
virtual StatusCode initialize() override
Algorithm to monitor event timeouts.
MsgStream & msg
Definition testRead.cxx:32