ATLAS Offline Software
SchedulerMonSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "SchedulerMonSvc.h"
6 
7 using namespace std::string_literals;
8 using namespace std::literals::string_view_literals;
9 
10 namespace {
12  enum class AlgState : size_t {
13  INITIAL = 0,
14  CONTROLREADY = 1,
15  DATAREADY = 2,
16  RESOURCELESS = 3,
17  SCHEDULED = 4,
18  EVTACCEPTED = 5,
19  EVTREJECTED = 6,
20  ERROR = 7,
21  MAXVALUE = 8
22  };
24  static constexpr std::array<std::string_view,8> s_algStateNames = {{
25  "INITIAL"sv, "CONTROLREADY"sv, "DATAREADY"sv, "RESOURCELESS"sv, "SCHEDULED"sv, "EVTACCEPTED"sv, "EVTREJECTED"sv, "ERROR"sv
26  }};
28  static constexpr std::array<size_t,8> s_algStateNumbers = {0,1,2,3,4,5,6,7};
30  static constexpr std::array<size_t,4> s_activeAlgStateNumbers = {
31  static_cast<size_t>(AlgState::CONTROLREADY),
32  static_cast<size_t>(AlgState::DATAREADY),
33  static_cast<size_t>(AlgState::RESOURCELESS),
34  static_cast<size_t>(AlgState::SCHEDULED)
35  };
37  template<typename Ta, typename Tb> constexpr double divAsDouble(const Ta& a, const Tb& b) {
38  return static_cast<double>(a) / static_cast<double>(b);
39  }
40 }
41 
42 // =============================================================================
43 SchedulerMonSvc::SchedulerMonSvc(const std::string& name, ISvcLocator* svcLoc)
44 : base_class(name, svcLoc) {}
45 
46 // =============================================================================
48  if (!m_monTool.empty()) ATH_CHECK(m_monTool.retrieve());
49  return StatusCode::SUCCESS;
50 }
51 
52 // =============================================================================
54  // Get a handle to the scheduler
55  if (!m_scheduler.isValid()) {
56  m_scheduler = serviceLocator()->service<IScheduler>(m_schedulerName, false);
57  if (!m_scheduler.isValid()) {
58  ATH_MSG_ERROR("Failed to retrieve the Scheduler service with name " << m_schedulerName);
59  return StatusCode::FAILURE;
60  }
61  }
62 
63  // Get the number of threads and slots
64  const int numThreads = std::stoi( SmartIF<IProperty>(m_scheduler)->getProperty("ThreadPoolSize").toString() );
65  const int numSlots = std::stoi( serviceLocator()->service("EventDataSvc").as<IProperty>()->getProperty("NSlots").toString() );
66 
67  // Flag the monitoring as running (prevents going past this point twice)
68  if (bool expected = false; not m_running.compare_exchange_strong(expected, true)) {
69  ATH_MSG_ERROR("startMonitoring called but it is already running");
70  return StatusCode::FAILURE;
71  }
72 
73  // Construct the callback and pass it to the scheduler monitoring API
74  // Note: capture by value as this lambda is called from outside the scope of this method
75  auto monCallback = [this, numThreads, numSlots](IScheduler::OccupancySnapshot snap) -> void {
76  auto monTime = Monitored::Timer("TIME_monCallback");
77  // Calculate and update snap counters
78  const ClockType::duration wallTime = snap.time - m_startTime;
79  const size_t thisSnapCounter = std::chrono::duration_cast<std::chrono::milliseconds>(wallTime).count() / m_monIntervalMillisec.value();
80  const size_t lastSnapCounter = m_lastSnapCounter.exchange(thisSnapCounter);
81  const int periodsSinceLastSnap = thisSnapCounter - lastSnapCounter;
82 
83  // If new snap comes before next sampling point, discard it
84  if (periodsSinceLastSnap <= 0) {
85  ATH_MSG_DEBUG("Discarding snap because periodsSinceLastSnap=" << periodsSinceLastSnap << " is not positive");
86  return;
87  }
88 
89  // Monitor total state counts across all slots
90  std::vector<int> stateTotalCounts(static_cast<size_t>(AlgState::MAXVALUE), 0);
91  for (size_t slot=0; slot < snap.states.size(); ++slot) {
92  for (size_t state=0; state < snap.states[slot].size(); ++state) {
93  stateTotalCounts[state] += snap.states[slot][state];
94  }
95  }
96  auto mon_stateNumber = Monitored::Collection("AlgStates", s_algStateNumbers);
97  auto mon_stateTotalCounts = Monitored::Collection("StateTotalCounts", stateTotalCounts);
98 
99  // Monitor alg state counts absolute numbers and ratios to N threads and N active states
100  std::vector<Monitored::Scalar<int>> mon_stateCounts;
101  std::vector<Monitored::Scalar<double>> mon_stateCountsOverThreads;
102  std::vector<Monitored::Scalar<double>> mon_stateCountsOverSlots;
103  std::vector<Monitored::Scalar<double>> mon_stateCountsOverActive;
104  mon_stateCounts.reserve(static_cast<size_t>(AlgState::MAXVALUE));
105  mon_stateCountsOverThreads.reserve(static_cast<size_t>(AlgState::MAXVALUE));
106  mon_stateCountsOverSlots.reserve(static_cast<size_t>(AlgState::MAXVALUE));
107  mon_stateCountsOverActive.reserve(static_cast<size_t>(AlgState::MAXVALUE));
108  int activeCount = 0;
109  for (size_t i : s_activeAlgStateNumbers) {
110  activeCount += stateTotalCounts[i];
111  }
112  for (size_t i : s_algStateNumbers) {
113  mon_stateCounts.emplace_back(s_algStateNames[i].data(), stateTotalCounts[i]);
114  mon_stateCountsOverThreads.emplace_back(s_algStateNames[i].data()+"_Over_Threads"s, divAsDouble(stateTotalCounts[i], numThreads));
115  mon_stateCountsOverSlots.emplace_back(s_algStateNames[i].data()+"_Over_Slots"s, divAsDouble(stateTotalCounts[i], numSlots));
116  double toActive = (activeCount > 0) ? divAsDouble(stateTotalCounts[i], activeCount) : 0;
117  mon_stateCountsOverActive.emplace_back(s_algStateNames[i].data()+"_Over_Active"s, toActive);
118  }
119 
120  // Monitor number of free slots
121  auto mon_freeSlots = Monitored::Scalar("FreeSlots", m_scheduler->freeSlots());
122  auto mon_freeSlotsFrac = Monitored::Scalar("FreeSlotsFraction", divAsDouble(m_scheduler->freeSlots(), numSlots));
123 
124  // Reserve vector of references with size equal to the number of variables added into the vector in the loop below
125  std::vector<std::reference_wrapper<Monitored::IMonitoredVariable>> allMonVars;
126  allMonVars.reserve(6 +
127  mon_stateCounts.size() +
128  mon_stateCountsOverThreads.size() +
129  mon_stateCountsOverSlots.size() +
130  mon_stateCountsOverActive.size());
131  // Fill monitoring histograms once for each sampling period passed since the last fill
132  // If multiple sampling periods passed, it means the scheduler state didn't change during that time
133  for (size_t snapNumber=lastSnapCounter+1; snapNumber<=thisSnapCounter; ++snapNumber) {
134  auto mon_snapNumber = Monitored::Scalar("SnapNumber", snapNumber);
135  auto mon_wallTimeSec = Monitored::Scalar("WallTimeSeconds", snapNumber*m_monIntervalMillisec.value()*1e-3);
136  allMonVars.clear();
137  allMonVars.insert(allMonVars.end(), mon_stateCounts.begin(), mon_stateCounts.end());
138  allMonVars.insert(allMonVars.end(), mon_stateCountsOverThreads.begin(), mon_stateCountsOverThreads.end());
139  allMonVars.insert(allMonVars.end(), mon_stateCountsOverSlots.begin(), mon_stateCountsOverSlots.end());
140  allMonVars.insert(allMonVars.end(), mon_stateCountsOverActive.begin(), mon_stateCountsOverActive.end());
141  allMonVars.insert(allMonVars.end(), {mon_stateNumber, mon_stateTotalCounts, mon_freeSlots, mon_freeSlotsFrac,
142  mon_snapNumber, mon_wallTimeSec});
143  Monitored::Group(m_monTool, allMonVars);
144  }
145  monTime.stop();
146  Monitored::Group(m_monTool, monTime);
147  };
148 
149  // Start monitoring
151  m_scheduler->recordOccupancy(m_monIntervalMillisec.value(), std::move(monCallback));
152 
153  ATH_MSG_INFO("Scheduler monitoring started");
154 
155  return StatusCode::SUCCESS;
156 }
157 
158 // =============================================================================
160  if (bool expected = true; not m_running.compare_exchange_strong(expected, false)) {
161  ATH_MSG_WARNING("stopMonitoring called but it was not running");
162  return StatusCode::SUCCESS;
163  }
164  m_scheduler->recordOccupancy(-1, {});
165  ATH_MSG_INFO("Scheduler monitoring stopped");
166  return StatusCode::SUCCESS;
167 }
SchedulerMonSvc::m_scheduler
SmartIF< IScheduler > m_scheduler
Definition: SchedulerMonSvc.h:54
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
TrigDefs::Group
Group
Properties of a chain group.
Definition: GroupProperties.h:13
python.SystemOfUnits.s
int s
Definition: SystemOfUnits.py:131
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
SchedulerMonSvc::SchedulerMonSvc
SchedulerMonSvc(const std::string &name, ISvcLocator *svcLoc)
Definition: SchedulerMonSvc.cxx:43
python.Constants.ERROR
int ERROR
Definition: Control/AthenaCommon/python/Constants.py:18
SchedulerMonSvc::stopMonitoring
virtual StatusCode stopMonitoring() override
Stop querying and monitoring Scheduler status.
Definition: SchedulerMonSvc.cxx:159
Monitored::Collection
ValuesCollection< T > Collection(std::string name, const T &collection)
Declare a monitored (double-convertible) collection.
Definition: MonitoredCollection.h:38
python.handimod.now
now
Definition: handimod.py:675
Amg::toString
std::string toString(const Translation3D &translation, int precision=4)
GeoPrimitvesToStringConverter.
Definition: GeoPrimitivesToStringConverter.h:40
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
lumiFormat.i
int i
Definition: lumiFormat.py:92
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
SchedulerMonSvc::m_schedulerName
Gaudi::Property< std::string > m_schedulerName
Definition: SchedulerMonSvc.h:46
TestMuonSF::getProperty
T getProperty(const asg::IAsgTool *interface_tool, const std::string &prop_name)
Definition: MuonSFTestHelper.cxx:17
SchedulerMonSvc.h
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
PixelAthHitMonAlgCfg.duration
duration
Definition: PixelAthHitMonAlgCfg.py:152
SchedulerMonSvc::m_monTool
ToolHandle< GenericMonitoringTool > m_monTool
Definition: SchedulerMonSvc.h:50
SchedulerMonSvc::m_lastSnapCounter
std::atomic_size_t m_lastSnapCounter
Definition: SchedulerMonSvc.h:57
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
plotBeamSpotMon.b
b
Definition: plotBeamSpotMon.py:77
SchedulerMonSvc::m_startTime
ClockType::time_point m_startTime
Definition: SchedulerMonSvc.h:56
DiTauMassTools::MaxHistStrategyV2::e
e
Definition: PhysicsAnalysis/TauID/DiTauMassTools/DiTauMassTools/HelperFunctions.h:26
a
TList * a
Definition: liststreamerinfos.cxx:10
python.BackTrackingConfig.numThreads
int numThreads
Definition: BackTrackingConfig.py:61
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
LArCellConditions.sv
bool sv
Definition: LArCellConditions.py:45
SchedulerMonSvc::initialize
virtual StatusCode initialize() override
Definition: SchedulerMonSvc.cxx:47
SchedulerMonSvc::m_running
std::atomic_bool m_running
Definition: SchedulerMonSvc.h:55
Monitored::Scalar
Declare a monitored scalar variable.
Definition: MonitoredScalar.h:34
SchedulerMonSvc::m_monIntervalMillisec
Gaudi::Property< unsigned int > m_monIntervalMillisec
Definition: SchedulerMonSvc.h:48
Monitored::Timer
A monitored timer.
Definition: MonitoredTimer.h:32
SchedulerMonSvc::startMonitoring
virtual StatusCode startMonitoring() override
Start querying and monitoring Scheduler status.
Definition: SchedulerMonSvc.cxx:53