ATLAS Offline Software
Loading...
Searching...
No Matches
WebdaqHistSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include "WebdaqHistSvc.h"
6
7#include "GaudiKernel/ISvcLocator.h"
8#include "GaudiKernel/IIncidentSvc.h"
10
13
14#include "hltinterface/IInfoRegister.h"
15#include "webdaq/webdaq-root.hpp"
16#include "webdaq/webdaq.hpp"
17
18#include "TError.h"
19#include "TGraph.h"
20#include "TH1.h"
21#include "TH2.h"
22#include "TH3.h"
23#include "TObject.h"
24#include "TROOT.h"
25#include "TTree.h"
26#include <TBufferJSON.h>
27
28#include <boost/date_time/posix_time/posix_time_types.hpp>
29#include <boost/date_time/posix_time/posix_time.hpp>
30#include <boost/date_time/gregorian/gregorian_types.hpp>
31
32#include <cstdlib>
33
34WebdaqHistSvc::WebdaqHistSvc(const std::string& name, ISvcLocator* svc) : base_class(name, svc)
35{}
36
37/**************************************************************************************/
38
39StatusCode WebdaqHistSvc::initialize ATLAS_NOT_THREAD_SAFE()
40{
41 // Protect against multiple instances of TROOT
42 if (0 == gROOT) {
43 static TROOT root("root", "ROOT I/O");
44 }
45 else {
46 ATH_MSG_VERBOSE("ROOT already initialized, debug = " << gDebug);
47 }
48
49 gErrorIgnoreLevel = kBreak; // ignore warnings see TError.h in ROOT base src
50
51 // compile regexes
52 m_excludeTypeRegex = boost::regex(m_excludeType.value());
53 m_includeTypeRegex = boost::regex(m_includeType.value());
54 m_excludeNameRegex = boost::regex(m_excludeName.value());
55 m_includeNameRegex = boost::regex(m_includeName.value());
56 m_PublicationIncludeNameRegex = boost::regex(m_PublicationIncludeName.value());
57 m_fastPublicationIncludeNameRegex = boost::regex(m_fastPublicationIncludeName.value());
58
59 // Retrieve and set OH mutex
60 ATH_MSG_INFO("Enabling use of OH histogram mutex");
61 static std::mutex mutex;
63 ATH_CHECK( m_jobOptionsSvc.retrieve() );
64
65 //Retireve enviroment variables
66 const char* tdaq_partition_cstr = std::getenv("TDAQ_PARTITION");
67 if (tdaq_partition_cstr != nullptr) {
68 m_partition = std::string(tdaq_partition_cstr);
69 ATH_MSG_INFO("Partition: " << m_partition);
70 } else {
71 ATH_MSG_ERROR("TDAQ_PARTITION environment variable not set");
72 return StatusCode::FAILURE;
73 }
74 const char* tdaqWebdaqBase_cstr = std::getenv("TDAQ_WEBDAQ_BASE");
75 if (tdaqWebdaqBase_cstr != nullptr) {
76 m_tdaqWebdaqBase = std::string(tdaqWebdaqBase_cstr);
77 ATH_MSG_INFO("TDAQ_WEBDAQ_BASE value: " << m_tdaqWebdaqBase);
78 } else {
79 ATH_MSG_ERROR("TDAQ_WEBDAQ_BASE environment variable not set! Is needed for the OH publication through webdaq");
80 return StatusCode::FAILURE;
81 }
82 const char* tdaq_oh_server = std::getenv("TDAQ_OH_SERVER");
83 if (tdaq_oh_server != nullptr) {
84 m_tdaqOHServerName = std::string(tdaq_oh_server);
85 } else {
86 m_tdaqOHServerName = m_OHServerName.value();
87 }
88 ATH_MSG_INFO("TDAQ_OH_SERVER value: " << m_tdaqOHServerName);
89 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
90 ATH_CHECK( incSvc.retrieve() );
91 incSvc->addListener(this, AthenaInterprocess::UpdateAfterFork::type());
92 return StatusCode::SUCCESS;
93}
94
95/**************************************************************************************/
96
97void WebdaqHistSvc::handle(const Incident& incident)
98{
99 if (incident.type() == AthenaInterprocess::UpdateAfterFork::type()) {
100 ATH_MSG_INFO("Going to initialize the monitoring Thread");
103 }
104}
105
106/**************************************************************************************/
107
109{
111 ATH_MSG_DEBUG("Stopping monitoring task");
112 m_stopFlag = true;
113 // Wait for the task to finish
114 if (m_thread.joinable()) {
115 ATH_MSG_DEBUG("Going to join the monitoring thread");
116 try {
117 m_thread.join();
118 }
119 catch (const std::exception& e) {
120 ATH_MSG_ERROR("Failed to join the monitoring thread: " << e.what());
121 return StatusCode::FAILURE;
122 }
123 }
124 if (m_threadFast.joinable()) {
125 ATH_MSG_DEBUG("Going to join the fast monitoring thread");
126 try {
127 m_threadFast.join();
128 }
129 catch (const std::exception& e) {
130 ATH_MSG_ERROR("Failed to join the fast monitoring thread: " << e.what());
131 return StatusCode::FAILURE;
132 }
133 }
134 ATH_MSG_DEBUG("Clearing list of histograms");
135 m_hists.clear();
136 m_histoMapUpdated = true;
138 return StatusCode::SUCCESS;
139}
140
141/**************************************************************************************/
142
144{
145 ATH_MSG_INFO("finalize");
146 // Reset OH mutex
148
149 return StatusCode::SUCCESS;
150}
151
152/**************************************************************************************/
153
154template <typename T>
155StatusCode WebdaqHistSvc::regHist_i(std::unique_ptr<T> hist_unique, const std::string& id,
156 bool shared, THistID*& phid)
157{
158 ATH_MSG_DEBUG("Registering histogram " << id);
159
160 phid = nullptr;
161 if (not isObjectAllowed(id, hist_unique.get())) {
162 return StatusCode::FAILURE;
163 }
164
165 if (hist_unique->Class()->InheritsFrom(TH1::Class())) {
166
167 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
168 if (m_hists.find(accessor, id)) {
169 ATH_MSG_ERROR("Histogram with name " << id << " already registered");
170 return StatusCode::FAILURE;
171 }
172 // Element not found, attempt to insert
173 if (!m_hists.insert(accessor, id)) {
174 ATH_MSG_ERROR("Failed to insert histogram with name " << id);
175 return StatusCode::FAILURE;
176 }
177 T* hist = hist_unique.release();
178 m_histoMapUpdated = true;
180 accessor->second = THistID(id, hist);
181 //finished
182 if (shared) accessor->second.mutex = std::make_unique<std::mutex>();
183 phid = &accessor->second;
184 ATH_MSG_DEBUG((shared ? "Shared histogram " : "Histogram ")
185 << hist->GetName() << " registered under " << id << " " << name());
186 } else {
187 ATH_MSG_ERROR("Cannot register " << hist_unique->ClassName()
188 << " because it does not inherit from TH1");
189 return StatusCode::FAILURE;
190 }
191 return StatusCode::SUCCESS;
192}
193
194/**************************************************************************************/
195
196template <typename T>
197LockedHandle<T> WebdaqHistSvc::regShared_i(const std::string& id, std::unique_ptr<T> hist)
198{
199 LockedHandle<T> lh(nullptr, nullptr);
200
201 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
202 // Check if the histogram is already registered
203 if (!m_hists.find(accessor, id)) {
204 // No histogram under that id yet
205 T* phist = hist.get();
206 THistID* phid = nullptr;
207 if (regHist_i(std::move(hist), id, true, phid).isSuccess()) {
208 if (phid) lh.set(phist, phid->mutex.get());
209 }
210 }
211 else
212 {
213 // Histogram already registered under that id
214 if (accessor->second.mutex == nullptr) {
215 ATH_MSG_ERROR("regShared: previously registered histogram \"" << id
216 << "\" was not marked shared");
217 }
218 T* phist = dynamic_cast<T*>(accessor->second.obj);
219 if (phist == nullptr) {
220 ATH_MSG_ERROR("regShared: unable to dcast retrieved shared hist \""
221 << id << "\" of type " << accessor->second.obj->IsA()->GetName()
222 << " to requested type " << System::typeinfoName(typeid(T)));
223 }
224 else {
225 lh.set(phist, accessor->second.mutex.get());
226 //hist is automatically deleted at end of method
227 }
228 }
229 return lh;
230}
231
232
233/**************************************************************************************/
234
235template <typename T>
236T* WebdaqHistSvc::getHist_i(const std::string& id, const size_t& /*ind*/, bool quiet) const
237{
238 ATH_MSG_DEBUG("Getting histogram " << id);
239
240 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
241 if (!m_hists.find(accessor, id) or accessor.empty()) {
242 if (!quiet) ATH_MSG_ERROR("could not locate Hist with id \"" << id << "\"");
243 return nullptr;
244 }
245
246 T* phist = dynamic_cast<T*>(accessor->second.obj);
247 if (phist == nullptr) {
248 ATH_MSG_ERROR("getHist: unable to dcast retrieved shared hist \""
249 << id << "\" of type " << accessor->second.obj->IsA()->GetName() << " to requested type "
250 << System::typeinfoName(typeid(T)));
251 return nullptr;
252 }
253 return phist;
254}
255
256/**************************************************************************************/
257
258StatusCode WebdaqHistSvc::getTHists_i(const std::string& dir, TList& tl) const
259{
260 for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
261 const std::string& id = it->first;
262 const THistID& h = it->second;
263 if (id.starts_with(dir)) { // histogram booking path starts from the dir
264 tl.Add(h.obj);
265 }
266 }
267 return StatusCode::SUCCESS;
268}
269
270/**************************************************************************************/
271
272template <typename T>
273LockedHandle<T> WebdaqHistSvc::getShared_i(const std::string& id) const
274{
275 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
276 if (m_hists.find(accessor, id)) {
277 //coverity[FORWARD_NULL]
278 auto * obj = accessor->second.obj;
279 //accessor is implicitly valid
280 //coverity[FORWARD_NULL]
281 if (accessor->second.mutex == nullptr) {
282 ATH_MSG_ERROR("getShared: found Hist with id \"" << id
283 << "\", but it's not marked as shared");
284 return {};
285 }
286 T* phist = dynamic_cast<T*>(obj);
287 if (phist == nullptr) {
288 const char* gotType = obj ? obj->IsA()->GetName() : "<null>";
289 ATH_MSG_ERROR("getShared: unable to dcast retrieved shared hist \""
290 << id << "\" of type " << gotType
291 << " to requested type " << System::typeinfoName(typeid(T)));
292 return {};
293 }
294 return LockedHandle<T>(phist, accessor->second.mutex.get());
295 }
296 ATH_MSG_ERROR("getShared: cannot find histogram with id \"" << id << "\"");
297 return {};
298}
299
300/**************************************************************************************/
301
302StatusCode WebdaqHistSvc::deReg(TObject* optr)
303{
304 // Find the relevant histogram and deregister it
305 for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
306 if (it->second.obj == optr) {
307 ATH_MSG_DEBUG("Found histogram " << optr << " booked under " << it->first
308 << " and will deregister it");
309 return deReg(it->first);
310 }
311 }
312 ATH_MSG_ERROR("Histogram with pointer " << optr << " not found in the histogram map");
313 return StatusCode::FAILURE;
314}
315
316/**************************************************************************************/
317
318StatusCode WebdaqHistSvc::deReg(const std::string& id)
319{
320 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
321 if (m_hists.find(accessor, id)) {
322 //Delete the histogram
323 accessor->second.obj->Delete();
324 m_hists.erase(accessor);
325 m_histoMapUpdated = true;
327 ATH_MSG_DEBUG("Deregistration of " << id << " done");
328 return StatusCode::SUCCESS;
329 }
330 ATH_MSG_ERROR("Deregistration failed: histogram with id \"" << id << "\" not found");
331 return StatusCode::FAILURE;
332}
333
334/**************************************************************************************/
335
336std::vector<std::string> WebdaqHistSvc::getHists() const
337{
338 std::vector<std::string> l;
339 l.reserve(m_hists.size());
340 for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
341 l.push_back(it->first);
342 }
343 return l;
344}
345
346/**************************************************************************************/
347
348std::set<std::string> WebdaqHistSvc::getSet(boost::regex nameSelect) const
349{
350 std::vector<std::string> l;
351 l.reserve(m_hists.size());
352 for (auto it = m_hists.begin(); it != m_hists.end(); ++it) {
353 if (boost::regex_match(it->first, nameSelect)) {
354 l.push_back(it->first);
355 }
356 }
357 ATH_MSG_DEBUG("Number of histograms matched: " << l.size());
358 std::set<std::string> HistoSet(l.begin(), l.end());
359 return HistoSet;
360}
361
362/**************************************************************************************/
363
364bool WebdaqHistSvc::isObjectAllowed(const std::string& path, const TObject* o) const
365{
366 boost::cmatch what;
367
368 if (not boost::regex_match(o->ClassName(), what, m_includeTypeRegex)) {
369 ATH_MSG_WARNING("Object " << path << " of type " << o->ClassName()
370 << " does NOT match IncludeType \"" << m_includeType << "\"");
371 return false;
372 }
373
374 if (boost::regex_match(o->ClassName(), what, m_excludeTypeRegex)) {
375 ATH_MSG_WARNING("Object " << path << " of type " << o->ClassName() << " matches ExcludeType \""
376 << m_excludeType << "\"");
377 return false;
378 }
379
380 if (not boost::regex_match(path.c_str(), what, m_includeNameRegex)) {
381 ATH_MSG_WARNING("Object " << path << " does NOT match IncludeName \"" << m_includeName << "\"");
382 return false;
383 }
384
385 if (boost::regex_match(path.c_str(), what, m_excludeNameRegex)) {
386 ATH_MSG_WARNING("Object " << path << " matches ExcludeName \"" << m_excludeName << "\"");
387 return false;
388 }
389
390 return true;
391}
392
393bool WebdaqHistSvc::existsHist(const std::string& name) const
394{
395 return (getHist_i<TH1>(name, 0, true) != nullptr);
396}
397
398/**************************************************************************************
399 * Typed interface methods
400 * All these are just forwarding to the templated xyz_i methods
401 **************************************************************************************/
402StatusCode WebdaqHistSvc::regHist(const std::string& id)
403{
404 std::unique_ptr<TH1> hist = nullptr;
405 THistID* hid = nullptr;
406 return regHist_i(std::move(hist), id, false, hid);
407}
408
409StatusCode WebdaqHistSvc::regHist(const std::string& id, std::unique_ptr<TH1> hist)
410{
411 THistID* hid = nullptr;
412 return regHist_i(std::move(hist), id, false, hid);
413}
414
415StatusCode WebdaqHistSvc::regHist(const std::string& id, TH1* hist_ptr)
416{
417 THistID* hid = nullptr;
418 std::unique_ptr<TH1> hist(hist_ptr);
419 return regHist_i(std::move(hist), id, false, hid);
420}
421
422/**************************************************************************************/
423
424StatusCode WebdaqHistSvc::getHist(const std::string& id, TH1*& hist, size_t ind) const
425{
426 hist = getHist_i<TH1>(id, ind);
427 return (hist != nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
428}
429
430StatusCode WebdaqHistSvc::getHist(const std::string& id, TH2*& hist, size_t ind) const
431{
432 hist = getHist_i<TH2>(id, ind);
433 return (hist != nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
434}
435
436StatusCode WebdaqHistSvc::getHist(const std::string& id, TH3*& hist, size_t ind) const
437{
438 hist = getHist_i<TH3>(id, ind);
439 return (hist != nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
440}
441
442/**************************************************************************************/
443
444StatusCode WebdaqHistSvc::getTHists(TDirectory* td, TList& tl, bool recurse) const
445{
446 if (recurse) ATH_MSG_DEBUG("Recursive flag is not supported in this implementation");
447 return getTHists_i(std::string(td->GetPath()), tl);
448}
449
450StatusCode WebdaqHistSvc::getTHists(const std::string& dir, TList& tl, bool recurse) const
451{
452 if (recurse) ATH_MSG_DEBUG("Recursive flag is not supported in this implementation");
453 return getTHists_i(dir, tl);
454}
455
456StatusCode WebdaqHistSvc::getTHists(TDirectory* td, TList& tl, bool recurse, bool reg)
457{
458 if (recurse || reg)
459 ATH_MSG_DEBUG("Recursive flag and automatic registration flag is not "
460 "supported in this implementation");
461 return getTHists_i(std::string(td->GetPath()), tl);
462}
463
464StatusCode WebdaqHistSvc::getTHists(const std::string& dir, TList& tl, bool recurse, bool reg)
465{
466 if (recurse || reg)
467 ATH_MSG_DEBUG("Recursive flag and automatic registration flag is not "
468 "supported in this implementation");
469 return getTHists_i(dir, tl);
470}
471
472/**************************************************************************************/
473
474StatusCode WebdaqHistSvc::regShared(const std::string& id, std::unique_ptr<TH1> hist,
475 LockedHandle<TH1>& lh)
476{
477 lh = regShared_i<TH1>(id, std::move(hist));
478 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
479}
480
481StatusCode WebdaqHistSvc::regShared(const std::string& id, std::unique_ptr<TH2> hist,
482 LockedHandle<TH2>& lh)
483{
484 lh = regShared_i<TH2>(id, std::move(hist));
485 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
486}
487
488StatusCode WebdaqHistSvc::regShared(const std::string& id, std::unique_ptr<TH3> hist,
489 LockedHandle<TH3>& lh)
490{
491 lh = regShared_i<TH3>(id, std::move(hist));
492 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
493}
494
495/**************************************************************************************/
496
497StatusCode WebdaqHistSvc::getShared(const std::string& id, LockedHandle<TH1>& lh) const
498{
499 lh = getShared_i<TH1>(id);
500 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
501}
502
503StatusCode WebdaqHistSvc::getShared(const std::string& id, LockedHandle<TH2>& lh) const
504{
505 lh = getShared_i<TH2>(id);
506 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
507}
508
509StatusCode WebdaqHistSvc::getShared(const std::string& id, LockedHandle<TH3>& lh) const
510{
511 lh = getShared_i<TH3>(id);
512 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
513}
514
515/**************************************************************************************/
516
527void WebdaqHistSvc::monitoringTask(int numSlots, int intervalSeconds, std::atomic<bool>& histoMapUpdated, boost::regex nameSelect)
528{
529 ATH_MSG_INFO("Started monitoring task for partition: " << m_partition << "and regex: " << nameSelect.str());
530 std::string appName = m_jobOptionsSvc->get("DataFlowConfig.DF_ApplicationName");
531 // OH doesn't allow multiple providers for a given server
532 // Need to modify the path for one of the threads to avoid the issue
533 if (nameSelect != boost::regex(".*"))
534 appName = appName + "_fast";
535
536 // Set the publication period
537 boost::posix_time::time_duration interval{boost::posix_time::seconds(intervalSeconds)};
538 ATH_MSG_DEBUG("Interval set to " << interval.total_seconds() << " seconds");
539 int interval_ms = interval.total_milliseconds();
540 if(numSlots == 0) numSlots = 1;
541 boost::posix_time::ptime epoch(boost::gregorian::date(2024,1,1));
542 // Sleep duration between slots (plus an extra slot for allowing a last sleep cycle)
543 boost::posix_time::time_duration slotSleepDuration = interval / (numSlots + 1);
544
545 // Create the Set of the histograms keys to order the histograms publication and reset the histoMapUpdated flag
546 std::set<std::string> HistoSet = getSet(nameSelect);
547 histoMapUpdated = false;
548
549 // Sync the publication to the period
550 syncPublish(interval_ms, epoch);
551 ATH_MSG_DEBUG("Monitoring task synched");
552
553 // Publication loop
554 while (!m_stopFlag)
555 {
556 // Check if the histograms map has been updated, and if so update the Set
557 if (histoMapUpdated)
558 {
559 ATH_MSG_DEBUG("Histo map updated, updating the Set");
560 HistoSet = getSet(nameSelect);
561 histoMapUpdated = false;
562 }
563 size_t totalHists = HistoSet.size();
564 ATH_MSG_DEBUG("Going to publish " << totalHists << " histograms");
565
566 // Divide the histograms in batches
567 size_t batchSize = (totalHists + numSlots - 1) / numSlots; // Ceiling division
568 ATH_MSG_DEBUG("Num of slots:" << numSlots << ", Interval_ms " << interval_ms << " milliseconds, Batch size: " << batchSize);
569
570 boost::posix_time::ptime start_time = boost::posix_time::microsec_clock::universal_time();
571 int counter = 0;
572 int BatchCounter = 0;
573 auto it = HistoSet.begin();
574 while(it != HistoSet.end())
575 {
576 boost::posix_time::ptime slot_start_time = boost::posix_time::microsec_clock::universal_time();
577 //Batch publication
578 ATH_MSG_DEBUG("Batch publication number " << BatchCounter << " starting.");
579 for(size_t j = 0; j < batchSize; ++j)
580 {
581 if(it == HistoSet.end())
582 {
583 break;
584 }
585 const std::string& id = *it;
586 std::string path = appName + '.' + id;
587 ATH_MSG_DEBUG("Publishing to " << m_partition << " Histogram " << path << " to the OH server " << m_tdaqOHServerName);
588 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
589 if (!m_hists.find(accessor, id)) {
590 ATH_MSG_WARNING("Histogram with name " << id << " not found in histogram map (probably deregistered).");
591 it++;
592 continue;
593 }
594 else
595 {
596 ATH_MSG_DEBUG("Histogram found in map, going to lock mutex and then publish it");
597 //Here we clone the histogram to avoid locking the OH mutex during the whole publication
598 TObject* obj = nullptr;
599 {
600 //Locking the OH mutex before touching the Histogram
602 //accessor is implicitly valid
603 //coverity[FORWARD_NULL]
604 obj = accessor->second.obj->Clone();
605 }
606 if (obj == nullptr) {
607 ATH_MSG_ERROR("Failed to clone histogram " << id);
608 it++;
609 continue;
610 }
611 if (!webdaq::oh::put(m_partition, m_tdaqOHServerName, path, obj)) {
612 ATH_MSG_ERROR("Histogram publishing failed !");
613 }
614 //Delete the cloned histogram. This was creating a memory leak
615 ATH_MSG_DEBUG("Deleting cloned histogram");
616 delete obj;
617 }
618 it++;
619 counter++;
620 }
621 ATH_MSG_DEBUG("Batch publication completed, " << counter << " histograms published");
622 // Sleep for slotSleepDuration - slot publication time, unless it's the last slot
623 if (it != HistoSet.end()) {
624 boost::posix_time::ptime slot_end_time = boost::posix_time::microsec_clock::universal_time();
625 int slot_sleep_time = slotSleepDuration.total_milliseconds() - (slot_end_time-slot_start_time).total_milliseconds();
626 if (slot_sleep_time > 0) {
627 ATH_MSG_DEBUG("Sleeping for " << slot_sleep_time << " seconds before publishing the next batch");
628 conditionedSleep(std::chrono::milliseconds(slot_sleep_time), m_stopFlag);
629 }
630 }
631 BatchCounter++;
632 }
633
634 //check if we exceeded the publication period
635 boost::posix_time::ptime end_time = boost::posix_time::microsec_clock::universal_time();
636 if (boost::posix_time::time_duration(end_time-start_time) > interval) {
637 ATH_MSG_WARNING("Publication deadline missed, cycle exceeded the interval.. Total publication time "
638 << boost::posix_time::to_simple_string(end_time-start_time));
639 }
640 ATH_MSG_DEBUG("Completed the publication of " << counter << " histograms. Publication time: " << boost::posix_time::to_simple_string(end_time-start_time));
641
642 //sleep till the next cycle
643 boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
644 int nowMs = (now-epoch).total_milliseconds();
645 boost::posix_time::time_duration next_cycle(boost::posix_time::milliseconds(interval_ms - (nowMs % interval_ms)));
646 ATH_MSG_DEBUG("epoch " << epoch);
647 ATH_MSG_DEBUG("interval_ms" << interval_ms);
648 ATH_MSG_DEBUG("now_ms " << nowMs);
649 ATH_MSG_DEBUG("Sleeping for " << next_cycle.total_milliseconds() << " milliseconds till the next cycle");
650 conditionedSleep(std::chrono::milliseconds(next_cycle.total_milliseconds()), m_stopFlag);
651 }
652 ATH_MSG_INFO("Monitoring task stopped");
653}
654
655/**************************************************************************************/
656
657void WebdaqHistSvc::syncPublish(long int interval_ms, boost::posix_time::ptime epoch)
658{
659 //Sync the publication to a multple of the interval
660 //Code taken from TDAQ monsvc https://gitlab.cern.ch/atlas-tdaq-software/monsvc/-/blob/master/src/PeriodicScheduler.cxx?ref_type=heads#L163
661 boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
662 int now_ms = (now-epoch).total_milliseconds();
663 //If now_ms % interval_ms == 0 we skip a cycle. Too bad.
664 boost::posix_time::time_duration sync(boost::posix_time::milliseconds(interval_ms - (now_ms % interval_ms)));
665 //Do not sync if we are below 50 ms
666 if (sync.total_milliseconds() > 50){
667 std::this_thread::sleep_for(std::chrono::milliseconds(sync.total_milliseconds()));
668 }
669}
670
671void WebdaqHistSvc::conditionedSleep(std::chrono::milliseconds duration, const std::atomic<bool>& stopFlag) {
672 auto start = std::chrono::steady_clock::now();
673 while (true) {
674 if (stopFlag.load()) {
675 return;
676 }
677 auto elapsed = std::chrono::steady_clock::now() - start;
678 if (elapsed >= duration) {
679 break;
680 }
681 auto remaining = duration - std::chrono::duration_cast<std::chrono::milliseconds>(elapsed);
682 std::this_thread::sleep_for(std::min(std::chrono::milliseconds(500), remaining));
683 }
684}
685
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
virtual void lock()=0
Interface to allow an object to lock itself when made const in SG.
OH histogram lock header file.
StatusCode WebdaqHistSvc::initialize ATLAS_NOT_THREAD_SAFE()
Install fatal handler with default options.
Define macros for attributes used to control the static checker.
Header file for AthHistogramAlgorithm.
static const std::string & type()
Incident type.
Definition Incidents.h:49
Gaudi::Property< std::string > m_includeType
boost::regex m_fastPublicationIncludeNameRegex
WebdaqHistSvc(const std::string &name, ISvcLocator *svc)
virtual StatusCode stop() override
virtual StatusCode getHist(const std::string &id, TH1 *&hist, size_t ind) const override
Gaudi::Property< std::string > m_excludeType
Gaudi::Property< std::string > m_excludeName
std::thread m_threadFast
boost::regex m_PublicationIncludeNameRegex
std::set< std::string > getSet(boost::regex) const
std::thread m_thread
Publication thread.
boost::regex m_includeTypeRegex
std::atomic< bool > m_stopFlag
Flag to stop the monitoring task.
virtual std::vector< std::string > getHists() const override
virtual void handle(const Incident &incident) override
boost::regex m_excludeNameRegex
LockedHandle< T > getShared_i(const std::string &id) const
virtual StatusCode getShared(const std::string &, LockedHandle< TH1 > &) const override
virtual bool existsHist(const std::string &name) const override
void conditionedSleep(std::chrono::milliseconds, const std::atomic< bool > &)
Sleep for a duration or until the stop flag is set.
std::atomic< bool > m_histoMapUpdatedFast
Flag to indicate when the histogram map is updated for the fast publication.
void syncPublish(long int, boost::posix_time::ptime)
Sync the publication to a multiple of the interval.
Gaudi::Property< int > m_numSlots
StatusCode regHist_i(std::unique_ptr< T > hist, const std::string &name, bool shared, THistID *&phid)
std::string m_tdaqOHServerName
The OH server name (TDAQ_OH_SERVER if defined, m_OHServerName otherwise).
virtual StatusCode finalize() override
boost::regex m_includeNameRegex
boost::regex m_excludeTypeRegex
LockedHandle< T > regShared_i(const std::string &id, std::unique_ptr< T > hist)
virtual StatusCode regHist(const std::string &name) override
ServiceHandle< Gaudi::Interfaces::IOptionsSvc > m_jobOptionsSvc
joboptions service
tbb::concurrent_hash_map< std::string, THistID > m_hists
Map of the registered histograms.
virtual StatusCode regShared(const std::string &, std::unique_ptr< TH1 >, LockedHandle< TH1 > &) override
std::string m_partition
The partition to publish to.
Gaudi::Property< int > m_intervalSecondsFast
Gaudi::Property< int > m_intervalSeconds
void monitoringTask(int, int, std::atomic< bool > &, boost::regex)
The actual publication Task.
T * getHist_i(const std::string &id, const size_t &ind, bool quiet=false) const
std::atomic< bool > m_histoMapUpdated
Flag to indicate when the histogram map is updated.
bool isObjectAllowed(const std::string &path, const TObject *o) const
Does the histogram follow the naming rules ?
virtual StatusCode deReg(TObject *obj) override
virtual StatusCode getTHists(TDirectory *td, TList &, bool recurse=false) const override
StatusCode getTHists_i(const std::string &name, TList &) const
Get TList of registered histograms.
Gaudi::Property< int > m_numSlotsFast
Gaudi::Property< std::string > m_includeName
STL class.
static void reset_histogram_mutex()
Reset (disable) histogram mutex.
static void set_histogram_mutex(std::mutex &mutex)
Set mutex to be used in oh_lock_histogram.
Scoped lock to be used for threaded histogram operations.
Helper struct that bundles the histogram, name and mutex.
std::unique_ptr< std::mutex > mutex