ATLAS Offline Software
Loading...
Searching...
No Matches
TrigMessageSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4#include "TrigMessageSvc.h"
5#include "GaudiKernel/IAppMgrUI.h"
6#include "GaudiKernel/IIncidentSvc.h"
7#include "GaudiKernel/ITHistSvc.h"
8#include "GaudiKernel/Kernel.h"
9#include "GaudiKernel/Message.h"
10#include "GaudiKernel/StatusCode.h"
11#include "GaudiKernel/System.h"
15
16#include "ers/ers.h"
17
18#include <fstream>
19#include <iostream>
20#include <sstream>
21
22// Declare ERS issue type
23ERS_DECLARE_ISSUE(ers, HLTMessage, , )
24
25static const std::string levelNames[MSG::NUM_LEVELS] = {"NIL", "VERBOSE", "DEBUG", "INFO",
26 "WARNING", "ERROR", "FATAL", "ALWAYS"};
27
28namespace {
30 size_t msgHash(const Message& msg)
31 {
32 std::string s = msg.getSource() + msg.getMessage();
33 s.erase(std::remove_if(s.begin(), s.end(), [](char c) { return std::isdigit(c); }), s.end());
34 return std::hash<std::string>()(s);
35 }
36} // namespace
37
38TrigMessageSvc::TrigMessageSvc(const std::string& name, ISvcLocator* svcloc) :
39 base_class(name, svcloc)
40{
41 m_outputLevel.declareUpdateHandler([svcloc](Gaudi::Details::PropertyBase&) {
42 SmartIF<IAppMgrUI> app = svcloc;
43 if (app) app->outputLevelUpdate();
44 });
45
46 for (int ic = 0; ic < MSG::NUM_LEVELS; ++ic) {
47 m_msgLimit[ic].declareUpdateHandler(&TrigMessageSvc::setupLimits, this);
48 m_thresholdProp[ic].declareUpdateHandler(&TrigMessageSvc::setupThreshold, this);
49 }
50
51 std::fill(std::begin(m_msgCount), std::end(m_msgCount), 0);
52}
53
55{
56 StatusCode sc = Service::initialize();
57 if (sc.isFailure()) return sc;
58
59 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
60 sc = incSvc.retrieve();
61 if (sc.isFailure()) {
62 reportMessage(name(), MSG::WARNING, "Cannot find IncidentSvc");
63 }
64 else {
65 incSvc->addListener(this, AthenaInterprocess::UpdateAfterFork::type());
66 }
67
69
70 if (m_color) {
71 std::cout << "TrigMessageSvc WARNING: Colors are not supported by TrigMessageSvc" << std::endl;
72 }
73 return StatusCode::SUCCESS;
74}
75
77{
78 m_state = Gaudi::StateMachine::OFFLINE;
79 StatusCode sc = initialize();
80 if ( sc.isSuccess() ) m_state = Gaudi::StateMachine::INITIALIZED;
81 return sc;
82}
83
85{
87 m_doPublish = true;
89 return StatusCode::SUCCESS;
90}
91
93{
94 // Disable asynchronous reporting again
95 if (m_asyncReporting) {
96 m_messageActionsQueue.emplace( [this]() {m_asyncReporting = false;} );
97 reportMessage(name(), MSG::INFO, "Disabling asynchronous message reporting");
98 m_thread.join();
99 }
100
101 m_doPublish = false;
103 return StatusCode::SUCCESS;
104}
105
106void TrigMessageSvc::handle(const Incident& incident)
107{
108 if (incident.type() == AthenaInterprocess::UpdateAfterFork::type()) {
109 reportMessage(name(), MSG::INFO, "Enabling asynchronous message reporting");
110 m_thread = std::thread( &TrigMessageSvc::asyncReporting, this);
111 }
112}
113
115{
116 ServiceHandle<ITHistSvc> histSvc("THistSvc", name());
117 if ( histSvc.retrieve().isFailure() ) {
118 reportMessage(name(), MSG::WARNING, "Cannot find THistSvc. Message stats will not be published.");
119 m_doPublish = false;
120 return;
121 }
122
123 // monitoring information root directory
124 const std::string path = "/EXPERT/HLTFramework/" + name() + "/";
125 const int nLevelBins = MSG::NUM_LEVELS - m_publishLevel;
126 m_msgCountHist = new TH1I("MessageCount", "Messages while RUNNING;Severity;Count",
127 nLevelBins, 0, nLevelBins);
128
129 const int nSrcBins = 1;
130 m_msgCountSrcHist = new TH2I("MessageCountBySource", "Messages while RUNNING;Severity;Source",
131 nLevelBins, 0, nLevelBins, nSrcBins, 0, nSrcBins);
132
133 for (int i=m_publishLevel; i<MSG::NUM_LEVELS; i++) {
134 m_msgCountHist->GetXaxis()->SetBinLabel(i-m_publishLevel+1, levelNames[i].c_str());
135 m_msgCountSrcHist->GetXaxis()->SetBinLabel(i-m_publishLevel+1, levelNames[i].c_str());
136 }
137
138 if ( histSvc->regHist(path + m_msgCountHist->GetName(), m_msgCountHist).isFailure() ) {
139 reportMessage(name(), MSG::WARNING, "Cannot register monitoring histogram 'MessageCount'");
140 }
141 if ( histSvc->regHist(path + m_msgCountSrcHist->GetName(), m_msgCountSrcHist).isFailure() ) {
142 reportMessage(name(), MSG::WARNING, "Cannot register monitoring histogram 'MessageCountBySource'");
143 }
144}
145
146
147void TrigMessageSvc::setupLimits(Gaudi::Details::PropertyBase& prop)
148{
149 // Just report problems in the settings of the limits and unknown limit
150 // parameters
151 if (prop.name() == "alwaysLimit") {
152 Gaudi::Property<int>* p = dynamic_cast<Gaudi::Property<int>*>(&prop);
153 if (p && p->value() != 0) {
154 std::cout << "TrigMessageSvc ERROR: cannot suppress ALWAYS messages" << std::endl;
155 p->setValue(0);
156 }
157 }
158 else if (prop.name() == "defaultLimit") {
159 for (int i = MSG::VERBOSE; i < MSG::NUM_LEVELS; ++i) {
160 if (i != MSG::ALWAYS) {
161 m_msgLimit[i] = m_msgLimit[MSG::NIL].value();
162 }
163 }
164 }
165 else if (prop.name() != "fatalLimit" && prop.name() != "errorLimit" &&
166 prop.name() != "warningLimit" && prop.name() == "infoLimit" &&
167 prop.name() == "debugLimit" && prop.name() == "verboseLimit") {
168 std::cout << "TrigMessageSvc ERROR: Unknown message limit parameter: " << prop.name()
169 << std::endl;
170 return;
171 }
172}
173
174void TrigMessageSvc::setupThreshold(Gaudi::Details::PropertyBase& prop)
175{
176 static const std::array<std::pair<const char*, MSG::Level>, 7> tbl{{{"setFatal", MSG::FATAL},
177 {"setError", MSG::ERROR},
178 {"setWarning", MSG::WARNING},
179 {"setInfo", MSG::INFO},
180 {"setDebug", MSG::DEBUG},
181 {"setVerbose", MSG::VERBOSE},
182 {"setAlways", MSG::ALWAYS}}};
183
184 auto i = std::find_if(
185 std::begin(tbl), std::end(tbl),
186 [&](const std::pair<const char*, MSG::Level>& t) { return prop.name() == t.first; });
187 if (i == std::end(tbl)) {
188 std::cerr << "TrigMessageSvc ERROR: Unknown message threshold parameter: " << prop.name()
189 << std::endl;
190 return;
191 }
192 int ic = i->second;
193
194 Gaudi::Property<std::vector<std::string>>* sap =
195 dynamic_cast<Gaudi::Property<std::vector<std::string>>*>(&prop);
196 if (!sap) {
197 std::cerr << "could not dcast " << prop.name()
198 << " to a Gaudi::Property<std::vector<std::string>> (which it "
199 "should be!)"
200 << std::endl;
201 }
202 else {
203 for (auto& i : sap->value()) setOutputLevel(i, ic);
204 }
205}
206
208{
209 m_suppress = false;
210 std::ostringstream os;
211
212 if (m_stats) {
213 os << "Summarizing all message counts"
214 << " (severity >= " << levelNames[m_statLevel] << ")" << std::endl;
215 }
216 else {
217 os << "Listing sources of suppressed message: " << std::endl;
218 }
219
220 os << "=====================================================" << std::endl;
221 os << " Message Source | Level | Count" << std::endl;
222 os << "-----------------------------+---------+-------------" << std::endl;
223
224 bool found(false);
225
226 for (auto itr = m_sourceMap.begin(); itr != m_sourceMap.end(); ++itr) {
227 for (unsigned int ic = 0; ic < MSG::NUM_LEVELS; ++ic) {
228 if ((m_suppress && itr->second.msg[ic] >= abs(m_msgLimit[ic]) && m_msgLimit[ic] != 0) ||
229 (m_stats && itr->second.msg[ic] > 0 && ic >= m_statLevel.value())) {
230 os << " ";
231 os.width(28);
232 os.setf(std::ios_base::left, std::ios_base::adjustfield);
233 os << itr->first;
234 os << "|";
235
236 os.width(8);
237 os.setf(std::ios_base::right, std::ios_base::adjustfield);
238 os << levelNames[ic];
239 os << " |";
240
241 os.width(9);
242 os << itr->second.msg[ic];
243 os << std::endl;
244
245 found = true;
246 }
247 }
248 }
249 os << "=====================================================" << std::endl;
250 if (found || m_stats) {
251 reportMessage(name(), MSG::INFO, os.str());
252 }
253
254 return StatusCode::SUCCESS;
255}
256
258{
259 if (m_asyncReporting) {
260 // msg has to be copied as the reference may become invalid by the time it is used
261 m_messageActionsQueue.emplace([this, m=Message(msg), outputLevel]() {
262 this->i_reportMessage(m, outputLevel); });
263 }
264 else {
266 }
267}
268
270{
271 m_asyncReporting = true;
272 std::function<void()> action;
273 while ( m_asyncReporting || !m_messageActionsQueue.empty() ) {
274 m_messageActionsQueue.pop(action);
275 action();
276 }
277}
278
279
284{
285 const int key = msg.getType();
286 ++m_msgCount[key];
287
288 const Message* cmsg = &msg;
289 bool doPrint = true;
290 std::unique_ptr<Message> newMessage;
291
292 if (m_doSuppress || m_stats.value()) {
293
294 // Increase the counter of 'key' type of messages for the source and
295 // get the new value.
296 int nmsg = ++(m_sourceMap[msg.getSource()].msg[key]);
297
298 const int msgLimit = m_msgLimit[key].value();
299 if (m_doSuppress) {
300 if (msgLimit > 0) { // regular suppression
301 if (nmsg > msgLimit) doPrint = false;
302 if (nmsg == msgLimit) {
303 std::string txt = levelNames[key] + " message limit (" + std::to_string(msgLimit) +
304 ") reached for " + msg.getSource() + ". Suppressing further output.";
305 newMessage = std::make_unique<Message>(msg.getSource(), MSG::WARNING, std::move(txt));
306 cmsg = newMessage.get();
307 }
308 }
309 }
310 else if (msgLimit < 0) { // logarithmic suppression
311 // Calculate message hash
312 const unsigned int mh = msgHash(*cmsg);
313 // Check if we saw this message already and increase counter
314 auto m = m_msgHashCount.find(mh);
315 if (m != m_msgHashCount.end()) {
316 nmsg = ++m->second;
317 }
318 else {
319 nmsg = m_msgHashCount[mh] = 1;
320 }
321 if (nmsg == abs(msgLimit)) {
322 std::ostringstream os;
323 os << msg.getMessage() << " [Message limit (" << abs(msgLimit)
324 << ") reached. Log-suppression of further output.]";
325 newMessage = std::make_unique<Message>(msg.getSource(), msg.getType(), os.str());
326 cmsg = newMessage.get();
327 }
328 else if (nmsg > abs(msgLimit)) {
329 const int everyNth = (int)exp10((int)log10(nmsg));
330 if ((nmsg % everyNth) == 0) {
331 std::ostringstream os;
332 os << msg.getMessage() << " [suppressed " << everyNth << " similar messages]";
333 newMessage = std::make_unique<Message>(msg.getSource(), msg.getType(), os.str());
334 cmsg = newMessage.get();
335 }
336 }
337 }
338 }
339
340 // Print the message
341 if (doPrint && key >= outputLevel) {
342 if (m_eventIDLevel != MSG::NIL && key >= static_cast<int>(m_eventIDLevel)) {
343 cmsg->setFormat(m_defaultFormat + " %E");
344 }
345 else {
346 cmsg->setFormat(m_defaultFormat);
347 }
348 cmsg->setTimeFormat(m_defaultTimeFormat);
349 (*m_defaultStream) << *cmsg << std::endl << std::flush;
350
351 // ERS forwarding
352 if (passErsFilter(cmsg->getSource(), m_useERS[key]) && passErsLimit(*cmsg)) {
353 i_reportERS(*cmsg);
354 }
355 }
356
357 // Publish message statistics if enabled and only while RUNNING
358 if ( m_doPublish && key>=static_cast<int>(m_publishLevel) ) {
359 m_msgCountHist->Fill(key-m_publishLevel, 1);
360 if (ATH_UNLIKELY(m_msgCountSrcHist->GetYaxis()->FindFixBin(msg.getSource().c_str())<0)) {
361 // Adding bins on the fly needs to be protected by mutex
363 m_msgCountSrcHist->Fill(key-m_publishLevel, msg.getSource().c_str(), 1);
364 m_msgCountSrcHist->LabelsDeflate("Y");
365 }
366 else {
367 m_msgCountSrcHist->Fill(key-m_publishLevel, msg.getSource().c_str(), 1);
368 }
369 }
370}
371
375void TrigMessageSvc::i_reportERS(const Message& msg) const
376{
377 /*
378 * Create ERS context object
379 *
380 * The (cross-node) MRS throttling is based on filename+line_number, i.e.
381 * ignoring the message text itself. We therefor use the message source as
382 * filename and the message hash as line_number. That way the same message
383 * from different nodes gets properly throttled by MRS.
384 */
385 const char* filename = msg.getSource().c_str();
386 const char* function_name = "";
387 const int line_number = msgHash(msg);
388 const char* package_name = "HLT";
389 ers::LocalContext hlt_context_info(package_name, filename, line_number, function_name);
390
391 // Create ERS issue object
392 Message m(msg);
393 if (m_eventIDLevel != MSG::NIL && msg.getType() >= static_cast<int>(m_eventIDLevel)) {
394 m.setFormat(m_ersFormat + " %E");
395 }
396 else {
397 m.setFormat(m_ersFormat);
398 }
399 std::ostringstream oss;
400 oss << m;
401 ers::HLTMessage ersMsg(hlt_context_info, oss.str());
402 ersMsg.add_qualifier("HLT"); // useful for filtering
403
404 // Forward Message to ERS
405 switch (msg.getType()) {
406 case MSG::NIL: break;
407 case MSG::VERBOSE: ers::debug(ersMsg, 2); break;
408 case MSG::DEBUG: ers::debug(ersMsg, 1); break;
409 case MSG::INFO: ers::info(ersMsg); break;
410 case MSG::WARNING: ers::warning(ersMsg); break;
411 case MSG::ERROR: ers::error(ersMsg); break;
412 case MSG::FATAL: ers::fatal(ersMsg); break;
413 default:
414 std::ostringstream oss;
415 oss << "Unknown message severity level: " << msg.getType() << " Original message was: " << m;
416 ers::error(ers::HLTMessage(ERS_HERE, oss.str()));
417 }
418}
419
421{
422 reportMessage(msg, outputLevel(msg.getSource()));
423}
424
425void TrigMessageSvc::reportMessage(std::string source, int type, std::string message)
426{
427 reportMessage(Message{std::move(source), type, std::move(message)});
428}
429
431{
432 return m_outputLevel;
433}
434
435int TrigMessageSvc::outputLevel(std::string_view source) const
436{
437 std::unique_lock<std::recursive_mutex> lock(m_thresholdMapMutex);
438 auto it = m_thresholdMap.find(source);
439 return it != m_thresholdMap.end() ? it->second : m_outputLevel.value();
440}
441
443{
444 m_outputLevel = new_level;
445}
446
447void TrigMessageSvc::setOutputLevel(std::string_view source, int level)
448{
449 std::unique_lock<std::recursive_mutex> lock(m_thresholdMapMutex);
450
451 // only write if we really have to...
452 auto i = m_thresholdMap.find( source );
453 if ( i == m_thresholdMap.end() ) {
454 m_thresholdMap.emplace( source, level );
455 } else if ( i->second != level ) {
456 i->second = level;
457 }
458}
459
460int TrigMessageSvc::messageCount(MSG::Level level) const
461{
462 return m_msgCount[level];
463}
464
465bool TrigMessageSvc::passErsFilter(const std::string& source,
466 const std::vector<std::string>& filter) const
467{
468 if (filter.empty()) return false; // forward none
469 auto it = filter.begin();
470 if (filter.size() == 1 && (*it) == "*") return true; // forward all
471
472 bool pass(false);
473 for (; it != filter.end(); ++it) {
474 if ((*it) == "*") pass = true; // forward except if there is a veto later
475 if (source == (*it)) return true; // forward specific source
476 if ("!" + source == (*it)) return false; // veto specific source
477 }
478 return pass;
479}
480
482{
483 if (m_ersEventLimit < 0) return true;
484
485 const EventContext::ContextID_t slot = msg.getEventSlot();
486 const EventContext::ContextEvt_t evt = msg.getEventNumber();
487 // get or create message statistics for this slot
488 auto [itr, inserted] = m_slotMsgCount.insert( {slot, {evt, MsgAry()} } );
489
490 // if new event in slot then reset counters
491 if ( itr->second.first != evt ) {
492 itr->second = {evt, MsgAry()};
493 }
494
495 // increment number of messages for this level and slot
496 const int N = ++itr->second.second.msg[msg.getType()];
497
498 return N <= m_ersEventLimit;
499}
#define ATH_UNLIKELY(x)
static const std::string levelNames[MSG::NUM_LEVELS]
static Double_t sc
OH histogram lock header file.
static const std::string & type()
Incident type.
Definition Incidents.h:49
bool passErsFilter(const std::string &source, const std::vector< std::string > &filter) const
Gaudi::Property< std::string > m_ersFormat
tbb::concurrent_bounded_queue< std::function< void()> > m_messageActionsQueue
void setupLimits(Gaudi::Details::PropertyBase &prop)
virtual int outputLevel() const override
virtual StatusCode finalize() override
void i_reportMessage(const Message &msg, int outputLevel)
Internal implementation of reportMessage(const Message&,int) without lock.
bool passErsLimit(const Message &msg)
std::array< Gaudi::Property< int >, MSG::NUM_LEVELS > m_msgLimit
virtual StatusCode start() override
Gaudi::Property< std::string > m_defaultTimeFormat
std::map< std::string, MsgAry > m_sourceMap
counts per source
std::map< size_t, unsigned int > m_msgHashCount
counts per message hash
std::array< Gaudi::Property< std::vector< std::string > >, MSG::NUM_LEVELS > m_thresholdProp
std::unordered_map< EventContext::ContextID_t, std::pair< EventContext::ContextEvt_t, MsgAry > > m_slotMsgCount
counts per slot and level
std::array< Gaudi::Property< std::vector< std::string > >, MSG::NUM_LEVELS > m_useERS
Special properties to control output to ERS of individual sources.
Gaudi::Property< bool > m_color
std::recursive_mutex m_thresholdMapMutex
virtual void reportMessage(const Message &message) override
TH1I * m_msgCountHist
Message counting per level histogram.
void i_reportERS(const Message &msg) const
Report message to online messaging system (ERS)
bool m_doPublish
are we publishing message statistics?
Gaudi::Property< std::string > m_defaultFormat
ThresholdMap m_thresholdMap
Output level threshold map.
void setupThreshold(Gaudi::Details::PropertyBase &prop)
Gaudi::Property< bool > m_stats
virtual StatusCode stop() override
Gaudi::Property< bool > m_suppressRunningOnly
virtual int messageCount(MSG::Level logLevel) const override
bool m_doSuppress
is suppression currently enabled?
TH2I * m_msgCountSrcHist
Message counting per message source.
Gaudi::Property< int > m_ersEventLimit
Gaudi::Property< unsigned int > m_statLevel
virtual void setOutputLevel(int new_level) override
std::array< int, MSG::NUM_LEVELS > m_msgCount
counts per level
virtual StatusCode reinitialize() override
virtual void handle(const Incident &incident) override
Gaudi::Property< unsigned int > m_eventIDLevel
Gaudi::Property< bool > m_suppress
std::thread m_thread
Thread for asynchronous reporting.
Gaudi::Property< unsigned int > m_publishLevel
virtual StatusCode initialize() override
TrigMessageSvc(const std::string &name, ISvcLocator *svcloc)
Scoped lock to be used for threaded histogram operations.
static char * package_name
Definition cmdline.cxx:754
Definition MsgLevel.h:28
STL namespace.
DataModel_detail::iterator< DVL > remove_if(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end, Predicate pred)
Specialization of remove_if for DataVector/List.
Private helper class to keep the count of messages of a type (MSG::LEVEL).
MsgStream & msg
Definition testRead.cxx:32
ERS_DECLARE_ISSUE(offline_EventStorage_v5, CompressionIssue, ERS_EMPTY, ERS_EMPTY) ERS_DECLARE_ISSUE_BASE(offline_EventStorage_v5