ATLAS Offline Software
Loading...
Searching...
No Matches
WebdaqHistSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include "WebdaqHistSvc.h"
6
7#include "GaudiKernel/ISvcLocator.h"
8#include "GaudiKernel/IIncidentSvc.h"
10
13
14#include "hltinterface/IInfoRegister.h"
15#include "webdaq/webdaq-root.hpp"
16#include "webdaq/webdaq.hpp"
17
18#include "TError.h"
19#include "TGraph.h"
20#include "TH1.h"
21#include "TH2.h"
22#include "TH3.h"
23#include "TObject.h"
24#include "TROOT.h"
25#include "TTree.h"
26#include <TBufferJSON.h>
27
28#include <boost/date_time/posix_time/posix_time_types.hpp>
29#include <boost/date_time/posix_time/posix_time.hpp>
30#include <boost/date_time/gregorian/gregorian_types.hpp>
31
32#include <cstdlib>
33
34WebdaqHistSvc::WebdaqHistSvc(const std::string& name, ISvcLocator* svc) : base_class(name, svc)
35{}
36
37/**************************************************************************************/
38
39StatusCode WebdaqHistSvc::initialize ATLAS_NOT_THREAD_SAFE()
40{
41 // Protect against multiple instances of TROOT
42 if (0 == gROOT) {
43 static TROOT root("root", "ROOT I/O");
44 }
45 else {
46 ATH_MSG_VERBOSE("ROOT already initialized, debug = " << gDebug);
47 }
48
49 gErrorIgnoreLevel = kBreak; // ignore warnings see TError.h in ROOT base src
50
51 // compile regexes
52 m_excludeTypeRegex = boost::regex(m_excludeType.value());
53 m_includeTypeRegex = boost::regex(m_includeType.value());
54 m_excludeNameRegex = boost::regex(m_excludeName.value());
55 m_includeNameRegex = boost::regex(m_includeName.value());
56 m_PublicationIncludeNameRegex = boost::regex(m_PublicationIncludeName.value());
57 m_fastPublicationIncludeNameRegex = boost::regex(m_fastPublicationIncludeName.value());
58
59 // Retrieve and set OH mutex
60 ATH_MSG_INFO("Enabling use of OH histogram mutex");
61 static std::mutex mutex;
63 ATH_CHECK( m_jobOptionsSvc.retrieve() );
64
65 //Retireve enviroment variables
66 const char* tdaq_partition_cstr = std::getenv("TDAQ_PARTITION");
67 if (tdaq_partition_cstr != nullptr) {
68 m_partition = std::string(tdaq_partition_cstr);
69 ATH_MSG_INFO("Partition: " << m_partition);
70 } else {
71 ATH_MSG_ERROR("TDAQ_PARTITION environment variable not set");
72 return StatusCode::FAILURE;
73 }
74 const char* tdaqWebdaqBase_cstr = std::getenv("TDAQ_WEBDAQ_BASE");
75 if (tdaqWebdaqBase_cstr != nullptr) {
76 m_tdaqWebdaqBase = std::string(tdaqWebdaqBase_cstr);
77 ATH_MSG_INFO("TDAQ_WEBDAQ_BASE value: " << m_tdaqWebdaqBase);
78 } else {
79 ATH_MSG_ERROR("TDAQ_WEBDAQ_BASE environment variable not set! Is needed for the OH publication through webdaq");
80 return StatusCode::FAILURE;
81 }
82 const char* tdaq_oh_server = std::getenv("TDAQ_OH_SERVER");
83 if (tdaq_oh_server != nullptr) {
84 m_tdaqOHServerName = std::string(tdaq_oh_server);
85 } else {
86 m_tdaqOHServerName = m_OHServerName.value();
87 }
88 ATH_MSG_INFO("TDAQ_OH_SERVER value: " << m_tdaqOHServerName);
89 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
90 ATH_CHECK( incSvc.retrieve() );
91 incSvc->addListener(this, AthenaInterprocess::UpdateAfterFork::type());
92 return StatusCode::SUCCESS;
93}
94
95/**************************************************************************************/
96
97void WebdaqHistSvc::handle(const Incident& incident)
98{
99 if (incident.type() == AthenaInterprocess::UpdateAfterFork::type()) {
100 ATH_MSG_INFO("Going to initialize the monitoring Thread");
103 }
104}
105
106/**************************************************************************************/
107
109{
111 ATH_MSG_DEBUG("Stopping monitoring task");
112 m_stopFlag = true;
113 // Wait for the task to finish
114 if (m_thread.joinable()) {
115 ATH_MSG_DEBUG("Going to join the monitoring thread");
116 try {
117 m_thread.join();
118 }
119 catch (const std::exception& e) {
120 ATH_MSG_ERROR("Failed to join the monitoring thread: " << e.what());
121 return StatusCode::FAILURE;
122 }
123 }
124 if (m_threadFast.joinable()) {
125 ATH_MSG_DEBUG("Going to join the fast monitoring thread");
126 try {
127 m_threadFast.join();
128 }
129 catch (const std::exception& e) {
130 ATH_MSG_ERROR("Failed to join the fast monitoring thread: " << e.what());
131 return StatusCode::FAILURE;
132 }
133 }
134 ATH_MSG_DEBUG("Clearing list of histograms");
135 m_hists.clear();
136 m_histoMapUpdated = true;
138 return StatusCode::SUCCESS;
139}
140
141/**************************************************************************************/
142
144{
145 ATH_MSG_INFO("finalize");
146 // Reset OH mutex
148
149 return StatusCode::SUCCESS;
150}
151
152/**************************************************************************************/
153
154template <typename T>
155StatusCode WebdaqHistSvc::regHist_i(std::unique_ptr<T> hist_unique, const std::string& id,
156 bool shared, THistID*& phid)
157{
158 ATH_MSG_DEBUG("Registering histogram " << id);
159
160 phid = nullptr;
161 if (not isObjectAllowed(id, hist_unique.get())) {
162 return StatusCode::FAILURE;
163 }
164
165 if (hist_unique->Class()->InheritsFrom(TH1::Class())) {
166
167 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
168 if (m_hists.find(accessor, id)) {
169 ATH_MSG_ERROR("Histogram with name " << id << " already registered");
170 return StatusCode::FAILURE;
171 }
172 // Element not found, attempt to insert
173 if (!m_hists.insert(accessor, id)) {
174 ATH_MSG_ERROR("Failed to insert histogram with name " << id);
175 return StatusCode::FAILURE;
176 }
177 T* hist = hist_unique.release();
178 m_histoMapUpdated = true;
180 accessor->second = THistID(id, hist);
181 //finished
182 if (shared) accessor->second.mutex = std::make_unique<std::mutex>();
183 phid = &accessor->second;
184 ATH_MSG_DEBUG((shared ? "Shared histogram " : "Histogram ")
185 << hist->GetName() << " registered under " << id << " " << name());
186 } else {
187 ATH_MSG_ERROR("Cannot register " << hist_unique->ClassName()
188 << " because it does not inherit from TH1");
189 return StatusCode::FAILURE;
190 }
191 return StatusCode::SUCCESS;
192}
193
194/**************************************************************************************/
195
196template <typename T>
197LockedHandle<T> WebdaqHistSvc::regShared_i(const std::string& id, std::unique_ptr<T> hist)
198{
199 LockedHandle<T> lh(nullptr, nullptr);
200
201 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
202 // Check if the histogram is already registered
203 if (!m_hists.find(accessor, id)) {
204 // No histogram under that id yet
205 T* phist = hist.get();
206 THistID* phid = nullptr;
207 if (regHist_i(std::move(hist), id, true, phid).isSuccess()) {
208 if (phid) lh.set(phist, phid->mutex.get());
209 }
210 }
211 else
212 {
213 // Histogram already registered under that id
214 if (accessor->second.mutex == nullptr) {
215 ATH_MSG_ERROR("regShared: previously registered histogram \"" << id
216 << "\" was not marked shared");
217 }
218 T* phist = dynamic_cast<T*>(accessor->second.obj);
219 if (phist == nullptr) {
220 ATH_MSG_ERROR("regShared: unable to dcast retrieved shared hist \""
221 << id << "\" of type " << accessor->second.obj->IsA()->GetName()
222 << " to requested type " << System::typeinfoName(typeid(T)));
223 }
224 else {
225 lh.set(phist, accessor->second.mutex.get());
226 //hist is automatically deleted at end of method
227 }
228 }
229 return lh;
230}
231
232
233/**************************************************************************************/
234
235template <typename T>
236T* WebdaqHistSvc::getHist_i(const std::string& id, const size_t& /*ind*/, bool quiet) const
237{
238 ATH_MSG_DEBUG("Getting histogram " << id);
239
240 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
241 if (!m_hists.find(accessor, id) or accessor.empty()) {
242 if (!quiet) ATH_MSG_ERROR("could not locate Hist with id \"" << id << "\"");
243 return nullptr;
244 }
245
246 T* phist = dynamic_cast<T*>(accessor->second.obj);
247 if (phist == nullptr) {
248 ATH_MSG_ERROR("getHist: unable to dcast retrieved shared hist \""
249 << id << "\" of type " << accessor->second.obj->IsA()->GetName() << " to requested type "
250 << System::typeinfoName(typeid(T)));
251 return nullptr;
252 }
253 return phist;
254}
255
256/**************************************************************************************/
257
258StatusCode WebdaqHistSvc::getTHists_i(const std::string& dir, TList& tl) const
259{
260 for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
261 const std::string& id = it->first;
262 const THistID& h = it->second;
263 if (id.starts_with(dir)) { // histogram booking path starts from the dir
264 tl.Add(h.obj);
265 }
266 }
267 return StatusCode::SUCCESS;
268}
269
270/**************************************************************************************/
271
272template <typename T>
273LockedHandle<T> WebdaqHistSvc::getShared_i(const std::string& id) const
274{
275 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
276 if (m_hists.find(accessor, id)) {
277 auto * obj = accessor->second.obj;
278 if (accessor->second.mutex == nullptr) {
279 ATH_MSG_ERROR("getShared: found Hist with id \"" << id
280 << "\", but it's not marked as shared");
281 return {};
282 }
283 T* phist = dynamic_cast<T*>(obj);
284 if (phist == nullptr) {
285 const char* gotType = obj ? obj->IsA()->GetName() : "<null>";
286 ATH_MSG_ERROR("getShared: unable to dcast retrieved shared hist \""
287 << id << "\" of type " << gotType
288 << " to requested type " << System::typeinfoName(typeid(T)));
289 return {};
290 }
291 return LockedHandle<T>(phist, accessor->second.mutex.get());
292 }
293 ATH_MSG_ERROR("getShared: cannot find histogram with id \"" << id << "\"");
294 return {};
295}
296
297/**************************************************************************************/
298
299StatusCode WebdaqHistSvc::deReg(TObject* optr)
300{
301 // Find the relevant histogram and deregister it
302 for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
303 if (it->second.obj == optr) {
304 ATH_MSG_DEBUG("Found histogram " << optr << " booked under " << it->first
305 << " and will deregister it");
306 return deReg(it->first);
307 }
308 }
309 ATH_MSG_ERROR("Histogram with pointer " << optr << " not found in the histogram map");
310 return StatusCode::FAILURE;
311}
312
313/**************************************************************************************/
314
315StatusCode WebdaqHistSvc::deReg(const std::string& id)
316{
317 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
318 if (m_hists.find(accessor, id)) {
319 //Delete the histogram
320 accessor->second.obj->Delete();
321 m_hists.erase(accessor);
322 m_histoMapUpdated = true;
324 ATH_MSG_DEBUG("Deregistration of " << id << " done");
325 return StatusCode::SUCCESS;
326 }
327 ATH_MSG_ERROR("Deregistration failed: histogram with id \"" << id << "\" not found");
328 return StatusCode::FAILURE;
329}
330
331/**************************************************************************************/
332
333std::vector<std::string> WebdaqHistSvc::getHists() const
334{
335 std::vector<std::string> l;
336 l.reserve(m_hists.size());
337 for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
338 l.push_back(it->first);
339 }
340 return l;
341}
342
343/**************************************************************************************/
344
345std::set<std::string> WebdaqHistSvc::getSet(boost::regex nameSelect) const
346{
347 std::vector<std::string> l;
348 l.reserve(m_hists.size());
349 for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
350 if (boost::regex_match(it->first, nameSelect)) {
351 l.push_back(it->first);
352 }
353 }
354 ATH_MSG_DEBUG("Number of histograms matched: " << l.size());
355 std::set<std::string> HistoSet(l.begin(), l.end());
356 return HistoSet;
357}
358
359/**************************************************************************************/
360
361bool WebdaqHistSvc::isObjectAllowed(const std::string& path, const TObject* o) const
362{
363 boost::cmatch what;
364
365 if (not boost::regex_match(o->ClassName(), what, m_includeTypeRegex)) {
366 ATH_MSG_WARNING("Object " << path << " of type " << o->ClassName()
367 << " does NOT match IncludeType \"" << m_includeType << "\"");
368 return false;
369 }
370
371 if (boost::regex_match(o->ClassName(), what, m_excludeTypeRegex)) {
372 ATH_MSG_WARNING("Object " << path << " of type " << o->ClassName() << " matches ExcludeType \""
373 << m_excludeType << "\"");
374 return false;
375 }
376
377 if (not boost::regex_match(path.c_str(), what, m_includeNameRegex)) {
378 ATH_MSG_WARNING("Object " << path << " does NOT match IncludeName \"" << m_includeName << "\"");
379 return false;
380 }
381
382 if (boost::regex_match(path.c_str(), what, m_excludeNameRegex)) {
383 ATH_MSG_WARNING("Object " << path << " matches ExcludeName \"" << m_excludeName << "\"");
384 return false;
385 }
386
387 return true;
388}
389
390bool WebdaqHistSvc::existsHist(const std::string& name) const
391{
392 return (getHist_i<TH1>(name, 0, true) != nullptr);
393}
394
395/**************************************************************************************
396 * Typed interface methods
397 * All these are just forwarding to the templated xyz_i methods
398 **************************************************************************************/
399StatusCode WebdaqHistSvc::regHist(const std::string& id)
400{
401 std::unique_ptr<TH1> hist = nullptr;
402 THistID* hid = nullptr;
403 return regHist_i(std::move(hist), id, false, hid);
404}
405
406StatusCode WebdaqHistSvc::regHist(const std::string& id, std::unique_ptr<TH1> hist)
407{
408 THistID* hid = nullptr;
409 return regHist_i(std::move(hist), id, false, hid);
410}
411
412StatusCode WebdaqHistSvc::regHist(const std::string& id, TH1* hist_ptr)
413{
414 THistID* hid = nullptr;
415 std::unique_ptr<TH1> hist(hist_ptr);
416 return regHist_i(std::move(hist), id, false, hid);
417}
418
419/**************************************************************************************/
420
421StatusCode WebdaqHistSvc::getHist(const std::string& id, TH1*& hist, size_t ind) const
422{
423 hist = getHist_i<TH1>(id, ind);
424 return (hist != nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
425}
426
427StatusCode WebdaqHistSvc::getHist(const std::string& id, TH2*& hist, size_t ind) const
428{
429 hist = getHist_i<TH2>(id, ind);
430 return (hist != nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
431}
432
433StatusCode WebdaqHistSvc::getHist(const std::string& id, TH3*& hist, size_t ind) const
434{
435 hist = getHist_i<TH3>(id, ind);
436 return (hist != nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
437}
438
439/**************************************************************************************/
440
441StatusCode WebdaqHistSvc::getTHists(TDirectory* td, TList& tl, bool recurse) const
442{
443 if (recurse) ATH_MSG_DEBUG("Recursive flag is not supported in this implementation");
444 return getTHists_i(std::string(td->GetPath()), tl);
445}
446
447StatusCode WebdaqHistSvc::getTHists(const std::string& dir, TList& tl, bool recurse) const
448{
449 if (recurse) ATH_MSG_DEBUG("Recursive flag is not supported in this implementation");
450 return getTHists_i(dir, tl);
451}
452
453StatusCode WebdaqHistSvc::getTHists(TDirectory* td, TList& tl, bool recurse, bool reg)
454{
455 if (recurse || reg)
456 ATH_MSG_DEBUG("Recursive flag and automatic registration flag is not "
457 "supported in this implementation");
458 return getTHists_i(std::string(td->GetPath()), tl);
459}
460
461StatusCode WebdaqHistSvc::getTHists(const std::string& dir, TList& tl, bool recurse, bool reg)
462{
463 if (recurse || reg)
464 ATH_MSG_DEBUG("Recursive flag and automatic registration flag is not "
465 "supported in this implementation");
466 return getTHists_i(dir, tl);
467}
468
469/**************************************************************************************/
470
471StatusCode WebdaqHistSvc::regShared(const std::string& id, std::unique_ptr<TH1> hist,
472 LockedHandle<TH1>& lh)
473{
474 lh = regShared_i<TH1>(id, std::move(hist));
475 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
476}
477
478StatusCode WebdaqHistSvc::regShared(const std::string& id, std::unique_ptr<TH2> hist,
479 LockedHandle<TH2>& lh)
480{
481 lh = regShared_i<TH2>(id, std::move(hist));
482 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
483}
484
485StatusCode WebdaqHistSvc::regShared(const std::string& id, std::unique_ptr<TH3> hist,
486 LockedHandle<TH3>& lh)
487{
488 lh = regShared_i<TH3>(id, std::move(hist));
489 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
490}
491
492/**************************************************************************************/
493
494StatusCode WebdaqHistSvc::getShared(const std::string& id, LockedHandle<TH1>& lh) const
495{
496 lh = getShared_i<TH1>(id);
497 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
498}
499
500StatusCode WebdaqHistSvc::getShared(const std::string& id, LockedHandle<TH2>& lh) const
501{
502 lh = getShared_i<TH2>(id);
503 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
504}
505
506StatusCode WebdaqHistSvc::getShared(const std::string& id, LockedHandle<TH3>& lh) const
507{
508 lh = getShared_i<TH3>(id);
509 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
510}
511
512/**************************************************************************************/
513
524void WebdaqHistSvc::monitoringTask(int numSlots, int intervalSeconds, std::atomic<bool>& histoMapUpdated, boost::regex nameSelect)
525{
526 ATH_MSG_INFO("Started monitoring task for partition: " << m_partition << "and regex: " << nameSelect.str());
527 std::string appName = m_jobOptionsSvc->get("DataFlowConfig.DF_ApplicationName");
528 // OH doesn't allow multiple providers for a given server
529 // Need to modify the path for one of the threads to avoid the issue
530 if (nameSelect != boost::regex(".*"))
531 appName = appName + "_fast";
532
533 // Set the publication period
534 boost::posix_time::time_duration interval{boost::posix_time::seconds(intervalSeconds)};
535 ATH_MSG_DEBUG("Interval set to " << interval.total_seconds() << " seconds");
536 int interval_ms = interval.total_milliseconds();
537 if(numSlots == 0) numSlots = 1;
538 boost::posix_time::ptime epoch(boost::gregorian::date(2024,1,1));
539 // Sleep duration between slots (plus an extra slot for allowing a last sleep cycle)
540 boost::posix_time::time_duration slotSleepDuration = interval / (numSlots + 1);
541
542 // Create the Set of the histograms keys to order the histograms publication and reset the histoMapUpdated flag
543 std::set<std::string> HistoSet = getSet(nameSelect);
544 histoMapUpdated = false;
545
546 // Sync the publication to the period
547 syncPublish(interval_ms, epoch);
548 ATH_MSG_DEBUG("Monitoring task synched");
549
550 // Publication loop
551 while (!m_stopFlag)
552 {
553 // Check if the histograms map has been updated, and if so update the Set
554 if (histoMapUpdated)
555 {
556 ATH_MSG_DEBUG("Histo map updated, updating the Set");
557 HistoSet = getSet(nameSelect);
558 histoMapUpdated = false;
559 }
560 size_t totalHists = HistoSet.size();
561 ATH_MSG_DEBUG("Going to publish " << totalHists << " histograms");
562
563 // Divide the histograms in batches
564 size_t batchSize = (totalHists + numSlots - 1) / numSlots; // Ceiling division
565 ATH_MSG_DEBUG("Num of slots:" << numSlots << ", Interval_ms " << interval_ms << " milliseconds, Batch size: " << batchSize);
566
567 boost::posix_time::ptime start_time = boost::posix_time::microsec_clock::universal_time();
568 int counter = 0;
569 int BatchCounter = 0;
570 auto it = HistoSet.begin();
571 while(it != HistoSet.end())
572 {
573 boost::posix_time::ptime slot_start_time = boost::posix_time::microsec_clock::universal_time();
574 //Batch publication
575 ATH_MSG_DEBUG("Batch publication number " << BatchCounter << " starting.");
576 for(size_t j = 0; j < batchSize; ++j)
577 {
578 if(it == HistoSet.end())
579 {
580 break;
581 }
582 const std::string& id = *it;
583 std::string path = appName + '.' + id;
584 ATH_MSG_DEBUG("Publishing to " << m_partition << " Histogram " << path << " to the OH server " << m_tdaqOHServerName);
585 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
586 if (!m_hists.find(accessor, id)) {
587 ATH_MSG_WARNING("Histogram with name " << id << " not found in histogram map (probably deregistered).");
588 it++;
589 continue;
590 }
591 else
592 {
593 ATH_MSG_DEBUG("Histogram found in map, going to lock mutex and then publish it");
594 //Here we clone the histogram to avoid locking the OH mutex during the whole publication
595 TObject* obj = nullptr;
596 {
597 //Locking the OH mutex before touching the Histogram
599 obj = accessor->second.obj->Clone();
600 }
601 if (obj == nullptr) {
602 ATH_MSG_ERROR("Failed to clone histogram " << id);
603 it++;
604 continue;
605 }
606 if (!webdaq::oh::put(m_partition, m_tdaqOHServerName, path, obj)) {
607 ATH_MSG_ERROR("Histogram publishing failed !");
608 }
609 //Delete the cloned histogram. This was creating a memory leak
610 ATH_MSG_DEBUG("Deleting cloned histogram");
611 delete obj;
612 }
613 it++;
614 counter++;
615 }
616 ATH_MSG_DEBUG("Batch publication completed, " << counter << " histograms published");
617 // Sleep for slotSleepDuration - slot publication time, unless it's the last slot
618 if (it != HistoSet.end()) {
619 boost::posix_time::ptime slot_end_time = boost::posix_time::microsec_clock::universal_time();
620 int slot_sleep_time = slotSleepDuration.total_milliseconds() - (slot_end_time-slot_start_time).total_milliseconds();
621 if (slot_sleep_time > 0) {
622 ATH_MSG_DEBUG("Sleeping for " << slot_sleep_time << " seconds before publishing the next batch");
623 conditionedSleep(std::chrono::milliseconds(slot_sleep_time), m_stopFlag);
624 }
625 }
626 BatchCounter++;
627 }
628
629 //check if we exceeded the publication period
630 boost::posix_time::ptime end_time = boost::posix_time::microsec_clock::universal_time();
631 if (boost::posix_time::time_duration(end_time-start_time) > interval) {
632 ATH_MSG_WARNING("Publication deadline missed, cycle exceeded the interval.. Total publication time "
633 << boost::posix_time::to_simple_string(end_time-start_time));
634 }
635 ATH_MSG_DEBUG("Completed the publication of " << counter << " histograms. Publication time: " << boost::posix_time::to_simple_string(end_time-start_time));
636
637 //sleep till the next cycle
638 boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
639 int nowMs = (now-epoch).total_milliseconds();
640 boost::posix_time::time_duration next_cycle(boost::posix_time::milliseconds(interval_ms - (nowMs % interval_ms)));
641 ATH_MSG_DEBUG("epoch " << epoch);
642 ATH_MSG_DEBUG("interval_ms" << interval_ms);
643 ATH_MSG_DEBUG("now_ms " << nowMs);
644 ATH_MSG_DEBUG("Sleeping for " << next_cycle.total_milliseconds() << " milliseconds till the next cycle");
645 conditionedSleep(std::chrono::milliseconds(next_cycle.total_milliseconds()), m_stopFlag);
646 }
647 ATH_MSG_INFO("Monitoring task stopped");
648}
649
650/**************************************************************************************/
651
652void WebdaqHistSvc::syncPublish(long int interval_ms, boost::posix_time::ptime epoch)
653{
654 //Sync the publication to a multple of the interval
655 //Code taken from TDAQ monsvc https://gitlab.cern.ch/atlas-tdaq-software/monsvc/-/blob/master/src/PeriodicScheduler.cxx?ref_type=heads#L163
656 boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
657 int now_ms = (now-epoch).total_milliseconds();
658 //If now_ms % interval_ms == 0 we skip a cycle. Too bad.
659 boost::posix_time::time_duration sync(boost::posix_time::milliseconds(interval_ms - (now_ms % interval_ms)));
660 //Do not sync if we are below 50 ms
661 if (sync.total_milliseconds() > 50){
662 std::this_thread::sleep_for(std::chrono::milliseconds(sync.total_milliseconds()));
663 }
664}
665
666void WebdaqHistSvc::conditionedSleep(std::chrono::milliseconds duration, const std::atomic<bool>& stopFlag) {
667 auto start = std::chrono::steady_clock::now();
668 while (true) {
669 if (stopFlag.load()) {
670 return;
671 }
672 auto elapsed = std::chrono::steady_clock::now() - start;
673 if (elapsed >= duration) {
674 break;
675 }
676 auto remaining = duration - std::chrono::duration_cast<std::chrono::milliseconds>(elapsed);
677 std::this_thread::sleep_for(std::min(std::chrono::milliseconds(500), remaining));
678 }
679}
680
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
OH histogram lock header file.
StatusCode WebdaqHistSvc::initialize ATLAS_NOT_THREAD_SAFE()
Install fatal handler with default options.
Define macros for attributes used to control the static checker.
Header file for AthHistogramAlgorithm.
static const std::string & type()
Incident type.
Definition Incidents.h:49
Gaudi::Property< std::string > m_includeType
boost::regex m_fastPublicationIncludeNameRegex
WebdaqHistSvc(const std::string &name, ISvcLocator *svc)
virtual StatusCode stop() override
virtual StatusCode getHist(const std::string &id, TH1 *&hist, size_t ind) const override
Gaudi::Property< std::string > m_excludeType
Gaudi::Property< std::string > m_excludeName
std::thread m_threadFast
boost::regex m_PublicationIncludeNameRegex
std::set< std::string > getSet(boost::regex) const
std::thread m_thread
Publication thread.
boost::regex m_includeTypeRegex
std::atomic< bool > m_stopFlag
Flag to stop the monitoring task.
virtual std::vector< std::string > getHists() const override
virtual void handle(const Incident &incident) override
boost::regex m_excludeNameRegex
LockedHandle< T > getShared_i(const std::string &id) const
virtual StatusCode getShared(const std::string &, LockedHandle< TH1 > &) const override
virtual bool existsHist(const std::string &name) const override
void conditionedSleep(std::chrono::milliseconds, const std::atomic< bool > &)
Sleep for a duration or until the stop flag is set.
std::atomic< bool > m_histoMapUpdatedFast
Flag to indicate when the histogram map is updated for the fast publication.
void syncPublish(long int, boost::posix_time::ptime)
Sync the publication to a multiple of the interval.
Gaudi::Property< int > m_numSlots
StatusCode regHist_i(std::unique_ptr< T > hist, const std::string &name, bool shared, THistID *&phid)
std::string m_tdaqOHServerName
The OH server name (TDAQ_OH_SERVER if defined, m_OHServerName otherwise)
virtual StatusCode finalize() override
boost::regex m_includeNameRegex
boost::regex m_excludeTypeRegex
LockedHandle< T > regShared_i(const std::string &id, std::unique_ptr< T > hist)
virtual StatusCode regHist(const std::string &name) override
ServiceHandle< Gaudi::Interfaces::IOptionsSvc > m_jobOptionsSvc
joboptions service
tbb::concurrent_hash_map< std::string, THistID > m_hists
Map of the registered histograms.
virtual StatusCode regShared(const std::string &, std::unique_ptr< TH1 >, LockedHandle< TH1 > &) override
std::string m_partition
The partition to publish to.
Gaudi::Property< int > m_intervalSecondsFast
Gaudi::Property< int > m_intervalSeconds
void monitoringTask(int, int, std::atomic< bool > &, boost::regex)
The actual publication Task.
T * getHist_i(const std::string &id, const size_t &ind, bool quiet=false) const
std::atomic< bool > m_histoMapUpdated
Flag to indicate when the histogram map is updated.
bool isObjectAllowed(const std::string &path, const TObject *o) const
Does the histogram follow the naming rules ?
virtual StatusCode deReg(TObject *obj) override
virtual StatusCode getTHists(TDirectory *td, TList &, bool recurse=false) const override
StatusCode getTHists_i(const std::string &name, TList &) const
Get TList of registered histograms.
Gaudi::Property< int > m_numSlotsFast
Gaudi::Property< std::string > m_includeName
STL class.
static void reset_histogram_mutex()
Reset (disable) histogram mutex.
static void set_histogram_mutex(std::mutex &mutex)
Set mutex to be used in oh_lock_histogram.
Scoped lock to be used for threaded histogram operations.
Helper struct that bundles the histogram, name and mutex.
std::unique_ptr< std::mutex > mutex