ATLAS Offline Software
Loading...
Searching...
No Matches
TrigCostSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
8
9#include "TrigCostSvc.h"
10
11#include <mutex> // For std::unique_lock
12
14
15TrigCostSvc::TrigCostSvc(const std::string& name, ISvcLocator* pSvcLocator) :
16base_class(name, pSvcLocator), // base_class = AthService
26{
27 ATH_MSG_DEBUG("TrigCostSvc regular constructor");
28}
29
30// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
31
33 // delete[] m_eventMonitored;
34 ATH_MSG_DEBUG("TrigCostSvc destructor()");
35}
36
37// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
38
39
41 ATH_MSG_DEBUG("TrigCostSvc initialize()");
43 // TODO Remove this when the configuration is correctly propagated in config-then-run jobs
44 if (!m_eventSlots) {
45 ATH_MSG_WARNING("numConcurrentEvents() == 0. This is a misconfiguration, probably coming from running from pickle. "
46 "Setting local m_eventSlots to a 'large' number until this is fixed to allow the job to proceed.");
47 m_eventSlots = 100;
48 }
49 ATH_MSG_INFO("Initializing TrigCostSvc with " << m_eventSlots << " event slots");
50
51 // We cannot have a vector here as atomics are not movable nor copyable. Unique heap arrays are supported by C++
52 m_eventMonitored = std::make_unique< std::atomic<bool>[] >( m_eventSlots );
53 m_slotMutex = std::make_unique< std::shared_mutex[] >( m_eventSlots );
54
55 for (size_t i = 0; i < m_eventSlots; ++i) m_eventMonitored[i] = false;
56
59 ATH_CHECK(m_rosData.initialize(m_eventSlots));
60
61 return StatusCode::SUCCESS;
62}
63
64// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
65
67 ATH_MSG_DEBUG("TrigCostSvc finalize()");
68 if (m_saveHashes) {
70 ATH_MSG_INFO("Calling hashes2file, saving dump of job's HLT hashing dictionary to disk.");
71 }
72 return StatusCode::SUCCESS;
73}
74
75// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
76
77StatusCode TrigCostSvc::startEvent(const EventContext& context, const bool enableMonitoring) {
78 const bool monitoredEvent = (enableMonitoring || m_monitorAllEvents);
79 ATH_CHECK(checkSlot(context));
80
81 m_eventMonitored[ context.slot() ] = false;
82
83 {
84 // "clear" is a whole table operation, we need it all to ourselves
85 std::unique_lock lockUnique( m_slotMutex[ context.slot() ] );
86 if (monitoredEvent) {
87 // Empty transient thread-safe stores in preparation for recording this event's cost data
88 ATH_CHECK(m_algStartInfo.clear(context, msg()));
89 ATH_CHECK(m_algStopTime.clear(context, msg()));
90 ATH_CHECK(m_rosData.clear(context, msg()));
91 }
92
93 // Enable collection of data in this slot for monitoredEvents
94 m_eventMonitored[ context.slot() ] = monitoredEvent;
95 }
96
97 // As we missed the AuditType::Before of the TrigCostSupervisorAlg (which is calling this TrigCostSvc::startEvent), let's add it now.
98 // This will be our canonical initial timestamps for measuring this event. Similar will be done for DecisionSummaryMakerAlg at the end
99 ATH_CHECK(processAlg(context, m_costSupervisorAlgName, AuditType::Before));
100
101 return StatusCode::SUCCESS;
102}
103
104// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
105
106StatusCode TrigCostSvc::processAlg(const EventContext& context, const std::string& caller, const AuditType type) {
107 ATH_CHECK(checkSlot(context));
108
109 TrigTimeStamp now;
110
111 // Do per-event within-slot monitoring
112 if (m_eventMonitored[ context.slot() ]) {
113 // Multiple simultaneous calls allowed here, adding their data to the concurrent map.
114 std::shared_lock lockShared( m_slotMutex[ context.slot() ] );
115
117 ATH_CHECK( ai.isValid() );
118
119 ATH_CHECK(monitor(context, ai, now, type));
120
121 ATH_MSG_VERBOSE("Caller '" << caller << "', '" << ai.m_store << "', slot:" << context.slot() << " "
122 << (type == AuditType::Before ? "BEGAN" : "ENDED") << " at " << now.microsecondsSinceEpoch());
123 }
124
125 // MultiSlot mode: do per-event monitoring of all slots, but saving the data within the master-slot
126 if (m_enableMultiSlot && context.slot() != m_masterSlot && m_eventMonitored[ m_masterSlot ]) {
127 std::shared_lock lockShared( m_slotMutex[ m_masterSlot ] );
128
129 // Note: we override the storage location of these data from all other slots to be saved in the MasterSlot
131 ATH_CHECK( ai.isValid() );
132
133 ATH_CHECK(monitor(context, ai, now, type));
134 }
135
136 return StatusCode::SUCCESS;
137}
138
139// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
140
141StatusCode TrigCostSvc::monitor(const EventContext& context, const AlgorithmIdentifier& ai, const TrigTimeStamp& now, const AuditType type) {
142
143 if (type == AuditType::Before) {
144
146 now,
147 std::this_thread::get_id(),
148 getROIID(context),
149 static_cast<uint32_t>(context.slot())
150 };
151 ATH_CHECK( m_algStartInfo.insert(ai, ap, msg()) );
152
153 // Cache the AlgorithmIdentifier which has just started executing on this thread
154 if (ai.m_realSlot == ai.m_slotToSaveInto) {
155 tbb::concurrent_hash_map<std::thread::id, AlgorithmIdentifier, ThreadHashCompare>::accessor acc;
156 m_threadToAlgMap.insert(acc, ap.m_algThreadID);
157 acc->second = ai;
158 }
159
160 } else if (type == AuditType::After) {
161
162 ATH_CHECK( m_algStopTime.insert(ai, now, msg()) );
163
164 } else {
165
166 ATH_MSG_ERROR("Only expecting AuditType::Before or AuditType::After");
167 return StatusCode::FAILURE;
168
169 }
170
171 return StatusCode::SUCCESS;
172}
173
174
175// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
176
177StatusCode TrigCostSvc::monitorROS(const EventContext& context, robmonitor::ROBDataMonitorStruct payload){
178 ATH_CHECK(checkSlot(context));
179 ATH_MSG_DEBUG( "Received ROB payload " << payload );
180
181 // Associate payload with an algorithm
182 AlgorithmIdentifier theAlg;
183 {
184 tbb::concurrent_hash_map<std::thread::id, AlgorithmIdentifier, ThreadHashCompare>::const_accessor acc;
185 bool result = m_threadToAlgMap.find(acc, std::this_thread::get_id());
186 //checking the return type 'result' is sufficient to know whether acc is bound
187 if (!result){
188 ATH_MSG_WARNING( "Cannot find algorithm on this thread (id=" << std::this_thread::get_id() << "). Request "<< payload <<" won't be monitored");
189 return StatusCode::SUCCESS;
190 }
191 //coverity[FORWARD_NULL:FALSE]
192 theAlg = acc->second;
193 }
194
195 // Record data in TrigCostDataStore
196 ATH_MSG_DEBUG( "Adding ROBs from" << payload.requestor_name << " to " << theAlg.m_hash );
197 {
198 std::shared_lock lockShared( m_slotMutex[ context.slot() ] );
199 ATH_CHECK( m_rosData.push_back(theAlg, std::move(payload), msg()) );
200 }
201
202 return StatusCode::SUCCESS;
203}
204
205
206// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
207
208StatusCode TrigCostSvc::endEvent(const EventContext& context, SG::WriteHandle<xAOD::TrigCompositeContainer>& costOutputHandle, SG::WriteHandle<xAOD::TrigCompositeContainer>& rosOutputHandle) {
209 ATH_CHECK(checkSlot(context));
210 if (m_eventMonitored[ context.slot() ] == false) {
211 // This event was not monitored - nothing to do.
212 ATH_MSG_DEBUG("Not a monitored event.");
213 return StatusCode::SUCCESS;
214 }
215
216 // As we will miss the AuditType::After of the TrigCostFinalizeAlg (which is calling this TrigCostSvc::endEvent), let's add it now.
217 // This will be our canonical final timestamps for measuring this event. Similar was done for HLTSeeding at the start
218 ATH_CHECK(processAlg(context, m_costFinalizeAlgName, AuditType::After));
219
220 // Reset eventMonitored flags
221 m_eventMonitored[ context.slot() ] = false;
222
223 // Now that this atomic is set to FALSE, additional algs in this instance which trigger this service will
224 // not be able to call TrigCostSvc::monitor
225
226 // ... but processAlg might already be running in other threads...
227 // Wait to obtain an exclusive lock.
228 std::unique_lock lockUnique( m_slotMutex[ context.slot() ] );
229
230 // we can now perform whole-map inspection of this event's TrigCostDataStores without the danger that it will be changed further
231
232 // Let's start by getting the global STOP time we just wrote
233 uint64_t eventStopTime = 0;
234 {
236 ATH_CHECK( myAi.isValid() );
237 tbb::concurrent_hash_map<AlgorithmIdentifier, TrigTimeStamp, AlgorithmIdentifierHashCompare>::const_accessor stopTimeAcessor;
238 if (m_algStopTime.retrieve(myAi, stopTimeAcessor, msg()).isFailure()) {
239 ATH_MSG_ERROR("No end time for '" << myAi.m_caller << "', '" << myAi.m_store << "'"); // Error as we JUST entered this info!
240 } else { // retrieve was a success
241 //coverity[FORWARD_NULL:FALSE]
242 eventStopTime = stopTimeAcessor->second.microsecondsSinceEpoch();
243 }
244 }
245
246 // And the global START time for the event
247 uint64_t eventStartTime = 0;
248 {
250 ATH_CHECK( hltSeedingAi.isValid() );
251 tbb::concurrent_hash_map<AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_accessor startAcessor;
252 if (m_algStartInfo.retrieve(hltSeedingAi, startAcessor, msg()).isFailure()) {
253 ATH_MSG_ERROR("No alg info for '" << hltSeedingAi.m_caller << "', '" << hltSeedingAi.m_store << "'"); // Error as we know this info must be present
254 } else { // retrieve was a success
255 //coverity[FORWARD_NULL:FALSE]
256 eventStartTime = startAcessor->second.m_algStartTime.microsecondsSinceEpoch();
257 }
258 }
259
260 // Read payloads. Write to persistent format
261 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator beginIt;
262 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator endIt;
263 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator it;
264 ATH_CHECK(m_algStartInfo.getIterators(context, msg(), beginIt, endIt));
265
266 ATH_MSG_DEBUG("Monitored event with " << std::distance(beginIt, endIt) << " AlgorithmPayload objects.");
267
268 std::map<size_t, size_t> aiToHandleIndex;
269 for (it = beginIt; it != endIt; ++it) {
270 const AlgorithmIdentifier& ai = it->first;
271 const AlgorithmPayload& ap = it->second;
272 uint64_t startTime = ap.m_algStartTime.microsecondsSinceEpoch();
273
274 // Can we find the end time for this alg? If not, it is probably still running. Hence we use "now" as the default time.
275 uint64_t stopTime = eventStopTime;
276 {
277 tbb::concurrent_hash_map<AlgorithmIdentifier, TrigTimeStamp, AlgorithmIdentifierHashCompare>::const_accessor stopTimeAcessor;
278 if (m_algStopTime.retrieve(ai, stopTimeAcessor, msg()).isFailure()) {
279 ATH_MSG_DEBUG("No end time for '" << ai.m_caller << "', '" << ai.m_store << "'");
280 } else { // retrieve was a success
281 stopTime = stopTimeAcessor->second.microsecondsSinceEpoch();
282 }
283 // stopTimeAcessor goes out of scope - lock released
284 }
285
286 // It is possible (when in the master-slot) to catch just the END of an Alg's exec from another slot, and then the START of the same
287 // alg executing in the next event in that same other-slot.
288 // This gives us an end time which is before the start time. Disregard these entries.
289 if (startTime > stopTime) {
290 ATH_MSG_VERBOSE("Disregard start-time:" << startTime << " > stop-time:" << stopTime
291 << " for " << TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " in slot " << ap.m_slot << ", this is slot " << context.slot());
292 continue;
293 }
294
295 // Lock the start and stop times to be no later than eventStopTime.
296 // E.g. it's possible for an alg in another slot to start or stop running after 'processAlg(context, m_costFinalizeAlgName, AuditType::After))'
297 // but before 'lockUnique( m_slotMutex[ context.slot() ] )', creating a timestamp after the nominal end point for this event.
298 // If the alg starts afterwards, we disregard it in lieu of setting to have zero walltime.
299 // If the alg stops afterwards, we truncate its stop time to be no later than eventStopTime
300 if (startTime > eventStopTime) {
301 ATH_MSG_VERBOSE("Disregard " << TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " as it started after endEvent() was finished being called" );
302 continue;
303 }
304 if (stopTime > eventStopTime) {
305 ATH_MSG_VERBOSE(TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " stopped after endEvent() was called, but before the cost container was locked,"
306 << " truncating its ending time stamp from " << stopTime << " to " << eventStopTime);
307 stopTime = eventStopTime;
308 }
309
310 // Do the same, locking the start and stop times to be no earlier than eventStartTime
311 // If the alg stops before eventStartTime, we disregard it in lieu of setting it to have zero walltime
312 // If the alg starts before eventStartTime, we truncate its start time to be no later than eventStopTime
313 if (stopTime < eventStartTime) {
314 ATH_MSG_VERBOSE("Disregard " << TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " as it stopped before startEvent() was finished being called" );
315 continue;
316 }
317 if (startTime < eventStartTime) {
318 ATH_MSG_VERBOSE(TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " started just after the cost container was unlocked, but before the HLTSeeding record was written."
319 << " truncating its starting time stamp from " << startTime << " to " << eventStartTime);
320 startTime = eventStartTime;
321 }
322
323 // Make a new TrigComposite to persist monitoring payload for this alg
325 costOutputHandle->push_back( tc );
326 // tc is now owned by storegate and, and has an aux store provided by the TrigCompositeCollection
327
328 const uint32_t threadID = static_cast<uint32_t>( std::hash< std::thread::id >()(ap.m_algThreadID) );
329 uint32_t threadEnumerator = 0;
330 {
331 // We can have multiple slots get here at the same time
332 std::lock_guard<std::mutex> lock(m_globalMutex);
333 const std::unordered_map<uint32_t, uint32_t>::const_iterator mapIt = m_threadToCounterMap.find(threadID);
334 if (mapIt == m_threadToCounterMap.end()) {
335 threadEnumerator = m_threadCounter;
336 m_threadToCounterMap.insert( std::make_pair(threadID, m_threadCounter++) );
337 } else {
338 threadEnumerator = mapIt->second;
339 }
340 }
341
342 bool result = true;
343 result &= tc->setDetail("alg", ai.callerHash(msg()));
344 result &= tc->setDetail("store", ai.storeHash(msg()));
345 result &= tc->setDetail("view", ai.m_viewID);
346 result &= tc->setDetail("thread", threadEnumerator);
347 result &= tc->setDetail("thash", threadID);
348 result &= tc->setDetail("slot", ap.m_slot);
349 result &= tc->setDetail("roi", ap.m_algROIID);
350 result &= tc->setDetail("start", startTime);
351 result &= tc->setDetail("stop", stopTime);
352 if (!result) ATH_MSG_WARNING("Failed to append one or more details to trigger cost TC");
353
354 aiToHandleIndex[ai.m_hash] = costOutputHandle->size() - 1;
355 }
356
357 typedef tbb::concurrent_hash_map< AlgorithmIdentifier, std::vector<robmonitor::ROBDataMonitorStruct>, AlgorithmIdentifierHashCompare>::const_iterator ROBConstIt;
358 ROBConstIt beginRob;
359 ROBConstIt endRob;
360
361 ATH_CHECK(m_rosData.getIterators(context, msg(), beginRob, endRob));
362
363 for (ROBConstIt it = beginRob; it != endRob; ++it) {
364 size_t aiHash = it->first.m_hash;
365
366 if (aiToHandleIndex.count(aiHash) == 0) {
367 ATH_MSG_WARNING("Algorithm with hash " << aiHash << " not found!");
368 }
369
370 // Save ROB data via TrigComposite
371 for (const robmonitor::ROBDataMonitorStruct& robData : it->second) {
373 rosOutputHandle->push_back(tc);
374
375 // Retrieve ROB requests data into primitives vectors
376 std::vector<uint32_t> robs_id;
377 std::vector<uint32_t> robs_size;
378 std::vector<unsigned> robs_history;
379 std::vector<unsigned short> robs_status;
380
381 robs_id.reserve(robData.requested_ROBs.size());
382 robs_size.reserve(robData.requested_ROBs.size());
383 robs_history.reserve(robData.requested_ROBs.size());
384 robs_status.reserve(robData.requested_ROBs.size());
385
386 for (const auto& rob : robData.requested_ROBs) {
387 robs_id.push_back(rob.second.rob_id);
388 robs_size.push_back(rob.second.rob_size);
389 robs_history.push_back(rob.second.rob_history);
390 robs_status.push_back(rob.second.isStatusOk());
391 }
392
393 bool result = true;
394 result &= tc->setDetail("alg_idx", aiToHandleIndex[aiHash]);
395 result &= tc->setDetail("lvl1ID", robData.lvl1ID);
396 result &= tc->setDetail<std::vector<uint32_t>>("robs_id", robs_id);
397 result &= tc->setDetail<std::vector<uint32_t>>("robs_size", robs_size);
398 result &= tc->setDetail<std::vector<unsigned>>("robs_history", robs_history);
399 result &= tc->setDetail<std::vector<unsigned short>>("robs_status", robs_status);
400 result &= tc->setDetail("start", robData.start_time);
401 result &= tc->setDetail("stop", robData.end_time);
402
403 if (!result) ATH_MSG_WARNING("Failed to append one or more details to trigger cost ROS TC");
404 }
405 }
406
407 if (msg().level() <= MSG::VERBOSE) {
408 ATH_MSG_VERBOSE("--- Trig Cost Event Summary ---");
409 for ( const xAOD::TrigComposite* tc : *costOutputHandle ) {
410 ATH_MSG_VERBOSE("Algorithm:'" << TrigConf::HLTUtils::hash2string( tc->getDetail<TrigConf::HLTHash>("alg"), "ALG") << "'");
411 ATH_MSG_VERBOSE(" Store:'" << TrigConf::HLTUtils::hash2string( tc->getDetail<TrigConf::HLTHash>("store"), "STORE") << "'");
412 ATH_MSG_VERBOSE(" View ID:" << tc->getDetail<int16_t>("view"));
413 ATH_MSG_VERBOSE(" Thread #:" << tc->getDetail<uint32_t>("thread") );
414 ATH_MSG_VERBOSE(" Thread ID Hash:" << tc->getDetail<uint32_t>("thash") );
415 ATH_MSG_VERBOSE(" Slot:" << tc->getDetail<uint32_t>("slot") );
416 ATH_MSG_VERBOSE(" RoI ID Hash:" << tc->getDetail<int32_t>("roi") );
417 ATH_MSG_VERBOSE(" Start Time:" << tc->getDetail<uint64_t>("start") << " mu s");
418 ATH_MSG_VERBOSE(" Stop Time:" << tc->getDetail<uint64_t>("stop") << " mu s");
419 }
420 }
421
422 return StatusCode::SUCCESS;
423}
424
425// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
426
427StatusCode TrigCostSvc::generateTimeoutReport(const EventContext& context, std::string& report) {
428
429 ATH_CHECK(checkSlot(context));
430 if (!m_eventMonitored[context.slot()]) {
431 ATH_MSG_DEBUG("Not a monitored event.");
432 report = "";
433 return StatusCode::SUCCESS;
434 }
435
436 std::unique_lock lockUnique(m_slotMutex[context.slot()]);
437
438 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator beginIt;
439 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator endIt;
440 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator it;
441 ATH_CHECK(m_algStartInfo.getIterators(context, msg(), beginIt, endIt));
442
443 // Create map that sorts in descending order
444 std::map<uint64_t, std::string, std::greater<uint64_t>> timeToAlgMap;
445
446 for (it = beginIt; it != endIt; ++it) {
447 const AlgorithmIdentifier& ai = it->first;
448 const AlgorithmPayload& ap = it->second;
449
450 // Don't look at any records from other slots
451 if (ai.m_realSlot != context.slot()) continue;
452
453 uint64_t startTime = ap.m_algStartTime.microsecondsSinceEpoch();
454 uint64_t stopTime = 0;
455 {
456 tbb::concurrent_hash_map<AlgorithmIdentifier, TrigTimeStamp, AlgorithmIdentifierHashCompare>::const_accessor stopTimeAcessor;
457 if (m_algStopTime.retrieve(ai, stopTimeAcessor, msg()).isFailure()) {
458 ATH_MSG_DEBUG("No end time for '" << ai.m_caller << "', '" << ai.m_store << "'");
459 } else { // retrieve was a success
460 //coverity[FORWARD_NULL:FALSE]
461 stopTime = stopTimeAcessor->second.microsecondsSinceEpoch();
462 }
463 // stopTimeAcessor goes out of scope - lock released
464 }
465
466 if (stopTime == 0) continue;
467
468 timeToAlgMap[stopTime-startTime] = ai.m_caller;
469 }
470
471 // Save top 5 times to the report
472 report = "Timeout detected with the following algorithms consuming the most time: ";
473 int algCounter = 0;
474 for(const std::pair<const uint64_t, std::string>& p : timeToAlgMap){
475 // Save time in miliseconds instead of microseconds
476 report += p.second + " (" + std::to_string(std::lround(p.first/1e3)) + " ms)";
477 ++algCounter;
478 if (algCounter >= 5){
479 break;
480 }
481 report += ", ";
482 }
483
484 return StatusCode::SUCCESS;
485}
486
487// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
488
489StatusCode TrigCostSvc::discardEvent(const EventContext& context) {
490
491 if (m_monitorAllEvents) {
492 ATH_MSG_DEBUG("All events are monitored - event will not be discarded");
493 return StatusCode::SUCCESS;
494 }
495
496 ATH_MSG_DEBUG("Cost Event will be discarded");
497 ATH_CHECK(checkSlot(context));
498 {
499 std::unique_lock lockUnique( m_slotMutex[ context.slot() ] );
500
501 // Reset eventMonitored flags
502 m_eventMonitored[ context.slot() ] = false;
503
504 // tables are cleared at the start of the event
505 }
506 return StatusCode::SUCCESS;
507}
508
509// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
510
511StatusCode TrigCostSvc::checkSlot(const EventContext& context) const {
512 if (context.slot() >= m_eventSlots) {
513 ATH_MSG_FATAL("Job is using event slot #" << context.slot() << ", but we only reserved space for: " << m_eventSlots);
514 return StatusCode::FAILURE;
515 }
516 return StatusCode::SUCCESS;
517}
518
519// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
520
521int32_t TrigCostSvc::getROIID(const EventContext& context) {
522 if (Atlas::hasExtendedEventContext(context)) {
524 if (roi) return static_cast<int32_t>(roi->roiId());
525 }
527}
528
529// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
530
531bool TrigCostSvc::isMonitoredEvent(const EventContext& context, const bool includeMultiSlot) const {
532 if (m_eventMonitored[ context.slot() ]) {
533 return true;
534 }
535 if (includeMultiSlot && m_enableMultiSlot) {
537 }
538 return false;
539}
540
541// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
542
543size_t TrigCostSvc::ThreadHashCompare::hash(const std::thread::id& thread) {
544 return static_cast<size_t>( std::hash< std::thread::id >()(thread) );
545}
546
547// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
548
549bool TrigCostSvc::ThreadHashCompare::equal(const std::thread::id& x, const std::thread::id& y) {
550 return (x == y);
551}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
Maintain a set of objects, one per slot.
virtual void lock()=0
Interface to allow an object to lock itself when made const in SG.
static Double_t tc
#define y
#define x
const IRoiDescriptor * roiDescriptor() const
Get cached pointer to View's Region of Interest Descriptor or nullptr if not describing a View.
Describes the API of the Region of Ineterest geometry.
virtual unsigned int roiId() const =0
identifiers
static const std::string hash2string(HLTHash, const std::string &category="TE")
hash function translating identifiers into names (via internal dictionary)
static void hashes2file(const std::string &fileName="hashes2string.txt")
debugging output of internal dictionary
virtual StatusCode processAlg(const EventContext &context, const std::string &caller, const AuditType type) override
Implementation of ITrigCostSvc::processAlg.
Gaudi::Property< bool > m_monitorAllEvents
Gaudi::Property< bool > m_saveHashes
Gaudi::Property< std::string > m_costFinalizeAlgName
std::mutex m_globalMutex
Used to protect all-slot modifications.
TrigCostDataStore< AlgorithmPayload > m_algStartInfo
Thread-safe store of algorithm start payload.
StatusCode checkSlot(const EventContext &context) const
Sanity check that the job is respecting the number of slots which were declared at config time.
TrigCostDataStore< TrigTimeStamp > m_algStopTime
Thread-safe store of algorithm stop times.
Gaudi::Property< bool > m_enableMultiSlot
virtual StatusCode initialize() override
Initialise, create enough storage to store m_eventSlots.
size_t m_eventSlots
Number of concurrent processing slots.
virtual ~TrigCostSvc()
Destructor.
std::unique_ptr< std::shared_mutex[] > m_slotMutex
Used to control and protect whole-table operations.
virtual StatusCode monitorROS(const EventContext &context, robmonitor::ROBDataMonitorStruct payload) override
Implementation of ITrigCostSvc::monitorROS.
virtual bool isMonitoredEvent(const EventContext &context, const bool includeMultiSlot=true) const override
std::unique_ptr< std::atomic< bool >[] > m_eventMonitored
Used to cache if the event in a given slot is being monitored.
virtual StatusCode endEvent(const EventContext &context, SG::WriteHandle< xAOD::TrigCompositeContainer > &costOutputHandle, SG::WriteHandle< xAOD::TrigCompositeContainer > &rosOutputHandle) override
Implementation of ITrigCostSvc::endEvent.
int32_t getROIID(const EventContext &context)
@breif Internal function to return a RoI from an extended event context context
virtual StatusCode discardEvent(const EventContext &context) override
Discard a cost monitored event.
Gaudi::Property< std::string > m_costSupervisorAlgName
TrigCostDataStore< std::vector< robmonitor::ROBDataMonitorStruct > > m_rosData
Thread-safe store of ROS data.
TrigCostSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard ATLAS Service constructor.
Gaudi::Property< size_t > m_masterSlot
virtual StatusCode finalize() override
Finalize, act on m_saveHashes.
virtual StatusCode generateTimeoutReport(const EventContext &context, std::string &report) override
StatusCode monitor(const EventContext &context, const AlgorithmIdentifier &ai, const TrigTimeStamp &now, const AuditType type)
Internal call to save monitoring data for a given AlgorithmIdentifier.
size_t m_threadCounter
Count how many unique thread ID we have seen.
tbb::concurrent_hash_map< std::thread::id, AlgorithmIdentifier, ThreadHashCompare > m_threadToAlgMap
Keeps track of what is running right now in each thread.
std::unordered_map< uint32_t, uint32_t > m_threadToCounterMap
Map thread's hash ID to a counting numeral.
virtual StatusCode startEvent(const EventContext &context, const bool enableMonitoring=true) override
Implementation of ITrigCostSvc::startEvent.
utility class to measure time duration in AthenaMT The pattern when it is useful: AlgA tags the begin...
The structure which is used to monitor the ROB data request in L2 It is created for every addROBData ...
uint64_t start_time
map of ROBs requested
std::map< const uint32_t, robmonitor::ROBDataStruct > requested_ROBs
name of requesting algorithm
uint64_t end_time
start time of ROB request (microsec since epoch)
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
bool hasExtendedEventContext(const EventContext &ctx)
Test whether a context object has an extended context installed.
size_t getNSlots()
Return the number of event slots.
TrigComposite_v1 TrigComposite
Declare the latest version of the class.
Static hash and equal members as required by tbb::concurrent_hash_map.
static AlgorithmIdentifier make(const EventContext &context, const std::string &caller, MsgStream &msg, const int16_t slotOverride=-1)
Construct an AlgorithmIdentifier.
Small structure to hold an algorithm's name and store, plus some details on its EventView.
std::string m_caller
Name of the algorithm.
std::string m_store
Name of the algorithm's store.
TrigConf::HLTHash callerHash(MsgStream &msg) const
size_t m_slotToSaveInto
The slot which is used for the purposes of recording data on this algorithm's execution.
static constexpr int16_t s_noView
Constant value used to express an Algorithm which is not running in a View.
TrigConf::HLTHash storeHash(MsgStream &msg) const
size_t m_realSlot
The actual slot of the algorithm.
size_t m_hash
Hash of algorithm + store + realSlot.
StatusCode isValid() const
int16_t m_viewID
If not within an event view, then the m_iewID = s_noView = -1.
Small structure wrap the various values stored for an algorithm just before it starts to execute.
static bool equal(const std::thread::id &x, const std::thread::id &y)
static size_t hash(const std::thread::id &thread)
MsgStream & msg
Definition testRead.cxx:32