ATLAS Offline Software
Loading...
Searching...
No Matches
HltEventLoopMgr.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3*/
4
5// Local includes
6#include "HltEventLoopMgr.h"
8#include "TrigRDBManager.h"
9
10// Trigger includes
13
14// Athena includes
22
23// Gaudi includes
24#include "GaudiKernel/ConcurrencyFlags.h"
25#include "GaudiKernel/IAlgManager.h"
26#include "GaudiKernel/IAlgorithm.h"
27#include "GaudiKernel/IEvtSelector.h"
28#include "GaudiKernel/IProperty.h"
29#include "GaudiKernel/IIoComponent.h"
30#include "GaudiKernel/ThreadLocalContext.h"
31
32// TDAQ includes
33#include "eformat/StreamTag.h"
34
35// ROOT includes
36#include "TROOT.h"
37#include "TSystem.h"
38
39// System includes
40#include <filesystem>
41#include <format>
42#include <sstream>
43#include <string>
44#include <time.h>
45
46// =============================================================================
47// Helper macros, typedefs and constants
48// =============================================================================
49namespace {
50 bool isTimedOut(const std::unordered_map<std::string_view,StatusCode>& algErrors) {
51 for (const auto& [key, sc] : algErrors) {
52 if (sc == Athena::Status::TIMEOUT) return true;
53 }
54 return false;
55 }
57 template <typename T> std::string toString(const T& x) {
58 std::ostringstream ss;
59 ss << x;
60 return ss.str();
61 }
62}
63using namespace boost::property_tree;
64
65// =============================================================================
66// Standard constructor
67// =============================================================================
68HltEventLoopMgr::HltEventLoopMgr(const std::string& name, ISvcLocator* svcLoc)
69: base_class(name, svcLoc) {}
70
71// =============================================================================
72// Standard destructor
73// =============================================================================
75{
76 // tbb:task_group destructor throws if wait() was never called
78}
79
80// =============================================================================
81// Reimplementation of AthService::initalize (IStateful interface)
82// =============================================================================
84{
85 // Do not auto-retrieve tools (see Gaudi!1124)
86 m_autoRetrieveTools = false;
87 m_checkToolDeps = false;
88
89 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
90
91 ATH_MSG_INFO(" ---> HltEventLoopMgr = " << name() << " initialize");
92
93 //----------------------------------------------------------------------------
94 // Setup properties
95 //----------------------------------------------------------------------------
96
97 // Set the timeout value (cast float to int)
98 m_softTimeoutValue = std::chrono::milliseconds(static_cast<int>(m_hardTimeout.value() * m_softTimeoutFraction.value()));
99
100 // Read DataFlow configuration properties
102
103 // print properties
104 ATH_MSG_INFO(" ---> ApplicationName = " << m_applicationName);
105 ATH_MSG_INFO(" ---> HardTimeout = " << m_hardTimeout.value());
106 ATH_MSG_INFO(" ---> SoftTimeoutFraction = " << m_softTimeoutFraction.value());
107 ATH_MSG_INFO(" ---> SoftTimeoutValue = " << m_softTimeoutValue.count());
108 ATH_MSG_INFO(" ---> TimeoutThreadIntervalMs = " << m_timeoutThreadIntervalMs.value());
109 ATH_MSG_INFO(" ---> TraceOnTimeout = " << m_traceOnTimeout.value());
110 ATH_MSG_INFO(" ---> MaxFrameworkErrors = " << m_maxFrameworkErrors.value());
111 ATH_MSG_INFO(" ---> FwkErrorDebugStreamName = " << m_fwkErrorDebugStreamName.value());
112 ATH_MSG_INFO(" ---> AlgErrorDebugStreamName = " << m_algErrorDebugStreamName.value());
113 ATH_MSG_INFO(" ---> TimeoutDebugStreamName = " << m_timeoutDebugStreamName.value());
114 ATH_MSG_INFO(" ---> TruncationDebugStreamName = " << m_truncationDebugStreamName.value());
115 ATH_MSG_INFO(" ---> SORPath = " << m_sorPath.value());
116 ATH_MSG_INFO(" ---> setMagFieldFromPtree = " << m_setMagFieldFromPtree.value());
117 ATH_MSG_INFO(" ---> execAtStart = " << m_execAtStart.value());
118 ATH_MSG_INFO(" ---> forceRunNumber = " << m_forceRunNumber.value());
119 ATH_MSG_INFO(" ---> forceLumiblock = " << m_forceLumiblock.value());
120 ATH_MSG_INFO(" ---> forceStartOfRunTime = " << m_forceSOR_ns.value());
121 ATH_MSG_INFO(" ---> RewriteLVL1 = " << m_rewriteLVL1.value());
122 ATH_MSG_INFO(" ---> EventContextWHKey = " << m_eventContextWHKey.key());
123 ATH_MSG_INFO(" ---> EventInfoRHKey = " << m_eventInfoRHKey.key());
124
125 ATH_CHECK( m_jobOptionsSvc.retrieve() );
126 const std::string& slots = m_jobOptionsSvc->get("EventDataSvc.NSlots");
127 if (!slots.empty())
128 ATH_MSG_INFO(" ---> NumConcurrentEvents = " << slots);
129 else
130 ATH_MSG_WARNING("Failed to retrieve the job property EventDataSvc.NSlots");
131 const std::string& threads = m_jobOptionsSvc->get("AvalancheSchedulerSvc.ThreadPoolSize");
132 if (!threads.empty())
133 ATH_MSG_INFO(" ---> NumThreads = " << threads);
134 else
135 ATH_MSG_WARNING("Failed to retrieve the job property AvalancheSchedulerSvc.ThreadPoolSize");
136
137 const std::string& procs = m_jobOptionsSvc->get("DataFlowConfig.DF_NumberOfWorkers");
138 if (!procs.empty()) {
139 ATH_MSG_INFO(" ---> NumProcs = " << procs);
140 try {
141 SG::HiveMgrSvc::setNumProcs(std::stoi(procs));
142 }
143 catch (const std::logic_error& ex) {
144 ATH_MSG_ERROR("Cannot convert " << procs << "to integer: " << ex.what());
145 return StatusCode::FAILURE;
146 }
147 }
148 else
149 ATH_MSG_WARNING("Failed to retrieve the job property DataFlowconfig.DF_NumberOfWorkers");
150
151 if (m_maxParallelIOTasks.value() <= 0) {
152 ATH_CHECK(m_maxParallelIOTasks.fromString(threads));
153 }
154 ATH_MSG_INFO(" ---> MaxParallelIOTasks = " << m_maxParallelIOTasks.value());
155 ATH_MSG_INFO(" ---> MaxIOWakeUpIntervalMs = " << m_maxIOWakeUpIntervalMs.value());
156
157 //----------------------------------------------------------------------------
158 // Setup all Hive services for multithreaded event processing with the exception of SchedulerSvc,
159 // which has to be initialised after forking because it opens new threads
160 //----------------------------------------------------------------------------
161 m_whiteboard = serviceLocator()->service(m_whiteboardName);
162 if( !m_whiteboard.isValid() ) {
163 ATH_MSG_FATAL("Error retrieving " << m_whiteboardName << " interface IHiveWhiteBoard");
164 return StatusCode::FAILURE;
165 }
166 ATH_MSG_DEBUG("Initialised " << m_whiteboardName << " interface IHiveWhiteBoard");
167
168 m_algResourcePool = serviceLocator()->service("AlgResourcePool");
169 if( !m_algResourcePool.isValid() ) {
170 ATH_MSG_FATAL("Error retrieving AlgResourcePool");
171 return StatusCode::FAILURE;
172 }
173 ATH_MSG_DEBUG("initialised AlgResourcePool");
174
175 m_aess = serviceLocator()->service("AlgExecStateSvc");
176 if( !m_aess.isValid() ) {
177 ATH_MSG_FATAL("Error retrieving AlgExecStateSvc");
178 return StatusCode::FAILURE;
179 }
180 ATH_MSG_DEBUG("initialised AlgExecStateSvc");
181
182 //----------------------------------------------------------------------------
183 // Initialise services
184 //----------------------------------------------------------------------------
185 ATH_CHECK(m_incidentSvc.retrieve());
186 ATH_CHECK(m_evtStore.retrieve());
187 ATH_CHECK(m_detectorStore.retrieve());
189 ATH_CHECK(m_evtSelector.retrieve());
190 ATH_CHECK(m_evtSelector->createContext(m_evtSelContext)); // create an EvtSelectorContext
191 ATH_CHECK(m_outputCnvSvc.retrieve());
192 ATH_CHECK(m_ioCompMgr.retrieve());
193 if (m_monitorScheduler) {
194 ATH_CHECK(m_schedulerMonSvc.retrieve());
195 }
196
197 //----------------------------------------------------------------------------
198 // Initialise tools
199 //----------------------------------------------------------------------------
200 // COOL helper
201 ATH_CHECK(m_coolHelper.retrieve());
202 // HLT result builder
203 ATH_CHECK(m_hltResultMaker.retrieve());
204 // Monitoring tools
205 if (!m_monTool.empty()) ATH_CHECK(m_monTool.retrieve());
206 ATH_CHECK(m_errorMonTool.retrieve());
207
208 //----------------------------------------------------------------------------
209 // Initialise data handle keys
210 //----------------------------------------------------------------------------
211 // EventContext WriteHandle
212 ATH_CHECK(m_eventContextWHKey.initialize());
213 // EventInfo ReadHandle
214 ATH_CHECK(m_eventInfoRHKey.initialize());
215 // HLTResultMT ReadHandle (created dynamically from the result builder property)
216 m_hltResultRHKey = m_hltResultMaker->resultName();
217 ATH_CHECK(m_hltResultRHKey.initialize());
218 // L1TriggerResult and RoIBResult ReadHandles for RewriteLVL1
221
222 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
223 return StatusCode::SUCCESS;
224}
225
226// =============================================================================
227// Reimplementation of AthService::stop (IStateful interface)
228// =============================================================================
230{
231 // Need to reinitialize IO in the mother process
232 if (m_workerID==0) {
233 ATH_CHECK(m_ioCompMgr->io_reinitialize());
234 }
235
236 return StatusCode::SUCCESS;
237}
238
239// =============================================================================
240// Reimplementation of AthService::finalize (IStateful interface)
241// =============================================================================
243{
244 ATH_MSG_INFO(" ---> HltEventLoopMgr/" << name() << " finalize ");
245 // Usually (but not necessarily) corresponds to the number of processed events +1
246 ATH_MSG_INFO("Total number of EventContext objects created " << m_localEventNumber);
247
248 // Release all handles
249 auto releaseAndCheck = [&](auto& handle, std::string_view handleType) {
250 if (handle.release().isFailure())
251 ATH_MSG_WARNING("finalize(): Failed to release " << handleType << " " << handle.typeAndName());
252 };
253 auto releaseService = [&](auto&&... args) { (releaseAndCheck(args,"service"), ...); };
254 auto releaseTool = [&](auto&&... args) { (releaseAndCheck(args,"tool"), ...); };
255 auto releaseSmartIF = [](auto&&... args) { (args.reset(), ...); };
256
257 releaseService(m_incidentSvc,
264
265 releaseTool(m_coolHelper,
267 m_monTool);
268
269 releaseSmartIF(m_whiteboard,
271 m_aess,
273
274 return StatusCode::SUCCESS;
275}
276
277// =============================================================================
278// Implementation of ITrigEventLoopMgr::prepareForStart
279// =============================================================================
281{
282 try {
283 const auto& rparams = pt.get_child("RunParams");
284 m_sorHelper = std::make_unique<TrigSORFromPtreeHelper>(msgSvc(), m_detectorStore, m_sorPath, rparams);
285 }
286 catch(ptree_bad_path& e) {
287 ATH_MSG_ERROR("Bad ptree path: \"" << e.path<ptree::path_type>().dump() << "\" - " << e.what());
288 return StatusCode::FAILURE;
289 }
290
291 // Override run/timestamp if needed
292 if (m_forceRunNumber > 0) {
293 m_sorHelper->setRunNumber(m_forceRunNumber);
294 ATH_MSG_WARNING("Run number overwrite:" << m_forceRunNumber);
295 }
296 if (m_forceSOR_ns > 0) {
297 m_sorHelper->setSORtime_ns(m_forceSOR_ns);
298 ATH_MSG_WARNING("SOR time overwrite:" << m_forceSOR_ns);
299 }
300
301 // Set our "run context"
302 m_currentRunCtx.setEventID( m_sorHelper->eventID() );
303 m_currentRunCtx.setExtension(Atlas::ExtendedEventContext(m_evtStore->hiveProxyDict(),
304 m_currentRunCtx.eventID().run_number()));
305
306 // Some algorithms expect a valid context during start()
307 ATH_MSG_DEBUG("Setting context for start transition: " << m_currentRunCtx.eventID());
308 Gaudi::Hive::setCurrentContext(m_currentRunCtx);
309
310 try {
311 ATH_CHECK( clearTemporaryStores() ); // do the necessary resets
312 ATH_CHECK( m_sorHelper->fillSOR(m_currentRunCtx) ); // update SOR in det store
313
314 const auto& soral = getSorAttrList();
315 updateMetadataStore(soral); // update metadata store
316 }
317 catch(const std::exception& e) {
318 ATH_MSG_ERROR("Exception: " << e.what());
319 }
320
321 ATH_CHECK( updateMagField(pt) ); // update magnetic field
322
323 return StatusCode::SUCCESS;
324}
325
326
327// =============================================================================
328// Implementation of ITrigEventLoopMgr::prepareForRun
329// =============================================================================
330StatusCode HltEventLoopMgr::prepareForRun(const ptree& /*pt*/)
331{
332 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
333
334 try
335 {
336 // Reset the AlgExecStateSvc (important in case there was a stop/start)
337 m_aess->reset(m_currentRunCtx);
338
339 // Fire BeginRun incident
340 m_incidentSvc->fireIncident(Incident(name(), IncidentType::BeginRun, m_currentRunCtx));
341
342 // Initialize COOL helper (needs to be done after IOVDbSvc has loaded all folders)
343 ATH_CHECK(m_coolHelper->readFolderInfo());
344
345 // Run optional algs/sequences (e.g. CondAlgs ATR-26138)
347
348 // close any open files (e.g. THistSvc)
349 ATH_CHECK(m_ioCompMgr->io_finalize());
350
351 // Verify that there are no other open ROOT files (e.g. from dual-use tools).
352 if ( !gROOT->GetListOfFiles()->IsEmpty() ) {
353 std::unordered_map<std::string, size_t> dups;
354 for (const auto f : *gROOT->GetListOfFiles()) {
355 ++dups[f->GetName()];
356 }
357 // Exception for THistSvc files as those will remain open
358 auto histsvc = serviceLocator()->service("THistSvc", false).as<IIoComponent>();
359 for (const std::string& histfile : m_ioCompMgr->io_retrieve(histsvc.get())) {
360 dups.erase(histfile);
361 }
362 if (!dups.empty()) {
363 msg() << MSG::ERROR << "The following ROOT files (with #instances) have not been closed yet: ";
364 for (const auto& [n,c] : dups) msg() << n << "(x" << c << ") ";
365 msg() << endmsg;
366 }
367 }
368
369 // close open DB connections
371
372 // Assert that scheduler has not been initialised before forking
373 SmartIF<IService> svc = serviceLocator()->service(m_schedulerName, /*createIf=*/ false);
374 if (svc.isValid()) {
375 ATH_MSG_FATAL("Misconfiguration - Scheduler was initialised before forking!");
376 return StatusCode::FAILURE;
377 }
378
379 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
380 return StatusCode::SUCCESS;
381 }
382 catch(const std::runtime_error& e)
383 {
384 ATH_MSG_ERROR("Runtime error: " << e.what());
385 }
386
387 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
388 return StatusCode::FAILURE;
389}
390
391
392// =============================================================================
393StatusCode HltEventLoopMgr::execAtStart(const EventContext& ctx) const
394{
395 IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
396
397 StatusCode sc;
398 for (const std::string& name : m_execAtStart) {
399 SmartIF<IAlgorithm>& alg = algMgr->algorithm(name, /*createIf*/false);
400 if ( alg ) {
401 ATH_MSG_INFO("Executing " << alg->name() << "...");
402 sc &= alg->sysExecute(ctx);
403 }
404 else ATH_MSG_WARNING("Cannot find algorithm or sequence " << name);
405 }
406 return sc;
407}
408
409
410// =============================================================================
411// Implementation of ITrigEventLoopMgr::hltUpdateAfterFork
412// =============================================================================
414{
415 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
416
418 ATH_MSG_INFO("Post-fork initialization for " << m_applicationName);
419
420 ATH_MSG_DEBUG("Initialising the scheduler after forking");
421 m_schedulerSvc = serviceLocator()->service(m_schedulerName, /*createIf=*/ true);
422 if ( !m_schedulerSvc.isValid()){
423 ATH_MSG_FATAL("Error retrieving " << m_schedulerName << " interface ISchedulerSvc");
424 return StatusCode::FAILURE;
425 }
426 ATH_MSG_DEBUG("Initialised " << m_schedulerName << " interface ISchedulerSvc");
427
428 ATH_MSG_DEBUG("Trying a stop-start of CoreDumpSvc");
429 SmartIF<IService> svc = serviceLocator()->service("CoreDumpSvc", /*createIf=*/ false);
430 if (svc.isValid()) {
431 StatusCode sc = svc->stop();
432 sc &= svc->start();
433 if (sc.isFailure()) {
434 ATH_MSG_WARNING("Could not perform stop/start for CoreDumpSvc");
435 }
436 else {
437 ATH_MSG_DEBUG("Done a stop-start of CoreDumpSvc");
438 }
439 }
440 else {
441 ATH_MSG_WARNING("Could not retrieve CoreDumpSvc");
442 }
443
444 // Make sure output files, i.e. histograms are written to their own directory.
445 // Nothing happens if the online TrigMonTHistSvc is used as there are no output files.
446 SmartIF<IIoComponent> histsvc = serviceLocator()->service("THistSvc", /*createIf=*/ false).as<IIoComponent>();
447 if ( !m_ioCompMgr->io_retrieve(histsvc.get()).empty() ) {
448 std::filesystem::path worker_dir = std::filesystem::absolute("athenaHLT_workers");
449 std::ostringstream oss;
450 oss << "athenaHLT-" << std::setfill('0') << std::setw(2) << m_workerID;
451 worker_dir /= oss.str();
452 // Delete worker directory if it exists already
453 if ( std::filesystem::exists(worker_dir) ) {
454 if ( std::filesystem::remove_all(worker_dir) == 0 ) {
455 ATH_MSG_FATAL("Cannot delete previous worker directory " << worker_dir);
456 return StatusCode::FAILURE;
457 }
458 }
459 if ( !std::filesystem::create_directories(worker_dir) ) {
460 ATH_MSG_FATAL("Cannot create worker directory " << worker_dir);
461 return StatusCode::FAILURE;
462 }
463 ATH_MSG_INFO("Writing worker output files to " << worker_dir);
464 ATH_CHECK(m_ioCompMgr->io_update_all(worker_dir.string()));
465 }
466 ATH_CHECK(m_ioCompMgr->io_reinitialize());
467
468 const size_t numSlots = m_whiteboard->getNumberOfStores();
469 m_freeSlots = numSlots;
470
471 // Initialise vector of time points for event timeout monitoring
473 m_eventTimerStartPoint.resize(numSlots, std::chrono::steady_clock::now());
474 m_isSlotProcessing.resize(numSlots, false);
475
476 // Initialise vector of time points for free slots monitoring
477 m_freeSlotStartPoint.clear();
478 m_freeSlotStartPoint.resize(numSlots, std::chrono::steady_clock::now());
479
480 // Initialise the queues used in parallel I/O steering
481 m_parallelIOQueue.set_capacity(static_cast<decltype(m_parallelIOQueue)::size_type>(m_maxParallelIOTasks.value()));
482 m_finishedEventsQueue.set_capacity(static_cast<decltype(m_finishedEventsQueue)::size_type>(numSlots));
483
484 // Fire incident to update listeners after forking
486
487 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
488 return StatusCode::SUCCESS;
489}
490
491// =============================================================================
492// Implementation of IEventProcessor::executeRun
493// =============================================================================
494StatusCode HltEventLoopMgr::executeRun(int maxevt)
495{
496 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
497
498 if (m_monitorScheduler) ATH_CHECK(m_schedulerMonSvc->startMonitoring());
499
500 StatusCode sc = StatusCode::SUCCESS;
501 try {
502 sc = nextEvent(maxevt);
503 if (sc.isFailure()) ATH_MSG_FATAL("Event loop failed");
504 }
505 catch (const std::exception& e) {
506 ATH_MSG_FATAL("Event loop failed, std::exception caught: " << e.what());
507 sc = StatusCode::FAILURE;
508 }
509 catch (...) {
510 ATH_MSG_FATAL("Event loop failed, unknown exception caught");
511 sc = StatusCode::FAILURE;
512 }
513
514 if (m_monitorScheduler) ATH_CHECK(m_schedulerMonSvc->stopMonitoring());
515
516 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
517 return sc;
518}
519
520// =============================================================================
521// Implementation of IEventProcessor::nextEvent
522// maxevt is not used - we always want to process all events delivered
523// =============================================================================
524StatusCode HltEventLoopMgr::nextEvent(int /*maxevt*/)
525{
526 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
527
528 // Start the event timer thread
529 ATH_MSG_DEBUG("Starting the timeout thread");
530 m_timeoutThread = std::make_unique<HLT::LoopThread>([this]{return eventTimerCallback();}, m_timeoutThreadIntervalMs.value());
531 m_timeoutThread->start();
532
533 // Start the event loop
534 ATH_MSG_INFO("Starting loop on events");
535 std::unique_lock<std::mutex> lock{m_loopStatus.loopEndedMutex};
536 m_inputThread = std::make_unique<HLT::LoopThread>([this]{return inputThreadCallback();}, m_maxIOWakeUpIntervalMs.value());
537 m_outputThread = std::make_unique<HLT::LoopThread>([this]{return outputThreadCallback();}, m_maxIOWakeUpIntervalMs.value());
538 m_outputThread->start();
539 m_inputThread->start();
540
541 // Wait for event loop to end. The condition means the main input and output threads flagged they have
542 // nothing else to do and will exit asynchronously (later than the wait here ends)
543 ATH_MSG_DEBUG("Event loop started, the main thread is going to sleep until it finishes");
544 m_loopStatus.loopEndedCond.wait(lock, [this](){return m_loopStatus.loopEnded.load();});
545 ATH_MSG_INFO("All events processed, finalising the event loop");
546
547 // Wait for the I/O TBB tasks and main I/O threads to finish. Note the TBB tasks need to finish first
548 // because they may notify the condition variables in the main I/O threads. The lifetime of the condition
549 // variables must span beyond any I/O TBB task.
550 ATH_MSG_DEBUG("Waiting for all I/O tasks and threads to return");
552 m_inputThread->wait();
553 m_outputThread->wait();
554
555 // Stop the event timer thread
556 ATH_MSG_DEBUG("All I/O threads and tasks finished. Stopping the timeout thread");
557 m_timeoutThread->stop();
558 m_timeoutThread->wait();
559 ATH_MSG_DEBUG("The timeout thread finished");
560
561 ATH_MSG_INFO("Finished loop on events");
562
563 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
564 return StatusCode::SUCCESS;
565}
566
567// =============================================================================
568// Implementation of IEventProcessor::stopRun (obsolete for online runnning)
569// =============================================================================
571 ATH_MSG_FATAL("Misconfiguration - the method HltEventLoopMgr::stopRun() cannot be used online");
572 return StatusCode::FAILURE;
573}
574
575// =============================================================================
576// Implementation of IEventProcessor::createEventContext
577// =============================================================================
579 size_t eventNumber = ++m_localEventNumber;
580 auto slot = m_whiteboard->allocateStore(eventNumber); // returns npos on failure
581 if (slot == std::string::npos) {
582 // return an invalid EventContext
583 return EventContext();
584 }
585 return EventContext{ eventNumber, slot };
586}
587
588// =============================================================================
589// Implementation of IEventProcessor::executeEvent
590// =============================================================================
591StatusCode HltEventLoopMgr::executeEvent(EventContext &&ctx)
592{
593 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
594
596
597 // Monitor slot idle time (between scheduler popFinishedEvent and pushNewEvent)
598 // Note this is time of a scheduler slot being free, not equal to the time of a whiteboard slot being free
599 const auto slotIdleTime = std::chrono::steady_clock::now() - m_freeSlotStartPoint[ctx.slot()];
600 Monitored::Scalar<int64_t> monSlotIdleTime("SlotIdleTime", std::chrono::duration_cast<std::chrono::milliseconds>(slotIdleTime).count());
601 Monitored::Group(m_monTool, monSlotIdleTime);
602
603 // Now add event to the scheduler
604 ATH_MSG_DEBUG("Adding event " << ctx.evt() << ", slot " << ctx.slot() << " to the scheduler");
605 StatusCode addEventStatus = m_schedulerSvc->pushNewEvent( new EventContext{std::move(ctx)} );
606
607 // If this fails, we need to wait for something to complete
608 if (addEventStatus.isFailure()){
609 ATH_MSG_ERROR("Failed adding event to the scheduler");
610 return StatusCode::FAILURE;
611 }
612
613 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
614 return StatusCode::SUCCESS;
615}
616
617// =============================================================================
619{
620 auto getDFProp = [&](const std::string& name, std::string& value, bool required = true) {
621 if (m_jobOptionsSvc->has("DataFlowConfig."+name)) {
622 value = m_jobOptionsSvc->get("DataFlowConfig."+name);
623 ATH_MSG_INFO(" ---> Read from DataFlow configuration: " << name << " = " << value);
624 } else {
625 msg() << (required ? MSG::WARNING : MSG::INFO)
626 << "Could not set Property " << name << " from DataFlow" << endmsg;
627 }
628 };
629
630 getDFProp( "DF_ApplicationName", m_applicationName );
631 std::string wid, wpid;
632 getDFProp( "DF_WorkerId", wid, false );
633 getDFProp( "DF_Pid", wpid, false );
634 if (!wid.empty()) m_workerID = std::stoi(wid);
635 if (!wpid.empty()) m_workerPID = std::stoi(wpid);
636}
637
638// =============================================================================
639void HltEventLoopMgr::updateMetadataStore(const coral::AttributeList & sor_attrlist) const
640{
641 auto metadatacont = std::make_unique<ByteStreamMetadataContainer>();
642 metadatacont->push_back(std::make_unique<ByteStreamMetadata>(
643 sor_attrlist["RunNumber"].data<unsigned int>(),
644 0,
645 0,
646 sor_attrlist["RecordingEnabled"].data<bool>(),
647 0,
648 sor_attrlist["DetectorMaskSnd"].data<unsigned long long>(),
649 sor_attrlist["DetectorMaskFst"].data<unsigned long long>(),
650 0,
651 0,
652 "",
653 "",
654 "",
655 0,
656 std::vector<std::string>()
657 ));
658 // Record ByteStreamMetadataContainer in MetaData Store
659 if(m_inputMetaDataStore->record(std::move(metadatacont),"ByteStreamMetadata").isFailure()) {
660 ATH_MSG_WARNING("Unable to record MetaData in InputMetaDataStore");
661 }
662 else {
663 ATH_MSG_DEBUG("Recorded MetaData in InputMetaDataStore");
664 }
665}
666
667//=========================================================================
668StatusCode HltEventLoopMgr::updateMagField(const ptree& pt) const
669{
671 try {
672 auto tor_cur = pt.get<float>("Magnets.ToroidsCurrent.value");
673 auto sol_cur = pt.get<float>("Magnets.SolenoidCurrent.value");
674
675 // Set current on conditions alg
676 IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
677
678 SmartIF<IAlgorithm>& fieldAlg = algMgr->algorithm("AtlasFieldMapCondAlg", /*createIf*/false);
679 if ( fieldAlg ) {
680 ATH_MSG_INFO("Setting field currents on AtlasFieldMapCondAlg");
681 ATH_CHECK( Gaudi::Utils::setProperty(fieldAlg, "MapSoleCurrent", sol_cur) );
682 ATH_CHECK( Gaudi::Utils::setProperty(fieldAlg, "MapToroCurrent", tor_cur) );
683 }
684 else ATH_MSG_WARNING("Cannot retrieve AtlasFieldMapCondAlg");
685
686 ATH_MSG_INFO("*****************************************");
687 ATH_MSG_INFO(" Auto-configuration of magnetic field: ");
688 ATH_MSG_INFO(" solenoid current from IS = " << sol_cur);
689 ATH_MSG_INFO(" torroid current from IS = " << tor_cur);
690 ATH_MSG_INFO("*****************************************");
691 }
692 catch(ptree_bad_path& e) {
693 ATH_MSG_ERROR( "Cannot read magnet currents from ptree: " << e.what() );
694 return StatusCode::FAILURE;
695 }
696 }
697 return StatusCode::SUCCESS;
698}
699
700
701// =============================================================================
703{
704 //----------------------------------------------------------------------------
705 // Clear the event store, if used in the event loop
706 //----------------------------------------------------------------------------
707 ATH_CHECK(m_evtStore->clearStore());
708 ATH_MSG_DEBUG("Cleared the EventStore");
709
710 //----------------------------------------------------------------------------
711 // Clear the InputMetaDataStore
712 //----------------------------------------------------------------------------
713 ATH_CHECK(m_inputMetaDataStore->clearStore());
714 ATH_MSG_DEBUG("Cleared the InputMetaDataStore");
715
716 return StatusCode::SUCCESS;
717}
718
719// =============================================================================
720const coral::AttributeList& HltEventLoopMgr::getSorAttrList() const
721{
722 auto sor = m_detectorStore->retrieve<const TrigSORFromPtreeHelper::SOR>(m_sorPath);
723 if (sor==nullptr) {
724 throw std::runtime_error("Cannot retrieve " + m_sorPath);
725 }
726 if(sor->size() != 1)
727 {
728 // This branch should never be entered (the CondAttrListCollection
729 // corresponding to the SOR should contain one single AttrList). Since
730 // that's required by code ahead but not checked at compile time, we
731 // explicitly guard against any potential future mistake with this check
732 throw std::runtime_error("SOR record should have one and one only attribute list, but it has " + std::to_string(sor->size()));
733 }
734
735 const auto & soral = sor->begin()->second;
736 printSORAttrList(soral);
737 return soral;
738}
739
740// =============================================================================
741void HltEventLoopMgr::printSORAttrList(const coral::AttributeList& atr) const
742{
743 unsigned long long sorTime_ns(atr["SORTime"].data<unsigned long long>());
744
745 // Human readable format of SOR time
746 time_t sorTime_sec = sorTime_ns / std::nano::den;
747 struct tm buf;
748
749 ATH_MSG_INFO("SOR parameters:");
750 ATH_MSG_INFO(" RunNumber = " << atr["RunNumber"].data<unsigned int>());
751 ATH_MSG_INFO(" SORTime [ns] = " << sorTime_ns <<
752 " (" << std::put_time(localtime_r(&sorTime_sec, &buf), "%F %T") << ") ");
753
754 auto dmfst = atr["DetectorMaskFst"].data<unsigned long long>();
755 auto dmsnd = atr["DetectorMaskSnd"].data<unsigned long long>();
756 ATH_MSG_INFO(" DetectorMaskFst = 0x" << std::format("{:016x}", dmfst));
757 ATH_MSG_INFO(" DetectorMaskSnd = 0x" << std::format("{:016x}", dmsnd));
758 ATH_MSG_INFO(" Complete DetectorMask = 0x" << std::format("{:016x}", dmfst)
759 << std::format("{:016x}", dmsnd));
760
761 ATH_MSG_INFO(" RunType = " << atr["RunType"].data<std::string>());
762 ATH_MSG_INFO(" RecordingEnabled = " << (atr["RecordingEnabled"].data<bool>() ? "true" : "false"));
763}
764
765// =============================================================================
766StatusCode HltEventLoopMgr::failedEvent(HLT::OnlineErrorCode errorCode, const EventContext& eventContext)
767{
768 ATH_MSG_VERBOSE("start of " << __FUNCTION__ << " with errorCode = " << errorCode
769 << ", context = " << eventContext << " eventID = " << eventContext.eventID());
770
771 // Used by MsgSvc (and possibly others but not relevant here)
772 Gaudi::Hive::setCurrentContext(eventContext);
773
774 auto returnFailureAndStopEventLoop = [this]() -> StatusCode {
775 ATH_MSG_INFO("Stopping event loop due to failure");
776 // Change the loop exit code to FAILURE
777 m_loopStatus.exitCode = StatusCode::FAILURE;
778
779 // Flag eventsAvailable=false which will result in I/O threads to finish all the ongoing event processing
780 // and then stop. We cannot flag loopEnded=true here yet, because it would finish the I/O threads while
781 // there might be still events being processed and they would crash when finished.
782 m_loopStatus.eventsAvailable = false;
783
784 // Inform the caller the failure could not be handled cleanly and the event loop will stop
785 return StatusCode::FAILURE;
786 };
787
788 //----------------------------------------------------------------------------
789 // Handle framework errors by printing an informative message and breaking the loop
790 //----------------------------------------------------------------------------
792 ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode
793 << " meaning there was a framework error before requesting a new event. No output will be produced for this event"
794 << " and the event loop will exit after all ongoing processing is finished.");
795 return returnFailureAndStopEventLoop();
796 }
798 ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode
799 << " meaning a new event could not be correctly read. No output will be produced for this event."
800 << " The event loop will exit after all ongoing processing is finished.");
801 return returnFailureAndStopEventLoop();
802 }
804 ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode
805 << " meaning there was a framework error after HLT result was already sent out."
806 << " The event loop will exit after all ongoing processing is finished.");
807 return returnFailureAndStopEventLoop();
808 }
810 ATH_MSG_ERROR("Failed to access the slot for the processed event, cannot produce output. OnlineErrorCode="
811 << errorCode << ". The event loop will exit after all ongoing processing is finished unless the failed event"
812 << " reaches a hard timeout sooner and this process is killed.");
813 return returnFailureAndStopEventLoop();
814 }
816 // Here we cannot be certain if the scheduler started processing the event or not. If yes, the output thread
817 // will finalise the event as normal. If not, the event will eventually reach a hard timeout and this process
818 // is killed, or we exit the process without ever producing output for this event (needs to be handled upstream).
819 ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode
820 << ". Cannot determine if the event processing started or not and whether a decision for this event will be"
821 << " produced. The event loop will exit after all ongoing processing is finished, which may include or"
822 << " not include the problematic event.");
823 return returnFailureAndStopEventLoop();
824 }
826 ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode
827 << " meaning the Scheduler returned FAILURE when asked to give a finished event. Will keep trying to"
828 << " pop further events if there are any still in the scheduler, but this may keep repeating until"
829 << " this process is killed by hard timeout or other means. If all ongoing processing manages to finish"
830 << " then the event loop will exit.");
831 return returnFailureAndStopEventLoop();
832 }
833 if (!eventContext.valid()) {
834 ATH_MSG_ERROR("Failure occurred with an invalid EventContext. Likely there was a framework error before"
835 << " requesting a new event or after sending the result of a finished event. OnlineErrorCode=" << errorCode
836 << ". The event loop will exit after all ongoing processing is finished.");
837 return returnFailureAndStopEventLoop();
838 }
839
840 //----------------------------------------------------------------------------
841 // Make sure we are using the right store
842 //----------------------------------------------------------------------------
843 if (m_whiteboard->selectStore(eventContext.slot()).isFailure()) {
845 }
846
847 //----------------------------------------------------------------------------
848 // Define a debug stream tag for the HLT result
849 //----------------------------------------------------------------------------
850 std::string debugStreamName;
851 switch (errorCode) {
853 debugStreamName = m_algErrorDebugStreamName.value();
854 break;
856 debugStreamName = m_timeoutDebugStreamName.value();
857 break;
859 debugStreamName = m_truncationDebugStreamName.value();
860 break;
861 default:
862 debugStreamName = m_fwkErrorDebugStreamName.value();
863 break;
864 }
865 eformat::helper::StreamTag debugStreamTag{debugStreamName, eformat::DEBUG_TAG, true};
866
867 //----------------------------------------------------------------------------
868 // Create an HLT result for the failed event (copy one if it exists and contains serialised data)
869 //----------------------------------------------------------------------------
870 std::unique_ptr<HLT::HLTResultMT> hltResultPtr;
871 StatusCode buildResultCode{StatusCode::SUCCESS};
872 auto hltResultRH = SG::makeHandle(m_hltResultRHKey,eventContext);
873 if (hltResultRH.isValid() && !hltResultRH->getSerialisedData().empty()) {
874 // There is already an existing result, create a copy with the error code and stream tag
875 hltResultPtr = std::make_unique<HLT::HLTResultMT>(*hltResultRH);
876 hltResultPtr->addErrorCode(errorCode);
877 buildResultCode &= hltResultPtr->addStreamTag(debugStreamTag);
878 } else {
879 // Create a result if not available, pre-fill with error code an stream tag, then try to fill event data
880 hltResultPtr = std::make_unique<HLT::HLTResultMT>();
881 hltResultPtr->addErrorCode(errorCode);
882 buildResultCode &= hltResultPtr->addStreamTag(debugStreamTag);
883 // Fill the result unless we already failed doing this before
884 if (errorCode != HLT::OnlineErrorCode::NO_HLT_RESULT) {
885 buildResultCode &= m_hltResultMaker->fillResult(*hltResultPtr,eventContext);
886 }
887 }
888
889 // Try to record the result in th event store
890 SG::WriteHandleKey<HLT::HLTResultMT> hltResultWHK(m_hltResultRHKey.key()+"_FailedEvent");
891 buildResultCode &= hltResultWHK.initialize();
892 auto hltResultWH = SG::makeHandle(hltResultWHK,eventContext);
893 if (buildResultCode.isFailure() || hltResultWH.record(std::move(hltResultPtr)).isFailure()) {
894 if (errorCode == HLT::OnlineErrorCode::NO_HLT_RESULT) {
895 // Avoid infinite loop
896 ATH_MSG_ERROR("Second failure to build or record the HLT Result in event store while handling a failed event. "
897 << "Cannot force-accept this event from HLT side, will rely on data collector to do this. "
898 << "The event loop will exit after all ongoing processing is finished.");
899 return returnFailureAndStopEventLoop();
900 }
901 ATH_MSG_ERROR("Failed to build or record the HLT Result in event store while handling a failed event. "
902 << "Trying again with skipped filling of the result contents (except debug stream tag).");
904 }
905
906 //----------------------------------------------------------------------------
907 // Monitor event processing time for the failed (force-accepted) event
908 //----------------------------------------------------------------------------
909 auto eventTime = std::chrono::steady_clock::now() - m_eventTimerStartPoint[eventContext.slot()];
910 int64_t eventTimeMillisec = std::chrono::duration_cast<std::chrono::milliseconds>(eventTime).count();
911 auto monTimeAny = Monitored::Scalar<int64_t>("TotalTime", eventTimeMillisec);
912 auto monTimeAcc = Monitored::Scalar<int64_t>("TotalTimeAccepted", eventTimeMillisec);
913 Monitored::Group(m_monTool, monTimeAny, monTimeAcc);
914
915 //----------------------------------------------------------------------------
916 // Try to build and send the output
917 //----------------------------------------------------------------------------
918 if (m_outputCnvSvc->connectOutput("").isFailure()) {
919 ATH_MSG_ERROR("The output conversion service failed in connectOutput() while handling a failed event. "
920 << "Cannot force-accept this event from HLT side, will rely on data collector to do this. "
921 << "The event loop will exit after all ongoing processing is finished.");
922 return returnFailureAndStopEventLoop();
923 }
924
925 DataObject* hltResultDO = m_evtStore->accessData(hltResultWH.clid(),hltResultWH.key());
926 if (hltResultDO == nullptr) {
927 if (errorCode == HLT::OnlineErrorCode::NO_HLT_RESULT) {
928 // Avoid infinite loop
929 ATH_MSG_ERROR("Second failure to build or record the HLT Result in event store while handling a failed event. "
930 << "Cannot force-accept this event from HLT side, will rely on data collector to do this. "
931 << "The event loop will exit after all ongoing processing is finished.");
932 return returnFailureAndStopEventLoop();
933 }
934 ATH_MSG_ERROR("Failed to retrieve DataObject for the HLT result object while handling a failed event. "
935 << "Trying again with skipped filling of the result contents (except debug stream tag).");
937 }
938
939 IOpaqueAddress* addr = nullptr;
940 if (m_outputCnvSvc->createRep(hltResultDO,addr).isFailure() || addr == nullptr) {
941 ATH_MSG_ERROR("Conversion of HLT result object to the output format failed while handling a failed event. "
942 << "Cannot force-accept this event from HLT side, will rely on data collector to do this. "
943 << "The event loop will exit after all ongoing processing is finished.");
944 delete addr;
945 addr = nullptr;
946 return returnFailureAndStopEventLoop();
947 }
948
949 if (m_outputCnvSvc->commitOutput("",true).isFailure()) {
950 ATH_MSG_ERROR("The output conversion service failed in commitOutput() while handling a failed event. "
951 << "Cannot force-accept this event from HLT side, will rely on data collector to do this. "
952 << "The event loop will exit after all ongoing processing is finished.");
953 delete addr;
954 addr = nullptr;
955 return returnFailureAndStopEventLoop();
956 }
957
958 // The output has been sent out, the ByteStreamAddress can be deleted
959 delete addr;
960 addr = nullptr;
961
962 //------------------------------------------------------------------------
963 // Reset the timeout flag and the timer, and mark the slot as idle
964 //------------------------------------------------------------------------
965 resetEventTimer(eventContext, /*processing=*/ false);
966
967 //----------------------------------------------------------------------------
968 // Clear the event data slot
969 //----------------------------------------------------------------------------
970 // Need to copy the event context because it's managed by the event store and clearWBSlot deletes it
971 const EventContext eventContextCopy = eventContext;
972 if (clearWBSlot(eventContext.slot()).isFailure())
974
975 // Only now after store clearing we can allow the slot to be filled again,
976 // so we increment m_freeSlots and notify the input thread
977 ++m_freeSlots;
978 if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) {
979 m_inputThread->cond().notify_all();
980 }
981
982 //----------------------------------------------------------------------------
983 // Finish handling the failed event
984 //----------------------------------------------------------------------------
985
986 // Unless this is an event data or algorithm processing failure, increment the number of framework failures
987 if (!HLT::isEventProcessingErrorCode(errorCode)) {
988 if ( m_maxFrameworkErrors.value()>=0 && ((++m_nFrameworkErrors)>m_maxFrameworkErrors.value()) ) {
989 ATH_MSG_ERROR("Failure with OnlineErrorCode=" << errorCode
990 << " was successfully handled, but the number of tolerable framework errors for this HltEventLoopMgr instance,"
991 << " which is " << m_maxFrameworkErrors.value() << ", was exceeded. Current local event number is "
992 << eventContextCopy.evt() << ", slot " << eventContextCopy.slot()
993 << ". The event loop will exit after all ongoing processing is finished.");
994 return returnFailureAndStopEventLoop();
995 }
996 }
997
998 // Even if handling the failed event succeeded, print an error message with failed event details
999 ATH_MSG_ERROR("Failed event with OnlineErrorCode=" << errorCode
1000 << " Current local event number is " << eventContextCopy.evt() << ", slot " << eventContextCopy.slot());
1001
1002 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
1003 return StatusCode::SUCCESS; // error handling succeeded, event loop may continue
1004}
1005
1006// =============================================================================
1008{
1009 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
1010 auto now=std::chrono::steady_clock::now();
1011 for (size_t i=0; i<m_eventTimerStartPoint.size(); ++i) {
1012 // iterate over all slots and check for timeout
1013 if (!m_isSlotProcessing.at(i)) continue;
1014 if (now > m_eventTimerStartPoint.at(i) + m_softTimeoutValue) {
1015 EventContext ctx(0,i); // we only need the slot number for Athena::Timeout instance
1016 // don't duplicate the actions if the timeout was already reached
1017 if (!Athena::Timeout::instance(ctx).reached()) {
1018 ATH_MSG_ERROR("Soft timeout in slot " << i << ". Processing time exceeded the limit of " << m_softTimeoutValue.count() << " ms");
1020 // Generate stack trace and scheduler dump only once, on the first timeout
1021 if (m_traceOnTimeout.value() && !m_timeoutTraceGenerated) {
1022 m_schedulerSvc->dumpState();
1023 ATH_MSG_INFO("Generating stack trace due to the soft timeout");
1025 gSystem->StackTrace();
1026 }
1027 }
1028 }
1029 }
1030 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
1031}
1032
1033// =============================================================================
1034void HltEventLoopMgr::resetEventTimer(const EventContext& eventContext, bool processing) {
1035 if (!eventContext.valid()) {return;}
1036 {
1037 std::unique_lock<std::mutex> lock(m_timeoutThread->mutex());
1038 m_eventTimerStartPoint[eventContext.slot()] = std::chrono::steady_clock::now();
1039 m_isSlotProcessing[eventContext.slot()] = processing;
1041 }
1042 m_timeoutThread->cond().notify_all();
1043}
1044
1045// =============================================================================
1046StatusCode HltEventLoopMgr::clearWBSlot(size_t evtSlot) const
1047{
1048 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
1049 auto monTime = Monitored::Timer<std::chrono::duration<float, std::milli>>("TIME_clearStore");
1050 StatusCode sc = m_whiteboard->clearStore(evtSlot);
1051 Monitored::Group(m_monTool, monTime);
1052 if( !sc.isSuccess() ) {
1053 ATH_MSG_WARNING("Clear of event data store failed");
1054 }
1055 ATH_MSG_VERBOSE("end of " << __FUNCTION__ << ", returning m_whiteboard->freeStore(evtSlot=" << evtSlot << ")");
1056 return m_whiteboard->freeStore(evtSlot);
1057}
1058
1059// =============================================================================
1061 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
1062 if (m_loopStatus.loopEnded) {
1063 ATH_MSG_VERBOSE("Event loop ended, stopping the input thread and returning from " << __FUNCTION__);
1064 m_inputThread->stop();
1065 // Notify output thread which may be still waiting for events
1066 m_outputThread->cond().notify_all();
1067 return;
1068 }
1069
1070 // Early exit conditions
1071 if (!m_loopStatus.eventsAvailable) {
1072 ATH_MSG_VERBOSE("No more events, flagging the event loop as finished, stopping the input thread"
1073 << " and returning from " << __FUNCTION__);
1074 m_inputThread->stop();
1075 // Notify output thread which may be still waiting for events
1076 m_outputThread->cond().notify_all();
1077 return;
1078 }
1079 const size_t numSlotsToFill = m_freeSlots.load();
1080 if (numSlotsToFill==0) {
1081 ATH_MSG_VERBOSE("No free slots, returning from " << __FUNCTION__);
1082 return;
1083 }
1084 m_freeSlots -= numSlotsToFill;
1085
1086 // Read in and start processing another event
1087 ATH_MSG_DEBUG("Free slots = " << numSlotsToFill << ". Reading new event(s) to fill the slot(s).");
1088
1089 // Fill all free slots with new events
1090 for (size_t i=0; i<numSlotsToFill; ++i) {
1091 auto task = [mgr=this](){
1092 StatusCode sc = StatusCode::SUCCESS;
1093 try {
1094 sc = mgr->startNextEvent();
1095 }
1096 catch (const std::exception& e) {
1097 mgr->error() << "Exception caught in startNextEvent: " << e.what() << endmsg;
1098 sc = StatusCode::FAILURE;
1099 }
1100 catch (...) {
1101 mgr->error() << "Exception caught in startNextEvent" << endmsg;
1102 sc = StatusCode::FAILURE;
1103 }
1104 if (sc.isFailure()) {
1105 mgr->error() << "startNextEvent failed, stopping the event loop" << endmsg;
1106 mgr->m_loopStatus.exitCode = StatusCode::FAILURE;
1107 mgr->m_loopStatus.eventsAvailable = false;
1108 return;
1109 }
1110 // Pop one item from parallel I/O queue to decrement its size - it doesn't matter which item
1111 // is popped, we only use the queue size to limit the number of tasks running in parallel
1112 bool popIOQueue{false};
1113 mgr->m_parallelIOQueue.pop(popIOQueue);
1114 };
1115
1116 // Push one item to the parallel I/O queue to increment its size - the value doesn't matter,
1117 // we only use the queue size and benefit from the blocking push call here to limit the number
1118 // of tasks running in parallel. Once we can push to the queue, we can schedule the task.
1119 m_parallelIOQueue.push(true);
1120 m_parallelIOTaskGroup.run(std::move(task));
1121 }
1122 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
1123}
1124
1125// =============================================================================
1127 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
1128 const size_t nslots = m_isSlotProcessing.size(); // size is fixed in hltUpdateAfterFork after configuring scheduler
1129 if (m_schedulerSvc->freeSlots() == nslots) {
1130 if (m_loopStatus.eventsAvailable) {
1131 ATH_MSG_DEBUG("There are currently no events being processed by the Scheduler, returning from " << __FUNCTION__);
1132 } else if (m_freeSlots == nslots) {
1133 ATH_MSG_DEBUG("No more events to process and scheduler is empty, stopping the event loop and output thread");
1134 if (!m_loopStatus.loopEnded && m_outputThread!=nullptr) {
1135 m_outputThread->stop();
1136 }
1137 // Notify input thread which may be still waiting for free slots
1138 if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) {
1139 m_inputThread->cond().notify_all();
1140 }
1141 // Notify the main thread that the loop ended - this is the only place able to do this!
1142 m_loopStatus.loopEnded = true;
1143 m_loopStatus.loopEndedCond.notify_all();
1144 }
1145 else{
1146 ATH_MSG_DEBUG("No more events, but processing is still ongoing, returning from " << __FUNCTION__);
1147 }
1148 return;
1149 }
1150
1151 //----------------------------------------------------------------------------
1152 // Pop events from the Scheduler
1153 //----------------------------------------------------------------------------
1154 std::vector<EventContext*> finishedEvtContexts;
1155 EventContext* finishedEvtContext(nullptr);
1156 const auto popStartTime = std::chrono::steady_clock::now();
1157
1158 // Pop one event from the scheduler (blocking call)
1159 ATH_MSG_DEBUG("Waiting for a finished event from the Scheduler");
1160 if (m_schedulerSvc->popFinishedEvent(finishedEvtContext).isFailure()) {
1162 delete finishedEvtContext;
1163 finishedEvtContext = nullptr;
1164 return;
1165 }
1166 ATH_MSG_DEBUG("Scheduler returned a finished event: " << finishedEvtContext);
1167 finishedEvtContexts.push_back(finishedEvtContext);
1168
1169 // See if more events are available (non-blocking call)
1170 while (m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()){
1171 ATH_MSG_DEBUG("Scheduler returned a finished event: " << *finishedEvtContext);
1172 finishedEvtContexts.push_back(finishedEvtContext);
1173 }
1174 const auto popSpentTime = std::chrono::steady_clock::now() - popStartTime;
1175 const auto popSpentTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(popSpentTime).count();
1176 Monitored::Scalar<int64_t> monPopSchedulerTime{"PopSchedulerTime", popSpentTimeMs};
1177 Monitored::Scalar<size_t> monPopSchedulerNumEvt{"PopSchedulerNumEvt", finishedEvtContexts.size()};
1178 Monitored::Group{m_monTool, monPopSchedulerNumEvt, monPopSchedulerTime};
1179
1180 //----------------------------------------------------------------------------
1181 // Post-process the finished events
1182 //----------------------------------------------------------------------------
1183 const size_t nFinishedEvents = finishedEvtContexts.size();
1184 ATH_MSG_DEBUG("Number of finished events to post-process: " << nFinishedEvents);
1185
1186 // Push all post-processing tasks to TBB
1187 for (EventContext* thisFinishedEvtContext : finishedEvtContexts) {
1188 // Reset free slot timer for monitoring
1189 if (thisFinishedEvtContext != nullptr) {
1190 m_freeSlotStartPoint[thisFinishedEvtContext->slot()] = std::chrono::steady_clock::now();
1191 }
1192
1193 // Create and enqueue the task
1194 m_finishedEventsQueue.push(thisFinishedEvtContext);
1195 auto task = [mgr=this](){
1196 StatusCode sc = StatusCode::SUCCESS;
1197 try {
1198 sc = mgr->processFinishedEvent();
1199 }
1200 catch (const std::exception& e) {
1201 mgr->error() << "Exception caught in processFinishedEvent: " << e.what() << endmsg;
1202 sc = StatusCode::FAILURE;
1203 }
1204 catch (...) {
1205 mgr->error() << "Exception caught in processFinishedEvent" << endmsg;
1206 sc = StatusCode::FAILURE;
1207 }
1208
1209 if (sc.isFailure()) {
1210 mgr->error() << "processFinishedEvent failed, stopping the event loop" << endmsg;
1211 mgr->m_loopStatus.exitCode = StatusCode::FAILURE;
1212 mgr->m_loopStatus.eventsAvailable = false;
1213 }
1214
1215 // Pop one item from parallel I/O queue to decrement its size - it doesn't matter which item
1216 // is popped, we only use the queue size to limit the number of tasks running in parallel
1217 bool popIOQueue{false};
1218 mgr->m_parallelIOQueue.pop(popIOQueue);
1219
1220 // Wake up the output thread if it's sleeping - this prevents a deadlock after the last event
1221 // when input thread already finished and is no longer waking up the output thread. Spurious wake-ups
1222 // during the event loop from this notification should have negligible effect on CPU load.
1223 mgr->m_outputThread->cond().notify_one();
1224 };
1225
1226 // Push one item to the parallel I/O queue to increment its size - the value doesn't matter,
1227 // we only use the queue size and benefit from the blocking push call here to limit the number
1228 // of tasks running in parallel. Once we can push to the queue, we can schedule the task.
1229 m_parallelIOQueue.push(true);
1230 m_parallelIOTaskGroup.run(std::move(task));
1231 }
1232
1233 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
1234}
1235
1236// =============================================================================
1238{
1239 StatusCode sc = StatusCode::SUCCESS;
1240 auto check = [this, &sc](std::string&& errmsg, HLT::OnlineErrorCode errcode, const EventContext& eventContext) {
1241 if (sc.isSuccess()) {return false;}
1243 sc = failedEvent(errcode, eventContext);
1244 Gaudi::Hive::setCurrentContext(EventContext());
1245 return true;
1246 };
1247
1248 //------------------------------------------------------------------------
1249 // Allocate event slot and create new EventContext
1250 //------------------------------------------------------------------------
1251
1252 // Create an EventContext, allocating and selecting a whiteboard slot
1253 std::unique_ptr<EventContext> eventContextPtr = std::make_unique<EventContext>(createEventContext());
1254
1255 sc = eventContextPtr->valid() ? StatusCode(StatusCode::SUCCESS) : StatusCode(StatusCode::FAILURE);
1256 if (check("Failed to allocate slot for a new event", HLT::OnlineErrorCode::BEFORE_NEXT_EVENT, *eventContextPtr)) {
1257 return sc;
1258 }
1259
1260 sc = m_whiteboard->selectStore(eventContextPtr->slot());
1261 if (check("Failed to select event store slot number " + std::to_string(eventContextPtr->slot()),
1262 HLT::OnlineErrorCode::BEFORE_NEXT_EVENT, *eventContextPtr)) {
1263 return sc;
1264 }
1265
1266 // We can completely avoid using ThreadLocalContext if we store the EventContext in the event store. Any
1267 // service/tool method which does not allow to pass EventContext as argument, can const-retrieve it from the
1268 // event store rather than using ThreadLocalContext.
1269
1270 // We link the current store in the extension of the EventContext we just created. Only then we create
1271 // a WriteHandle for the EventContext using the EventContext itself. The handle will use the linked hiveProxyDict
1272 // to record the context in the current store.
1273 eventContextPtr->setExtension(Atlas::ExtendedEventContext(m_evtStore->hiveProxyDict(),
1274 m_currentRunCtx.eventID().run_number()));
1275 auto eventContext = SG::makeHandle(m_eventContextWHKey,*eventContextPtr);
1276 sc = eventContext.record(std::move(eventContextPtr));
1277 if (check("Failed to record new EventContext in the event store",
1279 return sc;
1280 }
1281
1282 // Reset the AlgExecStateSvc
1283 m_aess->reset(*eventContext);
1284
1285 ATH_MSG_DEBUG("Created new EventContext with number: " << eventContext->evt()
1286 << ", slot: " << eventContext->slot());
1287
1288 // This ThreadLocalContext call is a not-so-nice behind-the-scenes way to inform some services about the current
1289 // context. If possible, services should use EventContext from the event store as recorded above. We have to set
1290 // the ThreadLocalContext here because some services still use it.
1291 Gaudi::Hive::setCurrentContext(*eventContext);
1292
1293 //------------------------------------------------------------------------
1294 // Create a new address for EventInfo to facilitate automatic conversion from input data
1295 //------------------------------------------------------------------------
1296 IOpaqueAddress* addr = nullptr;
1297 sc = m_evtSelector->createAddress(*m_evtSelContext, addr);
1298 if (check("Event selector failed to create an IOpaqueAddress",
1300 return sc;
1301 }
1302
1303 //------------------------------------------------------------------------
1304 // Get the next event
1305 //------------------------------------------------------------------------
1306 try {
1307 bool noEventsTemporarily{false};
1308 do {
1309 try {
1310 noEventsTemporarily = false;
1312 } catch (const hltonl::Exception::NoEventsTemporarily& e) {
1313 ATH_MSG_DEBUG("No new input events available temporarily, requesting again");
1314 noEventsTemporarily = true;
1315 }
1316 } while (noEventsTemporarily);
1317 }
1318 catch (const hltonl::Exception::NoMoreEvents& e) {
1319 sc = StatusCode::SUCCESS;
1320 m_loopStatus.eventsAvailable = false;
1321 sc = clearWBSlot(eventContext->slot());
1322 if (sc.isFailure()) {
1323 ATH_MSG_WARNING("Failed to clear the whiteboard slot " << eventContext->slot()
1324 << " after NoMoreEvents detected");
1325 }
1326 // Increment m_freeSlots after clearing the store and notify the input thread
1327 ++m_freeSlots;
1328 if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) {
1329 m_inputThread->cond().notify_all();
1330 }
1331 if (!m_loopStatus.loopEnded && m_outputThread!=nullptr) {
1332 if (m_freeSlots == m_isSlotProcessing.size()) {
1333 m_outputThread->cond().notify_all();
1334 }
1335 }
1336 return StatusCode::SUCCESS;
1337 }
1338 catch (const hltonl::Exception::MissingCTPFragment& e) {
1339 sc = StatusCode::FAILURE;
1340 if (check(e.what(), HLT::OnlineErrorCode::MISSING_CTP_FRAGMENT, *eventContext)) {
1341 return sc;
1342 }
1343 }
1344 catch (const hltonl::Exception::BadCTPFragment& e) {
1345 sc = StatusCode::FAILURE;
1346 if (check(e.what(), HLT::OnlineErrorCode::BAD_CTP_FRAGMENT, *eventContext)) {
1347 return sc;
1348 }
1349 }
1350 catch (const std::exception& e) {
1351 ATH_MSG_ERROR("Failed to get next event from the event source, std::exception caught: " << e.what());
1352 sc = StatusCode::FAILURE;
1353 }
1354 catch (...) {
1355 ATH_MSG_ERROR("Failed to get next event from the event source, unknown exception caught");
1356 sc = StatusCode::FAILURE;
1357 }
1358 if (check("Failed to get the next event",
1360 return sc;
1361 }
1362
1363 //------------------------------------------------------------------------
1364 // Reset the timeout flag and the timer, and mark the slot as busy
1365 //------------------------------------------------------------------------
1366 resetEventTimer(*eventContext, /*processing=*/ true);
1367
1368 //------------------------------------------------------------------------
1369 // Load event proxies and get event info
1370 //------------------------------------------------------------------------
1371 sc = m_evtStore->loadEventProxies();
1372 if (check("Failed to load event proxies", HLT::OnlineErrorCode::NO_EVENT_INFO, *eventContext)) {
1373 return sc;
1374 }
1375
1376 auto eventInfo = SG::makeHandle(m_eventInfoRHKey,*eventContext);
1377 sc = eventInfo.isValid() ? StatusCode::SUCCESS : StatusCode::FAILURE;
1378 if (check("Failed to retrieve EventInfo", HLT::OnlineErrorCode::NO_EVENT_INFO, *eventContext)) {
1379 return sc;
1380 }
1381
1382 ATH_MSG_DEBUG("Retrieved event info for the new event " << *eventInfo);
1383
1384 // Set EventID for the EventContext
1385 EventID eid = eventIDFromxAOD(eventInfo.cptr());
1386 // Override run/LB/timestamp if needed
1387 if (m_forceRunNumber > 0) {
1388 eid.set_run_number(m_forceRunNumber);
1389 }
1390 if (m_forceLumiblock > 0) {
1391 eid.set_lumi_block(m_forceLumiblock);
1392 }
1393 if (m_forceSOR_ns > 0) {
1394 eid.set_time_stamp(m_forceSOR_ns / std::nano::den);
1395 eid.set_time_stamp_ns_offset(m_forceSOR_ns % std::nano::den);
1396 }
1397 eventContext->setEventID(eid);
1398
1399 // Update thread-local EventContext after setting EventID
1400 Gaudi::Hive::setCurrentContext(*eventContext);
1401
1402 //-----------------------------------------------------------------------
1403 // COOL updates for LB changes
1404 //-----------------------------------------------------------------------
1405
1406 // Check if this is a new LB
1407 EventIDBase::number_type oldMaxLB{0}, newMaxLB{0};
1408 bool updatedLB{false};
1409 do {
1410 oldMaxLB = m_loopStatus.maxLB.load();
1411 newMaxLB = std::max(oldMaxLB, eventContext->eventID().lumi_block());
1412 updatedLB = newMaxLB > oldMaxLB;
1413 } while (updatedLB && !m_loopStatus.maxLB.compare_exchange_strong(oldMaxLB, newMaxLB));
1414 m_loopStatus.maxLB.compare_exchange_strong(oldMaxLB, newMaxLB);
1415
1416 // Wait in case a COOL update is ongoing to avoid executeEvent
1417 // reading conditions data while they are being updated.
1418 {
1419 std::unique_lock<std::mutex> lock(m_loopStatus.coolUpdateMutex);
1420 m_loopStatus.coolUpdateCond.wait(lock, [&]{return !m_loopStatus.coolUpdateOngoing;});
1421 }
1422
1423 // Do COOL updates (if needed) and notify other threads about it
1424 if (updatedLB) {
1425 {
1426 std::lock_guard<std::mutex> lock(m_loopStatus.coolUpdateMutex);
1427 m_loopStatus.coolUpdateOngoing = true;
1428 sc = m_coolHelper->hltCoolUpdate(*eventContext);
1429 if (check("Failure during COOL update", HLT::OnlineErrorCode::COOL_UPDATE, *eventContext)) {
1430 m_loopStatus.coolUpdateOngoing = false;
1431 return sc;
1432 }
1433 m_loopStatus.coolUpdateOngoing = false;
1434 }
1435 m_loopStatus.coolUpdateCond.notify_all();
1436 }
1437
1438 //------------------------------------------------------------------------
1439 // Process the event
1440 //------------------------------------------------------------------------
1441 // We need to make a copy of eventContext, as executeEvent uses move semantics and eventContext is already owned
1442 // by the event store. The copy we create here is pushed to the scheduler and retrieved back in drainScheduler
1443 // where we have to delete it.
1444 sc = executeEvent(EventContext(*eventContext));
1445 if (check("Failed to schedule event processing",
1447 return sc;
1448 }
1449 // Notify the output thread to start waiting for a finished event
1450 m_outputThread->cond().notify_one();
1451
1452 //------------------------------------------------------------------------
1453 // Set ThreadLocalContext to an invalid context
1454 //------------------------------------------------------------------------
1455 // We have passed the event to the scheduler and we are entering back a context-less environment
1456 Gaudi::Hive::setCurrentContext( EventContext() );
1457
1458 return sc;
1459}
1460
1461// =============================================================================
1463{
1464 EventContext* eventContext{nullptr};
1465 m_finishedEventsQueue.pop(eventContext);
1466
1467 StatusCode sc = StatusCode::SUCCESS;
1468 auto check = [this, &sc, &eventContext](std::string&& errmsg, HLT::OnlineErrorCode errcode) {
1469 if (sc.isSuccess()) {return false;}
1471 const EventContext& eventContextRef = (eventContext==nullptr) ? EventContext() : *eventContext;
1472 sc = failedEvent(errcode, eventContextRef);
1473 Gaudi::Hive::setCurrentContext(EventContext());
1474 delete eventContext;
1475 eventContext = nullptr;
1476 return true;
1477 };
1478
1479 //--------------------------------------------------------------------------
1480 // Basic checks, select slot, retrieve event info
1481 //--------------------------------------------------------------------------
1482 // Check if the EventContext object exists
1483 if (eventContext == nullptr) {
1484 sc = StatusCode::FAILURE;
1485 if (check("Detected nullptr EventContext while finalising a processed event",
1487 return sc;
1488 }
1489 }
1490
1491 // Set ThreadLocalContext to the currently processed finished context
1492 Gaudi::Hive::setCurrentContext(eventContext);
1493
1494 // Check the event processing status
1495 if (m_aess->eventStatus(*eventContext) != EventStatus::Success) {
1496 sc = StatusCode::FAILURE;
1497 auto algErrors = m_errorMonTool->algExecErrors(*eventContext);
1498 const HLT::OnlineErrorCode errCode = isTimedOut(algErrors) ?
1500 if (check("Processing event with context " + toString(*eventContext) + \
1501 " failed with status " + toString(m_aess->eventStatus(*eventContext)),
1502 errCode)) {
1503 return sc;
1504 }
1505 }
1506
1507 // Select the whiteboard slot
1508 sc = m_whiteboard->selectStore(eventContext->slot());
1509 if (check("Failed to select event store slot " + std::to_string(eventContext->slot()),
1511 return sc;
1512 }
1513
1514 // Fire EndProcessing incident - some services may depend on this
1515 m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndProcessing, *eventContext));
1516
1517 //--------------------------------------------------------------------------
1518 // HLT output handling
1519 //--------------------------------------------------------------------------
1520 // Call the result builder to record HLTResultMT in SG
1521 sc = m_hltResultMaker->makeResult(*eventContext);
1522 if (check("Failed to create the HLT result object", HLT::OnlineErrorCode::NO_HLT_RESULT)) {return sc;}
1523
1524 // Connect output (create the output container) - the argument is currently not used
1525 sc = m_outputCnvSvc->connectOutput("");
1526 if (check("Conversion service failed to connectOutput", HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) {return sc;}
1527
1528 // Retrieve the HLT result and the corresponding DataObject
1529 auto hltResult = SG::makeHandle(m_hltResultRHKey,*eventContext);
1530 if (!hltResult.isValid()) {sc = StatusCode::FAILURE;}
1531 if (check("Failed to retrieve the HLT result", HLT::OnlineErrorCode::NO_HLT_RESULT)) {return sc;}
1532
1533 DataObject* hltResultDO = m_evtStore->accessData(hltResult.clid(),hltResult.key());
1534 if (hltResultDO == nullptr) {sc = StatusCode::FAILURE;}
1535 if (check("Failed to retrieve the HLTResult DataObject", HLT::OnlineErrorCode::NO_HLT_RESULT)) {return sc;}
1536
1537 // Check for result truncation
1538 if (!hltResult->getTruncatedModuleIds().empty() && hltResult->severeTruncation()) {sc = StatusCode::FAILURE;}
1539 if (check("HLT result truncation", HLT::OnlineErrorCode::RESULT_TRUNCATION)) {return sc;}
1540
1541 // Convert the HLT result to the output data format
1542 IOpaqueAddress* addr = nullptr;
1543 sc = m_outputCnvSvc->createRep(hltResultDO,addr);
1544 if (sc.isFailure()) {delete addr; addr= nullptr;}
1545 if (check("Conversion service failed to convert HLTResult", HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) {return sc;}
1546
1547 // Retrieve and convert the L1 result to the output data format
1548 IOpaqueAddress* l1addr = nullptr;
1549 IOpaqueAddress* l1addrLegacy = nullptr;
1550 if (m_rewriteLVL1) {
1551 // Run-3 L1 simulation result
1552 if (not m_l1TriggerResultRHKey.empty()) {
1553 auto l1TriggerResult = SG::makeHandle(m_l1TriggerResultRHKey, *eventContext);
1554 if (!l1TriggerResult.isValid()) {sc = StatusCode::FAILURE;}
1555 if (check("Failed to retrieve the L1 Trigger Result for RewriteLVL1",
1557 return sc;
1558 }
1559
1560 DataObject* l1TriggerResultDO = m_evtStore->accessData(l1TriggerResult.clid(),l1TriggerResult.key());
1561 if (l1TriggerResultDO == nullptr) {sc = StatusCode::FAILURE;}
1562 if (check("Failed to retrieve the L1 Trigger Result DataObject for RewriteLVL1",
1564 return sc;
1565 }
1566
1567 sc = m_outputCnvSvc->createRep(l1TriggerResultDO,l1addr);
1568 if (sc.isFailure()) {delete l1addr; l1addr = nullptr;}
1569 if (check("Conversion service failed to convert L1 Trigger Result for RewriteLVL1",
1571 return sc;
1572 }
1573 }
1574 // Legacy (Run-2) L1 simulation result
1575 if (not m_roibResultRHKey.empty()) {
1576 auto roibResult = SG::makeHandle(m_roibResultRHKey, *eventContext);
1577 if (!roibResult.isValid()) {sc = StatusCode::FAILURE;}
1578 if (check("Failed to retrieve the RoIBResult for RewriteLVL1",
1580 return sc;
1581 }
1582
1583 DataObject* roibResultDO = m_evtStore->accessData(roibResult.clid(),roibResult.key());
1584 if (roibResultDO == nullptr) {sc = StatusCode::FAILURE;}
1585 if (check("Failed to retrieve the RoIBResult DataObject for RewriteLVL1",
1587 return sc;
1588 }
1589
1590 sc = m_outputCnvSvc->createRep(roibResultDO,l1addrLegacy);
1591 if (sc.isFailure()) {delete l1addrLegacy; l1addrLegacy = nullptr;}
1592 if (check("Conversion service failed to convert RoIBResult for RewriteLVL1",
1594 return sc;
1595 }
1596 }
1597 }
1598
1599 // Save event processing time before sending output
1600 bool eventAccepted = !hltResult->getStreamTags().empty();
1601 auto eventTime = std::chrono::steady_clock::now() - m_eventTimerStartPoint[eventContext->slot()];
1602 int64_t eventTimeMillisec = std::chrono::duration_cast<std::chrono::milliseconds>(eventTime).count();
1603
1604 // Commit output (write/send the output data) - the arguments are currently not used
1605 sc = m_outputCnvSvc->commitOutput("",true);
1606 if (sc.isFailure()) {delete addr; addr=nullptr;}
1607 if (check("Conversion service failed to commitOutput", HLT::OnlineErrorCode::OUTPUT_SEND_FAILURE)) {return sc;}
1608
1609 // The output has been sent out, the ByteStreamAddress can be deleted
1610 delete addr; addr = nullptr;
1611 delete l1addr; l1addr = nullptr;
1612 delete l1addrLegacy; l1addrLegacy = nullptr;
1613
1614 //------------------------------------------------------------------------
1615 // Reset the timeout flag and the timer, and mark the slot as idle
1616 //------------------------------------------------------------------------
1617 resetEventTimer(*eventContext, /*processing=*/ false);
1618
1619 //--------------------------------------------------------------------------
1620 // Clear the slot
1621 //--------------------------------------------------------------------------
1622 ATH_MSG_DEBUG("Clearing slot " << eventContext->slot()
1623 << " (event " << eventContext->evt() << ") of the whiteboard");
1624
1625 sc = clearWBSlot(eventContext->slot());
1626 if (check("Whiteboard slot " + std::to_string(eventContext->slot()) + " could not be properly cleared",
1628 return sc;
1629 }
1630
1631 ATH_MSG_DEBUG("Finished processing " << (eventAccepted ? "accepted" : "rejected")
1632 << " event with context " << *eventContext
1633 << " which took " << eventTimeMillisec << " ms");
1634
1635 // Only now after store clearing we can allow the slot to be filled again,
1636 // so we increment m_freeSlots and notify the input thread
1637 ++m_freeSlots;
1638 if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) {
1639 m_inputThread->cond().notify_all();
1640 }
1641
1642 // Fill the time monitoring histograms
1643 auto monTimeAny = Monitored::Scalar<int64_t>("TotalTime", eventTimeMillisec);
1644 auto monTimeAcc = Monitored::Scalar<int64_t>(eventAccepted ? "TotalTimeAccepted" : "TotalTimeRejected", eventTimeMillisec);
1645 Monitored::Group(m_monTool, monTimeAny, monTimeAcc);
1646
1647 // Set ThreadLocalContext to an invalid context as we entering a context-less environment
1648 Gaudi::Hive::setCurrentContext( EventContext() );
1649
1650 // Delete the EventContext which was created when calling executeEvent( EventContext(*eventContext) )
1651 delete eventContext;
1652 eventContext = nullptr;
1653
1654 return StatusCode::SUCCESS;
1655}
#define endmsg
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the ByteStreamMetadataContainer class.
This file contains the class definition for the ByteStreamMetadata class.
EventID eventIDFromxAOD(const xAOD::EventInfo *xaod)
Create EventID object from xAOD::EventInfo.
char data[hepevt_bytes_allocation_ATLAS]
Definition HepEvt.cxx:11
boost::property_tree::ptree ptree
static Double_t ss
static Double_t sc
Helper tool for COOL updates.
#define x
static thread_local std::ostringstream errmsg
Definition WaferTree.h:25
void resetTimeout(Timeout &instance)
Reset timeout.
Definition Timeout.h:83
void setTimeout(Timeout &instance)
Set timeout.
Definition Timeout.h:80
static Timeout & instance()
Get reference to Timeout singleton.
Definition Timeout.h:64
This class provides a unique identification for each event, in terms of run/event number and/or a tim...
Definition EventID.h:35
ToolHandle< TrigCOOLUpdateHelper > m_coolHelper
StatusCode clearTemporaryStores()
Clear per-event stores.
ServiceHandle< StoreGateSvc > m_inputMetaDataStore
SG::WriteHandleKey< EventContext > m_eventContextWHKey
ServiceHandle< IIoComponentMgr > m_ioCompMgr
Gaudi::Property< float > m_softTimeoutFraction
virtual StatusCode finalize() override
virtual StatusCode hltUpdateAfterFork(const boost::property_tree::ptree &pt) override
Gaudi::Property< int > m_maxParallelIOTasks
Gaudi::Property< int > m_maxIOWakeUpIntervalMs
StatusCode failedEvent(HLT::OnlineErrorCode errorCode, const EventContext &eventContext)
Handle a failure to process an event.
void printSORAttrList(const coral::AttributeList &atr) const
Print the SOR record.
tbb::concurrent_bounded_queue< bool > m_parallelIOQueue
Queue limiting the number of parallel I/O tasks.
std::unique_ptr< HLT::LoopThread > m_timeoutThread
Timeout thread.
SG::ReadHandleKey< xAOD::EventInfo > m_eventInfoRHKey
ServiceHandle< Gaudi::Interfaces::IOptionsSvc > m_jobOptionsSvc
StatusCode execAtStart(const EventContext &ctx) const
Execute optional algs/sequences.
std::unique_ptr< HLT::LoopThread > m_inputThread
Input handling thread (triggers reading new events)
std::atomic< size_t > m_localEventNumber
Event counter used for local bookkeeping; incremental per instance of HltEventLoopMgr,...
virtual StatusCode stop() override
tbb::concurrent_bounded_queue< EventContext * > m_finishedEventsQueue
Queue of events ready for output processing.
EventLoopStatus m_loopStatus
Object keeping track of the event loop status.
std::unique_ptr< HLT::LoopThread > m_outputThread
Output handling thread (triggers post-processing of finished events)
Gaudi::Property< std::vector< std::string > > m_execAtStart
StatusCode updateMagField(const boost::property_tree::ptree &pt) const
Set magnetic field currents from ptree.
tbb::task_group m_parallelIOTaskGroup
Task group to execute parallel I/O tasks asynchronously.
virtual StatusCode nextEvent(int maxevt=-1) override
Implementation of IEventProcessor::nextEvent which implements the event loop.
SG::ReadHandleKey< xAOD::TrigCompositeContainer > m_l1TriggerResultRHKey
void outputThreadCallback()
The method executed by the output handling thread.
StatusCode processFinishedEvent()
Perform all end-of-event actions for a single event popped out from the scheduler.
virtual StatusCode executeRun(int maxevt=-1) override
Implementation of IEventProcessor::executeRun which calls IEventProcessor::nextEvent.
Gaudi::Property< std::string > m_truncationDebugStreamName
ServiceHandle< StoreGateSvc > m_evtStore
virtual StatusCode initialize() override
Gaudi::Property< unsigned int > m_forceLumiblock
virtual EventContext createEventContext() override
create an Event Context object
std::vector< std::chrono::steady_clock::time_point > m_freeSlotStartPoint
Vector of time stamps telling when each scheduler slot was freed.
ServiceHandle< IEvtSelector > m_evtSelector
std::string m_applicationName
Application name.
Gaudi::Property< unsigned int > m_timeoutThreadIntervalMs
SmartIF< IAlgResourcePool > m_algResourcePool
int m_workerID
Worker ID.
StatusCode clearWBSlot(size_t evtSlot) const
Clear an event slot in the whiteboard.
virtual StatusCode executeEvent(EventContext &&ctx) override
Implementation of IEventProcessor::executeEvent which processes a single event.
std::vector< bool > m_isSlotProcessing
Vector of flags to tell if a slot is idle or processing.
SG::ReadHandleKey< ROIB::RoIBResult > m_roibResultRHKey
IEvtSelector::Context * m_evtSelContext
Event selector context.
ServiceHandle< IIncidentSvc > m_incidentSvc
SmartIF< IHiveWhiteBoard > m_whiteboard
bool m_timeoutTraceGenerated
Flag set when a soft timeout produces a stack trace, to avoid producing multiple traces.
Gaudi::Property< std::string > m_timeoutDebugStreamName
virtual StatusCode stopRun() override
Implementation of IEventProcessor::stopRun (obsolete for online runnning)
Gaudi::Property< unsigned long long > m_forceSOR_ns
SmartIF< IScheduler > m_schedulerSvc
StatusCode startNextEvent()
Gaudi::Property< std::string > m_algErrorDebugStreamName
virtual StatusCode prepareForStart(const boost::property_tree::ptree &) override
std::chrono::milliseconds m_softTimeoutValue
Soft timeout value set to HardTimeout*SoftTimeoutFraction at initialisation.
SmartIF< IAlgExecStateSvc > m_aess
void updateDFProps()
Read DataFlow configuration properties.
Gaudi::Property< std::string > m_whiteboardName
std::vector< std::chrono::steady_clock::time_point > m_eventTimerStartPoint
Vector of event start-processing time stamps in each slot.
std::atomic< size_t > m_freeSlots
Number of free slots used to synchronise input/output tasks.
Gaudi::Property< int > m_maxFrameworkErrors
ServiceHandle< IConversionSvc > m_outputCnvSvc
EventContext m_currentRunCtx
"Event" context of current run with dummy event/slot number
void eventTimerCallback()
The method executed by the event timeout monitoring thread.
Gaudi::Property< std::string > m_fwkErrorDebugStreamName
Gaudi::Property< float > m_hardTimeout
ToolHandle< GenericMonitoringTool > m_monTool
std::atomic< int > m_nFrameworkErrors
Counter of framework errors.
HltEventLoopMgr(const std::string &name, ISvcLocator *svcLoc)
Standard constructor.
ToolHandle< HLTResultMTMaker > m_hltResultMaker
void updateMetadataStore(const coral::AttributeList &sor_attrlist) const
ToolHandle< ITrigErrorMonTool > m_errorMonTool
Gaudi::Property< unsigned int > m_forceRunNumber
Gaudi::Property< std::string > m_sorPath
int m_workerPID
Worker PID.
ServiceHandle< ISchedulerMonSvc > m_schedulerMonSvc
Gaudi::Property< bool > m_monitorScheduler
Gaudi::Property< bool > m_traceOnTimeout
Gaudi::Property< std::string > m_schedulerName
const coral::AttributeList & getSorAttrList() const
Extract the single attr list off the SOR CondAttrListCollection.
std::unique_ptr< TrigSORFromPtreeHelper > m_sorHelper
void resetEventTimer(const EventContext &eventContext, bool processing)
Reset the timeout flag and the timer, and mark the slot as busy or idle according to the second argum...
Gaudi::Property< bool > m_setMagFieldFromPtree
SG::ReadHandleKey< HLT::HLTResultMT > m_hltResultRHKey
StoreGate key for reading the HLT result.
virtual ~HltEventLoopMgr() noexcept override
Standard destructor.
Gaudi::Property< bool > m_rewriteLVL1
ServiceHandle< StoreGateSvc > m_detectorStore
Group of local monitoring quantities and retain correlation when filling histograms
Declare a monitored scalar variable.
A monitored timer.
static void setNumProcs(size_t numProcs)
Set number of concurrent processes.
StatusCode initialize(bool used=true)
If this object is used as a property, then this should be called during the initialize phase.
Property holding a SG store/key/clid from which a WriteHandle is made.
static StatusCode closeDBConnections(MsgStream &msg)
Close database connections.
CondAttrListCollection SOR
Thrown if the CTP ROBFragment for a new event has non-zero status word or other errors.
Thrown if the CTP ROBFragment cannot be retrieved for a new event.
Thrown if the event source cannot provide new events temporarily, e.g.
Thrown if all events are already read from the input and another one is requested.
int count(std::string s, const std::string &regx)
count how many occurances of a regx are in a string
Definition hcg.cxx:146
std::string toString(const Translation3D &translation, int precision=4)
GeoPrimitvesToStringConverter.
@ TIMEOUT
Timeout during event processing.
constexpr bool isEventProcessingErrorCode(const OnlineErrorCode code)
AthROOTErrorHandlerSvc * svc
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
histsvc
TrigInDetMonitoring part ################################.
MsgStream & msg
Definition testRead.cxx:32