Loading [MathJax]/jax/output/SVG/config.js
 |
ATLAS Offline Software
|
Go to the documentation of this file.
7 #include "GaudiKernel/ISvcLocator.h"
8 #include "GaudiKernel/IIncidentSvc.h"
14 #include "hltinterface/IInfoRegister.h"
15 #include "webdaq/webdaq-root.hpp"
16 #include "webdaq/webdaq.hpp"
26 #include <TBufferJSON.h>
28 #include <boost/date_time/posix_time/posix_time_types.hpp>
29 #include <boost/date_time/posix_time/posix_time.hpp>
30 #include <boost/date_time/gregorian/gregorian_types.hpp>
43 static TROOT
root(
"root",
"ROOT I/O");
52 m_excludeTypeRegex =
boost::regex(m_excludeType.value());
53 m_includeTypeRegex =
boost::regex(m_includeType.value());
54 m_excludeNameRegex =
boost::regex(m_excludeName.value());
55 m_includeNameRegex =
boost::regex(m_includeName.value());
56 m_PublicationIncludeNameRegex =
boost::regex(m_PublicationIncludeName.value());
57 m_fastPublicationIncludeNameRegex =
boost::regex(m_fastPublicationIncludeName.value());
66 const char* tdaq_partition_cstr =
std::getenv(
"TDAQ_PARTITION");
67 if (tdaq_partition_cstr !=
nullptr) {
68 m_partition = std::string(tdaq_partition_cstr);
72 return StatusCode::FAILURE;
74 const char* tdaqWebdaqBase_cstr =
std::getenv(
"TDAQ_WEBDAQ_BASE");
75 if (tdaqWebdaqBase_cstr !=
nullptr) {
76 m_tdaqWebdaqBase = std::string(tdaqWebdaqBase_cstr);
77 ATH_MSG_INFO(
"TDAQ_WEBDAQ_BASE value: " << m_tdaqWebdaqBase);
79 ATH_MSG_ERROR(
"TDAQ_WEBDAQ_BASE environment variable not set! Is needed for the OH publication through webdaq");
80 return StatusCode::FAILURE;
82 const char* tdaq_oh_server =
std::getenv(
"TDAQ_OH_SERVER");
83 if (tdaq_oh_server !=
nullptr) {
84 m_tdaqOHServerName = std::string(tdaq_oh_server);
86 m_tdaqOHServerName = m_OHServerName.value();
88 ATH_MSG_INFO(
"TDAQ_OH_SERVER value: " << m_tdaqOHServerName);
92 return StatusCode::SUCCESS;
100 ATH_MSG_INFO(
"Going to initialize the monitoring Thread");
120 ATH_MSG_ERROR(
"Failed to join the monitoring thread: " <<
e.what());
121 return StatusCode::FAILURE;
130 ATH_MSG_ERROR(
"Failed to join the fast monitoring thread: " <<
e.what());
131 return StatusCode::FAILURE;
138 return StatusCode::SUCCESS;
149 return StatusCode::SUCCESS;
154 template <
typename T>
162 return StatusCode::FAILURE;
165 if (hist_unique->Class()->InheritsFrom(TH1::Class())) {
169 ATH_MSG_ERROR(
"Histogram with name " <<
id <<
" already registered");
170 return StatusCode::FAILURE;
174 ATH_MSG_ERROR(
"Failed to insert histogram with name " <<
id);
175 return StatusCode::FAILURE;
177 T*
hist = hist_unique.release();
185 <<
hist->GetName() <<
" registered under " <<
id <<
" " <<
name());
188 <<
" because it does not inherit from TH1");
189 return StatusCode::FAILURE;
191 return StatusCode::SUCCESS;
196 template <
typename T>
199 LockedHandle<T>
lh(
nullptr,
nullptr);
205 T* phist =
hist.get();
207 if (
regHist_i(std::move(
hist),
id,
true, phid).isSuccess()) {
208 if (phid)
lh.set(phist, phid->
mutex);
214 if (
accessor->second.mutex ==
nullptr) {
215 ATH_MSG_ERROR(
"regShared: previously registered histogram \"" <<
id
216 <<
"\" was not marked shared");
218 T* phist =
dynamic_cast<T*
>(
accessor->second.obj);
219 if (phist ==
nullptr) {
220 ATH_MSG_ERROR(
"regShared: unable to dcast retrieved shared hist \""
221 <<
id <<
"\" of type " <<
accessor->second.obj->IsA()->GetName()
235 template <
typename T>
240 tbb::concurrent_hash_map<std::string, THistID>::const_accessor
accessor;
246 T* phist =
dynamic_cast<T*
>(
accessor->second.obj);
247 if (phist ==
nullptr) {
248 ATH_MSG_ERROR(
"getHist: unable to dcast retrieved shared hist \""
249 <<
id <<
"\" of type " <<
accessor->second.obj->IsA()->GetName() <<
" to requested type "
261 const std::string&
id =
it->first;
263 if (
id.starts_with(
dir)) {
267 return StatusCode::SUCCESS;
272 template <
typename T>
275 tbb::concurrent_hash_map<std::string, THistID>::const_accessor
accessor;
277 if (
accessor->second.mutex ==
nullptr) {
279 <<
"\", but it's not marked as shared");
282 T* phist =
dynamic_cast<T*
>(
accessor->second.obj);
283 if (phist ==
nullptr) {
284 ATH_MSG_ERROR(
"getShared: unable to dcast retrieved shared hist \""
285 <<
id <<
"\" of type " <<
accessor->second.obj->IsA()->GetName()
289 return LockedHandle<T>(phist,
accessor->second.mutex);
291 ATH_MSG_ERROR(
"getShared: cannot find histogram with id \"" <<
id <<
"\"");
301 if (
it->second.obj == optr) {
302 ATH_MSG_DEBUG(
"Found histogram " << optr <<
" booked under " <<
it->first
303 <<
" and will deregister it");
307 ATH_MSG_ERROR(
"Histogram with pointer " << optr <<
" not found in the histogram map");
308 return StatusCode::FAILURE;
323 return StatusCode::SUCCESS;
325 ATH_MSG_ERROR(
"Deregistration failed: histogram with id \"" <<
id <<
"\" not found");
326 return StatusCode::FAILURE;
333 std::vector<std::string>
l;
336 l.push_back(
it->first);
345 std::vector<std::string>
l;
348 if (boost::regex_match(
it->first, nameSelect)) {
349 l.push_back(
it->first);
353 std::set<std::string> HistoSet(
l.begin(),
l.end());
365 <<
" does NOT match IncludeType \"" <<
m_includeType <<
"\"");
370 ATH_MSG_WARNING(
"Object " <<
path <<
" of type " << o->ClassName() <<
" matches ExcludeType \""
390 return (getHist_i<TH1>(
name, 0,
true) !=
nullptr);
399 std::unique_ptr<TH1>
hist =
nullptr;
413 std::unique_ptr<TH1>
hist(hist_ptr);
421 hist = getHist_i<TH1>(
id, ind);
422 return (
hist !=
nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
427 hist = getHist_i<TH2>(
id, ind);
428 return (
hist !=
nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
433 hist = getHist_i<TH3>(
id, ind);
434 return (
hist !=
nullptr ? StatusCode::SUCCESS : StatusCode::FAILURE);
454 ATH_MSG_DEBUG(
"Recursive flag and automatic registration flag is not "
455 "supported in this implementation");
462 ATH_MSG_DEBUG(
"Recursive flag and automatic registration flag is not "
463 "supported in this implementation");
470 LockedHandle<TH1>&
lh)
472 lh = regShared_i<TH1>(
id, std::move(
hist));
473 return (
lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
477 LockedHandle<TH2>&
lh)
479 lh = regShared_i<TH2>(
id, std::move(
hist));
480 return (
lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
484 LockedHandle<TH3>&
lh)
486 lh = regShared_i<TH3>(
id, std::move(
hist));
487 return (
lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
494 lh = getShared_i<TH1>(
id);
495 return (
lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
500 lh = getShared_i<TH2>(
id);
501 return (
lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
506 lh = getShared_i<TH3>(
id);
507 return (
lh ? StatusCode::SUCCESS : StatusCode::FAILURE);
525 std::string appName =
m_jobOptionsSvc->get(
"DataFlowConfig.DF_ApplicationName");
529 appName = appName +
"_fast";
534 int interval_ms =
interval.total_milliseconds();
535 if(numSlots == 0) numSlots = 1;
538 boost::posix_time::time_duration slotSleepDuration =
interval / (numSlots + 1);
541 std::set<std::string> HistoSet =
getSet(nameSelect);
542 histoMapUpdated =
false;
555 HistoSet =
getSet(nameSelect);
556 histoMapUpdated =
false;
558 size_t totalHists = HistoSet.size();
559 ATH_MSG_DEBUG(
"Going to publish " << totalHists <<
" histograms");
562 size_t batchSize = (totalHists + numSlots - 1) / numSlots;
563 ATH_MSG_DEBUG(
"Num of slots:" << numSlots <<
", Interval_ms " << interval_ms <<
" milliseconds, Batch size: " << batchSize);
565 boost::posix_time::ptime start_time = boost::posix_time::microsec_clock::universal_time();
567 int BatchCounter = 0;
568 auto it = HistoSet.begin();
569 while(
it != HistoSet.end())
571 boost::posix_time::ptime slot_start_time = boost::posix_time::microsec_clock::universal_time();
573 ATH_MSG_DEBUG(
"Batch publication number " << BatchCounter <<
" starting.");
574 for(
size_t j = 0; j < batchSize; ++j)
576 if(
it == HistoSet.end())
580 const std::string&
id = *
it;
581 std::string
path = appName +
'.' +
id;
583 tbb::concurrent_hash_map<std::string, THistID>::const_accessor
accessor;
585 ATH_MSG_WARNING(
"Histogram with name " <<
id <<
" not found in histogram map (probably deregistered).");
591 ATH_MSG_DEBUG(
"Histogram found in map, going to lock mutex and then publish it");
593 TObject*
obj =
nullptr;
599 if (
obj ==
nullptr) {
616 if (
it != HistoSet.end()) {
617 boost::posix_time::ptime slot_end_time = boost::posix_time::microsec_clock::universal_time();
618 int slot_sleep_time = slotSleepDuration.total_milliseconds() - (slot_end_time-slot_start_time).total_milliseconds();
619 if (slot_sleep_time > 0) {
620 ATH_MSG_DEBUG(
"Sleeping for " << slot_sleep_time <<
" seconds before publishing the next batch");
628 boost::posix_time::ptime end_time = boost::posix_time::microsec_clock::universal_time();
629 if (boost::posix_time::time_duration(end_time-start_time) >
interval) {
630 ATH_MSG_WARNING(
"Publication deadline missed, cycle exceeded the interval.. Total publication time "
631 << boost::posix_time::to_simple_string(end_time-start_time));
633 ATH_MSG_DEBUG(
"Completed the publication of " <<
counter <<
" histograms. Publication time: " << boost::posix_time::to_simple_string(end_time-start_time));
636 boost::posix_time::ptime
now = boost::posix_time::microsec_clock::universal_time();
637 int nowMs = (
now-epoch).total_milliseconds();
638 boost::posix_time::time_duration next_cycle(boost::posix_time::milliseconds(interval_ms - (nowMs % interval_ms)));
642 ATH_MSG_DEBUG(
"Sleeping for " << next_cycle.total_milliseconds() <<
" milliseconds till the next cycle");
654 boost::posix_time::ptime
now = boost::posix_time::microsec_clock::universal_time();
655 int now_ms = (
now-epoch).total_milliseconds();
657 boost::posix_time::time_duration sync(boost::posix_time::milliseconds(interval_ms - (now_ms % interval_ms)));
659 if (sync.total_milliseconds() > 50){
660 std::this_thread::sleep_for(std::chrono::milliseconds(sync.total_milliseconds()));
667 if (stopFlag.load()) {
674 auto remaining =
duration - std::chrono::duration_cast<std::chrono::milliseconds>(elapsed);
675 std::this_thread::sleep_for(
std::min(std::chrono::milliseconds(500), remaining));
T * getHist_i(const std::string &id, const size_t &ind, bool quiet=false) const
OH histogram lock header file.
Gaudi::Property< std::string > m_includeType
path
python interpreter configuration --------------------------------------—
virtual void handle(const Incident &incident) override
std::set< std::string > getSet(boost::regex) const
virtual StatusCode regHist(const std::string &name) override
virtual StatusCode getShared(const std::string &, LockedHandle< TH1 > &) const override
virtual StatusCode finalize() override
virtual bool existsHist(const std::string &name) const override
virtual StatusCode stop() override
Gaudi::Property< int > m_intervalSeconds
ServiceHandle< Gaudi::Interfaces::IOptionsSvc > m_jobOptionsSvc
joboptions service
Gaudi::Property< std::string > m_includeName
#define ATH_MSG_VERBOSE(x)
std::atomic< bool > m_stopFlag
Flag to stop the monitoring task.
def recurse(rdir, dqregion, ignorepath, reffile=None)
Helper struct that bundles the histogram, name and mutex.
std::thread m_thread
Publication thread.
Scoped lock to be used for threaded histogram operations.
std::string m_tdaqOHServerName
The OH server name (TDAQ_OH_SERVER if defined, m_OHServerName otherwise)
LockedHandle< T > getShared_i(const std::string &id) const
boost::regex m_includeTypeRegex
StatusCode getTHists_i(const std::string &name, TList &) const
Get TList of registered histograms.
std::string typeinfoName(const std::type_info &ti)
Convert a type_info to a demangled string.
boost::regex m_fastPublicationIncludeNameRegex
::StatusCode StatusCode
StatusCode definition for legacy code.
static void reset_histogram_mutex()
Reset (disable) histogram mutex.
virtual StatusCode getHist(const std::string &id, TH1 *&hist, size_t ind) const override
AthROOTErrorHandlerSvc * svc
LockedHandle< T > regShared_i(const std::string &id, std::unique_ptr< T > hist)
StatusCode regHist_i(std::unique_ptr< T > hist, const std::string &name, bool shared, THistID *&phid)
void monitoringTask(int, int, std::atomic< bool > &, boost::regex)
The actual publication Task.
boost::regex m_PublicationIncludeNameRegex
Gaudi::Property< std::string > m_excludeName
boost::regex m_excludeNameRegex
boost::regex m_excludeTypeRegex
virtual std::vector< std::string > getHists() const override
static const std::string & type()
Incident type.
virtual StatusCode deReg(TObject *obj) override
const AccessorWrapper< T > * accessor(xAOD::JetAttribute::AttributeID id)
Returns an attribute accessor corresponding to an AttributeID.
std::atomic< bool > m_histoMapUpdated
Flag to indicate when the histogram map is updated.
virtual StatusCode regShared(const std::string &, std::unique_ptr< TH1 >, LockedHandle< TH1 > &) override
tbb::concurrent_hash_map< std::string, THistID > m_hists
Map of the registered histograms.
virtual StatusCode getTHists(TDirectory *td, TList &, bool recurse=false) const override
Gaudi::Property< std::string > m_excludeType
StatusCode WebdaqHistSvc::initialize ATLAS_NOT_THREAD_SAFE()
Install fatal handler with default options.
std::string getenv(const std::string &variableName)
get an environment variable
#define ATH_MSG_WARNING(x)
const boost::regex ref(r_ef)
std::atomic< bool > m_histoMapUpdatedFast
Flag to indicate when the histogram map is updated for the fast publication
Gaudi::Property< int > m_intervalSecondsFast
void syncPublish(long int, boost::posix_time::ptime)
Sync the publication to a multiple of the interval.
boost::regex m_includeNameRegex
WebdaqHistSvc(const std::string &name, ISvcLocator *svc)
static void set_histogram_mutex(std::mutex &mutex)
Set mutex to be used in oh_lock_histogram.
Gaudi::Property< int > m_numSlots
Define macros for attributes used to control the static checker.
void conditionedSleep(std::chrono::milliseconds, const std::atomic< bool > &)
Sleep for a duration or until the stop flag is set.
Gaudi::Property< int > m_numSlotsFast
std::string m_partition
The partition to publish to.
bool isObjectAllowed(const std::string &path, const TObject *o) const
Does the histogram follow the naming rules ?