ATLAS Offline Software
Loading...
Searching...
No Matches
SchedulerMonSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include "SchedulerMonSvc.h"
6
7using namespace std::string_literals;
8using namespace std::literals::string_view_literals;
9
10namespace {
12 enum class AlgState : size_t {
13 INITIAL = 0,
14 CONTROLREADY = 1,
15 DATAREADY = 2,
16 RESOURCELESS = 3,
17 SCHEDULED = 4,
18 EVTACCEPTED = 5,
19 EVTREJECTED = 6,
20 ERROR = 7,
21 MAXVALUE = 8
22 };
24 static constexpr std::array<std::string_view,8> s_algStateNames = {{
25 "INITIAL"sv, "CONTROLREADY"sv, "DATAREADY"sv, "RESOURCELESS"sv, "SCHEDULED"sv, "EVTACCEPTED"sv, "EVTREJECTED"sv, "ERROR"sv
26 }};
28 static constexpr std::array<size_t,8> s_algStateNumbers = {0,1,2,3,4,5,6,7};
30 static constexpr std::array<size_t,4> s_activeAlgStateNumbers = {
31 static_cast<size_t>(AlgState::CONTROLREADY),
32 static_cast<size_t>(AlgState::DATAREADY),
33 static_cast<size_t>(AlgState::RESOURCELESS),
34 static_cast<size_t>(AlgState::SCHEDULED)
35 };
37 template<typename Ta, typename Tb> constexpr double divAsDouble(const Ta& a, const Tb& b) {
38 return static_cast<double>(a) / static_cast<double>(b);
39 }
40}
41
42// =============================================================================
43SchedulerMonSvc::SchedulerMonSvc(const std::string& name, ISvcLocator* svcLoc)
44: base_class(name, svcLoc) {}
45
46// =============================================================================
48 if (!m_monTool.empty()) ATH_CHECK(m_monTool.retrieve());
49 return StatusCode::SUCCESS;
50}
51
52// =============================================================================
54 // Get a handle to the scheduler
55 if (!m_scheduler.isValid()) {
56 m_scheduler = serviceLocator()->service<IScheduler>(m_schedulerName, false);
57 if (!m_scheduler.isValid()) {
58 ATH_MSG_ERROR("Failed to retrieve the Scheduler service with name " << m_schedulerName);
59 return StatusCode::FAILURE;
60 }
61 }
62
63 // Get the number of threads and slots
64 const int numThreads = std::stoi( SmartIF<IProperty>(m_scheduler)->getProperty("ThreadPoolSize").toString() );
65 const int numSlots = std::stoi( serviceLocator()->service("EventDataSvc").as<IProperty>()->getProperty("NSlots").toString() );
66
67 // Flag the monitoring as running (prevents going past this point twice)
68 if (bool expected = false; not m_running.compare_exchange_strong(expected, true)) {
69 ATH_MSG_ERROR("startMonitoring called but it is already running");
70 return StatusCode::FAILURE;
71 }
72
73 // Construct the callback and pass it to the scheduler monitoring API
74 // Note: capture by value as this lambda is called from outside the scope of this method
75 auto monCallback = [this, numThreads, numSlots](IScheduler::OccupancySnapshot snap) -> void {
76 auto monTime = Monitored::Timer("TIME_monCallback");
77 // Calculate and update snap counters
78 const ClockType::duration wallTime = snap.time - m_startTime;
79 const size_t thisSnapCounter = std::chrono::duration_cast<std::chrono::milliseconds>(wallTime).count() / m_monIntervalMillisec.value();
80 const size_t lastSnapCounter = m_lastSnapCounter.exchange(thisSnapCounter);
81 const int periodsSinceLastSnap = thisSnapCounter - lastSnapCounter;
82
83 // If new snap comes before next sampling point, discard it
84 if (periodsSinceLastSnap <= 0) {
85 ATH_MSG_DEBUG("Discarding snap because periodsSinceLastSnap=" << periodsSinceLastSnap << " is not positive");
86 return;
87 }
88
89 // Monitor total state counts across all slots
90 std::vector<int> stateTotalCounts(static_cast<size_t>(AlgState::MAXVALUE), 0);
91 for (size_t slot=0; slot < snap.states.size(); ++slot) {
92 for (size_t state=0; state < snap.states[slot].size(); ++state) {
93 stateTotalCounts[state] += snap.states[slot][state];
94 }
95 }
96 auto mon_stateNumber = Monitored::Collection("AlgStates", s_algStateNumbers);
97 auto mon_stateTotalCounts = Monitored::Collection("StateTotalCounts", stateTotalCounts);
98
99 // Monitor alg state counts absolute numbers and ratios to N threads and N active states
100 std::vector<Monitored::Scalar<int>> mon_stateCounts;
101 std::vector<Monitored::Scalar<double>> mon_stateCountsOverThreads;
102 std::vector<Monitored::Scalar<double>> mon_stateCountsOverSlots;
103 std::vector<Monitored::Scalar<double>> mon_stateCountsOverActive;
104 mon_stateCounts.reserve(static_cast<size_t>(AlgState::MAXVALUE));
105 mon_stateCountsOverThreads.reserve(static_cast<size_t>(AlgState::MAXVALUE));
106 mon_stateCountsOverSlots.reserve(static_cast<size_t>(AlgState::MAXVALUE));
107 mon_stateCountsOverActive.reserve(static_cast<size_t>(AlgState::MAXVALUE));
108 int activeCount = 0;
109 for (size_t i : s_activeAlgStateNumbers) {
110 activeCount += stateTotalCounts[i];
111 }
112 for (size_t i : s_algStateNumbers) {
113 mon_stateCounts.emplace_back(s_algStateNames[i].data(), stateTotalCounts[i]);
114 mon_stateCountsOverThreads.emplace_back(s_algStateNames[i].data()+"_Over_Threads"s, divAsDouble(stateTotalCounts[i], numThreads));
115 mon_stateCountsOverSlots.emplace_back(s_algStateNames[i].data()+"_Over_Slots"s, divAsDouble(stateTotalCounts[i], numSlots));
116 double toActive = (activeCount > 0) ? divAsDouble(stateTotalCounts[i], activeCount) : 0;
117 mon_stateCountsOverActive.emplace_back(s_algStateNames[i].data()+"_Over_Active"s, toActive);
118 }
119
120 // Monitor number of free slots
121 auto mon_freeSlots = Monitored::Scalar("FreeSlots", m_scheduler->freeSlots());
122 auto mon_freeSlotsFrac = Monitored::Scalar("FreeSlotsFraction", divAsDouble(m_scheduler->freeSlots(), numSlots));
123
124 // Reserve vector of references with size equal to the number of variables added into the vector in the loop below
125 std::vector<std::reference_wrapper<Monitored::IMonitoredVariable>> allMonVars;
126 allMonVars.reserve(6 +
127 mon_stateCounts.size() +
128 mon_stateCountsOverThreads.size() +
129 mon_stateCountsOverSlots.size() +
130 mon_stateCountsOverActive.size());
131 // Fill monitoring histograms once for each sampling period passed since the last fill
132 // If multiple sampling periods passed, it means the scheduler state didn't change during that time
133 for (size_t snapNumber=lastSnapCounter+1; snapNumber<=thisSnapCounter; ++snapNumber) {
134 auto mon_snapNumber = Monitored::Scalar("SnapNumber", snapNumber);
135 auto mon_wallTimeSec = Monitored::Scalar("WallTimeSeconds", snapNumber*m_monIntervalMillisec.value()*1e-3);
136 allMonVars.clear();
137 allMonVars.insert(allMonVars.end(), mon_stateCounts.begin(), mon_stateCounts.end());
138 allMonVars.insert(allMonVars.end(), mon_stateCountsOverThreads.begin(), mon_stateCountsOverThreads.end());
139 allMonVars.insert(allMonVars.end(), mon_stateCountsOverSlots.begin(), mon_stateCountsOverSlots.end());
140 allMonVars.insert(allMonVars.end(), mon_stateCountsOverActive.begin(), mon_stateCountsOverActive.end());
141 allMonVars.insert(allMonVars.end(), {mon_stateNumber, mon_stateTotalCounts, mon_freeSlots, mon_freeSlotsFrac,
142 mon_snapNumber, mon_wallTimeSec});
143 Monitored::Group(m_monTool, allMonVars);
144 }
145 monTime.stop();
146 Monitored::Group(m_monTool, monTime);
147 };
148
149 // Start monitoring
150 m_startTime = ClockType::now();
151 m_scheduler->recordOccupancy(m_monIntervalMillisec.value(), std::move(monCallback));
152
153 ATH_MSG_INFO("Scheduler monitoring started");
154
155 return StatusCode::SUCCESS;
156}
157
158// =============================================================================
160 if (bool expected = true; not m_running.compare_exchange_strong(expected, false)) {
161 ATH_MSG_WARNING("stopMonitoring called but it was not running");
162 return StatusCode::SUCCESS;
163 }
164 m_scheduler->recordOccupancy(-1, {});
165 ATH_MSG_INFO("Scheduler monitoring stopped");
166 return StatusCode::SUCCESS;
167}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
char data[hepevt_bytes_allocation_ATLAS]
Definition HepEvt.cxx:11
static Double_t a
Group of local monitoring quantities and retain correlation when filling histograms
Declare a monitored scalar variable.
A monitored timer.
std::atomic_size_t m_lastSnapCounter
std::atomic_bool m_running
virtual StatusCode initialize() override
virtual StatusCode stopMonitoring() override
Stop querying and monitoring Scheduler status.
ClockType::time_point m_startTime
SmartIF< IScheduler > m_scheduler
SchedulerMonSvc(const std::string &name, ISvcLocator *svcLoc)
virtual StatusCode startMonitoring() override
Start querying and monitoring Scheduler status.
Gaudi::Property< unsigned int > m_monIntervalMillisec
ToolHandle< GenericMonitoringTool > m_monTool
Gaudi::Property< std::string > m_schedulerName
ValuesCollection< T > Collection(std::string name, const T &collection)
Declare a monitored (double-convertible) collection.