ATLAS Offline Software
Public Types | Public Member Functions | Private Attributes | List of all members
SchedulerMonSvc Class Reference

Service monitoring the Scheduler status and producing relevant online histograms. More...

#include <SchedulerMonSvc.h>

Inheritance diagram for SchedulerMonSvc:
Collaboration diagram for SchedulerMonSvc:

Public Types

using ClockType = decltype(IScheduler::OccupancySnapshot::time)::clock
 

Public Member Functions

 SchedulerMonSvc (const std::string &name, ISvcLocator *svcLoc)
 
virtual ~SchedulerMonSvc () override=default
 
virtual StatusCode initialize () override
 
virtual StatusCode startMonitoring () override
 Start querying and monitoring Scheduler status. More...
 
virtual StatusCode stopMonitoring () override
 Stop querying and monitoring Scheduler status. More...
 

Private Attributes

Gaudi::Property< std::string > m_schedulerName
 
Gaudi::Property< unsigned int > m_monIntervalMillisec
 
ToolHandle< GenericMonitoringToolm_monTool
 
SmartIF< IScheduler > m_scheduler {nullptr}
 
std::atomic_bool m_running {false}
 
ClockType::time_point m_startTime {}
 
std::atomic_size_t m_lastSnapCounter {0}
 

Detailed Description

Service monitoring the Scheduler status and producing relevant online histograms.

Definition at line 26 of file SchedulerMonSvc.h.

Member Typedef Documentation

◆ ClockType

using SchedulerMonSvc::ClockType = decltype(IScheduler::OccupancySnapshot::time)::clock

Definition at line 30 of file SchedulerMonSvc.h.

Constructor & Destructor Documentation

◆ SchedulerMonSvc()

SchedulerMonSvc::SchedulerMonSvc ( const std::string &  name,
ISvcLocator *  svcLoc 
)

Definition at line 43 of file SchedulerMonSvc.cxx.

44 : base_class(name, svcLoc) {}

◆ ~SchedulerMonSvc()

virtual SchedulerMonSvc::~SchedulerMonSvc ( )
overridevirtualdefault

Member Function Documentation

◆ initialize()

StatusCode SchedulerMonSvc::initialize ( )
overridevirtual

Definition at line 47 of file SchedulerMonSvc.cxx.

47  {
48  if (!m_monTool.empty()) ATH_CHECK(m_monTool.retrieve());
49  return StatusCode::SUCCESS;
50 }

◆ startMonitoring()

StatusCode SchedulerMonSvc::startMonitoring ( )
overridevirtual

Start querying and monitoring Scheduler status.

Definition at line 53 of file SchedulerMonSvc.cxx.

