ATLAS Offline Software
Loading...
Searching...
No Matches
WebdaqHistSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include "WebdaqHistSvc.h"
6
7#include "GaudiKernel/ISvcLocator.h"
8#include "GaudiKernel/IIncidentSvc.h"
10
13
14#include "hltinterface/IInfoRegister.h"
15#include "webdaq/webdaq-root.hpp"
16#include "webdaq/webdaq.hpp"
17
18#include "TError.h"
19#include "TGraph.h"
20#include "TH1.h"
21#include "TH2.h"
22#include "TH3.h"
23#include "TObject.h"
24#include "TROOT.h"
25#include "TTree.h"
26#include <TBufferJSON.h>
27
28#include <boost/date_time/posix_time/posix_time_types.hpp>
29#include <boost/date_time/posix_time/posix_time.hpp>
30#include <boost/date_time/gregorian/gregorian_types.hpp>
31
32#include <cstdlib>
33
34WebdaqHistSvc::WebdaqHistSvc(const std::string& name, ISvcLocator* svc) : base_class(name, svc)
35{}
36
37/**************************************************************************************/
38
39StatusCode WebdaqHistSvc::initialize ATLAS_NOT_THREAD_SAFE()
40{
41 // Protect against multiple instances of TROOT
42 if (0 == gROOT) {
43 static TROOT root("root", "ROOT I/O");
44 }
45 else {
46 ATH_MSG_VERBOSE("ROOT already initialized, debug = " << gDebug);
47 }
48
49 gErrorIgnoreLevel = kBreak; // ignore warnings see TError.h in ROOT base src
50
51 // compile regexes
52 m_excludeTypeRegex = boost::regex(m_excludeType.value());
53 m_includeTypeRegex = boost::regex(m_includeType.value());
54 m_excludeNameRegex = boost::regex(m_excludeName.value());
55 m_includeNameRegex = boost::regex(m_includeName.value());
56 m_PublicationIncludeNameRegex = boost::regex(m_PublicationIncludeName.value());
57 m_fastPublicationIncludeNameRegex = boost::regex(m_fastPublicationIncludeName.value());
58
59 // Retrieve and set OH mutex
60 ATH_MSG_INFO("Enabling use of OH histogram mutex");
61 static std::mutex mutex;
63 ATH_CHECK( m_jobOptionsSvc.retrieve() );
64
65 //Retireve enviroment variables
66 const char* tdaq_partition_cstr = std::getenv("TDAQ_PARTITION");
67 if (tdaq_partition_cstr != nullptr) {
68 m_partition = std::string(tdaq_partition_cstr);
69 ATH_MSG_INFO("Partition: " << m_partition);
70 } else {
71 ATH_MSG_ERROR("TDAQ_PARTITION environment variable not set");
72 return StatusCode::FAILURE;
73 }
74 const char* tdaqWebdaqBase_cstr = std::getenv("TDAQ_WEBDAQ_BASE");
75 if (tdaqWebdaqBase_cstr != nullptr) {
76 m_tdaqWebdaqBase = std::string(tdaqWebdaqBase_cstr);
77 ATH_MSG_INFO("TDAQ_WEBDAQ_BASE value: " << m_tdaqWebdaqBase);
78 } else {
79 ATH_MSG_ERROR("TDAQ_WEBDAQ_BASE environment variable not set! Is needed for the OH publication through webdaq");
80 return StatusCode::FAILURE;
81 }
82 const char* tdaq_oh_server = std::getenv("TDAQ_OH_SERVER");
83 if (tdaq_oh_server != nullptr) {
84 m_tdaqOHServerName = std::string(tdaq_oh_server);
85 } else {
86 m_tdaqOHServerName = m_OHServerName.value();
87 }
88 ATH_MSG_INFO("TDAQ_OH_SERVER value: " << m_tdaqOHServerName);
89 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
90 ATH_CHECK( incSvc.retrieve() );
91 incSvc->addListener(this, AthenaInterprocess::UpdateAfterFork::type());
92 return StatusCode::SUCCESS;
93}
94
95/**************************************************************************************/
96
97void WebdaqHistSvc::handle(const Incident& incident)
98{
99 if (incident.type() == AthenaInterprocess::UpdateAfterFork::type()) {
100 ATH_MSG_INFO("Going to initialize the monitoring Thread");
103 }
104}
105
106/**************************************************************************************/
107
109{
111 ATH_MSG_DEBUG("Stopping monitoring task");
112 m_stopFlag = true;
113 // Wait for the task to finish
114 if (m_thread.joinable()) {
115 ATH_MSG_DEBUG("Going to join the monitoring thread");
116 try {
117 m_thread.join();
118 }
119 catch (const std::exception& e) {
120 ATH_MSG_ERROR("Failed to join the monitoring thread: " << e.what());
121 return StatusCode::FAILURE;
122 }
123 }
124 if (m_threadFast.joinable()) {
125 ATH_MSG_DEBUG("Going to join the fast monitoring thread");
126 try {
127 m_threadFast.join();
128 }
129 catch (const std::exception& e) {
130 ATH_MSG_ERROR("Failed to join the fast monitoring thread: " << e.what());
131 return StatusCode::FAILURE;
132 }
133 }
134 ATH_MSG_DEBUG("Clearing list of histograms");
135 m_hists.clear();
136 m_histoMapUpdated = true;
138 return StatusCode::SUCCESS;
139}
140
141/**************************************************************************************/
142
144{
145 ATH_MSG_INFO("finalize");
146 // Reset OH mutex
148
149 return StatusCode::SUCCESS;
150}
151
152/**************************************************************************************/
153
154template <typename T>
155StatusCode WebdaqHistSvc::regHist_i(std::unique_ptr<T> hist_unique, const std::string& id,
156 bool shared, THistID*& phid)
157{
158 ATH_MSG_DEBUG("Registering histogram " << id);
159
160 phid = nullptr;
161 if (not isObjectAllowed(id, hist_unique.get())) {
162 return StatusCode::FAILURE;
163 }
164
165 if (hist_unique->Class()->InheritsFrom(TH1::Class())) {
166
167 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
168 if (m_hists.find(accessor, id)) {
169 ATH_MSG_ERROR("Histogram with name " << id << " already registered");
170 return StatusCode::FAILURE;
171 }
172 // Element not found, attempt to insert
173 if (!m_hists.insert(accessor, id)) {
174 ATH_MSG_ERROR("Failed to insert histogram with name " << id);
175 return StatusCode::FAILURE;
176 }
177 T* hist = hist_unique.release();
178 m_histoMapUpdated = true;
180 accessor->second = THistID(id, hist);
181 //finished
182 if (shared) accessor->second.mutex = new std::mutex;
183 phid = &accessor->second;
184 ATH_MSG_DEBUG((shared ? "Shared histogram " : "Histogram ")
185 << hist->GetName() << " registered under " << id << " " << name());
186 } else {
187 ATH_MSG_ERROR("Cannot register " << hist_unique->ClassName()
188 << " because it does not inherit from TH1");
189 return StatusCode::FAILURE;
190 }
191 return StatusCode::SUCCESS;
192}
193
194/**************************************************************************************/
195
196template <typename T>
197LockedHandle<T> WebdaqHistSvc::regShared_i(const std::string& id, std::unique_ptr<T> hist)
198{
199 LockedHandle<T> lh(nullptr, nullptr);
200
201 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
202 // Check if the histogram is already registered
203 if (!m_hists.find(accessor, id)) {
204 // No histogram under that id yet
205 T* phist = hist.get();
206 THistID* phid = nullptr;
207 if (regHist_i(std::move(hist), id, true, phid).isSuccess()) {
208 if (phid) lh.set(phist, phid->mutex);
209 }
210 }
211 else
212 {
213 // Histogram already registered under that id
214 if (accessor->second.mutex == nullptr) {
215 ATH_MSG_ERROR("regShared: previously registered histogram \"" << id
216 << "\" was not marked shared");
217 }
218 T* phist = dynamic_cast<T*>(accessor->second.obj);
219 if (phist == nullptr) {
220 ATH_MSG_ERROR("regShared: unable to dcast retrieved shared hist \""
221 << id << "\" of type " << accessor->second.obj->IsA()->GetName()
222 << " to requested type " << System::typeinfoName(typeid(T)));
223 }
224 else {
225 lh.set(phist, accessor->second.mutex);
226 //hist is automatically deleted at end of method
227 }
228 }
229 return lh;
230}
231
232
233/**************************************************************************************/
234
235template <typename T>
236T* WebdaqHistSvc::getHist_i(const std::string& id, const size_t& /*ind*/, bool quiet) const
237{
238 ATH_MSG_DEBUG("Getting histogram " << id);
239
240 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
241 if (!m_hists.find(accessor, id) or accessor.empty()) {
242 if (!quiet) ATH_MSG_ERROR("could not locate Hist with id \"" << id << "\"");
243 return nullptr;
244 }
245
246 T* phist = dynamic_cast<T*>(accessor->second.obj);
247 if (phist == nullptr) {
248 ATH_MSG_ERROR("getHist: unable to dcast retrieved shared hist \""
249 << id << "\" of type " << accessor->second.obj->IsA()->GetName() << " to requested type "
250 << System::typeinfoName(typeid(T)));
251 return nullptr;
252 }
253 return phist;
254}
255
256/**************************************************************************************/
257
258StatusCode WebdaqHistSvc::getTHists_i(const std::string& dir, TList& tl) const
259{
260 for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
261 const std::string& id = it->first;
262 const THistID& h = it->second;
263 if (id.starts_with(dir)) { // histogram booking path starts from the dir
264 tl.Add(h.obj);
265 }
266 }
267 return StatusCode::SUCCESS;
268}
269
270/**************************************************************************************/
271
272template <typename T>
273LockedHandle<T> WebdaqHistSvc::getShared_i(const std::string& id) const
274{
275 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
276 if (m_hists.find(accessor, id)) {
277 if (accessor->second.mutex == nullptr) {
278 ATH_MSG_ERROR("getShared: found Hist with id \"" << id
279 << "\", but it's not marked as shared");
280 return {};
281 }
282 T* phist = dynamic_cast<T*>(accessor->second.obj);
283 if (phist == nullptr) {
284 ATH_MSG_ERROR("getShared: unable to dcast retrieved shared hist \""
285 << id << "\" of type " << accessor->second.obj->IsA()->GetName()
286 << " to requested type " << System::typeinfoName(typeid(T)));
287 return {};
288 }
289 return LockedHandle<T>(phist, accessor->second.mutex);
290 }
291 ATH_MSG_ERROR("getShared: cannot find histogram with id \"" << id << "\"");
292 return {};
293}
294
295/**************************************************************************************/
296
297StatusCode WebdaqHistSvc::deReg(TObject* optr)
298{
299 // Find the relevant histogram and deregister it
300 for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
301 if (it->second.obj == optr) {
302 ATH_MSG_DEBUG("Found histogram " << optr << " booked under " << it->first
303 << " and will deregister it");
304 return deReg(it->first);
305 }
306 }
307 ATH_MSG_ERROR("Histogram with pointer " << optr << " not found in the histogram map");
308 return StatusCode::FAILURE;
309}
310
311/**************************************************************************************/
312
313StatusCode WebdaqHistSvc::deReg(const std::string& id)
314{
315 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
316 if (m_hists.find(accessor, id)) {
317 //Delete the histogram
318 accessor->second.obj->Delete();
319 m_hists.erase(accessor);
320 m_histoMapUpdated = true;
322 ATH_MSG_DEBUG("Deregistration of " << id << " done");
323 return StatusCode::SUCCESS;
324 }
325 ATH_MSG_ERROR("Deregistration failed: histogram with id \"" << id << "\" not found");
326 return StatusCode::FAILURE;
327}
328
329/**************************************************************************************/
330
331std::vector<std::string> WebdaqHistSvc::getHists() const
332{
333 std::vector<std::string> l;
334 l.reserve(m_hists.size());
335 for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
336 l.push_back(it->first);
337 }
338 return l;
339}
340
341/**************************************************************************************/
342
343std::set<std::string> WebdaqHistSvc::getSet(boost::regex nameSelect) const
344{
345 std::vector<std::string> l;
346 l.reserve(m_hists.size());
347 for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
348 if (boost::regex_match(it->first, nameSelect)) {
349 l.push_back(it->first);
350 }
351 }
352 ATH_MSG_DEBUG("Number of histograms matched: " << l.size());
353 std::set<std::string> HistoSet(l.begin(), l.end());
354 return HistoSet;
355}
356
357/**************************************************************************************/
358
359bool WebdaqHistSvc::isObjectAllowed(const std::string& path, const TObject* o) const
360{
361 boost::cmatch what;
362
363 if (not boost::regex_match(o->ClassName(), what, m_includeTypeRegex)) {
364 ATH_MSG_WARNING("Object " << path << " of type " << o->ClassName()
365 << " does NOT match IncludeType \"" << m_includeType << "\"");
366 return false;
367 }
368
369 if (boost::regex_match(o->ClassName(), what, m_excludeTypeRegex)) {
370 ATH_MSG_WARNING("Object " << path << " of type " << o->ClassName() << " matches ExcludeType \""
371 << m_excludeType << "\"");
372 return false;
373 }
374
375 if (not boost::regex_match(path.c_str(), what, m_includeNameRegex)) {
376 ATH_MSG_WARNING("Object " << path << " does NOT match IncludeName \"" << m_includeName << "\"");
377 return false;
378 }
379
380 if (boost::regex_match(path.c_str(), what, m_excludeNameRegex)) {
381 ATH_MSG_WARNING("Object " << path << " matches ExcludeName \"" << m_excludeName << "\"");
382 return false;
383 }
384
385 return true;
386}
387
388bool WebdaqHistSvc::existsHist(const std::string& name) const
389{
390 return (getHist_i<TH1>(name, 0, true) != nullptr);
391}
392
393/**************************************************************************************
394 * Typed interface methods
395 * All these are just forwarding to the templated xyz_i methods
396 **************************************************************************************/
397StatusCode WebdaqHistSvc::regHist(const std::string& id)
398{
399 std::unique_ptr<TH1> hist = nullptr;
400 THistID* hid = nullptr;
401 return regHist_i(std::move(hist), id, false, hid);
402}
403
404StatusCode WebdaqHistSvc::regHist(const std::string& id, std::unique_ptr<TH1> hist)
405{
406 THistID* hid = nullptr;
407 return regHist_i(std::move(hist), id, false, hid);
408}
409
410StatusCode WebdaqHistSvc::regHist(const std::string& id, TH1* hist_ptr)
411{
412 THistID* hid = nullptr;
413 std::unique_ptr<TH1> hist(hist_ptr);
414 return regHist_i(std::move(hist), id, false, hid);
415}
416
417/**************************************************************************************/
418
419StatusCode WebdaqHistSvc::getHist(const std::string& id, TH1*& hist, size_t ind) const
420{
421 hist = getHist_i<TH1>(id, ind);
422 return (hist != nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
423}
424
425StatusCode WebdaqHistSvc::getHist(const std::string& id, TH2*& hist, size_t ind) const
426{
427 hist = getHist_i<TH2>(id, ind);
428 return (hist != nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
429}
430
431StatusCode WebdaqHistSvc::getHist(const std::string& id, TH3*& hist, size_t ind) const
432{
433 hist = getHist_i<TH3>(id, ind);
434 return (hist != nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
435}
436
437/**************************************************************************************/
438
439StatusCode WebdaqHistSvc::getTHists(TDirectory* td, TList& tl, bool recurse) const
440{
441 if (recurse) ATH_MSG_DEBUG("Recursive flag is not supported in this implementation");
442 return getTHists_i(std::string(td->GetPath()), tl);
443}
444
445StatusCode WebdaqHistSvc::getTHists(const std::string& dir, TList& tl, bool recurse) const
446{
447 if (recurse) ATH_MSG_DEBUG("Recursive flag is not supported in this implementation");
448 return getTHists_i(dir, tl);
449}
450
451StatusCode WebdaqHistSvc::getTHists(TDirectory* td, TList& tl, bool recurse, bool reg)
452{
453 if (recurse || reg)
454 ATH_MSG_DEBUG("Recursive flag and automatic registration flag is not "
455 "supported in this implementation");
456 return getTHists_i(std::string(td->GetPath()), tl);
457}
458
459StatusCode WebdaqHistSvc::getTHists(const std::string& dir, TList& tl, bool recurse, bool reg)
460{
461 if (recurse || reg)
462 ATH_MSG_DEBUG("Recursive flag and automatic registration flag is not "
463 "supported in this implementation");
464 return getTHists_i(dir, tl);
465}
466
467/**************************************************************************************/
468
469StatusCode WebdaqHistSvc::regShared(const std::string& id, std::unique_ptr<TH1> hist,
470 LockedHandle<TH1>& lh)
471{
472 lh = regShared_i<TH1>(id, std::move(hist));
473 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
474}
475
476StatusCode WebdaqHistSvc::regShared(const std::string& id, std::unique_ptr<TH2> hist,
477 LockedHandle<TH2>& lh)
478{
479 lh = regShared_i<TH2>(id, std::move(hist));
480 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
481}
482
483StatusCode WebdaqHistSvc::regShared(const std::string& id, std::unique_ptr<TH3> hist,
484 LockedHandle<TH3>& lh)
485{
486 lh = regShared_i<TH3>(id, std::move(hist));
487 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
488}
489
490/**************************************************************************************/
491
492StatusCode WebdaqHistSvc::getShared(const std::string& id, LockedHandle<TH1>& lh) const
493{
494 lh = getShared_i<TH1>(id);
495 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
496}
497
498StatusCode WebdaqHistSvc::getShared(const std::string& id, LockedHandle<TH2>& lh) const
499{
500 lh = getShared_i<TH2>(id);
501 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
502}
503
504StatusCode WebdaqHistSvc::getShared(const std::string& id, LockedHandle<TH3>& lh) const
505{
506 lh = getShared_i<TH3>(id);
507 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
508}
509
510/**************************************************************************************/
511
522void WebdaqHistSvc::monitoringTask(int numSlots, int intervalSeconds, std::atomic<bool>& histoMapUpdated, boost::regex nameSelect)
523{
524 ATH_MSG_INFO("Started monitoring task for partition: " << m_partition << "and regex: " << nameSelect.str());
525 std::string appName = m_jobOptionsSvc->get("DataFlowConfig.DF_ApplicationName");
526 // OH doesn't allow multiple providers for a given server
527 // Need to modify the path for one of the threads to avoid the issue
528 if (nameSelect != boost::regex(".*"))
529 appName = appName + "_fast";
530
531 // Set the publication period
532 boost::posix_time::time_duration interval{boost::posix_time::seconds(intervalSeconds)};
533 ATH_MSG_DEBUG("Interval set to " << interval.total_seconds() << " seconds");
534 int interval_ms = interval.total_milliseconds();
535 if(numSlots == 0) numSlots = 1;
536 boost::posix_time::ptime epoch(boost::gregorian::date(2024,1,1));
537 // Sleep duration between slots (plus an extra slot for allowing a last sleep cycle)
538 boost::posix_time::time_duration slotSleepDuration = interval / (numSlots + 1);
539
540 // Create the Set of the histograms keys to order the histograms publication and reset the histoMapUpdated flag
541 std::set<std::string> HistoSet = getSet(nameSelect);
542 histoMapUpdated = false;
543
544 // Sync the publication to the period
545 syncPublish(interval_ms, epoch);
546 ATH_MSG_DEBUG("Monitoring task synched");
547
548 // Publication loop
549 while (!m_stopFlag)
550 {
551 // Check if the histograms map has been updated, and if so update the Set
552 if (histoMapUpdated)
553 {
554 ATH_MSG_DEBUG("Histo map updated, updating the Set");
555 HistoSet = getSet(nameSelect);
556 histoMapUpdated = false;
557 }
558 size_t totalHists = HistoSet.size();
559 ATH_MSG_DEBUG("Going to publish " << totalHists << " histograms");
560
561 // Divide the histograms in batches
562 size_t batchSize = (totalHists + numSlots - 1) / numSlots; // Ceiling division
563 ATH_MSG_DEBUG("Num of slots:" << numSlots << ", Interval_ms " << interval_ms << " milliseconds, Batch size: " << batchSize);
564
565 boost::posix_time::ptime start_time = boost::posix_time::microsec_clock::universal_time();
566 int counter = 0;
567 int BatchCounter = 0;
568 auto it = HistoSet.begin();
569 while(it != HistoSet.end())
570 {
571 boost::posix_time::ptime slot_start_time = boost::posix_time::microsec_clock::universal_time();
572 //Batch publication
573 ATH_MSG_DEBUG("Batch publication number " << BatchCounter << " starting.");
574 for(size_t j = 0; j < batchSize; ++j)
575 {
576 if(it == HistoSet.end())
577 {
578 break;
579 }
580 const std::string& id = *it;
581 std::string path = appName + '.' + id;
582 ATH_MSG_DEBUG("Publishing to " << m_partition << " Histogram " << path << " to the OH server " << m_tdaqOHServerName);
583 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
584 if (!m_hists.find(accessor, id)) {
585 ATH_MSG_WARNING("Histogram with name " << id << " not found in histogram map (probably deregistered).");
586 it++;
587 continue;
588 }
589 else
590 {
591 ATH_MSG_DEBUG("Histogram found in map, going to lock mutex and then publish it");
592 //Here we clone the histogram to avoid locking the OH mutex during the whole publication
593 TObject* obj = nullptr;
594 {
595 //Locking the OH mutex before touching the Histogram
597 obj = accessor->second.obj->Clone();
598 }
599 if (obj == nullptr) {
600 ATH_MSG_ERROR("Failed to clone histogram " << id);
601 it++;
602 continue;
603 }
604 if (!webdaq::oh::put(m_partition, m_tdaqOHServerName, path, obj)) {
605 ATH_MSG_ERROR("Histogram publishing failed !");
606 }
607 //Delete the cloned histogram. This was creating a memory leak
608 ATH_MSG_DEBUG("Deleting cloned histogram");
609 delete obj;
610 }
611 it++;
612 counter++;
613 }
614 ATH_MSG_DEBUG("Batch publication completed, " << counter << " histograms published");
615 // Sleep for slotSleepDuration - slot publication time, unless it's the last slot
616 if (it != HistoSet.end()) {
617 boost::posix_time::ptime slot_end_time = boost::posix_time::microsec_clock::universal_time();
618 int slot_sleep_time = slotSleepDuration.total_milliseconds() - (slot_end_time-slot_start_time).total_milliseconds();
619 if (slot_sleep_time > 0) {
620 ATH_MSG_DEBUG("Sleeping for " << slot_sleep_time << " seconds before publishing the next batch");
621 conditionedSleep(std::chrono::milliseconds(slot_sleep_time), m_stopFlag);
622 }
623 }
624 BatchCounter++;
625 }
626
627 //check if we exceeded the publication period
628 boost::posix_time::ptime end_time = boost::posix_time::microsec_clock::universal_time();
629 if (boost::posix_time::time_duration(end_time-start_time) > interval) {
630 ATH_MSG_WARNING("Publication deadline missed, cycle exceeded the interval.. Total publication time "
631 << boost::posix_time::to_simple_string(end_time-start_time));
632 }
633 ATH_MSG_DEBUG("Completed the publication of " << counter << " histograms. Publication time: " << boost::posix_time::to_simple_string(end_time-start_time));
634
635 //sleep till the next cycle
636 boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
637 int nowMs = (now-epoch).total_milliseconds();
638 boost::posix_time::time_duration next_cycle(boost::posix_time::milliseconds(interval_ms - (nowMs % interval_ms)));
639 ATH_MSG_DEBUG("epoch " << epoch);
640 ATH_MSG_DEBUG("interval_ms" << interval_ms);
641 ATH_MSG_DEBUG("now_ms " << nowMs);
642 ATH_MSG_DEBUG("Sleeping for " << next_cycle.total_milliseconds() << " milliseconds till the next cycle");
643 conditionedSleep(std::chrono::milliseconds(next_cycle.total_milliseconds()), m_stopFlag);
644 }
645 ATH_MSG_INFO("Monitoring task stopped");
646}
647
648/**************************************************************************************/
649
650void WebdaqHistSvc::syncPublish(long int interval_ms, boost::posix_time::ptime epoch)
651{
652 //Sync the publication to a multple of the interval
653 //Code taken from TDAQ monsvc https://gitlab.cern.ch/atlas-tdaq-software/monsvc/-/blob/master/src/PeriodicScheduler.cxx?ref_type=heads#L163
654 boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
655 int now_ms = (now-epoch).total_milliseconds();
656 //If now_ms % interval_ms == 0 we skip a cycle. Too bad.
657 boost::posix_time::time_duration sync(boost::posix_time::milliseconds(interval_ms - (now_ms % interval_ms)));
658 //Do not sync if we are below 50 ms
659 if (sync.total_milliseconds() > 50){
660 std::this_thread::sleep_for(std::chrono::milliseconds(sync.total_milliseconds()));
661 }
662}
663
664void WebdaqHistSvc::conditionedSleep(std::chrono::milliseconds duration, const std::atomic<bool>& stopFlag) {
665 auto start = std::chrono::steady_clock::now();
666 while (true) {
667 if (stopFlag.load()) {
668 return;
669 }
670 auto elapsed = std::chrono::steady_clock::now() - start;
671 if (elapsed >= duration) {
672 break;
673 }
674 auto remaining = duration - std::chrono::duration_cast<std::chrono::milliseconds>(elapsed);
675 std::this_thread::sleep_for(std::min(std::chrono::milliseconds(500), remaining));
676 }
677}
678
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
OH histogram lock header file.
StatusCode WebdaqHistSvc::initialize ATLAS_NOT_THREAD_SAFE()
Install fatal handler with default options.
Define macros for attributes used to control the static checker.
Header file for AthHistogramAlgorithm.
static const std::string & type()
Incident type.
Definition Incidents.h:49
Gaudi::Property< std::string > m_includeType
boost::regex m_fastPublicationIncludeNameRegex
WebdaqHistSvc(const std::string &name, ISvcLocator *svc)
virtual StatusCode stop() override
virtual StatusCode getHist(const std::string &id, TH1 *&hist, size_t ind) const override
Gaudi::Property< std::string > m_excludeType
Gaudi::Property< std::string > m_excludeName
std::thread m_threadFast
boost::regex m_PublicationIncludeNameRegex
std::set< std::string > getSet(boost::regex) const
std::thread m_thread
Publication thread.
boost::regex m_includeTypeRegex
std::atomic< bool > m_stopFlag
Flag to stop the monitoring task.
virtual std::vector< std::string > getHists() const override
virtual void handle(const Incident &incident) override
boost::regex m_excludeNameRegex
LockedHandle< T > getShared_i(const std::string &id) const
virtual StatusCode getShared(const std::string &, LockedHandle< TH1 > &) const override
virtual bool existsHist(const std::string &name) const override
void conditionedSleep(std::chrono::milliseconds, const std::atomic< bool > &)
Sleep for a duration or until the stop flag is set.
std::atomic< bool > m_histoMapUpdatedFast
Flag to indicate when the histogram map is updated for the fast publication.
void syncPublish(long int, boost::posix_time::ptime)
Sync the publication to a multiple of the interval.
Gaudi::Property< int > m_numSlots
StatusCode regHist_i(std::unique_ptr< T > hist, const std::string &name, bool shared, THistID *&phid)
std::string m_tdaqOHServerName
The OH server name (TDAQ_OH_SERVER if defined, m_OHServerName otherwise)
virtual StatusCode finalize() override
boost::regex m_includeNameRegex
boost::regex m_excludeTypeRegex
LockedHandle< T > regShared_i(const std::string &id, std::unique_ptr< T > hist)
virtual StatusCode regHist(const std::string &name) override
ServiceHandle< Gaudi::Interfaces::IOptionsSvc > m_jobOptionsSvc
joboptions service
tbb::concurrent_hash_map< std::string, THistID > m_hists
Map of the registered histograms.
virtual StatusCode regShared(const std::string &, std::unique_ptr< TH1 >, LockedHandle< TH1 > &) override
std::string m_partition
The partition to publish to.
Gaudi::Property< int > m_intervalSecondsFast
Gaudi::Property< int > m_intervalSeconds
void monitoringTask(int, int, std::atomic< bool > &, boost::regex)
The actual publication Task.
T * getHist_i(const std::string &id, const size_t &ind, bool quiet=false) const
std::atomic< bool > m_histoMapUpdated
Flag to indicate when the histogram map is updated.
bool isObjectAllowed(const std::string &path, const TObject *o) const
Does the histogram follow the naming rules ?
virtual StatusCode deReg(TObject *obj) override
virtual StatusCode getTHists(TDirectory *td, TList &, bool recurse=false) const override
StatusCode getTHists_i(const std::string &name, TList &) const
Get TList of registered histograms.
Gaudi::Property< int > m_numSlotsFast
Gaudi::Property< std::string > m_includeName
STL class.
static void reset_histogram_mutex()
Reset (disable) histogram mutex.
static void set_histogram_mutex(std::mutex &mutex)
Set mutex to be used in oh_lock_histogram.
Scoped lock to be used for threaded histogram operations.
Helper struct that bundles the histogram, name and mutex.