7#include "GaudiKernel/ISvcLocator.h"
8#include "GaudiKernel/IIncidentSvc.h"
14#include "hltinterface/IInfoRegister.h"
15#include "webdaq/webdaq-root.hpp"
16#include "webdaq/webdaq.hpp"
26#include <TBufferJSON.h>
28#include <boost/date_time/posix_time/posix_time_types.hpp>
29#include <boost/date_time/posix_time/posix_time.hpp>
30#include <boost/date_time/gregorian/gregorian_types.hpp>
43 static TROOT root(
"root",
"ROOT I/O");
49 gErrorIgnoreLevel = kBreak;
52 m_excludeTypeRegex = boost::regex(m_excludeType.value());
53 m_includeTypeRegex = boost::regex(m_includeType.value());
54 m_excludeNameRegex = boost::regex(m_excludeName.value());
55 m_includeNameRegex = boost::regex(m_includeName.value());
56 m_PublicationIncludeNameRegex = boost::regex(m_PublicationIncludeName.value());
57 m_fastPublicationIncludeNameRegex = boost::regex(m_fastPublicationIncludeName.value());
61 static std::mutex
mutex;
66 const char* tdaq_partition_cstr = std::getenv(
"TDAQ_PARTITION");
67 if (tdaq_partition_cstr !=
nullptr) {
68 m_partition = std::string(tdaq_partition_cstr);
72 return StatusCode::FAILURE;
74 const char* tdaqWebdaqBase_cstr = std::getenv(
"TDAQ_WEBDAQ_BASE");
75 if (tdaqWebdaqBase_cstr !=
nullptr) {
76 m_tdaqWebdaqBase = std::string(tdaqWebdaqBase_cstr);
77 ATH_MSG_INFO(
"TDAQ_WEBDAQ_BASE value: " << m_tdaqWebdaqBase);
79 ATH_MSG_ERROR(
"TDAQ_WEBDAQ_BASE environment variable not set! Is needed for the OH publication through webdaq");
80 return StatusCode::FAILURE;
82 const char* tdaq_oh_server = std::getenv(
"TDAQ_OH_SERVER");
83 if (tdaq_oh_server !=
nullptr) {
84 m_tdaqOHServerName = std::string(tdaq_oh_server);
86 m_tdaqOHServerName = m_OHServerName.value();
88 ATH_MSG_INFO(
"TDAQ_OH_SERVER value: " << m_tdaqOHServerName);
92 return StatusCode::SUCCESS;
100 ATH_MSG_INFO(
"Going to initialize the monitoring Thread");
119 catch (
const std::exception& e) {
120 ATH_MSG_ERROR(
"Failed to join the monitoring thread: " << e.what());
121 return StatusCode::FAILURE;
129 catch (
const std::exception& e) {
130 ATH_MSG_ERROR(
"Failed to join the fast monitoring thread: " << e.what());
131 return StatusCode::FAILURE;
138 return StatusCode::SUCCESS;
149 return StatusCode::SUCCESS;
162 return StatusCode::FAILURE;
165 if (hist_unique->Class()->InheritsFrom(TH1::Class())) {
167 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
168 if (
m_hists.find(accessor,
id)) {
169 ATH_MSG_ERROR(
"Histogram with name " <<
id <<
" already registered");
170 return StatusCode::FAILURE;
173 if (!
m_hists.insert(accessor,
id)) {
174 ATH_MSG_ERROR(
"Failed to insert histogram with name " <<
id);
175 return StatusCode::FAILURE;
177 T* hist = hist_unique.release();
180 accessor->second =
THistID(
id, hist);
182 if (shared) accessor->second.mutex =
new std::mutex;
183 phid = &accessor->second;
185 << hist->GetName() <<
" registered under " <<
id <<
" " << name());
188 <<
" because it does not inherit from TH1");
189 return StatusCode::FAILURE;
191 return StatusCode::SUCCESS;
199 LockedHandle<T> lh(
nullptr,
nullptr);
201 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
203 if (!
m_hists.find(accessor,
id)) {
205 T* phist = hist.get();
207 if (
regHist_i(std::move(hist),
id,
true, phid).isSuccess()) {
208 if (phid) lh.set(phist, phid->
mutex);
214 if (accessor->second.mutex ==
nullptr) {
215 ATH_MSG_ERROR(
"regShared: previously registered histogram \"" <<
id
216 <<
"\" was not marked shared");
218 T* phist =
dynamic_cast<T*
>(accessor->second.obj);
219 if (phist ==
nullptr) {
220 ATH_MSG_ERROR(
"regShared: unable to dcast retrieved shared hist \""
221 <<
id <<
"\" of type " << accessor->second.obj->IsA()->GetName()
222 <<
" to requested type " << System::typeinfoName(
typeid(T)));
225 lh.set(phist, accessor->second.mutex);
240 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
241 if (!
m_hists.find(accessor,
id) or accessor.empty()) {
246 T* phist =
dynamic_cast<T*
>(accessor->second.obj);
247 if (phist ==
nullptr) {
248 ATH_MSG_ERROR(
"getHist: unable to dcast retrieved shared hist \""
249 <<
id <<
"\" of type " << accessor->second.obj->IsA()->GetName() <<
" to requested type "
250 << System::typeinfoName(
typeid(T)));
261 const std::string&
id = it->first;
263 if (
id.starts_with(dir)) {
267 return StatusCode::SUCCESS;
275 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
276 if (
m_hists.find(accessor,
id)) {
277 if (accessor->second.mutex ==
nullptr) {
279 <<
"\", but it's not marked as shared");
282 T* phist =
dynamic_cast<T*
>(accessor->second.obj);
283 if (phist ==
nullptr) {
284 ATH_MSG_ERROR(
"getShared: unable to dcast retrieved shared hist \""
285 <<
id <<
"\" of type " << accessor->second.obj->IsA()->GetName()
286 <<
" to requested type " << System::typeinfoName(
typeid(T)));
289 return LockedHandle<T>(phist, accessor->second.mutex);
291 ATH_MSG_ERROR(
"getShared: cannot find histogram with id \"" <<
id <<
"\"");
301 if (it->second.obj == optr) {
302 ATH_MSG_DEBUG(
"Found histogram " << optr <<
" booked under " << it->first
303 <<
" and will deregister it");
304 return deReg(it->first);
307 ATH_MSG_ERROR(
"Histogram with pointer " << optr <<
" not found in the histogram map");
308 return StatusCode::FAILURE;
315 tbb::concurrent_hash_map<std::string, THistID>::accessor accessor;
316 if (
m_hists.find(accessor,
id)) {
318 accessor->second.obj->Delete();
323 return StatusCode::SUCCESS;
325 ATH_MSG_ERROR(
"Deregistration failed: histogram with id \"" <<
id <<
"\" not found");
326 return StatusCode::FAILURE;
333 std::vector<std::string> l;
336 l.push_back(it->first);
345 std::vector<std::string> l;
348 if (boost::regex_match(it->first, nameSelect)) {
349 l.push_back(it->first);
353 std::set<std::string> HistoSet(l.begin(), l.end());
365 <<
" does NOT match IncludeType \"" <<
m_includeType <<
"\"");
370 ATH_MSG_WARNING(
"Object " << path <<
" of type " << o->ClassName() <<
" matches ExcludeType \""
399 std::unique_ptr<TH1> hist =
nullptr;
401 return regHist_i(std::move(hist),
id,
false, hid);
407 return regHist_i(std::move(hist),
id,
false, hid);
413 std::unique_ptr<TH1> hist(hist_ptr);
414 return regHist_i(std::move(hist),
id,
false, hid);
422 return (hist !=
nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
428 return (hist !=
nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
434 return (hist !=
nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
441 if (recurse)
ATH_MSG_DEBUG(
"Recursive flag is not supported in this implementation");
442 return getTHists_i(std::string(td->GetPath()), tl);
447 if (recurse)
ATH_MSG_DEBUG(
"Recursive flag is not supported in this implementation");
454 ATH_MSG_DEBUG(
"Recursive flag and automatic registration flag is not "
455 "supported in this implementation");
456 return getTHists_i(std::string(td->GetPath()), tl);
462 ATH_MSG_DEBUG(
"Recursive flag and automatic registration flag is not "
463 "supported in this implementation");
470 LockedHandle<TH1>& lh)
473 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
477 LockedHandle<TH2>& lh)
480 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
484 LockedHandle<TH3>& lh)
487 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
495 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
501 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
507 return (lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
525 std::string appName =
m_jobOptionsSvc->get(
"DataFlowConfig.DF_ApplicationName");
528 if (nameSelect != boost::regex(
".*"))
529 appName = appName +
"_fast";
532 boost::posix_time::time_duration interval{boost::posix_time::seconds(intervalSeconds)};
533 ATH_MSG_DEBUG(
"Interval set to " << interval.total_seconds() <<
" seconds");
534 int interval_ms = interval.total_milliseconds();
535 if(numSlots == 0) numSlots = 1;
536 boost::posix_time::ptime epoch(boost::gregorian::date(2024,1,1));
538 boost::posix_time::time_duration slotSleepDuration = interval / (numSlots + 1);
541 std::set<std::string> HistoSet =
getSet(nameSelect);
542 histoMapUpdated =
false;
555 HistoSet =
getSet(nameSelect);
556 histoMapUpdated =
false;
558 size_t totalHists = HistoSet.size();
559 ATH_MSG_DEBUG(
"Going to publish " << totalHists <<
" histograms");
562 size_t batchSize = (totalHists + numSlots - 1) / numSlots;
563 ATH_MSG_DEBUG(
"Num of slots:" << numSlots <<
", Interval_ms " << interval_ms <<
" milliseconds, Batch size: " << batchSize);
565 boost::posix_time::ptime start_time = boost::posix_time::microsec_clock::universal_time();
567 int BatchCounter = 0;
568 auto it = HistoSet.begin();
569 while(it != HistoSet.end())
571 boost::posix_time::ptime slot_start_time = boost::posix_time::microsec_clock::universal_time();
573 ATH_MSG_DEBUG(
"Batch publication number " << BatchCounter <<
" starting.");
574 for(
size_t j = 0; j < batchSize; ++j)
576 if(it == HistoSet.end())
580 const std::string&
id = *it;
581 std::string path = appName +
'.' + id;
583 tbb::concurrent_hash_map<std::string, THistID>::const_accessor accessor;
584 if (!
m_hists.find(accessor,
id)) {
585 ATH_MSG_WARNING(
"Histogram with name " <<
id <<
" not found in histogram map (probably deregistered).");
591 ATH_MSG_DEBUG(
"Histogram found in map, going to lock mutex and then publish it");
593 TObject* obj =
nullptr;
597 obj = accessor->second.obj->Clone();
599 if (obj ==
nullptr) {
614 ATH_MSG_DEBUG(
"Batch publication completed, " << counter <<
" histograms published");
616 if (it != HistoSet.end()) {
617 boost::posix_time::ptime slot_end_time = boost::posix_time::microsec_clock::universal_time();
618 int slot_sleep_time = slotSleepDuration.total_milliseconds() - (slot_end_time-slot_start_time).total_milliseconds();
619 if (slot_sleep_time > 0) {
620 ATH_MSG_DEBUG(
"Sleeping for " << slot_sleep_time <<
" seconds before publishing the next batch");
628 boost::posix_time::ptime end_time = boost::posix_time::microsec_clock::universal_time();
629 if (boost::posix_time::time_duration(end_time-start_time) > interval) {
630 ATH_MSG_WARNING(
"Publication deadline missed, cycle exceeded the interval.. Total publication time "
631 << boost::posix_time::to_simple_string(end_time-start_time));
633 ATH_MSG_DEBUG(
"Completed the publication of " << counter <<
" histograms. Publication time: " << boost::posix_time::to_simple_string(end_time-start_time));
636 boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
637 int nowMs = (now-epoch).total_milliseconds();
638 boost::posix_time::time_duration next_cycle(boost::posix_time::milliseconds(interval_ms - (nowMs % interval_ms)));
642 ATH_MSG_DEBUG(
"Sleeping for " << next_cycle.total_milliseconds() <<
" milliseconds till the next cycle");
654 boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
655 int now_ms = (now-epoch).total_milliseconds();
657 boost::posix_time::time_duration sync(boost::posix_time::milliseconds(interval_ms - (now_ms % interval_ms)));
659 if (sync.total_milliseconds() > 50){
660 std::this_thread::sleep_for(std::chrono::milliseconds(sync.total_milliseconds()));
665 auto start = std::chrono::steady_clock::now();
667 if (stopFlag.load()) {
670 auto elapsed = std::chrono::steady_clock::now() - start;
671 if (elapsed >= duration) {
674 auto remaining = duration - std::chrono::duration_cast<std::chrono::milliseconds>(elapsed);
675 std::this_thread::sleep_for(std::min(std::chrono::milliseconds(500), remaining));
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
OH histogram lock header file.
StatusCode WebdaqHistSvc::initialize ATLAS_NOT_THREAD_SAFE()
Install fatal handler with default options.
Define macros for attributes used to control the static checker.
Header file for AthHistogramAlgorithm.
static const std::string & type()
Incident type.
Gaudi::Property< std::string > m_includeType
boost::regex m_fastPublicationIncludeNameRegex
WebdaqHistSvc(const std::string &name, ISvcLocator *svc)
virtual StatusCode stop() override
virtual StatusCode getHist(const std::string &id, TH1 *&hist, size_t ind) const override
Gaudi::Property< std::string > m_excludeType
Gaudi::Property< std::string > m_excludeName
boost::regex m_PublicationIncludeNameRegex
std::set< std::string > getSet(boost::regex) const
std::thread m_thread
Publication thread.
boost::regex m_includeTypeRegex
std::atomic< bool > m_stopFlag
Flag to stop the monitoring task.
virtual std::vector< std::string > getHists() const override
virtual void handle(const Incident &incident) override
boost::regex m_excludeNameRegex
LockedHandle< T > getShared_i(const std::string &id) const
virtual StatusCode getShared(const std::string &, LockedHandle< TH1 > &) const override
virtual bool existsHist(const std::string &name) const override
void conditionedSleep(std::chrono::milliseconds, const std::atomic< bool > &)
Sleep for a duration or until the stop flag is set.
std::atomic< bool > m_histoMapUpdatedFast
Flag to indicate when the histogram map is updated for the fast publication.
void syncPublish(long int, boost::posix_time::ptime)
Sync the publication to a multiple of the interval.
Gaudi::Property< int > m_numSlots
StatusCode regHist_i(std::unique_ptr< T > hist, const std::string &name, bool shared, THistID *&phid)
std::string m_tdaqOHServerName
The OH server name (TDAQ_OH_SERVER if defined, m_OHServerName otherwise)
virtual StatusCode finalize() override
boost::regex m_includeNameRegex
boost::regex m_excludeTypeRegex
LockedHandle< T > regShared_i(const std::string &id, std::unique_ptr< T > hist)
virtual StatusCode regHist(const std::string &name) override
ServiceHandle< Gaudi::Interfaces::IOptionsSvc > m_jobOptionsSvc
joboptions service
tbb::concurrent_hash_map< std::string, THistID > m_hists
Map of the registered histograms.
virtual StatusCode regShared(const std::string &, std::unique_ptr< TH1 >, LockedHandle< TH1 > &) override
std::string m_partition
The partition to publish to.
Gaudi::Property< int > m_intervalSecondsFast
Gaudi::Property< int > m_intervalSeconds
void monitoringTask(int, int, std::atomic< bool > &, boost::regex)
The actual publication Task.
T * getHist_i(const std::string &id, const size_t &ind, bool quiet=false) const
std::atomic< bool > m_histoMapUpdated
Flag to indicate when the histogram map is updated.
bool isObjectAllowed(const std::string &path, const TObject *o) const
Does the histogram follow the naming rules ?
virtual StatusCode deReg(TObject *obj) override
virtual StatusCode getTHists(TDirectory *td, TList &, bool recurse=false) const override
StatusCode getTHists_i(const std::string &name, TList &) const
Get TList of registered histograms.
Gaudi::Property< int > m_numSlotsFast
Gaudi::Property< std::string > m_includeName
static void reset_histogram_mutex()
Reset (disable) histogram mutex.
static void set_histogram_mutex(std::mutex &mutex)
Set mutex to be used in oh_lock_histogram.
Scoped lock to be used for threaded histogram operations.
Helper struct that bundles the histogram, name and mutex.