53  {
54  // Get a handle to the scheduler
55  if (!m_scheduler.isValid()) {
56  m_scheduler = serviceLocator()->service<IScheduler>(m_schedulerName, false);
57  if (!m_scheduler.isValid()) {
58  ATH_MSG_ERROR("Failed to retrieve the Scheduler service with name " << m_schedulerName);
59  return StatusCode::FAILURE;
60  }
61  }
62 
63  // Get the number of threads and slots
64  const int numThreads = std::stoi( SmartIF<IProperty>(m_scheduler)->getProperty("ThreadPoolSize").toString() );
65  const int numSlots = std::stoi( serviceLocator()->service("EventDataSvc").as<IProperty>()->getProperty("NSlots").toString() );
66 
67  // Flag the monitoring as running (prevents going past this point twice)
68  if (bool expected = false; not m_running.compare_exchange_strong(expected, true)) {
69  ATH_MSG_ERROR("startMonitoring called but it is already running");
70  return StatusCode::FAILURE;
71  }
72 
73  // Construct the callback and pass it to the scheduler monitoring API
74  // Note: capture by value as this lambda is called from outside the scope of this method
75  auto monCallback = [this, numThreads, numSlots](IScheduler::OccupancySnapshot snap) -> void {
76  auto monTime = Monitored::Timer("TIME_monCallback");
77  // Calculate and update snap counters
78  const ClockType::duration wallTime = snap.time - m_startTime;
79  const size_t thisSnapCounter = std::chrono::duration_cast<std::chrono::milliseconds>(wallTime).count() / m_monIntervalMillisec.value();
80  const size_t lastSnapCounter = m_lastSnapCounter.exchange(thisSnapCounter);
81  const int periodsSinceLastSnap = thisSnapCounter - lastSnapCounter;
82 
83  // If new snap comes before next sampling point, discard it
84  if (periodsSinceLastSnap <= 0) {
85  ATH_MSG_DEBUG("Discarding snap because periodsSinceLastSnap=" << periodsSinceLastSnap << " is not positive");
86  return;
87  }
88 
89  // Monitor total state counts across all slots
90  std::vector<int> stateTotalCounts(static_cast<size_t>(AlgState::MAXVALUE), 0);
91  for (size_t slot=0; slot < snap.states.size(); ++slot) {
92  for (size_t state=0; state < snap.states[slot].size(); ++state) {
93  stateTotalCounts[state] += snap.states[slot][state];
94  }
95  }
96  auto mon_stateNumber = Monitored::Collection("AlgStates", s_algStateNumbers);
97  auto mon_stateTotalCounts = Monitored::Collection("StateTotalCounts", stateTotalCounts);
98 
99  // Monitor alg state counts absolute numbers and ratios to N threads and N active states
100  std::vector<Monitored::Scalar<int>> mon_stateCounts;
101  std::vector<Monitored::Scalar<double>> mon_stateCountsOverThreads;
102  std::vector<Monitored::Scalar<double>> mon_stateCountsOverSlots;
103  std::vector<Monitored::Scalar<double>> mon_stateCountsOverActive;
104  mon_stateCounts.reserve(static_cast<size_t>(AlgState::MAXVALUE));
105  mon_stateCountsOverThreads.reserve(static_cast<size_t>(AlgState::MAXVALUE));
106  mon_stateCountsOverSlots.reserve(static_cast<size_t>(AlgState::MAXVALUE));
107  mon_stateCountsOverActive.reserve(static_cast<size_t>(AlgState::MAXVALUE));
108  int activeCount = 0;
109  for (size_t i : s_activeAlgStateNumbers) {
110  activeCount += stateTotalCounts[i];
111  }
112  for (size_t i : s_algStateNumbers) {
113  mon_stateCounts.emplace_back(s_algStateNames[i].data(), stateTotalCounts[i]);
114  mon_stateCountsOverThreads.emplace_back(s_algStateNames[i].data()+"_Over_Threads"s, divAsDouble(stateTotalCounts[i], numThreads));
115  mon_stateCountsOverSlots.emplace_back(s_algStateNames[i].data()+"_Over_Slots"s, divAsDouble(stateTotalCounts[i], numSlots));
116  double toActive = (activeCount > 0) ? divAsDouble(stateTotalCounts[i], activeCount) : 0;
117  mon_stateCountsOverActive.emplace_back(s_algStateNames[i].data()+"_Over_Active"s, toActive);
118  }
119 
120  // Monitor number of free slots
121  auto mon_freeSlots = Monitored::Scalar("FreeSlots", m_scheduler->freeSlots());
122  auto mon_freeSlotsFrac = Monitored::Scalar("FreeSlotsFraction", divAsDouble(m_scheduler->freeSlots(), numSlots));
123 
124  // Reserve vector of references with size equal to the number of variables added into the vector in the loop below
125  std::vector<std::reference_wrapper<Monitored::IMonitoredVariable>> allMonVars;
126  allMonVars.reserve(6 +
127  mon_stateCounts.size() +
128  mon_stateCountsOverThreads.size() +
129  mon_stateCountsOverSlots.size() +
130  mon_stateCountsOverActive.size());
131  // Fill monitoring histograms once for each sampling period passed since the last fill
132  // If multiple sampling periods passed, it means the scheduler state didn't change during that time
133  for (size_t snapNumber=lastSnapCounter+1; snapNumber<=thisSnapCounter; ++snapNumber) {
134  auto mon_snapNumber = Monitored::Scalar("SnapNumber", snapNumber);
135  auto mon_wallTimeSec = Monitored::Scalar("WallTimeSeconds", snapNumber*m_monIntervalMillisec.value()*1e-3);
136  allMonVars.clear();
137  allMonVars.insert(allMonVars.end(), mon_stateCounts.begin(), mon_stateCounts.end());
138  allMonVars.insert(allMonVars.end(), mon_stateCountsOverThreads.begin(), mon_stateCountsOverThreads.end());
139  allMonVars.insert(allMonVars.end(), mon_stateCountsOverSlots.begin(), mon_stateCountsOverSlots.end());
140  allMonVars.insert(allMonVars.end(), mon_stateCountsOverActive.begin(), mon_stateCountsOverActive.end());
141  allMonVars.insert(allMonVars.end(), {mon_stateNumber, mon_stateTotalCounts, mon_freeSlots, mon_freeSlotsFrac,
142  mon_snapNumber, mon_wallTimeSec});
143  Monitored::Group(m_monTool, allMonVars);
144  }
145  monTime.stop();
146  Monitored::Group(m_monTool, monTime);
147  };
148 
149  // Start monitoring
151  m_scheduler->recordOccupancy(m_monIntervalMillisec.value(), std::move(monCallback));
152 
153  ATH_MSG_INFO("Scheduler monitoring started");
154 
155  return StatusCode::SUCCESS;
156 }

