Start querying and monitoring Scheduler status.
53 {
54
59 return StatusCode::FAILURE;
60 }
61 }
62
63
65 const int numSlots = std::stoi( serviceLocator()->service(
"EventDataSvc").as<IProperty>()->
getProperty(
"NSlots").
toString() );
66
67
68 if (
bool expected =
false; not
m_running.compare_exchange_strong(expected,
true)) {
69 ATH_MSG_ERROR(
"startMonitoring called but it is already running");
70 return StatusCode::FAILURE;
71 }
72
73
74
75 auto monCallback = [
this,
numThreads, numSlots](IScheduler::OccupancySnapshot snap) ->
void {
76 auto monTime = Monitored::Timer("TIME_monCallback");
77
78 const ClockType::duration wallTime = snap.time -
m_startTime;
79 const size_t thisSnapCounter = std::chrono::duration_cast<std::chrono::milliseconds>(wallTime).count() /
m_monIntervalMillisec.value();
81 const int periodsSinceLastSnap = thisSnapCounter - lastSnapCounter;
82
83
84 if (periodsSinceLastSnap <= 0) {
85 ATH_MSG_DEBUG(
"Discarding snap because periodsSinceLastSnap=" << periodsSinceLastSnap <<
" is not positive");
86 return;
87 }
88
89
90 std::vector<int> stateTotalCounts(static_cast<size_t>(AlgState::MAXVALUE), 0);
91 for (size_t slot=0; slot < snap.states.size(); ++slot) {
92 for (size_t state=0; state < snap.states[slot].size(); ++state) {
93 stateTotalCounts[state] += snap.states[slot][state];
94 }
95 }
98
99
100 std::vector<Monitored::Scalar<int>> mon_stateCounts;
101 std::vector<Monitored::Scalar<double>> mon_stateCountsOverThreads;
102 std::vector<Monitored::Scalar<double>> mon_stateCountsOverSlots;
103 std::vector<Monitored::Scalar<double>> mon_stateCountsOverActive;
104 mon_stateCounts.reserve(static_cast<size_t>(AlgState::MAXVALUE));
105 mon_stateCountsOverThreads.reserve(static_cast<size_t>(AlgState::MAXVALUE));
106 mon_stateCountsOverSlots.reserve(static_cast<size_t>(AlgState::MAXVALUE));
107 mon_stateCountsOverActive.reserve(static_cast<size_t>(AlgState::MAXVALUE));
108 int activeCount = 0;
109 for (size_t i : s_activeAlgStateNumbers) {
110 activeCount += stateTotalCounts[
i];
111 }
112 for (size_t i : s_algStateNumbers) {
113 mon_stateCounts.emplace_back(s_algStateNames[i].
data(), stateTotalCounts[i]);
114 mon_stateCountsOverThreads.emplace_back(s_algStateNames[i].
data()+
"_Over_Threads"s, divAsDouble(stateTotalCounts[i], numThreads));
115 mon_stateCountsOverSlots.emplace_back(s_algStateNames[i].
data()+
"_Over_Slots"s, divAsDouble(stateTotalCounts[i], numSlots));
116 double toActive = (activeCount > 0) ? divAsDouble(stateTotalCounts[i], activeCount) : 0;
117 mon_stateCountsOverActive.emplace_back(s_algStateNames[i].
data()+
"_Over_Active"s, toActive);
118 }
119
120
121 auto mon_freeSlots = Monitored::Scalar(
"FreeSlots",
m_scheduler->freeSlots());
122 auto mon_freeSlotsFrac = Monitored::Scalar(
"FreeSlotsFraction", divAsDouble(
m_scheduler->freeSlots(), numSlots));
123
124
125 std::vector<std::reference_wrapper<Monitored::IMonitoredVariable>> allMonVars;
126 allMonVars.reserve(6 +
127 mon_stateCounts.size() +
128 mon_stateCountsOverThreads.size() +
129 mon_stateCountsOverSlots.size() +
130 mon_stateCountsOverActive.size());
131
132
133 for (size_t snapNumber=lastSnapCounter+1; snapNumber<=thisSnapCounter; ++snapNumber) {
134 auto mon_snapNumber = Monitored::Scalar("SnapNumber", snapNumber);
135 auto mon_wallTimeSec = Monitored::Scalar(
"WallTimeSeconds", snapNumber*
m_monIntervalMillisec.value()*1e-3);
136 allMonVars.clear();
137 allMonVars.insert(allMonVars.end(), mon_stateCounts.begin(), mon_stateCounts.end());
138 allMonVars.insert(allMonVars.end(), mon_stateCountsOverThreads.begin(), mon_stateCountsOverThreads.end());
139 allMonVars.insert(allMonVars.end(), mon_stateCountsOverSlots.begin(), mon_stateCountsOverSlots.end());
140 allMonVars.insert(allMonVars.end(), mon_stateCountsOverActive.begin(), mon_stateCountsOverActive.end());
141 allMonVars.insert(allMonVars.end(), {mon_stateNumber, mon_stateTotalCounts, mon_freeSlots, mon_freeSlotsFrac,
142 mon_snapNumber, mon_wallTimeSec});
144 }
145 monTime.stop();
147 };
148
149
152
154
155 return StatusCode::SUCCESS;
156}
char data[hepevt_bytes_allocation_ATLAS]
std::atomic_size_t m_lastSnapCounter
std::atomic_bool m_running
ClockType::time_point m_startTime
SmartIF< IScheduler > m_scheduler
Gaudi::Property< unsigned int > m_monIntervalMillisec
Gaudi::Property< std::string > m_schedulerName
std::string toString(const Translation3D &translation, int precision=4)
GeoPrimitvesToStringConverter.
ValuesCollection< T > Collection(std::string name, const T &collection)
Declare a monitored (double-convertible) collection.
T getProperty(const asg::IAsgTool *interface_tool, const std::string &prop_name)