7#include "GaudiKernel/ISvcLocator.h"
8#include "GaudiKernel/IIncidentSvc.h"
14#include "hltinterface/IInfoRegister.h"
15#include "webdaq/webdaq-root.hpp"
16#include "webdaq/webdaq.hpp"
26#include <TBufferJSON.h>
28#include <boost/date_time/posix_time/posix_time_types.hpp>
29#include <boost/date_time/posix_time/posix_time.hpp>
30#include <boost/date_time/gregorian/gregorian_types.hpp>
43 static TROOT root(
"root",
"ROOT I/O");
49 gErrorIgnoreLevel = kBreak;
52 m_excludeTypeRegex = boost::regex(m_excludeType.value());
53 m_includeTypeRegex = boost::regex(m_includeType.value());
54 m_excludeNameRegex = boost::regex(m_excludeName.value());
55 m_includeNameRegex = boost::regex(m_includeName.value());
56 m_PublicationIncludeNameRegex = boost::regex(m_PublicationIncludeName.value());
57 m_fastPublicationIncludeNameRegex = boost::regex(m_fastPublicationIncludeName.value());
61 static std::mutex
mutex;
66 const char* tdaq_partition_cstr = std::getenv(
"TDAQ_PARTITION");
67 if (tdaq_partition_cstr !=
nullptr) {
68 m_partition = std::string(tdaq_partition_cstr);
72 return StatusCode::FAILURE;
74 const char* tdaqWebdaqBase_cstr = std::getenv(
"TDAQ_WEBDAQ_BASE");
75 if (tdaqWebdaqBase_cstr !=
nullptr) {
76 m_tdaqWebdaqBase = std::string(tdaqWebdaqBase_cstr);
77 ATH_MSG_INFO(
"TDAQ_WEBDAQ_BASE value: " << m_tdaqWebdaqBase);
79 ATH_MSG_ERROR(
"TDAQ_WEBDAQ_BASE environment variable not set! Is needed for the OH publication through webdaq");
80 return StatusCode::FAILURE;
82 const char* tdaq_oh_server = std::getenv(
"TDAQ_OH_SERVER");
83 if (tdaq_oh_server !=
nullptr) {
84 m_tdaqOHServerName = std::string(tdaq_oh_server);
86 m_tdaqOHServerName = m_OHServerName.value();
88 ATH_MSG_INFO(
"TDAQ_OH_SERVER value: " << m_tdaqOHServerName);
92 return StatusCode::SUCCESS;
100 ATH_MSG_INFO(
"Going to initialize the monitoring Thread");
119 catch (
const std::exception& e) {
120 ATH_MSG_ERROR(
"Failed to join the monitoring thread: " << e.what());
121 return StatusCode::FAILURE;
129 catch (
const std::exception& e) {
130 ATH_MSG_ERROR(
"Failed to join the fast monitoring thread: " << e.what());
131 return StatusCode::FAILURE;
138 return StatusCode::SUCCESS;
149 return StatusCode::SUCCESS;
162 return StatusCode::FAILURE;
165 if (hist_unique->Class()->InheritsFrom(TH1::Class())) {
167 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
168 if (
m_hists.find(accessor,
id)) {
169 ATH_MSG_ERROR(
"Histogram with name " <<
id <<
" already registered");
170 return StatusCode::FAILURE;
173 if (!
m_hists.insert(accessor,
id)) {
174 ATH_MSG_ERROR(
"Failed to insert histogram with name " <<
id);
175 return StatusCode::FAILURE;
177 T* hist = hist_unique.release();
180 accessor->second =
THistID(
id, hist);
182 if (shared) accessor->second.mutex = std::make_unique<std::mutex>();
183 phid = &accessor->second;
185 << hist->GetName() <<
" registered under " <<
id <<
" " << name());
188 <<
" because it does not inherit from TH1");
189 return StatusCode::FAILURE;
191 return StatusCode::SUCCESS;
199 LockedHandle<T> lh(
nullptr,
nullptr);
201 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
203 if (!
m_hists.find(accessor,
id)) {
205 T* phist = hist.get();
207 if (
regHist_i(std::move(hist),
id,
true, phid).isSuccess()) {
208 if (phid) lh.set(phist, phid->
mutex.get());
214 if (accessor->second.mutex ==
nullptr) {
215 ATH_MSG_ERROR(
"regShared: previously registered histogram \"" <<
id
216 <<
"\" was not marked shared");
218 T* phist =
dynamic_cast<T*
>(accessor->second.obj);
219 if (phist ==
nullptr) {
220 ATH_MSG_ERROR(
"regShared: unable to dcast retrieved shared hist \""
221 <<
id <<
"\" of type " << accessor->second.obj->IsA()->GetName()
222 <<
" to requested type " << System::typeinfoName(
typeid(T)));
225 lh.set(phist, accessor->second.mutex.get());
240 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
241 if (!
m_hists.find(accessor,
id) or accessor.empty()) {
246 T* phist =
dynamic_cast<T*
>(accessor->second.obj);
247 if (phist ==
nullptr) {
248 ATH_MSG_ERROR(
"getHist: unable to dcast retrieved shared hist \""
249 <<
id <<
"\" of type " << accessor->second.obj->IsA()->GetName() <<
" to requested type "
250 << System::typeinfoName(
typeid(T)));
261 const std::string&
id = it->first;
263 if (
id.starts_with(dir)) {
267 return StatusCode::SUCCESS;
275 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
276 if (
m_hists.find(accessor,
id)) {
277 auto * obj = accessor->second.obj;
280 if (accessor->second.mutex ==
nullptr) {
282 <<
"\", but it's not marked as shared");
285 T* phist =
dynamic_cast<T*
>(obj);
286 if (phist ==
nullptr) {
287 const char* gotType = obj ? obj->IsA()->GetName() :
"<null>";
288 ATH_MSG_ERROR(
"getShared: unable to dcast retrieved shared hist \""
289 <<
id <<
"\" of type " << gotType
290 <<
" to requested type " << System::typeinfoName(
typeid(T)));
293 return LockedHandle<T>(phist, accessor->second.mutex.get());
295 ATH_MSG_ERROR(
"getShared: cannot find histogram with id \"" <<
id <<
"\"");
305 if (it->second.obj == optr) {
306 ATH_MSG_DEBUG(
"Found histogram " << optr <<
" booked under " << it->first
307 <<
" and will deregister it");
308 return deReg(it->first);
311 ATH_MSG_ERROR(
"Histogram with pointer " << optr <<
" not found in the histogram map");
312 return StatusCode::FAILURE;
319 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
320 if (
m_hists.find(accessor,
id)) {
322 accessor->second.obj->Delete();
327 return StatusCode::SUCCESS;
329 ATH_MSG_ERROR(
"Deregistration failed: histogram with id \"" <<
id <<
"\" not found");
330 return StatusCode::FAILURE;
337 std::vector<std::string> l;
340 l.push_back(it->first);
349 std::vector<std::string> l;
352 if (boost::regex_match(it->first, nameSelect)) {
353 l.push_back(it->first);
357 std::set<std::string> HistoSet(l.begin(), l.end());
369 <<
" does NOT match IncludeType \"" <<
m_includeType <<
"\"");
374 ATH_MSG_WARNING(
"Object " << path <<
" of type " << o->ClassName() <<
" matches ExcludeType \""
403 std::unique_ptr<TH1> hist =
nullptr;
405 return regHist_i(std::move(hist),
id,
false, hid);
411 return regHist_i(std::move(hist),
id,
false, hid);
417 std::unique_ptr<TH1> hist(hist_ptr);
418 return regHist_i(std::move(hist),
id,
false, hid);
426 return (hist !=
nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
432 return (hist !=
nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
438 return (hist !=
nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
445 if (recurse)
ATH_MSG_DEBUG(
"Recursive flag is not supported in this implementation");
446 return getTHists_i(std::string(td->GetPath()), tl);
451 if (recurse)
ATH_MSG_DEBUG(
"Recursive flag is not supported in this implementation");
458 ATH_MSG_DEBUG(
"Recursive flag and automatic registration flag is not "
459 "supported in this implementation");
460 return getTHists_i(std::string(td->GetPath()), tl);
466 ATH_MSG_DEBUG(
"Recursive flag and automatic registration flag is not "
467 "supported in this implementation");
474 LockedHandle<TH1>& lh)
477 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
481 LockedHandle<TH2>& lh)
484 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
488 LockedHandle<TH3>& lh)
491 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
499 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
505 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
511 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
529 std::string appName =
m_jobOptionsSvc->get(
"DataFlowConfig.DF_ApplicationName");
532 if (nameSelect != boost::regex(
".*"))
533 appName = appName +
"_fast";
536 boost::posix_time::time_duration interval{boost::posix_time::seconds(intervalSeconds)};
537 ATH_MSG_DEBUG(
"Interval set to " << interval.total_seconds() <<
" seconds");
538 int interval_ms = interval.total_milliseconds();
539 if(numSlots == 0) numSlots = 1;
540 boost::posix_time::ptime epoch(boost::gregorian::date(2024,1,1));
542 boost::posix_time::time_duration slotSleepDuration = interval / (numSlots + 1);
545 std::set<std::string> HistoSet =
getSet(nameSelect);
546 histoMapUpdated =
false;
559 HistoSet =
getSet(nameSelect);
560 histoMapUpdated =
false;
562 size_t totalHists = HistoSet.size();
563 ATH_MSG_DEBUG(
"Going to publish " << totalHists <<
" histograms");
566 size_t batchSize = (totalHists + numSlots - 1) / numSlots;
567 ATH_MSG_DEBUG(
"Num of slots:" << numSlots <<
", Interval_ms " << interval_ms <<
" milliseconds, Batch size: " << batchSize);
569 boost::posix_time::ptime start_time = boost::posix_time::microsec_clock::universal_time();
571 int BatchCounter = 0;
572 auto it = HistoSet.begin();
573 while(it != HistoSet.end())
575 boost::posix_time::ptime slot_start_time = boost::posix_time::microsec_clock::universal_time();
577 ATH_MSG_DEBUG(
"Batch publication number " << BatchCounter <<
" starting.");
578 for(
size_t j = 0; j < batchSize; ++j)
580 if(it == HistoSet.end())
584 const std::string&
id = *it;
585 std::string path = appName +
'.' + id;
587 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
588 if (!
m_hists.find(accessor,
id)) {
589 ATH_MSG_WARNING(
"Histogram with name " <<
id <<
" not found in histogram map (probably deregistered).");
595 ATH_MSG_DEBUG(
"Histogram found in map, going to lock mutex and then publish it");
597 TObject* obj =
nullptr;
603 obj = accessor->second.obj->Clone();
605 if (obj ==
nullptr) {
620 ATH_MSG_DEBUG(
"Batch publication completed, " << counter <<
" histograms published");
622 if (it != HistoSet.end()) {
623 boost::posix_time::ptime slot_end_time = boost::posix_time::microsec_clock::universal_time();
624 int slot_sleep_time = slotSleepDuration.total_milliseconds() - (slot_end_time-slot_start_time).total_milliseconds();
625 if (slot_sleep_time > 0) {
626 ATH_MSG_DEBUG(
"Sleeping for " << slot_sleep_time <<
" seconds before publishing the next batch");
634 boost::posix_time::ptime end_time = boost::posix_time::microsec_clock::universal_time();
635 if (boost::posix_time::time_duration(end_time-start_time) > interval) {
636 ATH_MSG_WARNING(
"Publication deadline missed, cycle exceeded the interval.. Total publication time "
637 << boost::posix_time::to_simple_string(end_time-start_time));
639 ATH_MSG_DEBUG(
"Completed the publication of " << counter <<
" histograms. Publication time: " << boost::posix_time::to_simple_string(end_time-start_time));
642 boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
643 int nowMs = (now-epoch).total_milliseconds();
644 boost::posix_time::time_duration next_cycle(boost::posix_time::milliseconds(interval_ms - (nowMs % interval_ms)));
648 ATH_MSG_DEBUG(
"Sleeping for " << next_cycle.total_milliseconds() <<
" milliseconds till the next cycle");
660 boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
661 int now_ms = (now-epoch).total_milliseconds();
663 boost::posix_time::time_duration sync(boost::posix_time::milliseconds(interval_ms - (now_ms % interval_ms)));
665 if (sync.total_milliseconds() > 50){
666 std::this_thread::sleep_for(std::chrono::milliseconds(sync.total_milliseconds()));
671 auto start = std::chrono::steady_clock::now();
673 if (stopFlag.load()) {
676 auto elapsed = std::chrono::steady_clock::now() - start;
677 if (elapsed >= duration) {
680 auto remaining = duration - std::chrono::duration_cast<std::chrono::milliseconds>(elapsed);
681 std::this_thread::sleep_for(std::min(std::chrono::milliseconds(500), remaining));
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
OH histogram lock header file.
StatusCode WebdaqHistSvc::initialize ATLAS_NOT_THREAD_SAFE()
Install fatal handler with default options.
Define macros for attributes used to control the static checker.
Header file for AthHistogramAlgorithm.
static const std::string & type()
Incident type.
Gaudi::Property< std::string > m_includeType
boost::regex m_fastPublicationIncludeNameRegex
WebdaqHistSvc(const std::string &name, ISvcLocator *svc)
virtual StatusCode stop() override
virtual StatusCode getHist(const std::string &id, TH1 *&hist, size_t ind) const override
Gaudi::Property< std::string > m_excludeType
Gaudi::Property< std::string > m_excludeName
boost::regex m_PublicationIncludeNameRegex
std::set< std::string > getSet(boost::regex) const
std::thread m_thread
Publication thread.
boost::regex m_includeTypeRegex
std::atomic< bool > m_stopFlag
Flag to stop the monitoring task.
virtual std::vector< std::string > getHists() const override
virtual void handle(const Incident &incident) override
boost::regex m_excludeNameRegex
LockedHandle< T > getShared_i(const std::string &id) const
virtual StatusCode getShared(const std::string &, LockedHandle< TH1 > &) const override
virtual bool existsHist(const std::string &name) const override
void conditionedSleep(std::chrono::milliseconds, const std::atomic< bool > &)
Sleep for a duration or until the stop flag is set.
std::atomic< bool > m_histoMapUpdatedFast
Flag to indicate when the histogram map is updated for the fast publication.
void syncPublish(long int, boost::posix_time::ptime)
Sync the publication to a multiple of the interval.
Gaudi::Property< int > m_numSlots
StatusCode regHist_i(std::unique_ptr< T > hist, const std::string &name, bool shared, THistID *&phid)
std::string m_tdaqOHServerName
The OH server name (TDAQ_OH_SERVER if defined, m_OHServerName otherwise)
virtual StatusCode finalize() override
boost::regex m_includeNameRegex
boost::regex m_excludeTypeRegex
LockedHandle< T > regShared_i(const std::string &id, std::unique_ptr< T > hist)
virtual StatusCode regHist(const std::string &name) override
ServiceHandle< Gaudi::Interfaces::IOptionsSvc > m_jobOptionsSvc
joboptions service
tbb::concurrent_hash_map< std::string, THistID > m_hists
Map of the registered histograms.
virtual StatusCode regShared(const std::string &, std::unique_ptr< TH1 >, LockedHandle< TH1 > &) override
std::string m_partition
The partition to publish to.
Gaudi::Property< int > m_intervalSecondsFast
Gaudi::Property< int > m_intervalSeconds
void monitoringTask(int, int, std::atomic< bool > &, boost::regex)
The actual publication Task.
T * getHist_i(const std::string &id, const size_t &ind, bool quiet=false) const
std::atomic< bool > m_histoMapUpdated
Flag to indicate when the histogram map is updated.
bool isObjectAllowed(const std::string &path, const TObject *o) const
Does the histogram follow the naming rules ?
virtual StatusCode deReg(TObject *obj) override
virtual StatusCode getTHists(TDirectory *td, TList &, bool recurse=false) const override
StatusCode getTHists_i(const std::string &name, TList &) const
Get TList of registered histograms.
Gaudi::Property< int > m_numSlotsFast
Gaudi::Property< std::string > m_includeName
static void reset_histogram_mutex()
Reset (disable) histogram mutex.
static void set_histogram_mutex(std::mutex &mutex)
Set mutex to be used in oh_lock_histogram.
Scoped lock to be used for threaded histogram operations.
Helper struct that bundles the histogram, name and mutex.
std::unique_ptr< std::mutex > mutex