◆ stopMonitoring()

StatusCode SchedulerMonSvc::stopMonitoring ( )
overridevirtual

Stop querying and monitoring Scheduler status.

Definition at line 159 of file SchedulerMonSvc.cxx.

159  {
160  if (bool expected = true; not m_running.compare_exchange_strong(expected, false)) {
161  ATH_MSG_WARNING("stopMonitoring called but it was not running");
162  return StatusCode::SUCCESS;
163  }
164  m_scheduler->recordOccupancy(-1, {});
165  ATH_MSG_INFO("Scheduler monitoring stopped");
166  return StatusCode::SUCCESS;
167 }

Member Data Documentation

◆ m_lastSnapCounter

std::atomic_size_t SchedulerMonSvc::m_lastSnapCounter {0}
private

Definition at line 57 of file SchedulerMonSvc.h.

◆ m_monIntervalMillisec

Gaudi::Property<unsigned int> SchedulerMonSvc::m_monIntervalMillisec
private
Initial value:
{
this, "MonIntervalMillisec", 100, "Monitoring snapshot interval in milliseconds"}

Definition at line 48 of file SchedulerMonSvc.h.

◆ m_monTool

ToolHandle<GenericMonitoringTool> SchedulerMonSvc::m_monTool
private
Initial value:
{
this, "MonTool", "", "Monitoring tool"}

Definition at line 50 of file SchedulerMonSvc.h.

◆ m_running

std::atomic_bool SchedulerMonSvc::m_running {false}
private

Definition at line 55 of file SchedulerMonSvc.h.

◆ m_scheduler

SmartIF<IScheduler> SchedulerMonSvc::m_scheduler {nullptr}
private

Definition at line 54 of file SchedulerMonSvc.h.

◆ m_schedulerName

Gaudi::Property<std::string> SchedulerMonSvc::m_schedulerName
private
Initial value:
{
this, "SchedulerName", "AvalancheSchedulerSvc", "Name of the scheduler"}

Definition at line 46 of file SchedulerMonSvc.h.

◆ m_startTime

ClockType::time_point SchedulerMonSvc::m_startTime {}
private

Definition at line 56 of file SchedulerMonSvc.h.


The documentation for this class was generated from the following files:
AllowedVariables::e
e
Definition: AsgElectronSelectorTool.cxx:37
SchedulerMonSvc::m_scheduler
SmartIF< IScheduler > m_scheduler
Definition: SchedulerMonSvc.h:54
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
TrigDefs::Group
Group
Properties of a chain group.
Definition: GroupProperties.h:13
python.SystemOfUnits.s
int s
Definition: SystemOfUnits.py:131
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
Monitored::Collection
ValuesCollection< T > Collection(std::string name, const T &collection)
Declare a monitored (double-convertible) collection.
Definition: MonitoredCollection.h:38
python.handimod.now
now
Definition: handimod.py:675
Amg::toString
std::string toString(const Translation3D &translation, int precision=4)
GeoPrimitvesToStringConverter.
Definition: GeoPrimitivesToStringConverter.h:40
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
lumiFormat.i
int i
Definition: lumiFormat.py:85
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
SchedulerMonSvc::m_schedulerName
Gaudi::Property< std::string > m_schedulerName
Definition: SchedulerMonSvc.h:46
TestMuonSF::getProperty
T getProperty(const asg::IAsgTool *interface_tool, const std::string &prop_name)
Definition: MuonSFTestHelper.cxx:17
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
PixelAthHitMonAlgCfg.duration
duration
Definition: PixelAthHitMonAlgCfg.py:152
SchedulerMonSvc::m_monTool
ToolHandle< GenericMonitoringTool > m_monTool
Definition: SchedulerMonSvc.h:50
SchedulerMonSvc::m_lastSnapCounter
std::atomic_size_t m_lastSnapCounter
Definition: SchedulerMonSvc.h:57
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:221
SchedulerMonSvc::m_startTime
ClockType::time_point m_startTime
Definition: SchedulerMonSvc.h:56
python.BackTrackingConfig.numThreads
int numThreads
Definition: BackTrackingConfig.py:61
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
SchedulerMonSvc::m_running
std::atomic_bool m_running
Definition: SchedulerMonSvc.h:55
Monitored::Scalar
Declare a monitored scalar variable.
Definition: MonitoredScalar.h:34
SchedulerMonSvc::m_monIntervalMillisec
Gaudi::Property< unsigned int > m_monIntervalMillisec
Definition: SchedulerMonSvc.h:48
Monitored::Timer
A monitored timer.
Definition: MonitoredTimer.h:32