Loading [MathJax]/jax/output/SVG/config.js
ATLAS Offline Software
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
WebdaqHistSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "WebdaqHistSvc.h"
6 
7 #include "GaudiKernel/ISvcLocator.h"
8 #include "GaudiKernel/IIncidentSvc.h"
10 
13 
14 #include "hltinterface/IInfoRegister.h"
15 #include "webdaq/webdaq-root.hpp"
16 #include "webdaq/webdaq.hpp"
17 
18 #include "TError.h"
19 #include "TGraph.h"
20 #include "TH1.h"
21 #include "TH2.h"
22 #include "TH3.h"
23 #include "TObject.h"
24 #include "TROOT.h"
25 #include "TTree.h"
26 #include <TBufferJSON.h>
27 
28 #include <boost/date_time/posix_time/posix_time_types.hpp>
29 #include <boost/date_time/posix_time/posix_time.hpp>
30 #include <boost/date_time/gregorian/gregorian_types.hpp>
31 
32 #include <cstdlib>
33 
34 WebdaqHistSvc::WebdaqHistSvc(const std::string& name, ISvcLocator* svc) : base_class(name, svc)
35 {}
36 
37 /**************************************************************************************/
38 
40 {
41  // Protect against multiple instances of TROOT
42  if (0 == gROOT) {
43  static TROOT root("root", "ROOT I/O");
44  }
45  else {
46  ATH_MSG_VERBOSE("ROOT already initialized, debug = " << gDebug);
47  }
48 
49  gErrorIgnoreLevel = kBreak; // ignore warnings see TError.h in ROOT base src
50 
51  // compile regexes
52  m_excludeTypeRegex = boost::regex(m_excludeType.value());
53  m_includeTypeRegex = boost::regex(m_includeType.value());
54  m_excludeNameRegex = boost::regex(m_excludeName.value());
55  m_includeNameRegex = boost::regex(m_includeName.value());
56  m_PublicationIncludeNameRegex = boost::regex(m_PublicationIncludeName.value());
57  m_fastPublicationIncludeNameRegex = boost::regex(m_fastPublicationIncludeName.value());
58 
59  // Retrieve and set OH mutex
60  ATH_MSG_INFO("Enabling use of OH histogram mutex");
61  static std::mutex mutex;
63  ATH_CHECK( m_jobOptionsSvc.retrieve() );
64 
65  //Retireve enviroment variables
66  const char* tdaq_partition_cstr = std::getenv("TDAQ_PARTITION");
67  if (tdaq_partition_cstr != nullptr) {
68  m_partition = std::string(tdaq_partition_cstr);
69  ATH_MSG_INFO("Partition: " << m_partition);
70  } else {
71  ATH_MSG_ERROR("TDAQ_PARTITION environment variable not set");
72  return StatusCode::FAILURE;
73  }
74  const char* tdaqWebdaqBase_cstr = std::getenv("TDAQ_WEBDAQ_BASE");
75  if (tdaqWebdaqBase_cstr != nullptr) {
76  m_tdaqWebdaqBase = std::string(tdaqWebdaqBase_cstr);
77  ATH_MSG_INFO("TDAQ_WEBDAQ_BASE value: " << m_tdaqWebdaqBase);
78  } else {
79  ATH_MSG_ERROR("TDAQ_WEBDAQ_BASE environment variable not set! Is needed for the OH publication through webdaq");
80  return StatusCode::FAILURE;
81  }
82  const char* tdaq_oh_server = std::getenv("TDAQ_OH_SERVER");
83  if (tdaq_oh_server != nullptr) {
84  m_tdaqOHServerName = std::string(tdaq_oh_server);
85  } else {
86  m_tdaqOHServerName = m_OHServerName.value();
87  }
88  ATH_MSG_INFO("TDAQ_OH_SERVER value: " << m_tdaqOHServerName);
89  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
90  ATH_CHECK( incSvc.retrieve() );
91  incSvc->addListener(this, AthenaInterprocess::UpdateAfterFork::type());
92  return StatusCode::SUCCESS;
93 }
94 
95 /**************************************************************************************/
96 
97 void WebdaqHistSvc::handle(const Incident& incident)
98 {
99  if (incident.type() == AthenaInterprocess::UpdateAfterFork::type()) {
100  ATH_MSG_INFO("Going to initialize the monitoring Thread");
103  }
104 }
105 
106 /**************************************************************************************/
107 
109 {
111  ATH_MSG_DEBUG("Stopping monitoring task");
112  m_stopFlag = true;
113  // Wait for the task to finish
114  if (m_thread.joinable()) {
115  ATH_MSG_DEBUG("Going to join the monitoring thread");
116  try {
117  m_thread.join();
118  }
119  catch (const std::exception& e) {
120  ATH_MSG_ERROR("Failed to join the monitoring thread: " << e.what());
121  return StatusCode::FAILURE;
122  }
123  }
124  if (m_threadFast.joinable()) {
125  ATH_MSG_DEBUG("Going to join the fast monitoring thread");
126  try {
127  m_threadFast.join();
128  }
129  catch (const std::exception& e) {
130  ATH_MSG_ERROR("Failed to join the fast monitoring thread: " << e.what());
131  return StatusCode::FAILURE;
132  }
133  }
134  ATH_MSG_DEBUG("Clearing list of histograms");
135  m_hists.clear();
136  m_histoMapUpdated = true;
137  m_histoMapUpdatedFast = true;
138  return StatusCode::SUCCESS;
139 }
140 
141 /**************************************************************************************/
142 
144 {
145  ATH_MSG_INFO("finalize");
146  // Reset OH mutex
148 
149  return StatusCode::SUCCESS;
150 }
151 
152 /**************************************************************************************/
153 
154 template <typename T>
155 StatusCode WebdaqHistSvc::regHist_i(std::unique_ptr<T> hist_unique, const std::string& id,
156  bool shared, THistID*& phid)
157 {
158  ATH_MSG_DEBUG("Registering histogram " << id);
159 
160  phid = nullptr;
161  if (not isObjectAllowed(id, hist_unique.get())) {
162  return StatusCode::FAILURE;
163  }
164 
165  if (hist_unique->Class()->InheritsFrom(TH1::Class())) {
166 
168  if (m_hists.find(accessor, id)) {
169  ATH_MSG_ERROR("Histogram with name " << id << " already registered");
170  return StatusCode::FAILURE;
171  }
172  // Element not found, attempt to insert
173  if (!m_hists.insert(accessor, id)) {
174  ATH_MSG_ERROR("Failed to insert histogram with name " << id);
175  return StatusCode::FAILURE;
176  }
177  T* hist = hist_unique.release();
178  m_histoMapUpdated = true;
179  m_histoMapUpdatedFast = true;
180  accessor->second = THistID(id, hist);
181  //finished
182  if (shared) accessor->second.mutex = new std::mutex;
183  phid = &accessor->second;
184  ATH_MSG_DEBUG((shared ? "Shared histogram " : "Histogram ")
185  << hist->GetName() << " registered under " << id << " " << name());
186  } else {
187  ATH_MSG_ERROR("Cannot register " << hist_unique->ClassName()
188  << " because it does not inherit from TH1");
189  return StatusCode::FAILURE;
190  }
191  return StatusCode::SUCCESS;
192 }
193 
194 /**************************************************************************************/
195 
196 template <typename T>
197 LockedHandle<T> WebdaqHistSvc::regShared_i(const std::string& id, std::unique_ptr<T> hist)
198 {
199  LockedHandle<T> lh(nullptr, nullptr);
200 
202  // Check if the histogram is already registered
203  if (!m_hists.find(accessor, id)) {
204  // No histogram under that id yet
205  T* phist = hist.get();
206  THistID* phid = nullptr;
207  if (regHist_i(std::move(hist), id, true, phid).isSuccess()) {
208  if (phid) lh.set(phist, phid->mutex);
209  }
210  }
211  else
212  {
213  // Histogram already registered under that id
214  if (accessor->second.mutex == nullptr) {
215  ATH_MSG_ERROR("regShared: previously registered histogram \"" << id
216  << "\" was not marked shared");
217  }
218  T* phist = dynamic_cast<T*>(accessor->second.obj);
219  if (phist == nullptr) {
220  ATH_MSG_ERROR("regShared: unable to dcast retrieved shared hist \""
221  << id << "\" of type " << accessor->second.obj->IsA()->GetName()
222  << " to requested type " << System::typeinfoName(typeid(T)));
223  }
224  else {
225  lh.set(phist, accessor->second.mutex);
226  //hist is automatically deleted at end of method
227  }
228  }
229  return lh;
230 }
231 
232 
233 /**************************************************************************************/
234 
235 template <typename T>
236 T* WebdaqHistSvc::getHist_i(const std::string& id, const size_t& /*ind*/, bool quiet) const
237 {
238  ATH_MSG_DEBUG("Getting histogram " << id);
239 
240  tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
241  if (!m_hists.find(accessor, id) or accessor.empty()) {
242  if (!quiet) ATH_MSG_ERROR("could not locate Hist with id \"" << id << "\"");
243  return nullptr;
244  }
245 
246  T* phist = dynamic_cast<T*>(accessor->second.obj);
247  if (phist == nullptr) {
248  ATH_MSG_ERROR("getHist: unable to dcast retrieved shared hist \""
249  << id << "\" of type " << accessor->second.obj->IsA()->GetName() << " to requested type "
250  << System::typeinfoName(typeid(T)));
251  return nullptr;
252  }
253  return phist;
254 }
255 
256 /**************************************************************************************/
257 
258 StatusCode WebdaqHistSvc::getTHists_i(const std::string& dir, TList& tl) const
259 {
260  for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
261  const std::string& id = it->first;
262  const THistID& h = it->second;
263  if (id.starts_with(dir)) { // histogram booking path starts from the dir
264  tl.Add(h.obj);
265  }
266  }
267  return StatusCode::SUCCESS;
268 }
269 
270 /**************************************************************************************/
271 
272 template <typename T>
273 LockedHandle<T> WebdaqHistSvc::getShared_i(const std::string& id) const
274 {
275  tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
276  if (m_hists.find(accessor, id)) {
277  if (accessor->second.mutex == nullptr) {
278  ATH_MSG_ERROR("getShared: found Hist with id \"" << id
279  << "\", but it's not marked as shared");
280  return {};
281  }
282  T* phist = dynamic_cast<T*>(accessor->second.obj);
283  if (phist == nullptr) {
284  ATH_MSG_ERROR("getShared: unable to dcast retrieved shared hist \""
285  << id << "\" of type " << accessor->second.obj->IsA()->GetName()
286  << " to requested type " << System::typeinfoName(typeid(T)));
287  return {};
288  }
289  return LockedHandle<T>(phist, accessor->second.mutex);
290  }
291  ATH_MSG_ERROR("getShared: cannot find histogram with id \"" << id << "\"");
292  return {};
293 }
294 
295 /**************************************************************************************/
296 
298 {
299  // Find the relevant histogram and deregister it
300  for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
301  if (it->second.obj == optr) {
302  ATH_MSG_DEBUG("Found histogram " << optr << " booked under " << it->first
303  << " and will deregister it");
304  return deReg(it->first);
305  }
306  }
307  ATH_MSG_ERROR("Histogram with pointer " << optr << " not found in the histogram map");
308  return StatusCode::FAILURE;
309 }
310 
311 /**************************************************************************************/
312 
313 StatusCode WebdaqHistSvc::deReg(const std::string& id)
314 {
316  if (m_hists.find(accessor, id)) {
317  //Delete the histogram
318  accessor->second.obj->Delete();
319  m_hists.erase(accessor);
320  m_histoMapUpdated = true;
321  m_histoMapUpdatedFast = true;
322  ATH_MSG_DEBUG("Deregistration of " << id << " done");
323  return StatusCode::SUCCESS;
324  }
325  ATH_MSG_ERROR("Deregistration failed: histogram with id \"" << id << "\" not found");
326  return StatusCode::FAILURE;
327 }
328 
329 /**************************************************************************************/
330 
331 std::vector<std::string> WebdaqHistSvc::getHists() const
332 {
333  std::vector<std::string> l;
334  l.reserve(m_hists.size());
335  for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
336  l.push_back(it->first);
337  }
338  return l;
339 }
340 
341 /**************************************************************************************/
342 
343 std::set<std::string> WebdaqHistSvc::getSet(boost::regex nameSelect) const
344 {
345  std::vector<std::string> l;
346  l.reserve(m_hists.size());
347  for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
348  if (boost::regex_match(it->first, nameSelect)) {
349  l.push_back(it->first);
350  }
351  }
352  ATH_MSG_DEBUG("Number of histograms matched: " << l.size());
353  std::set<std::string> HistoSet(l.begin(), l.end());
354  return HistoSet;
355 }
356 
357 /**************************************************************************************/
358 
359 bool WebdaqHistSvc::isObjectAllowed(const std::string& path, const TObject* o) const
360 {
361  boost::cmatch what;
362 
363  if (not boost::regex_match(o->ClassName(), what, m_includeTypeRegex)) {
364  ATH_MSG_WARNING("Object " << path << " of type " << o->ClassName()
365  << " does NOT match IncludeType \"" << m_includeType << "\"");
366  return false;
367  }
368 
369  if (boost::regex_match(o->ClassName(), what, m_excludeTypeRegex)) {
370  ATH_MSG_WARNING("Object " << path << " of type " << o->ClassName() << " matches ExcludeType \""
371  << m_excludeType << "\"");
372  return false;
373  }
374 
375  if (not boost::regex_match(path.c_str(), what, m_includeNameRegex)) {
376  ATH_MSG_WARNING("Object " << path << " does NOT match IncludeName \"" << m_includeName << "\"");
377  return false;
378  }
379 
380  if (boost::regex_match(path.c_str(), what, m_excludeNameRegex)) {
381  ATH_MSG_WARNING("Object " << path << " matches ExcludeName \"" << m_excludeName << "\"");
382  return false;
383  }
384 
385  return true;
386 }
387 
388 bool WebdaqHistSvc::existsHist(const std::string& name) const
389 {
390  return (getHist_i<TH1>(name, 0, true) != nullptr);
391 }
392 
393 /**************************************************************************************
394  * Typed interface methods
395  * All these are just forwarding to the templated xyz_i methods
396  **************************************************************************************/
397 StatusCode WebdaqHistSvc::regHist(const std::string& id)
398 {
399  std::unique_ptr<TH1> hist = nullptr;
400  THistID* hid = nullptr;
401  return regHist_i(std::move(hist), id, false, hid);
402 }
403 
404 StatusCode WebdaqHistSvc::regHist(const std::string& id, std::unique_ptr<TH1> hist)
405 {
406  THistID* hid = nullptr;
407  return regHist_i(std::move(hist), id, false, hid);
408 }
409 
410 StatusCode WebdaqHistSvc::regHist(const std::string& id, TH1* hist_ptr)
411 {
412  THistID* hid = nullptr;
413  std::unique_ptr<TH1> hist(hist_ptr);
414  return regHist_i(std::move(hist), id, false, hid);
415 }
416 
417 /**************************************************************************************/
418 
419 StatusCode WebdaqHistSvc::getHist(const std::string& id, TH1*& hist, size_t ind) const
420 {
421  hist = getHist_i<TH1>(id, ind);
422  return (hist != nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
423 }
424 
425 StatusCode WebdaqHistSvc::getHist(const std::string& id, TH2*& hist, size_t ind) const
426 {
427  hist = getHist_i<TH2>(id, ind);
428  return (hist != nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
429 }
430 
431 StatusCode WebdaqHistSvc::getHist(const std::string& id, TH3*& hist, size_t ind) const
432 {
433  hist = getHist_i<TH3>(id, ind);
434  return (hist != nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
435 }
436 
437 /**************************************************************************************/
438 
439 StatusCode WebdaqHistSvc::getTHists(TDirectory* td, TList& tl, bool recurse) const
440 {
441  if (recurse) ATH_MSG_DEBUG("Recursive flag is not supported in this implementation");
442  return getTHists_i(std::string(td->GetPath()), tl);
443 }
444 
445 StatusCode WebdaqHistSvc::getTHists(const std::string& dir, TList& tl, bool recurse) const
446 {
447  if (recurse) ATH_MSG_DEBUG("Recursive flag is not supported in this implementation");
448  return getTHists_i(dir, tl);
449 }
450 
451 StatusCode WebdaqHistSvc::getTHists(TDirectory* td, TList& tl, bool recurse, bool reg)
452 {
453  if (recurse || reg)
454  ATH_MSG_DEBUG("Recursive flag and automatic registration flag is not "
455  "supported in this implementation");
456  return getTHists_i(std::string(td->GetPath()), tl);
457 }
458 
459 StatusCode WebdaqHistSvc::getTHists(const std::string& dir, TList& tl, bool recurse, bool reg)
460 {
461  if (recurse || reg)
462  ATH_MSG_DEBUG("Recursive flag and automatic registration flag is not "
463  "supported in this implementation");
464  return getTHists_i(dir, tl);
465 }
466 
467 /**************************************************************************************/
468 
469 StatusCode WebdaqHistSvc::regShared(const std::string& id, std::unique_ptr<TH1> hist,
470  LockedHandle<TH1>& lh)
471 {
472  lh = regShared_i<TH1>(id, std::move(hist));
473  return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
474 }
475 
476 StatusCode WebdaqHistSvc::regShared(const std::string& id, std::unique_ptr<TH2> hist,
477  LockedHandle<TH2>& lh)
478 {
479  lh = regShared_i<TH2>(id, std::move(hist));
480  return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
481 }
482 
483 StatusCode WebdaqHistSvc::regShared(const std::string& id, std::unique_ptr<TH3> hist,
484  LockedHandle<TH3>& lh)
485 {
486  lh = regShared_i<TH3>(id, std::move(hist));
487  return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
488 }
489 
490 /**************************************************************************************/
491 
492 StatusCode WebdaqHistSvc::getShared(const std::string& id, LockedHandle<TH1>& lh) const
493 {
494  lh = getShared_i<TH1>(id);
495  return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
496 }
497 
498 StatusCode WebdaqHistSvc::getShared(const std::string& id, LockedHandle<TH2>& lh) const
499 {
500  lh = getShared_i<TH2>(id);
501  return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
502 }
503 
504 StatusCode WebdaqHistSvc::getShared(const std::string& id, LockedHandle<TH3>& lh) const
505 {
506  lh = getShared_i<TH3>(id);
507  return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
508 }
509 
510 /**************************************************************************************/
511 
522 void WebdaqHistSvc::monitoringTask(int numSlots, int intervalSeconds, std::atomic<bool>& histoMapUpdated, boost::regex nameSelect)
523 {
524  ATH_MSG_INFO("Started monitoring task for partition: " << m_partition << "and regex: " << nameSelect.str());
525  std::string appName = m_jobOptionsSvc->get("DataFlowConfig.DF_ApplicationName");
526  // OH doesn't allow multiple providers for a given server
527  // Need to modify the path for one of the threads to avoid the issue
528  if (nameSelect != boost::regex(".*"))
529  appName = appName + "_fast";
530 
531  // Set the publication period
532  boost::posix_time::time_duration interval{boost::posix_time::seconds(intervalSeconds)};
533  ATH_MSG_DEBUG("Interval set to " << interval.total_seconds() << " seconds");
534  int interval_ms = interval.total_milliseconds();
535  if(numSlots == 0) numSlots = 1;
536  boost::posix_time::ptime epoch(boost::gregorian::date(2024,1,1));
537  // Sleep duration between slots (plus an extra slot for allowing a last sleep cycle)
538  boost::posix_time::time_duration slotSleepDuration = interval / (numSlots + 1);
539 
540  // Create the Set of the histograms keys to order the histograms publication and reset the histoMapUpdated flag
541  std::set<std::string> HistoSet = getSet(nameSelect);
542  histoMapUpdated = false;
543 
544  // Sync the publication to the period
545  syncPublish(interval_ms, epoch);
546  ATH_MSG_DEBUG("Monitoring task synched");
547 
548  // Publication loop
549  while (!m_stopFlag)
550  {
551  // Check if the histograms map has been updated, and if so update the Set
552  if (histoMapUpdated)
553  {
554  ATH_MSG_DEBUG("Histo map updated, updating the Set");
555  HistoSet = getSet(nameSelect);
556  histoMapUpdated = false;
557  }
558  size_t totalHists = HistoSet.size();
559  ATH_MSG_DEBUG("Going to publish " << totalHists << " histograms");
560 
561  // Divide the histograms in batches
562  size_t batchSize = (totalHists + numSlots - 1) / numSlots; // Ceiling division
563  ATH_MSG_DEBUG("Num of slots:" << numSlots << ", Interval_ms " << interval_ms << " milliseconds, Batch size: " << batchSize);
564 
565  boost::posix_time::ptime start_time = boost::posix_time::microsec_clock::universal_time();
566  int counter = 0;
567  int BatchCounter = 0;
568  auto it = HistoSet.begin();
569  while(it != HistoSet.end())
570  {
571  boost::posix_time::ptime slot_start_time = boost::posix_time::microsec_clock::universal_time();
572  //Batch publication
573  ATH_MSG_DEBUG("Batch publication number " << BatchCounter << " starting.");
574  for(size_t j = 0; j < batchSize; ++j)
575  {
576  if(it == HistoSet.end())
577  {
578  break;
579  }
580  const std::string& id = *it;
581  std::string path = appName + '.' + id;
582  ATH_MSG_DEBUG("Publishing to " << m_partition << " Histogram " << path << " to the OH server " << m_tdaqOHServerName);
583  tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
584  if (!m_hists.find(accessor, id)) {
585  ATH_MSG_WARNING("Histogram with name " << id << " not found in histogram map (probably deregistered).");
586  it++;
587  continue;
588  }
589  else
590  {
591  ATH_MSG_DEBUG("Histogram found in map, going to lock mutex and then publish it");
592  //Here we clone the histogram to avoid locking the OH mutex during the whole publication
593  TObject* obj = nullptr;
594  {
595  //Locking the OH mutex before touching the Histogram
597  obj = accessor->second.obj->Clone();
598  }
599  if (obj == nullptr) {
600  ATH_MSG_ERROR("Failed to clone histogram " << id);
601  it++;
602  continue;
603  }
604  if (!webdaq::oh::put(m_partition, m_tdaqOHServerName, path, obj)) {
605  ATH_MSG_ERROR("Histogram publishing failed !");
606  }
607  //Delete the cloned histogram. This was creating a memory leak
608  ATH_MSG_DEBUG("Deleting cloned histogram");
609  delete obj;
610  }
611  it++;
612  counter++;
613  }
614  ATH_MSG_DEBUG("Batch publication completed, " << counter << " histograms published");
615  // Sleep for slotSleepDuration - slot publication time, unless it's the last slot
616  if (it != HistoSet.end()) {
617  boost::posix_time::ptime slot_end_time = boost::posix_time::microsec_clock::universal_time();
618  int slot_sleep_time = slotSleepDuration.total_milliseconds() - (slot_end_time-slot_start_time).total_milliseconds();
619  if (slot_sleep_time > 0) {
620  ATH_MSG_DEBUG("Sleeping for " << slot_sleep_time << " seconds before publishing the next batch");
621  conditionedSleep(std::chrono::milliseconds(slot_sleep_time), m_stopFlag);
622  }
623  }
624  BatchCounter++;
625  }
626 
627  //check if we exceeded the publication period
628  boost::posix_time::ptime end_time = boost::posix_time::microsec_clock::universal_time();
629  if (boost::posix_time::time_duration(end_time-start_time) > interval) {
630  ATH_MSG_WARNING("Publication deadline missed, cycle exceeded the interval.. Total publication time "
631  << boost::posix_time::to_simple_string(end_time-start_time));
632  }
633  ATH_MSG_DEBUG("Completed the publication of " << counter << " histograms. Publication time: " << boost::posix_time::to_simple_string(end_time-start_time));
634 
635  //sleep till the next cycle
636  boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
637  int nowMs = (now-epoch).total_milliseconds();
638  boost::posix_time::time_duration next_cycle(boost::posix_time::milliseconds(interval_ms - (nowMs % interval_ms)));
639  ATH_MSG_DEBUG("epoch " << epoch);
640  ATH_MSG_DEBUG("interval_ms" << interval_ms);
641  ATH_MSG_DEBUG("now_ms " << nowMs);
642  ATH_MSG_DEBUG("Sleeping for " << next_cycle.total_milliseconds() << " milliseconds till the next cycle");
643  conditionedSleep(std::chrono::milliseconds(next_cycle.total_milliseconds()), m_stopFlag);
644  }
645  ATH_MSG_INFO("Monitoring task stopped");
646 }
647 
648 /**************************************************************************************/
649 
650 void WebdaqHistSvc::syncPublish(long int interval_ms, boost::posix_time::ptime epoch)
651 {
652  //Sync the publication to a multple of the interval
653  //Code taken from TDAQ monsvc https://gitlab.cern.ch/atlas-tdaq-software/monsvc/-/blob/master/src/PeriodicScheduler.cxx?ref_type=heads#L163
654  boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
655  int now_ms = (now-epoch).total_milliseconds();
656  //If now_ms % interval_ms == 0 we skip a cycle. Too bad.
657  boost::posix_time::time_duration sync(boost::posix_time::milliseconds(interval_ms - (now_ms % interval_ms)));
658  //Do not sync if we are below 50 ms
659  if (sync.total_milliseconds() > 50){
660  std::this_thread::sleep_for(std::chrono::milliseconds(sync.total_milliseconds()));
661  }
662 }
663 
664 void WebdaqHistSvc::conditionedSleep(std::chrono::milliseconds duration, const std::atomic<bool>& stopFlag) {
666  while (true) {
667  if (stopFlag.load()) {
668  return;
669  }
670  auto elapsed = std::chrono::steady_clock::now() - start;
671  if (elapsed >= duration) {
672  break;
673  }
674  auto remaining = duration - std::chrono::duration_cast<std::chrono::milliseconds>(elapsed);
675  std::this_thread::sleep_for(std::min(std::chrono::milliseconds(500), remaining));
676  }
677 }
678 
WebdaqHistSvc::getHist_i
T * getHist_i(const std::string &id, const size_t &ind, bool quiet=false) const
Definition: WebdaqHistSvc.cxx:236
AllowedVariables::e
e
Definition: AsgElectronSelectorTool.cxx:37
OHLockedHist.h
OH histogram lock header file.
DiTauMassTools::TauTypes::lh
@ lh
Definition: PhysicsAnalysis/TauID/DiTauMassTools/DiTauMassTools/HelperFunctions.h:53
WebdaqHistSvc::m_includeType
Gaudi::Property< std::string > m_includeType
Definition: WebdaqHistSvc.h:185
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
WebdaqHistSvc::handle
virtual void handle(const Incident &incident) override
Definition: WebdaqHistSvc.cxx:97
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
WebdaqHistSvc.h
quiet
bool quiet
Definition: TrigGlobEffCorrValidation.cxx:190
BeamSpot::mutex
std::mutex mutex
Definition: InDetBeamSpotVertex.cxx:18
covarianceToolsLibrary.gErrorIgnoreLevel
gErrorIgnoreLevel
Definition: covarianceToolsLibrary.py:21
WebdaqHistSvc::getSet
std::set< std::string > getSet(boost::regex) const
Definition: WebdaqHistSvc.cxx:343
WebdaqHistSvc::regHist
virtual StatusCode regHist(const std::string &name) override
Definition: WebdaqHistSvc.cxx:397
WebdaqHistSvc::getShared
virtual StatusCode getShared(const std::string &, LockedHandle< TH1 > &) const override
Definition: WebdaqHistSvc.cxx:492
plotmaker.hist
hist
Definition: plotmaker.py:148
min
constexpr double min()
Definition: ap_fixedTest.cxx:26
initialize
void initialize()
Definition: run_EoverP.cxx:894
WebdaqHistSvc::finalize
virtual StatusCode finalize() override
Definition: WebdaqHistSvc.cxx:143
WebdaqHistSvc::existsHist
virtual bool existsHist(const std::string &name) const override
Definition: WebdaqHistSvc.cxx:388
WebdaqHistSvc::stop
virtual StatusCode stop() override
Definition: WebdaqHistSvc.cxx:108
mergePhysValFiles.start
start
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:14
WebdaqHistSvc::m_intervalSeconds
Gaudi::Property< int > m_intervalSeconds
Definition: WebdaqHistSvc.h:193
skel.it
it
Definition: skel.GENtoEVGEN.py:407
WebdaqHistSvc::m_jobOptionsSvc
ServiceHandle< Gaudi::Interfaces::IOptionsSvc > m_jobOptionsSvc
joboptions service
Definition: WebdaqHistSvc.h:166
WebdaqHistSvc::m_includeName
Gaudi::Property< std::string > m_includeName
Definition: WebdaqHistSvc.h:187
UploadAMITag.l
list l
Definition: UploadAMITag.larcaf.py:158
sendEI_SPB.root
root
Definition: sendEI_SPB.py:34
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
WebdaqHistSvc::m_stopFlag
std::atomic< bool > m_stopFlag
Flag to stop the monitoring task.
Definition: WebdaqHistSvc.h:144
dq_make_web_display.recurse
def recurse(rdir, dqregion, ignorepath, reffile=None)
Definition: dq_make_web_display.py:23
WebdaqHistSvc::THistID
Helper struct that bundles the histogram, name and mutex.
Definition: WebdaqHistSvc.h:134
WebdaqHistSvc::m_thread
std::thread m_thread
Publication thread.
Definition: WebdaqHistSvc.h:152
oh_scoped_lock_histogram
Scoped lock to be used for threaded histogram operations.
Definition: OHLockedHist.h:108
PrepareReferenceFile.regex
regex
Definition: PrepareReferenceFile.py:43
WebdaqHistSvc::m_tdaqOHServerName
std::string m_tdaqOHServerName
The OH server name (TDAQ_OH_SERVER if defined, m_OHServerName otherwise)
Definition: WebdaqHistSvc.h:159
WebdaqHistSvc::THistID::mutex
std::mutex * mutex
Definition: WebdaqHistSvc.h:140
WebdaqHistSvc::getShared_i
LockedHandle< T > getShared_i(const std::string &id) const
Definition: WebdaqHistSvc.cxx:273
WebdaqHistSvc::m_includeTypeRegex
boost::regex m_includeTypeRegex
Definition: WebdaqHistSvc.h:209
python.handimod.now
now
Definition: handimod.py:675
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
WebdaqHistSvc::getTHists_i
StatusCode getTHists_i(const std::string &name, TList &) const
Get TList of registered histograms.
Definition: WebdaqHistSvc.cxx:258
Athena::typeinfoName
std::string typeinfoName(const std::type_info &ti)
Convert a type_info to a demangled string.
Definition: AthenaKernel/src/ClassName.cxx:23
WebdaqHistSvc::m_fastPublicationIncludeNameRegex
boost::regex m_fastPublicationIncludeNameRegex
Definition: WebdaqHistSvc.h:213
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
Incidents.h
getLatestRuns.interval
interval
Definition: getLatestRuns.py:24
calibdata.exception
exception
Definition: calibdata.py:496
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
oh_lock_histogram_mutex::reset_histogram_mutex
static void reset_histogram_mutex()
Reset (disable) histogram mutex.
Definition: OHLockedHist.h:39
WebdaqHistSvc::getHist
virtual StatusCode getHist(const std::string &id, TH1 *&hist, size_t ind) const override
Definition: WebdaqHistSvc.cxx:419
Handler::svc
AthROOTErrorHandlerSvc * svc
Definition: AthROOTErrorHandlerSvc.cxx:10
WebdaqHistSvc::m_threadFast
std::thread m_threadFast
Definition: WebdaqHistSvc.h:153
python.LArCalib_HVCorrConfig.seconds
seconds
Definition: LArCalib_HVCorrConfig.py:99
WebdaqHistSvc::regShared_i
LockedHandle< T > regShared_i(const std::string &id, std::unique_ptr< T > hist)
Definition: WebdaqHistSvc.cxx:197
PixelAthHitMonAlgCfg.duration
duration
Definition: PixelAthHitMonAlgCfg.py:152
WebdaqHistSvc::regHist_i
StatusCode regHist_i(std::unique_ptr< T > hist, const std::string &name, bool shared, THistID *&phid)
Definition: WebdaqHistSvc.cxx:155
beamspotman.dir
string dir
Definition: beamspotman.py:623
WebdaqHistSvc::monitoringTask
void monitoringTask(int, int, std::atomic< bool > &, boost::regex)
The actual publication Task.
Definition: WebdaqHistSvc.cxx:522
WebdaqHistSvc::m_PublicationIncludeNameRegex
boost::regex m_PublicationIncludeNameRegex
Definition: WebdaqHistSvc.h:212
WebdaqHistSvc::m_excludeName
Gaudi::Property< std::string > m_excludeName
Definition: WebdaqHistSvc.h:186
python.ExitCodes.what
def what(code)
Definition: ExitCodes.py:73
WebdaqHistSvc::m_excludeNameRegex
boost::regex m_excludeNameRegex
Definition: WebdaqHistSvc.h:210
id
SG::auxid_t id
Definition: Control/AthContainers/Root/debug.cxx:239
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
fillHVMap_fromASCII.date
string date
Definition: fillHVMap_fromASCII.py:8
WebdaqHistSvc::m_excludeTypeRegex
boost::regex m_excludeTypeRegex
Definition: WebdaqHistSvc.h:208
WebdaqHistSvc::getHists
virtual std::vector< std::string > getHists() const override
Definition: WebdaqHistSvc.cxx:331
AthenaInterprocess::UpdateAfterFork::type
static const std::string & type()
Incident type.
Definition: Incidents.h:49
WebdaqHistSvc::deReg
virtual StatusCode deReg(TObject *obj) override
Definition: WebdaqHistSvc.cxx:297
xAOD::JetAttributeAccessor::accessor
const AccessorWrapper< T > * accessor(xAOD::JetAttribute::AttributeID id)
Returns an attribute accessor corresponding to an AttributeID.
Definition: JetAccessorMap.h:26
WebdaqHistSvc::m_histoMapUpdated
std::atomic< bool > m_histoMapUpdated
Flag to indicate when the histogram map is updated.
Definition: WebdaqHistSvc.h:161
WebdaqHistSvc::regShared
virtual StatusCode regShared(const std::string &, std::unique_ptr< TH1 >, LockedHandle< TH1 > &) override
Definition: WebdaqHistSvc.cxx:469
WebdaqHistSvc::m_hists
tbb::concurrent_hash_map< std::string, THistID > m_hists
Map of the registered histograms.
Definition: WebdaqHistSvc.h:169
WebdaqHistSvc::getTHists
virtual StatusCode getTHists(TDirectory *td, TList &, bool recurse=false) const override
Definition: WebdaqHistSvc.cxx:439
WebdaqHistSvc::m_excludeType
Gaudi::Property< std::string > m_excludeType
Definition: WebdaqHistSvc.h:184
ATLAS_NOT_THREAD_SAFE
StatusCode WebdaqHistSvc::initialize ATLAS_NOT_THREAD_SAFE()
Install fatal handler with default options.
Definition: WebdaqHistSvc.cxx:39
SCT_ConditionsAlgorithms::CoveritySafe::getenv
std::string getenv(const std::string &variableName)
get an environment variable
Definition: SCT_ConditionsUtilities.cxx:17
h
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
ref
const boost::regex ref(r_ef)
WebdaqHistSvc::m_histoMapUpdatedFast
std::atomic< bool > m_histoMapUpdatedFast
Flag to indicate when the histogram map is updated for the fast publication
Definition: WebdaqHistSvc.h:163
WebdaqHistSvc::m_intervalSecondsFast
Gaudi::Property< int > m_intervalSecondsFast
Definition: WebdaqHistSvc.h:194
WebdaqHistSvc::syncPublish
void syncPublish(long int, boost::posix_time::ptime)
Sync the publication to a multiple of the interval.
Definition: WebdaqHistSvc.cxx:650
WebdaqHistSvc::m_includeNameRegex
boost::regex m_includeNameRegex
Definition: WebdaqHistSvc.h:211
dqt_zlumi_alleff_HIST.tl
tl
Definition: dqt_zlumi_alleff_HIST.py:73
WebdaqHistSvc::WebdaqHistSvc
WebdaqHistSvc(const std::string &name, ISvcLocator *svc)
Definition: WebdaqHistSvc.cxx:34
oh_lock_histogram_mutex::set_histogram_mutex
static void set_histogram_mutex(std::mutex &mutex)
Set mutex to be used in oh_lock_histogram.
Definition: OHLockedHist.h:36
test_pyathena.counter
counter
Definition: test_pyathena.py:15
WebdaqHistSvc::m_numSlots
Gaudi::Property< int > m_numSlots
Definition: WebdaqHistSvc.h:191
checker_macros.h
Define macros for attributes used to control the static checker.
python.PyAthena.obj
obj
Definition: PyAthena.py:132
WebdaqHistSvc::conditionedSleep
void conditionedSleep(std::chrono::milliseconds, const std::atomic< bool > &)
Sleep for a duration or until the stop flag is set.
Definition: WebdaqHistSvc.cxx:664
WebdaqHistSvc::m_numSlotsFast
Gaudi::Property< int > m_numSlotsFast
Definition: WebdaqHistSvc.h:192
WebdaqHistSvc::m_partition
std::string m_partition
The partition to publish to.
Definition: WebdaqHistSvc.h:155
TSU::T
unsigned long long T
Definition: L1TopoDataTypes.h:35
WebdaqHistSvc::isObjectAllowed
bool isObjectAllowed(const std::string &path, const TObject *o) const
Does the histogram follow the naming rules ?
Definition: WebdaqHistSvc.cxx:359
ServiceHandle< IIncidentSvc >