ATLAS Offline Software
Loading...
Searching...
No Matches
TrigCostSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
3*/
4
7
8#include "TrigCostSvc.h"
9
10#include <mutex> // For std::unique_lock
11
13
14TrigCostSvc::TrigCostSvc(const std::string& name, ISvcLocator* pSvcLocator) :
15base_class(name, pSvcLocator), // base_class = AthService
25{
26 ATH_MSG_DEBUG("TrigCostSvc regular constructor");
27}
28
29// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
30
32 // delete[] m_eventMonitored;
33 ATH_MSG_DEBUG("TrigCostSvc destructor()");
34}
35
36// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
37
38
40 ATH_MSG_DEBUG("TrigCostSvc initialize()");
42 // TODO Remove this when the configuration is correctly propagated in config-then-run jobs
43 if (!m_eventSlots) {
44 ATH_MSG_WARNING("numConcurrentEvents() == 0. This is a misconfiguration, probably coming from running from pickle. "
45 "Setting local m_eventSlots to a 'large' number until this is fixed to allow the job to proceed.");
46 m_eventSlots = 100;
47 }
48 ATH_MSG_INFO("Initializing TrigCostSvc with " << m_eventSlots << " event slots");
49
50 // We cannot have a vector here as atomics are not movable nor copyable. Unique heap arrays are supported by C++
51 m_eventMonitored = std::make_unique< std::atomic<bool>[] >( m_eventSlots );
52 m_slotMutex = std::make_unique< std::shared_mutex[] >( m_eventSlots );
53
54 for (size_t i = 0; i < m_eventSlots; ++i) m_eventMonitored[i] = false;
55
58 ATH_CHECK(m_rosData.initialize(m_eventSlots));
59
60 return StatusCode::SUCCESS;
61}
62
63// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
64
66 ATH_MSG_DEBUG("TrigCostSvc finalize()");
67 if (m_saveHashes) {
69 ATH_MSG_INFO("Calling hashes2file, saving dump of job's HLT hashing dictionary to disk.");
70 }
71 return StatusCode::SUCCESS;
72}
73
74// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
75
76StatusCode TrigCostSvc::startEvent(const EventContext& context, const bool enableMonitoring) {
77 const bool monitoredEvent = (enableMonitoring || m_monitorAllEvents);
78 ATH_CHECK(checkSlot(context));
79
80 m_eventMonitored[ context.slot() ] = false;
81
82 {
83 // "clear" is a whole table operation, we need it all to ourselves
84 std::unique_lock lockUnique( m_slotMutex[ context.slot() ] );
85 if (monitoredEvent) {
86 // Empty transient thread-safe stores in preparation for recording this event's cost data
87 ATH_CHECK(m_algStartInfo.clear(context, msg()));
88 ATH_CHECK(m_algStopTime.clear(context, msg()));
89 ATH_CHECK(m_rosData.clear(context, msg()));
90 }
91
92 // Enable collection of data in this slot for monitoredEvents
93 m_eventMonitored[ context.slot() ] = monitoredEvent;
94 }
95
96 // As we missed the AuditType::Before of the TrigCostSupervisorAlg (which is calling this TrigCostSvc::startEvent), let's add it now.
97 // This will be our canonical initial timestamps for measuring this event. Similar will be done for DecisionSummaryMakerAlg at the end
98 ATH_CHECK(processAlg(context, m_costSupervisorAlgName, AuditType::Before));
99
100 return StatusCode::SUCCESS;
101}
102
103// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
104
105StatusCode TrigCostSvc::processAlg(const EventContext& context, const std::string& caller, const AuditType type) {
106 ATH_CHECK(checkSlot(context));
107
108 TrigTimeStamp now;
109
110 // Do per-event within-slot monitoring
111 if (m_eventMonitored[ context.slot() ]) {
112 // Multiple simultaneous calls allowed here, adding their data to the concurrent map.
113 std::shared_lock lockShared( m_slotMutex[ context.slot() ] );
114
116 ATH_CHECK( ai.isValid() );
117
118 ATH_CHECK(monitor(context, ai, now, type));
119
120 ATH_MSG_VERBOSE("Caller '" << caller << "', '" << ai.m_store << "', slot:" << context.slot() << " "
121 << (type == AuditType::Before ? "BEGAN" : "ENDED") << " at " << now.microsecondsSinceEpoch());
122 }
123
124 // MultiSlot mode: do per-event monitoring of all slots, but saving the data within the master-slot
125 if (m_enableMultiSlot && context.slot() != m_masterSlot && m_eventMonitored[ m_masterSlot ]) {
126 std::shared_lock lockShared( m_slotMutex[ m_masterSlot ] );
127
128 // Note: we override the storage location of these data from all other slots to be saved in the MasterSlot
130 ATH_CHECK( ai.isValid() );
131
132 ATH_CHECK(monitor(context, ai, now, type));
133 }
134
135 return StatusCode::SUCCESS;
136}
137
138// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
139
140StatusCode TrigCostSvc::monitor(const EventContext& context, const AlgorithmIdentifier& ai, const TrigTimeStamp& now, const AuditType type) {
141
142 if (type == AuditType::Before) {
143
145 now,
146 std::this_thread::get_id(),
147 getROIID(context),
148 static_cast<uint32_t>(context.slot())
149 };
150 ATH_CHECK( m_algStartInfo.insert(ai, ap, msg()) );
151
152 // Cache the AlgorithmIdentifier which has just started executing on this thread
153 if (ai.m_realSlot == ai.m_slotToSaveInto) {
154 tbb::concurrent_hash_map<std::thread::id, AlgorithmIdentifier, ThreadHashCompare>::accessor acc;
155 m_threadToAlgMap.insert(acc, ap.m_algThreadID);
156 acc->second = ai;
157 }
158
159 } else if (type == AuditType::After) {
160
161 ATH_CHECK( m_algStopTime.insert(ai, now, msg()) );
162
163 } else {
164
165 ATH_MSG_ERROR("Only expecting AuditType::Before or AuditType::After");
166 return StatusCode::FAILURE;
167
168 }
169
170 return StatusCode::SUCCESS;
171}
172
173
174// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
175
176StatusCode TrigCostSvc::monitorROS(const EventContext& context, robmonitor::ROBDataMonitorStruct payload){
177 ATH_CHECK(checkSlot(context));
178 ATH_MSG_DEBUG( "Received ROB payload " << payload );
179
180 // Associate payload with an algorithm
181 AlgorithmIdentifier theAlg;
182 {
183 tbb::concurrent_hash_map<std::thread::id, AlgorithmIdentifier, ThreadHashCompare>::const_accessor acc;
184 bool result = m_threadToAlgMap.find(acc, std::this_thread::get_id());
185 if (!result){
186 ATH_MSG_WARNING( "Cannot find algorithm on this thread (id=" << std::this_thread::get_id() << "). Request "<< payload <<" won't be monitored");
187 return StatusCode::SUCCESS;
188 }
189
190 theAlg = acc->second;
191 }
192
193 // Record data in TrigCostDataStore
194 ATH_MSG_DEBUG( "Adding ROBs from" << payload.requestor_name << " to " << theAlg.m_hash );
195 {
196 std::shared_lock lockShared( m_slotMutex[ context.slot() ] );
197 ATH_CHECK( m_rosData.push_back(theAlg, std::move(payload), msg()) );
198 }
199
200 return StatusCode::SUCCESS;
201}
202
203
204// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
205
206StatusCode TrigCostSvc::endEvent(const EventContext& context, SG::WriteHandle<xAOD::TrigCompositeContainer>& costOutputHandle, SG::WriteHandle<xAOD::TrigCompositeContainer>& rosOutputHandle) {
207 ATH_CHECK(checkSlot(context));
208 if (m_eventMonitored[ context.slot() ] == false) {
209 // This event was not monitored - nothing to do.
210 ATH_MSG_DEBUG("Not a monitored event.");
211 return StatusCode::SUCCESS;
212 }
213
214 // As we will miss the AuditType::After of the TrigCostFinalizeAlg (which is calling this TrigCostSvc::endEvent), let's add it now.
215 // This will be our canonical final timestamps for measuring this event. Similar was done for HLTSeeding at the start
216 ATH_CHECK(processAlg(context, m_costFinalizeAlgName, AuditType::After));
217
218 // Reset eventMonitored flags
219 m_eventMonitored[ context.slot() ] = false;
220
221 // Now that this atomic is set to FALSE, additional algs in this instance which trigger this service will
222 // not be able to call TrigCostSvc::monitor
223
224 // ... but processAlg might already be running in other threads...
225 // Wait to obtain an exclusive lock.
226 std::unique_lock lockUnique( m_slotMutex[ context.slot() ] );
227
228 // we can now perform whole-map inspection of this event's TrigCostDataStores without the danger that it will be changed further
229
230 // Let's start by getting the global STOP time we just wrote
231 uint64_t eventStopTime = 0;
232 {
234 ATH_CHECK( myAi.isValid() );
235 tbb::concurrent_hash_map<AlgorithmIdentifier, TrigTimeStamp, AlgorithmIdentifierHashCompare>::const_accessor stopTimeAcessor;
236 if (m_algStopTime.retrieve(myAi, stopTimeAcessor, msg()).isFailure()) {
237 ATH_MSG_ERROR("No end time for '" << myAi.m_caller << "', '" << myAi.m_store << "'"); // Error as we JUST entered this info!
238 } else { // retrieve was a success
239 eventStopTime = stopTimeAcessor->second.microsecondsSinceEpoch();
240 }
241 }
242
243 // And the global START time for the event
244 uint64_t eventStartTime = 0;
245 {
247 ATH_CHECK( hltSeedingAi.isValid() );
248 tbb::concurrent_hash_map<AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_accessor startAcessor;
249 if (m_algStartInfo.retrieve(hltSeedingAi, startAcessor, msg()).isFailure()) {
250 ATH_MSG_ERROR("No alg info for '" << hltSeedingAi.m_caller << "', '" << hltSeedingAi.m_store << "'"); // Error as we know this info must be present
251 } else { // retrieve was a success
252 eventStartTime = startAcessor->second.m_algStartTime.microsecondsSinceEpoch();
253 }
254 }
255
256 // Read payloads. Write to persistent format
257 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator beginIt;
258 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator endIt;
259 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator it;
260 ATH_CHECK(m_algStartInfo.getIterators(context, msg(), beginIt, endIt));
261
262 ATH_MSG_DEBUG("Monitored event with " << std::distance(beginIt, endIt) << " AlgorithmPayload objects.");
263
264 std::map<size_t, size_t> aiToHandleIndex;
265 for (it = beginIt; it != endIt; ++it) {
266 const AlgorithmIdentifier& ai = it->first;
267 const AlgorithmPayload& ap = it->second;
268 uint64_t startTime = ap.m_algStartTime.microsecondsSinceEpoch();
269
270 // Can we find the end time for this alg? If not, it is probably still running. Hence we use "now" as the default time.
271 uint64_t stopTime = eventStopTime;
272 {
273 tbb::concurrent_hash_map<AlgorithmIdentifier, TrigTimeStamp, AlgorithmIdentifierHashCompare>::const_accessor stopTimeAcessor;
274 if (m_algStopTime.retrieve(ai, stopTimeAcessor, msg()).isFailure()) {
275 ATH_MSG_DEBUG("No end time for '" << ai.m_caller << "', '" << ai.m_store << "'");
276 } else { // retrieve was a success
277 stopTime = stopTimeAcessor->second.microsecondsSinceEpoch();
278 }
279 // stopTimeAcessor goes out of scope - lock released
280 }
281
282 // It is possible (when in the master-slot) to catch just the END of an Alg's exec from another slot, and then the START of the same
283 // alg executing in the next event in that same other-slot.
284 // This gives us an end time which is before the start time. Disregard these entries.
285 if (startTime > stopTime) {
286 ATH_MSG_VERBOSE("Disregard start-time:" << startTime << " > stop-time:" << stopTime
287 << " for " << TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " in slot " << ap.m_slot << ", this is slot " << context.slot());
288 continue;
289 }
290
291 // Lock the start and stop times to be no later than eventStopTime.
292 // E.g. it's possible for an alg in another slot to start or stop running after 'processAlg(context, m_costFinalizeAlgName, AuditType::After))'
293 // but before 'lockUnique( m_slotMutex[ context.slot() ] )', creating a timestamp after the nominal end point for this event.
294 // If the alg starts afterwards, we disregard it in lieu of setting to have zero walltime.
295 // If the alg stops afterwards, we truncate its stop time to be no later than eventStopTime
296 if (startTime > eventStopTime) {
297 ATH_MSG_VERBOSE("Disregard " << TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " as it started after endEvent() was finished being called" );
298 continue;
299 }
300 if (stopTime > eventStopTime) {
301 ATH_MSG_VERBOSE(TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " stopped after endEvent() was called, but before the cost container was locked,"
302 << " truncating its ending time stamp from " << stopTime << " to " << eventStopTime);
303 stopTime = eventStopTime;
304 }
305
306 // Do the same, locking the start and stop times to be no earlier than eventStartTime
307 // If the alg stops before eventStartTime, we disregard it in lieu of setting it to have zero walltime
308 // If the alg starts before eventStartTime, we truncate its start time to be no later than eventStopTime
309 if (stopTime < eventStartTime) {
310 ATH_MSG_VERBOSE("Disregard " << TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " as it stopped before startEvent() was finished being called" );
311 continue;
312 }
313 if (startTime < eventStartTime) {
314 ATH_MSG_VERBOSE(TrigConf::HLTUtils::hash2string( ai.callerHash(msg()), "ALG") << " started just after the cost container was unlocked, but before the HLTSeeding record was written."
315 << " truncating its starting time stamp from " << startTime << " to " << eventStartTime);
316 startTime = eventStartTime;
317 }
318
319 // Make a new TrigComposite to persist monitoring payload for this alg
321 costOutputHandle->push_back( tc );
322 // tc is now owned by storegate and, and has an aux store provided by the TrigCompositeCollection
323
324 const uint32_t threadID = static_cast<uint32_t>( std::hash< std::thread::id >()(ap.m_algThreadID) );
325 uint32_t threadEnumerator = 0;
326 {
327 // We can have multiple slots get here at the same time
328 std::lock_guard<std::mutex> lock(m_globalMutex);
329 const std::unordered_map<uint32_t, uint32_t>::const_iterator mapIt = m_threadToCounterMap.find(threadID);
330 if (mapIt == m_threadToCounterMap.end()) {
331 threadEnumerator = m_threadCounter;
332 m_threadToCounterMap.insert( std::make_pair(threadID, m_threadCounter++) );
333 } else {
334 threadEnumerator = mapIt->second;
335 }
336 }
337
338 bool result = true;
339 result &= tc->setDetail("alg", ai.callerHash(msg()));
340 result &= tc->setDetail("store", ai.storeHash(msg()));
341 result &= tc->setDetail("view", ai.m_viewID);
342 result &= tc->setDetail("thread", threadEnumerator);
343 result &= tc->setDetail("thash", threadID);
344 result &= tc->setDetail("slot", ap.m_slot);
345 result &= tc->setDetail("roi", ap.m_algROIID);
346 result &= tc->setDetail("start", startTime);
347 result &= tc->setDetail("stop", stopTime);
348 if (!result) ATH_MSG_WARNING("Failed to append one or more details to trigger cost TC");
349
350 aiToHandleIndex[ai.m_hash] = costOutputHandle->size() - 1;
351 }
352
353 typedef tbb::concurrent_hash_map< AlgorithmIdentifier, std::vector<robmonitor::ROBDataMonitorStruct>, AlgorithmIdentifierHashCompare>::const_iterator ROBConstIt;
354 ROBConstIt beginRob;
355 ROBConstIt endRob;
356
357 ATH_CHECK(m_rosData.getIterators(context, msg(), beginRob, endRob));
358
359 for (ROBConstIt it = beginRob; it != endRob; ++it) {
360 size_t aiHash = it->first.m_hash;
361
362 if (aiToHandleIndex.count(aiHash) == 0) {
363 ATH_MSG_WARNING("Algorithm with hash " << aiHash << " not found!");
364 }
365
366 // Save ROB data via TrigComposite
367 for (const robmonitor::ROBDataMonitorStruct& robData : it->second) {
369 rosOutputHandle->push_back(tc);
370
371 // Retrieve ROB requests data into primitives vectors
372 std::vector<uint32_t> robs_id;
373 std::vector<uint32_t> robs_size;
374 std::vector<unsigned> robs_history;
375 std::vector<unsigned short> robs_status;
376
377 robs_id.reserve(robData.requested_ROBs.size());
378 robs_size.reserve(robData.requested_ROBs.size());
379 robs_history.reserve(robData.requested_ROBs.size());
380 robs_status.reserve(robData.requested_ROBs.size());
381
382 for (const auto& rob : robData.requested_ROBs) {
383 robs_id.push_back(rob.second.rob_id);
384 robs_size.push_back(rob.second.rob_size);
385 robs_history.push_back(rob.second.rob_history);
386 robs_status.push_back(rob.second.isStatusOk());
387 }
388
389 bool result = true;
390 result &= tc->setDetail("alg_idx", aiToHandleIndex[aiHash]);
391 result &= tc->setDetail("lvl1ID", robData.lvl1ID);
392 result &= tc->setDetail<std::vector<uint32_t>>("robs_id", robs_id);
393 result &= tc->setDetail<std::vector<uint32_t>>("robs_size", robs_size);
394 result &= tc->setDetail<std::vector<unsigned>>("robs_history", robs_history);
395 result &= tc->setDetail<std::vector<unsigned short>>("robs_status", robs_status);
396 result &= tc->setDetail("start", robData.start_time);
397 result &= tc->setDetail("stop", robData.end_time);
398
399 if (!result) ATH_MSG_WARNING("Failed to append one or more details to trigger cost ROS TC");
400 }
401 }
402
403 if (msg().level() <= MSG::VERBOSE) {
404 ATH_MSG_VERBOSE("--- Trig Cost Event Summary ---");
405 for ( const xAOD::TrigComposite* tc : *costOutputHandle ) {
406 ATH_MSG_VERBOSE("Algorithm:'" << TrigConf::HLTUtils::hash2string( tc->getDetail<TrigConf::HLTHash>("alg"), "ALG") << "'");
407 ATH_MSG_VERBOSE(" Store:'" << TrigConf::HLTUtils::hash2string( tc->getDetail<TrigConf::HLTHash>("store"), "STORE") << "'");
408 ATH_MSG_VERBOSE(" View ID:" << tc->getDetail<int16_t>("view"));
409 ATH_MSG_VERBOSE(" Thread #:" << tc->getDetail<uint32_t>("thread") );
410 ATH_MSG_VERBOSE(" Thread ID Hash:" << tc->getDetail<uint32_t>("thash") );
411 ATH_MSG_VERBOSE(" Slot:" << tc->getDetail<uint32_t>("slot") );
412 ATH_MSG_VERBOSE(" RoI ID Hash:" << tc->getDetail<int32_t>("roi") );
413 ATH_MSG_VERBOSE(" Start Time:" << tc->getDetail<uint64_t>("start") << " mu s");
414 ATH_MSG_VERBOSE(" Stop Time:" << tc->getDetail<uint64_t>("stop") << " mu s");
415 }
416 }
417
418 return StatusCode::SUCCESS;
419}
420
421// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
422
423StatusCode TrigCostSvc::generateTimeoutReport(const EventContext& context, std::string& report) {
424
425 ATH_CHECK(checkSlot(context));
426 if (!m_eventMonitored[context.slot()]) {
427 ATH_MSG_DEBUG("Not a monitored event.");
428 report = "";
429 return StatusCode::SUCCESS;
430 }
431
432 std::unique_lock lockUnique(m_slotMutex[context.slot()]);
433
434 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator beginIt;
435 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator endIt;
436 tbb::concurrent_hash_map< AlgorithmIdentifier, AlgorithmPayload, AlgorithmIdentifierHashCompare>::const_iterator it;
437 ATH_CHECK(m_algStartInfo.getIterators(context, msg(), beginIt, endIt));
438
439 // Create map that sorts in descending order
440 std::map<uint64_t, std::string, std::greater<uint64_t>> timeToAlgMap;
441
442 for (it = beginIt; it != endIt; ++it) {
443 const AlgorithmIdentifier& ai = it->first;
444 const AlgorithmPayload& ap = it->second;
445
446 // Don't look at any records from other slots
447 if (ai.m_realSlot != context.slot()) continue;
448
449 uint64_t startTime = ap.m_algStartTime.microsecondsSinceEpoch();
450 uint64_t stopTime = 0;
451 {
452 tbb::concurrent_hash_map<AlgorithmIdentifier, TrigTimeStamp, AlgorithmIdentifierHashCompare>::const_accessor stopTimeAcessor;
453 if (m_algStopTime.retrieve(ai, stopTimeAcessor, msg()).isFailure()) {
454 ATH_MSG_DEBUG("No end time for '" << ai.m_caller << "', '" << ai.m_store << "'");
455 } else { // retrieve was a success
456 stopTime = stopTimeAcessor->second.microsecondsSinceEpoch();
457 }
458 // stopTimeAcessor goes out of scope - lock released
459 }
460
461 if (stopTime == 0) continue;
462
463 timeToAlgMap[stopTime-startTime] = ai.m_caller;
464 }
465
466 // Save top 5 times to the report
467 report = "Timeout detected with the following algorithms consuming the most time: ";
468 int algCounter = 0;
469 for(const std::pair<const uint64_t, std::string>& p : timeToAlgMap){
470 // Save time in miliseconds instead of microseconds
471 report += p.second + " (" + std::to_string(std::lround(p.first/1e3)) + " ms)";
472 ++algCounter;
473 if (algCounter >= 5){
474 break;
475 }
476 report += ", ";
477 }
478
479 return StatusCode::SUCCESS;
480}
481
482// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
483
484StatusCode TrigCostSvc::discardEvent(const EventContext& context) {
485
486 if (m_monitorAllEvents) {
487 ATH_MSG_DEBUG("All events are monitored - event will not be discarded");
488 return StatusCode::SUCCESS;
489 }
490
491 ATH_MSG_DEBUG("Cost Event will be discarded");
492 ATH_CHECK(checkSlot(context));
493 {
494 std::unique_lock lockUnique( m_slotMutex[ context.slot() ] );
495
496 // Reset eventMonitored flags
497 m_eventMonitored[ context.slot() ] = false;
498
499 // tables are cleared at the start of the event
500 }
501 return StatusCode::SUCCESS;
502}
503
504// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
505
506StatusCode TrigCostSvc::checkSlot(const EventContext& context) const {
507 if (context.slot() >= m_eventSlots) {
508 ATH_MSG_FATAL("Job is using event slot #" << context.slot() << ", but we only reserved space for: " << m_eventSlots);
509 return StatusCode::FAILURE;
510 }
511 return StatusCode::SUCCESS;
512}
513
514// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
515
516int32_t TrigCostSvc::getROIID(const EventContext& context) {
517 if (Atlas::hasExtendedEventContext(context)) {
519 if (roi) return static_cast<int32_t>(roi->roiId());
520 }
522}
523
524// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
525
526bool TrigCostSvc::isMonitoredEvent(const EventContext& context, const bool includeMultiSlot) const {
527 if (m_eventMonitored[ context.slot() ]) {
528 return true;
529 }
530 if (includeMultiSlot && m_enableMultiSlot) {
532 }
533 return false;
534}
535
536// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
537
538size_t TrigCostSvc::ThreadHashCompare::hash(const std::thread::id& thread) {
539 return static_cast<size_t>( std::hash< std::thread::id >()(thread) );
540}
541
542// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
543
544bool TrigCostSvc::ThreadHashCompare::equal(const std::thread::id& x, const std::thread::id& y) {
545 return (x == y);
546}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
Maintain a set of objects, one per slot.
static Double_t tc
#define y
#define x
const TrigRoiDescriptor * roiDescriptor() const
Get cached pointer to View's Region of Interest Descriptor or nullptr if not describing a View.
static const std::string hash2string(HLTHash, const std::string &category="TE")
hash function translating identifiers into names (via internal dictionary)
static void hashes2file(const std::string &fileName="hashes2string.txt")
debugging output of internal dictionary
virtual StatusCode processAlg(const EventContext &context, const std::string &caller, const AuditType type) override
Implementation of ITrigCostSvc::processAlg.
Gaudi::Property< bool > m_monitorAllEvents
Gaudi::Property< bool > m_saveHashes
Gaudi::Property< std::string > m_costFinalizeAlgName
std::mutex m_globalMutex
Used to protect all-slot modifications.
TrigCostDataStore< AlgorithmPayload > m_algStartInfo
Thread-safe store of algorithm start payload.
StatusCode checkSlot(const EventContext &context) const
Sanity check that the job is respecting the number of slots which were declared at config time.
TrigCostDataStore< TrigTimeStamp > m_algStopTime
Thread-safe store of algorithm stop times.
Gaudi::Property< bool > m_enableMultiSlot
virtual StatusCode initialize() override
Initialise, create enough storage to store m_eventSlots.
size_t m_eventSlots
Number of concurrent processing slots.
virtual ~TrigCostSvc()
Destructor.
std::unique_ptr< std::shared_mutex[] > m_slotMutex
Used to control and protect whole-table operations.
virtual StatusCode monitorROS(const EventContext &context, robmonitor::ROBDataMonitorStruct payload) override
Implementation of ITrigCostSvc::monitorROS.
virtual bool isMonitoredEvent(const EventContext &context, const bool includeMultiSlot=true) const override
std::unique_ptr< std::atomic< bool >[] > m_eventMonitored
Used to cache if the event in a given slot is being monitored.
virtual StatusCode endEvent(const EventContext &context, SG::WriteHandle< xAOD::TrigCompositeContainer > &costOutputHandle, SG::WriteHandle< xAOD::TrigCompositeContainer > &rosOutputHandle) override
Implementation of ITrigCostSvc::endEvent.
int32_t getROIID(const EventContext &context)
@breif Internal function to return a RoI from an extended event context context
virtual StatusCode discardEvent(const EventContext &context) override
Discard a cost monitored event.
Gaudi::Property< std::string > m_costSupervisorAlgName
TrigCostDataStore< std::vector< robmonitor::ROBDataMonitorStruct > > m_rosData
Thread-safe store of ROS data.
TrigCostSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard ATLAS Service constructor.
Gaudi::Property< size_t > m_masterSlot
virtual StatusCode finalize() override
Finalize, act on m_saveHashes.
virtual StatusCode generateTimeoutReport(const EventContext &context, std::string &report) override
StatusCode monitor(const EventContext &context, const AlgorithmIdentifier &ai, const TrigTimeStamp &now, const AuditType type)
Internal call to save monitoring data for a given AlgorithmIdentifier.
size_t m_threadCounter
Count how many unique thread ID we have seen.
tbb::concurrent_hash_map< std::thread::id, AlgorithmIdentifier, ThreadHashCompare > m_threadToAlgMap
Keeps track of what is running right now in each thread.
std::unordered_map< uint32_t, uint32_t > m_threadToCounterMap
Map thread's hash ID to a counting numeral.
virtual StatusCode startEvent(const EventContext &context, const bool enableMonitoring=true) override
Implementation of ITrigCostSvc::startEvent.
nope - should be used for standalone also, perhaps need to protect the class def bits ifndef XAOD_ANA...
virtual unsigned int roiId() const override final
these quantities probably don't need to be used any more
utility class to measure time duration in AthenaMT The pattern when it is useful: AlgA tags the begin...
The structure which is used to monitor the ROB data request in L2 It is created for every addROBData ...
uint64_t start_time
map of ROBs requested
std::map< const uint32_t, robmonitor::ROBDataStruct > requested_ROBs
name of requesting algorithm
uint64_t end_time
start time of ROB request (microsec since epoch)
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
bool hasExtendedEventContext(const EventContext &ctx)
Test whether a context object has an extended context installed.
size_t getNSlots()
Return the number of event slots.
TrigComposite_v1 TrigComposite
Declare the latest version of the class.
Static hash and equal members as required by tbb::concurrent_hash_map.
static AlgorithmIdentifier make(const EventContext &context, const std::string &caller, MsgStream &msg, const int16_t slotOverride=-1)
Construct an AlgorithmIdentifier.
Small structure to hold an algorithm's name and store, plus some details on its EventView.
std::string m_caller
Name of the algorithm.
std::string m_store
Name of the algorithm's store.
TrigConf::HLTHash callerHash(MsgStream &msg) const
size_t m_slotToSaveInto
The slot which is used for the purposes of recording data on this algorithm's execution.
static constexpr int16_t s_noView
Constant value used to express an Algorithm which is not running in a View.
TrigConf::HLTHash storeHash(MsgStream &msg) const
size_t m_realSlot
The actual slot of the algorithm.
size_t m_hash
Hash of algorithm + store + realSlot.
StatusCode isValid() const
int16_t m_viewID
If not within an event view, then the m_iewID = s_noView = -1.
Small structure wrap the various values stored for an algorithm just before it starts to execute.
static bool equal(const std::thread::id &x, const std::thread::id &y)
static size_t hash(const std::thread::id &thread)
MsgStream & msg
Definition testRead.cxx:32