ATLAS Offline Software
Loading...
Searching...
No Matches
TrigCostSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
7
8#include "TrigCostSvc.h"
9
10#include <mutex> // For std::unique_lock
11
13
14TrigCostSvc::TrigCostSvc(const std::string& name, ISvcLocator* pSvcLocator) :
15base_class(name, pSvcLocator), // base_class = AthService
25{
26 ATH_MSG_DEBUG("TrigCostSvc regular constructor");
27}
28
29// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
30
32 // delete[] m_eventMonitored;
33 ATH_MSG_DEBUG("TrigCostSvc destructor()");
34}
35
36// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
37
38
40 ATH_MSG_DEBUG("TrigCostSvc initialize()");
42 // TODO Remove this when the configuration is correctly propagated in config-then-run jobs
43 if (!m_eventSlots) {
44 ATH_MSG_WARNING("numConcurrentEvents() == 0. This is a misconfiguration, probably coming from running from pickle. "
45 "Setting local m_eventSlots to a 'large' number until this is fixed to allow the job to proceed.");
46 m_eventSlots = 100;
47 }
48 ATH_MSG_INFO("Initializing TrigCostSvc with " << m_eventSlots << " event slots");
49
50 // We cannot have a vector here as atomics are not movable nor copyable. Unique heap arrays are supported by C++
51 m_eventMonitored = std::make_unique< std::atomic<bool>[] >( m_eventSlots );
52 m_slotMutex = std::make_unique< std::shared_mutex[] >( m_eventSlots );
53
54 for (size_t i = 0; i < m_eventSlots; ++i) m_eventMonitored[i] = false;
55
58 ATH_CHECK(m_rosData.initialize(m_eventSlots));
59
60 return StatusCode::SUCCESS;
61}
62
63// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
64
66 ATH_MSG_DEBUG("TrigCostSvc finalize()");
67 if (m_saveHashes) {
69 ATH_MSG_INFO("Calling hashes2file, saving dump of job's HLT hashing dictionary to disk.");
70 }
71 return StatusCode::SUCCESS;
72}
73
74// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
75
76StatusCode TrigCostSvc::startEvent(const EventContext& context, const bool enableMonitoring) {
77 const bool monitoredEvent = (enableMonitoring || m_monitorAllEvents);
78 ATH_CHECK(checkSlot(context));
79
80 m_eventMonitored[ context.slot() ] = false;
81
82 {
83 // "clear" is a whole table operation, we need it all to ourselves
84 std::unique_lock lockUnique( m_slotMutex[ context.slot() ] );
85 if (monitoredEvent) {
86 // Empty transient thread-safe stores in preparation for recording this event's cost data
87 ATH_CHECK(m_algStartInfo.clear(context, msg()));
88 ATH_CHECK(m_algStopTime.clear(context, msg()));
89 ATH_CHECK(m_rosData.clear(context, msg()));
90 }
91
92 // Enable collection of data in this slot for monitoredEvents
93 m_eventMonitored[ context.slot() ] = monitoredEvent;
94 }
95
96 // As we missed the AuditType::Before of the TrigCostSupervisorAlg (which is calling this TrigCostSvc::startEvent), let's add it now.
97 // This will be our canonical initial timestamps for measuring this event. Similar will be done for DecisionSummaryMakerAlg at the end
98 ATH_CHECK(processAlg(context, m_costSupervisorAlgName, AuditType::Before));
99
100 return StatusCode::SUCCESS;
101}
102
103// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
104
105StatusCode TrigCostSvc::processAlg(const EventContext& context, const std::string& caller, const AuditType type) {
106 ATH_CHECK(checkSlot(context));
107
108 TrigTimeStamp now;
109
110 // Do per-event within-slot monitoring
111 if (m_eventMonitored[ context.slot() ]) {
112 // Multiple simultaneous calls allowed here, adding their data to the concurrent map.
113 std::shared_lock lockShared( m_slotMutex[ context.slot() ] );
114
116 ATH_CHECK( ai.isValid() );
117
118 ATH_CHECK(monitor(context, ai, now, type));
119
120 ATH_MSG_VERBOSE("Caller '" << caller << "', '" << ai.m_store << "', slot:" << context.slot() << " "
121 << (type == AuditType::Before ? "BEGAN" : "ENDED") << " at " << now.microsecondsSinceEpoch());
122 }
123
124 // MultiSlot mode: do per-event monitoring of all slots, but saving the data within the master-slot
125 if (m_enableMultiSlot && context.slot() != m_masterSlot && m_eventMonitored[ m_masterSlot ]) {
126 std::shared_lock lockShared( m_slotMutex[ m_masterSlot ] );
127
128 // Note: we override the storage location of these data from all other slots to be saved in the MasterSlot
130 ATH_CHECK( ai.isValid() );
131
132 ATH_CHECK(monitor(context, ai, now, type));
133 }
134
135 return StatusCode::SUCCESS;
136}
137
138// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
139
140StatusCode TrigCostSvc::monitor(const EventContext& context, const AlgorithmIdentifier& ai, const TrigTimeStamp& now, const AuditType type) {
141
142 if (type == AuditType::Before) {
143
145 now,
146 std::this_thread::get_id(),
147 getROIID(context),
148 static_cast<uint32_t>(context.slot())
149 };
150 ATH_CHECK( m_algStartInfo.insert(ai, ap, msg()) );
151
152 // Cache the AlgorithmIdentifier which has just started executing on this thread
153 if (ai.m_realSlot == ai.m_slotToSaveInto) {
154 tbb::concurrent_hash_map<std::thread::id, AlgorithmIdentifier, ThreadHashCompare>::accessor acc;
155 m_threadToAlgMap.insert(acc, ap.m_algThreadID);
156 acc->second = ai;
157 }
158
159 } else if (type == AuditType::After) {
160
161 ATH_CHECK( m_algStopTime.insert(ai, now, msg()) );
162
163 } else {
164
165 ATH_MSG_ERROR("Only expecting AuditType::Before or AuditType::After");
166 return StatusCode::FAILURE;
167
168 }
169
170 return StatusCode::SUCCESS;
171}
172
173
174// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
175
176StatusCode TrigCostSvc::monitorROS(const EventContext& context, robmonitor::ROBDataMonitorStruct payload){
177 ATH_CHECK(checkSlot(context));
178 ATH_MSG_DEBUG( "Received ROB payload " << payload );
179
180 // Associate payload with an algorithm
181 AlgorithmIdentifier theAlg;
182 {
183 tbb::concurrent_hash_map<std::thread::id, AlgorithmIdentifier, ThreadHashCompare>::const_accessor acc;
184 bool result = m_threadToAlgMap.find(acc, std::this_thread::get_id());
185 //checking the return type 'result' is sufficient to know whether acc is bound
186 if (!result){
187 ATH_MSG_WARNING( "Cannot find algorithm on this thread (id=" << std::this_thread::get_id() << "). Request "<< payload <<" won't be monitored");
188 return StatusCode::SUCCESS;
189 }
190 //coverity[FORWARD_NULL:FALSE]
191 theAlg = acc->second;
192 }
193
194 // Record data in TrigCostDataStore
195 ATH_MSG_DEBUG( "Adding ROBs from" << payload.requestor_name << " to " << theAlg.m_hash );
196 {
197 std::shared_lock lockShared( m_slotMutex[ context.slot() ] );
198 ATH_CHECK( m_rosData.push_back(theAlg, std::move(payload), msg()) );
199 }
200
201 return StatusCode::SUCCESS;
202}
203
204
205// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
206
207StatusCode TrigCostSvc::endEvent(const EventContext& context, SG::WriteHandle<xAOD::TrigCompositeContainer>& costOutputHandle, SG::WriteHandle<xAOD::TrigCompositeContainer>& rosOutputHandle) {
208 ATH_CHECK(checkSlot(context));
209 if (m_eventMonitored[ context.slot() ] == false) {
210 // This event was not monitored - nothing to do.
211 ATH_MSG_DEBUG("Not a monitored event.");
212 return StatusCode::SUCCESS;
213 }
214
215 // As we will miss the AuditType::After of the TrigCostFinalizeAlg (which is calling this TrigCostSvc::endEvent), let's add it now.
216 // This will be our canonical final timestamps for measuring this event. Similar was done for HLTSeeding at the start
217 ATH_CHECK(processAlg(context, m_costFinalizeAlgName, AuditType::After));
218
219 // Reset eventMonitored flags
220 m_eventMonitored[ context.slot() ] = false;
221
222 // Now that this atomic is set to FALSE, additional algs in this instance which trigger this service will
223 // not be able to call TrigCostSvc::monitor
224
225 // ... but processAlg might already be running in other threads...
226 // Wait to obtain an exclusive lock.
227 std::unique_lock lockUnique( m_slotMutex[ context.slot() ] );
228
229 // we can now perform whole-map inspection of this event's TrigCostDataStores without the danger that it will be changed further
230
231 // Let's start by getting the global STOP time we just wrote
232 uint64_t eventStopTime = 0;
233 {
235 ATH_CHECK( myAi.isValid() );
236 tbb::concurrent_hash_map<AlgorithmIdentifier, TrigTimeStamp, AlgorithmIdentifierHashCompare>::const_accessor stopTimeAcessor;
237 if (m_algStopTime.retrieve(myAi, stopTimeAcessor, msg()).isFailure()) {
238 ATH_MSG_ERROR("No end time for '" << myAi.m_caller << "', '" << myAi.m_store << "'"); // Error as we JUST entered this info!
239 } else { // retrieve was a success
240 //coverity[FORWARD_NULL:FALSE]
241 eventStopTime = stopTimeAcessor->second.microsecondsSinceEpoch();
242 }
243 }
244
245 // And the global START time for the event
246 uint64_t eventStartTime = 0;
247 {
249 ATH_CHECK( hltSeedingAi.isValid() );
250 tbb::concurrent_hash_map<AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_accessor startAcessor;
251 if (m_algStartInfo.retrieve(hltSeedingAi, startAcessor, msg()).isFailure()) {
252 ATH_MSG_ERROR("No alg info for '" << hltSeedingAi.m_caller << "', '" << hltSeedingAi.m_store << "'"); // Error as we know this info must be present
253 } else { // retrieve was a success
254 //coverity[FORWARD_NULL:FALSE]
255 eventStartTime = startAcessor->second.m_algStartTime.microsecondsSinceEpoch();
256 }
257 }
258
259 // Read payloads. Write to persistent format
260 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator beginIt;
261 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator endIt;
262 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator it;
263 ATH_CHECK(m_algStartInfo.getIterators(context, msg(), beginIt, endIt));
264
265 ATH_MSG_DEBUG("Monitored event with " << std::distance(beginIt, endIt) << " AlgorithmPayload objects.");
266
267 std::map<size_t, size_t> aiToHandleIndex;
268 for (it = beginIt; it != endIt; ++it) {
269 const AlgorithmIdentifier& ai = it->first;
270 const AlgorithmPayload& ap = it->second;
271 uint64_t startTime = ap.m_algStartTime.microsecondsSinceEpoch();
272
273 // Can we find the end time for this alg? If not, it is probably still running. Hence we use "now" as the default time.
274 uint64_t stopTime = eventStopTime;
275 {
276 tbb::concurrent_hash_map<AlgorithmIdentifier, TrigTimeStamp, AlgorithmIdentifierHashCompare>::const_accessor stopTimeAcessor;
277 if (m_algStopTime.retrieve(ai, stopTimeAcessor, msg()).isFailure()) {
278 ATH_MSG_DEBUG("No end time for '" << ai.m_caller << "', '" << ai.m_store << "'");
279 } else { // retrieve was a success
280 stopTime = stopTimeAcessor->second.microsecondsSinceEpoch();
281 }
282 // stopTimeAcessor goes out of scope - lock released
283 }
284
285 // It is possible (when in the master-slot) to catch just the END of an Alg's exec from another slot, and then the START of the same
286 // alg executing in the next event in that same other-slot.
287 // This gives us an end time which is before the start time. Disregard these entries.
288 if (startTime > stopTime) {
289 ATH_MSG_VERBOSE("Disregard start-time:" << startTime << " > stop-time:" << stopTime
290 << " for " << TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " in slot " << ap.m_slot << ", this is slot " << context.slot());
291 continue;
292 }
293
294 // Lock the start and stop times to be no later than eventStopTime.
295 // E.g. it's possible for an alg in another slot to start or stop running after 'processAlg(context, m_costFinalizeAlgName, AuditType::After))'
296 // but before 'lockUnique( m_slotMutex[ context.slot() ] )', creating a timestamp after the nominal end point for this event.
297 // If the alg starts afterwards, we disregard it in lieu of setting to have zero walltime.
298 // If the alg stops afterwards, we truncate its stop time to be no later than eventStopTime
299 if (startTime > eventStopTime) {
300 ATH_MSG_VERBOSE("Disregard " << TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " as it started after endEvent() was finished being called" );
301 continue;
302 }
303 if (stopTime > eventStopTime) {
304 ATH_MSG_VERBOSE(TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " stopped after endEvent() was called, but before the cost container was locked,"
305 << " truncating its ending time stamp from " << stopTime << " to " << eventStopTime);
306 stopTime = eventStopTime;
307 }
308
309 // Do the same, locking the start and stop times to be no earlier than eventStartTime
310 // If the alg stops before eventStartTime, we disregard it in lieu of setting it to have zero walltime
311 // If the alg starts before eventStartTime, we truncate its start time to be no later than eventStopTime
312 if (stopTime < eventStartTime) {
313 ATH_MSG_VERBOSE("Disregard " << TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " as it stopped before startEvent() was finished being called" );
314 continue;
315 }
316 if (startTime < eventStartTime) {
317 ATH_MSG_VERBOSE(TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " started just after the cost container was unlocked, but before the HLTSeeding record was written."
318 << " truncating its starting time stamp from " << startTime << " to " << eventStartTime);
319 startTime = eventStartTime;
320 }
321
322 // Make a new TrigComposite to persist monitoring payload for this alg
324 costOutputHandle->push_back( tc );
325 // tc is now owned by storegate and, and has an aux store provided by the TrigCompositeCollection
326
327 const uint32_t threadID = static_cast<uint32_t>( std::hash< std::thread::id >()(ap.m_algThreadID) );
328 uint32_t threadEnumerator = 0;
329 {
330 // We can have multiple slots get here at the same time
331 std::lock_guard<std::mutex> lock(m_globalMutex);
332 const std::unordered_map<uint32_t, uint32_t>::const_iterator mapIt = m_threadToCounterMap.find(threadID);
333 if (mapIt == m_threadToCounterMap.end()) {
334 threadEnumerator = m_threadCounter;
335 m_threadToCounterMap.insert( std::make_pair(threadID, m_threadCounter++) );
336 } else {
337 threadEnumerator = mapIt->second;
338 }
339 }
340
341 bool result = true;
342 result &= tc->setDetail("alg", ai.callerHash(msg()));
343 result &= tc->setDetail("store", ai.storeHash(msg()));
344 result &= tc->setDetail("view", ai.m_viewID);
345 result &= tc->setDetail("thread", threadEnumerator);
346 result &= tc->setDetail("thash", threadID);
347 result &= tc->setDetail("slot", ap.m_slot);
348 result &= tc->setDetail("roi", ap.m_algROIID);
349 result &= tc->setDetail("start", startTime);
350 result &= tc->setDetail("stop", stopTime);
351 if (!result) ATH_MSG_WARNING("Failed to append one or more details to trigger cost TC");
352
353 aiToHandleIndex[ai.m_hash] = costOutputHandle->size() - 1;
354 }
355
356 typedef tbb::concurrent_hash_map< AlgorithmIdentifier, std::vector<robmonitor::ROBDataMonitorStruct>, AlgorithmIdentifierHashCompare>::const_iterator ROBConstIt;
357 ROBConstIt beginRob;
358 ROBConstIt endRob;
359
360 ATH_CHECK(m_rosData.getIterators(context, msg(), beginRob, endRob));
361
362 for (ROBConstIt it = beginRob; it != endRob; ++it) {
363 size_t aiHash = it->first.m_hash;
364
365 if (aiToHandleIndex.count(aiHash) == 0) {
366 ATH_MSG_WARNING("Algorithm with hash " << aiHash << " not found!");
367 }
368
369 // Save ROB data via TrigComposite
370 for (const robmonitor::ROBDataMonitorStruct& robData : it->second) {
372 rosOutputHandle->push_back(tc);
373
374 // Retrieve ROB requests data into primitives vectors
375 std::vector<uint32_t> robs_id;
376 std::vector<uint32_t> robs_size;
377 std::vector<unsigned> robs_history;
378 std::vector<unsigned short> robs_status;
379
380 robs_id.reserve(robData.requested_ROBs.size());
381 robs_size.reserve(robData.requested_ROBs.size());
382 robs_history.reserve(robData.requested_ROBs.size());
383 robs_status.reserve(robData.requested_ROBs.size());
384
385 for (const auto& rob : robData.requested_ROBs) {
386 robs_id.push_back(rob.second.rob_id);
387 robs_size.push_back(rob.second.rob_size);
388 robs_history.push_back(rob.second.rob_history);
389 robs_status.push_back(rob.second.isStatusOk());
390 }
391
392 bool result = true;
393 result &= tc->setDetail("alg_idx", aiToHandleIndex[aiHash]);
394 result &= tc->setDetail("lvl1ID", robData.lvl1ID);
395 result &= tc->setDetail<std::vector<uint32_t>>("robs_id", robs_id);
396 result &= tc->setDetail<std::vector<uint32_t>>("robs_size", robs_size);
397 result &= tc->setDetail<std::vector<unsigned>>("robs_history", robs_history);
398 result &= tc->setDetail<std::vector<unsigned short>>("robs_status", robs_status);
399 result &= tc->setDetail("start", robData.start_time);
400 result &= tc->setDetail("stop", robData.end_time);
401
402 if (!result) ATH_MSG_WARNING("Failed to append one or more details to trigger cost ROS TC");
403 }
404 }
405
406 if (msg().level() <= MSG::VERBOSE) {
407 ATH_MSG_VERBOSE("--- Trig Cost Event Summary ---");
408 for ( const xAOD::TrigComposite* tc : *costOutputHandle ) {
409 ATH_MSG_VERBOSE("Algorithm:'" << TrigConf::HLTUtils::hash2string( tc->getDetail<TrigConf::HLTHash>("alg"), "ALG") << "'");
410 ATH_MSG_VERBOSE(" Store:'" << TrigConf::HLTUtils::hash2string( tc->getDetail<TrigConf::HLTHash>("store"), "STORE") << "'");
411 ATH_MSG_VERBOSE(" View ID:" << tc->getDetail<int16_t>("view"));
412 ATH_MSG_VERBOSE(" Thread #:" << tc->getDetail<uint32_t>("thread") );
413 ATH_MSG_VERBOSE(" Thread ID Hash:" << tc->getDetail<uint32_t>("thash") );
414 ATH_MSG_VERBOSE(" Slot:" << tc->getDetail<uint32_t>("slot") );
415 ATH_MSG_VERBOSE(" RoI ID Hash:" << tc->getDetail<int32_t>("roi") );
416 ATH_MSG_VERBOSE(" Start Time:" << tc->getDetail<uint64_t>("start") << " mu s");
417 ATH_MSG_VERBOSE(" Stop Time:" << tc->getDetail<uint64_t>("stop") << " mu s");
418 }
419 }
420
421 return StatusCode::SUCCESS;
422}
423
424// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
425
426StatusCode TrigCostSvc::generateTimeoutReport(const EventContext& context, std::string& report) {
427
428 ATH_CHECK(checkSlot(context));
429 if (!m_eventMonitored[context.slot()]) {
430 ATH_MSG_DEBUG("Not a monitored event.");
431 report = "";
432 return StatusCode::SUCCESS;
433 }
434
435 std::unique_lock lockUnique(m_slotMutex[context.slot()]);
436
437 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator beginIt;
438 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator endIt;
439 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator it;
440 ATH_CHECK(m_algStartInfo.getIterators(context, msg(), beginIt, endIt));
441
442 // Create map that sorts in descending order
443 std::map<uint64_t, std::string, std::greater<uint64_t>> timeToAlgMap;
444
445 for (it = beginIt; it != endIt; ++it) {
446 const AlgorithmIdentifier& ai = it->first;
447 const AlgorithmPayload& ap = it->second;
448
449 // Don't look at any records from other slots
450 if (ai.m_realSlot != context.slot()) continue;
451
452 uint64_t startTime = ap.m_algStartTime.microsecondsSinceEpoch();
453 uint64_t stopTime = 0;
454 {
455 tbb::concurrent_hash_map<AlgorithmIdentifier, TrigTimeStamp, AlgorithmIdentifierHashCompare>::const_accessor stopTimeAcessor;
456 if (m_algStopTime.retrieve(ai, stopTimeAcessor, msg()).isFailure()) {
457 ATH_MSG_DEBUG("No end time for '" << ai.m_caller << "', '" << ai.m_store << "'");
458 } else { // retrieve was a success
459 //coverity[FORWARD_NULL:FALSE]
460 stopTime = stopTimeAcessor->second.microsecondsSinceEpoch();
461 }
462 // stopTimeAcessor goes out of scope - lock released
463 }
464
465 if (stopTime == 0) continue;
466
467 timeToAlgMap[stopTime-startTime] = ai.m_caller;
468 }
469
470 // Save top 5 times to the report
471 report = "Timeout detected with the following algorithms consuming the most time: ";
472 int algCounter = 0;
473 for(const std::pair<const uint64_t, std::string>& p : timeToAlgMap){
474 // Save time in miliseconds instead of microseconds
475 report += p.second + " (" + std::to_string(std::lround(p.first/1e3)) + " ms)";
476 ++algCounter;
477 if (algCounter >= 5){
478 break;
479 }
480 report += ", ";
481 }
482
483 return StatusCode::SUCCESS;
484}
485
486// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
487
488StatusCode TrigCostSvc::discardEvent(const EventContext& context) {
489
490 if (m_monitorAllEvents) {
491 ATH_MSG_DEBUG("All events are monitored - event will not be discarded");
492 return StatusCode::SUCCESS;
493 }
494
495 ATH_MSG_DEBUG("Cost Event will be discarded");
496 ATH_CHECK(checkSlot(context));
497 {
498 std::unique_lock lockUnique( m_slotMutex[ context.slot() ] );
499
500 // Reset eventMonitored flags
501 m_eventMonitored[ context.slot() ] = false;
502
503 // tables are cleared at the start of the event
504 }
505 return StatusCode::SUCCESS;
506}
507
508// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
509
510StatusCode TrigCostSvc::checkSlot(const EventContext& context) const {
511 if (context.slot() >= m_eventSlots) {
512 ATH_MSG_FATAL("Job is using event slot #" << context.slot() << ", but we only reserved space for: " << m_eventSlots);
513 return StatusCode::FAILURE;
514 }
515 return StatusCode::SUCCESS;
516}
517
518// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
519
520int32_t TrigCostSvc::getROIID(const EventContext& context) {
521 if (Atlas::hasExtendedEventContext(context)) {
523 if (roi) return static_cast<int32_t>(roi->roiId());
524 }
526}
527
528// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
529
530bool TrigCostSvc::isMonitoredEvent(const EventContext& context, const bool includeMultiSlot) const {
531 if (m_eventMonitored[ context.slot() ]) {
532 return true;
533 }
534 if (includeMultiSlot && m_enableMultiSlot) {
536 }
537 return false;
538}
539
540// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
541
542size_t TrigCostSvc::ThreadHashCompare::hash(const std::thread::id& thread) {
543 return static_cast<size_t>( std::hash< std::thread::id >()(thread) );
544}
545
546// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
547
548bool TrigCostSvc::ThreadHashCompare::equal(const std::thread::id& x, const std::thread::id& y) {
549 return (x == y);
550}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
Maintain a set of objects, one per slot.
static Double_t tc
#define y
#define x
const TrigRoiDescriptor * roiDescriptor() const
Get cached pointer to View's Region of Interest Descriptor or nullptr if not describing a View.
static const std::string hash2string(HLTHash, const std::string &category="TE")
hash function translating identifiers into names (via internal dictionary)
static void hashes2file(const std::string &fileName="hashes2string.txt")
debugging output of internal dictionary
virtual StatusCode processAlg(const EventContext &context, const std::string &caller, const AuditType type) override
Implementation of ITrigCostSvc::processAlg.
Gaudi::Property< bool > m_monitorAllEvents
Gaudi::Property< bool > m_saveHashes
Gaudi::Property< std::string > m_costFinalizeAlgName
std::mutex m_globalMutex
Used to protect all-slot modifications.
TrigCostDataStore< AlgorithmPayload > m_algStartInfo
Thread-safe store of algorithm start payload.
StatusCode checkSlot(const EventContext &context) const
Sanity check that the job is respecting the number of slots which were declared at config time.
TrigCostDataStore< TrigTimeStamp > m_algStopTime
Thread-safe store of algorithm stop times.
Gaudi::Property< bool > m_enableMultiSlot
virtual StatusCode initialize() override
Initialise, create enough storage to store m_eventSlots.
size_t m_eventSlots
Number of concurrent processing slots.
virtual ~TrigCostSvc()
Destructor.
std::unique_ptr< std::shared_mutex[] > m_slotMutex
Used to control and protect whole-table operations.
virtual StatusCode monitorROS(const EventContext &context, robmonitor::ROBDataMonitorStruct payload) override
Implementation of ITrigCostSvc::monitorROS.
virtual bool isMonitoredEvent(const EventContext &context, const bool includeMultiSlot=true) const override
std::unique_ptr< std::atomic< bool >[] > m_eventMonitored
Used to cache if the event in a given slot is being monitored.
virtual StatusCode endEvent(const EventContext &context, SG::WriteHandle< xAOD::TrigCompositeContainer > &costOutputHandle, SG::WriteHandle< xAOD::TrigCompositeContainer > &rosOutputHandle) override
Implementation of ITrigCostSvc::endEvent.
int32_t getROIID(const EventContext &context)
@breif Internal function to return a RoI from an extended event context context
virtual StatusCode discardEvent(const EventContext &context) override
Discard a cost monitored event.
Gaudi::Property< std::string > m_costSupervisorAlgName
TrigCostDataStore< std::vector< robmonitor::ROBDataMonitorStruct > > m_rosData
Thread-safe store of ROS data.
TrigCostSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard ATLAS Service constructor.
Gaudi::Property< size_t > m_masterSlot
virtual StatusCode finalize() override
Finalize, act on m_saveHashes.
virtual StatusCode generateTimeoutReport(const EventContext &context, std::string &report) override
StatusCode monitor(const EventContext &context, const AlgorithmIdentifier &ai, const TrigTimeStamp &now, const AuditType type)
Internal call to save monitoring data for a given AlgorithmIdentifier.
size_t m_threadCounter
Count how many unique thread ID we have seen.
tbb::concurrent_hash_map< std::thread::id, AlgorithmIdentifier, ThreadHashCompare > m_threadToAlgMap
Keeps track of what is running right now in each thread.
std::unordered_map< uint32_t, uint32_t > m_threadToCounterMap
Map thread's hash ID to a counting numeral.
virtual StatusCode startEvent(const EventContext &context, const bool enableMonitoring=true) override
Implementation of ITrigCostSvc::startEvent.
nope - should be used for standalone also, perhaps need to protect the class def bits ifndef XAOD_ANA...
virtual unsigned int roiId() const override final
these quantities probably don't need to be used any more
utility class to measure time duration in AthenaMT The pattern when it is useful: AlgA tags the begin...
The structure which is used to monitor the ROB data request in L2 It is created for every addROBData ...
uint64_t start_time
map of ROBs requested
std::map< const uint32_t, robmonitor::ROBDataStruct > requested_ROBs
name of requesting algorithm
uint64_t end_time
start time of ROB request (microsec since epoch)
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
bool hasExtendedEventContext(const EventContext &ctx)
Test whether a context object has an extended context installed.
size_t getNSlots()
Return the number of event slots.
TrigComposite_v1 TrigComposite
Declare the latest version of the class.
Static hash and equal members as required by tbb::concurrent_hash_map.
static AlgorithmIdentifier make(const EventContext &context, const std::string &caller, MsgStream &msg, const int16_t slotOverride=-1)
Construct an AlgorithmIdentifier.
Small structure to hold an algorithm's name and store, plus some details on its EventView.
std::string m_caller
Name of the algorithm.
std::string m_store
Name of the algorithm's store.
TrigConf::HLTHash callerHash(MsgStream &msg) const
size_t m_slotToSaveInto
The slot which is used for the purposes of recording data on this algorithm's execution.
static constexpr int16_t s_noView
Constant value used to express an Algorithm which is not running in a View.
TrigConf::HLTHash storeHash(MsgStream &msg) const
size_t m_realSlot
The actual slot of the algorithm.
size_t m_hash
Hash of algorithm + store + realSlot.
StatusCode isValid() const
int16_t m_viewID
If not within an event view, then the m_iewID = s_noView = -1.
Small structure wrap the various values stored for an algorithm just before it starts to execute.
static bool equal(const std::thread::id &x, const std::thread::id &y)
static size_t hash(const std::thread::id &thread)
MsgStream & msg
Definition testRead.cxx:32