ATLAS Offline Software
HltEventLoopMgr.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 // Local includes
6 #include "HltEventLoopMgr.h"
7 #include "TrigCOOLUpdateHelper.h"
8 #include "TrigRDBManager.h"
9 
10 // Trigger includes
13 
14 // Athena includes
20 #include "StoreGate/StoreGateSvc.h"
21 #include "StoreGate/SGHiveMgrSvc.h"
22 
23 // Gaudi includes
24 #include "GaudiKernel/ConcurrencyFlags.h"
25 #include "GaudiKernel/IAlgManager.h"
26 #include "GaudiKernel/IAlgorithm.h"
27 #include "GaudiKernel/IEvtSelector.h"
28 #include "GaudiKernel/IProperty.h"
29 #include "GaudiKernel/IIoComponent.h"
30 #include "GaudiKernel/ThreadLocalContext.h"
31 
32 // TDAQ includes
33 #include "eformat/StreamTag.h"
34 
35 // ROOT includes
36 #include "TROOT.h"
37 #include "TSystem.h"
38 
39 // System includes
40 #include <filesystem>
41 #include <format>
42 #include <sstream>
43 #include <string>
44 #include <time.h>
45 
46 // =============================================================================
47 // Helper macros, typedefs and constants
48 // =============================================================================
49 namespace {
50  bool isTimedOut(const std::unordered_map<std::string_view,StatusCode>& algErrors) {
51  for (const auto& [key, sc] : algErrors) {
52  if (sc == Athena::Status::TIMEOUT) return true;
53  }
54  return false;
55  }
57  template <typename T> std::string toString(const T& x) {
58  std::ostringstream ss;
59  ss << x;
60  return ss.str();
61  }
62 }
63 using namespace boost::property_tree;
64 
65 // =============================================================================
66 // Standard constructor
67 // =============================================================================
68 HltEventLoopMgr::HltEventLoopMgr(const std::string& name, ISvcLocator* svcLoc)
69 : base_class(name, svcLoc) {}
70 
71 // =============================================================================
72 // Standard destructor
73 // =============================================================================
75 {
76  // tbb:task_group destructor throws if wait() was never called
77  m_parallelIOTaskGroup.wait();
78 }
79 
80 // =============================================================================
81 // Reimplementation of AthService::initalize (IStateful interface)
82 // =============================================================================
84 {
85  // Do not auto-retrieve tools (see Gaudi!1124)
86  m_autoRetrieveTools = false;
87  m_checkToolDeps = false;
88 
89  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
90 
91  ATH_MSG_INFO(" ---> HltEventLoopMgr = " << name() << " initialize");
92 
93  //----------------------------------------------------------------------------
94  // Setup properties
95  //----------------------------------------------------------------------------
96 
97  // Set the timeout value (cast float to int)
98  m_softTimeoutValue = std::chrono::milliseconds(static_cast<int>(m_hardTimeout.value() * m_softTimeoutFraction.value()));
99 
100  // Read DataFlow configuration properties
101  updateDFProps();
102 
103  // print properties
104  ATH_MSG_INFO(" ---> ApplicationName = " << m_applicationName);
105  ATH_MSG_INFO(" ---> HardTimeout = " << m_hardTimeout.value());
106  ATH_MSG_INFO(" ---> SoftTimeoutFraction = " << m_softTimeoutFraction.value());
107  ATH_MSG_INFO(" ---> SoftTimeoutValue = " << m_softTimeoutValue.count());
108  ATH_MSG_INFO(" ---> TimeoutThreadIntervalMs = " << m_timeoutThreadIntervalMs.value());
109  ATH_MSG_INFO(" ---> TraceOnTimeout = " << m_traceOnTimeout.value());
110  ATH_MSG_INFO(" ---> MaxFrameworkErrors = " << m_maxFrameworkErrors.value());
111  ATH_MSG_INFO(" ---> FwkErrorDebugStreamName = " << m_fwkErrorDebugStreamName.value());
112  ATH_MSG_INFO(" ---> AlgErrorDebugStreamName = " << m_algErrorDebugStreamName.value());
113  ATH_MSG_INFO(" ---> TimeoutDebugStreamName = " << m_timeoutDebugStreamName.value());
114  ATH_MSG_INFO(" ---> TruncationDebugStreamName = " << m_truncationDebugStreamName.value());
115  ATH_MSG_INFO(" ---> SORPath = " << m_sorPath.value());
116  ATH_MSG_INFO(" ---> setMagFieldFromPtree = " << m_setMagFieldFromPtree.value());
117  ATH_MSG_INFO(" ---> execAtStart = " << m_execAtStart.value());
118  ATH_MSG_INFO(" ---> forceRunNumber = " << m_forceRunNumber.value());
119  ATH_MSG_INFO(" ---> forceLumiblock = " << m_forceLumiblock.value());
120  ATH_MSG_INFO(" ---> forceStartOfRunTime = " << m_forceSOR_ns.value());
121  ATH_MSG_INFO(" ---> RewriteLVL1 = " << m_rewriteLVL1.value());
122  ATH_MSG_INFO(" ---> EventContextWHKey = " << m_eventContextWHKey.key());
123  ATH_MSG_INFO(" ---> EventInfoRHKey = " << m_eventInfoRHKey.key());
124 
125  ATH_CHECK( m_jobOptionsSvc.retrieve() );
126  const std::string& slots = m_jobOptionsSvc->get("EventDataSvc.NSlots");
127  if (!slots.empty())
128  ATH_MSG_INFO(" ---> NumConcurrentEvents = " << slots);
129  else
130  ATH_MSG_WARNING("Failed to retrieve the job property EventDataSvc.NSlots");
131  const std::string& threads = m_jobOptionsSvc->get("AvalancheSchedulerSvc.ThreadPoolSize");
132  if (!threads.empty())
133  ATH_MSG_INFO(" ---> NumThreads = " << threads);
134  else
135  ATH_MSG_WARNING("Failed to retrieve the job property AvalancheSchedulerSvc.ThreadPoolSize");
136 
137  const std::string& procs = m_jobOptionsSvc->get("DataFlowConfig.DF_NumberOfWorkers");
138  if (!procs.empty()) {
139  ATH_MSG_INFO(" ---> NumProcs = " << procs);
140  try {
141  SG::HiveMgrSvc::setNumProcs(std::stoi(procs));
142  }
143  catch (const std::logic_error& ex) {
144  ATH_MSG_ERROR("Cannot convert " << procs << "to integer: " << ex.what());
145  return StatusCode::FAILURE;
146  }
147  }
148  else
149  ATH_MSG_WARNING("Failed to retrieve the job property DataFlowconfig.DF_NumberOfWorkers");
150 
151  if (m_maxParallelIOTasks.value() <= 0) {
153  }
154  ATH_MSG_INFO(" ---> MaxParallelIOTasks = " << m_maxParallelIOTasks.value());
155  ATH_MSG_INFO(" ---> MaxIOWakeUpIntervalMs = " << m_maxIOWakeUpIntervalMs.value());
156 
157  //----------------------------------------------------------------------------
158  // Setup all Hive services for multithreaded event processing with the exception of SchedulerSvc,
159  // which has to be initialised after forking because it opens new threads
160  //----------------------------------------------------------------------------
161  m_whiteboard = serviceLocator()->service(m_whiteboardName);
162  if( !m_whiteboard.isValid() ) {
163  ATH_MSG_FATAL("Error retrieving " << m_whiteboardName << " interface IHiveWhiteBoard");
164  return StatusCode::FAILURE;
165  }
166  ATH_MSG_DEBUG("Initialised " << m_whiteboardName << " interface IHiveWhiteBoard");
167 
168  m_algResourcePool = serviceLocator()->service("AlgResourcePool");
169  if( !m_algResourcePool.isValid() ) {
170  ATH_MSG_FATAL("Error retrieving AlgResourcePool");
171  return StatusCode::FAILURE;
172  }
173  ATH_MSG_DEBUG("initialised AlgResourcePool");
174 
175  m_aess = serviceLocator()->service("AlgExecStateSvc");
176  if( !m_aess.isValid() ) {
177  ATH_MSG_FATAL("Error retrieving AlgExecStateSvc");
178  return StatusCode::FAILURE;
179  }
180  ATH_MSG_DEBUG("initialised AlgExecStateSvc");
181 
182  //----------------------------------------------------------------------------
183  // Initialise services
184  //----------------------------------------------------------------------------
185  ATH_CHECK(m_incidentSvc.retrieve());
186  ATH_CHECK(m_evtStore.retrieve());
187  ATH_CHECK(m_detectorStore.retrieve());
188  ATH_CHECK(m_inputMetaDataStore.retrieve());
189  ATH_CHECK(m_evtSelector.retrieve());
190  ATH_CHECK(m_evtSelector->createContext(m_evtSelContext)); // create an EvtSelectorContext
191  ATH_CHECK(m_outputCnvSvc.retrieve());
192  ATH_CHECK(m_ioCompMgr.retrieve());
193  if (m_monitorScheduler) {
194  ATH_CHECK(m_schedulerMonSvc.retrieve());
195  }
196 
197  //----------------------------------------------------------------------------
198  // Initialise tools
199  //----------------------------------------------------------------------------
200  // COOL helper
201  ATH_CHECK(m_coolHelper.retrieve());
202  // HLT result builder
203  ATH_CHECK(m_hltResultMaker.retrieve());
204  // Monitoring tools
205  if (!m_monTool.empty()) ATH_CHECK(m_monTool.retrieve());
206  ATH_CHECK(m_errorMonTool.retrieve());
207 
208  //----------------------------------------------------------------------------
209  // Initialise data handle keys
210  //----------------------------------------------------------------------------
211  // EventContext WriteHandle
213  // EventInfo ReadHandle
215  // HLTResultMT ReadHandle (created dynamically from the result builder property)
216  m_hltResultRHKey = m_hltResultMaker->resultName();
218  // L1TriggerResult and RoIBResult ReadHandles for RewriteLVL1
221 
222  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
223  return StatusCode::SUCCESS;
224 }
225 
226 // =============================================================================
227 // Reimplementation of AthService::stop (IStateful interface)
228 // =============================================================================
230 {
231  // Need to reinitialize IO in the mother process
232  if (m_workerID==0) {
233  ATH_CHECK(m_ioCompMgr->io_reinitialize());
234  }
235 
236  return StatusCode::SUCCESS;
237 }
238 
239 // =============================================================================
240 // Reimplementation of AthService::finalize (IStateful interface)
241 // =============================================================================
243 {
244  ATH_MSG_INFO(" ---> HltEventLoopMgr/" << name() << " finalize ");
245  // Usually (but not necessarily) corresponds to the number of processed events +1
246  ATH_MSG_INFO("Total number of EventContext objects created " << m_localEventNumber);
247 
248  // Release all handles
249  auto releaseAndCheck = [&](auto& handle, std::string_view handleType) {
250  if (handle.release().isFailure())
251  ATH_MSG_WARNING("finalize(): Failed to release " << handleType << " " << handle.typeAndName());
252  };
253  auto releaseService = [&](auto&&... args) { (releaseAndCheck(args,"service"), ...); };
254  auto releaseTool = [&](auto&&... args) { (releaseAndCheck(args,"tool"), ...); };
255  auto releaseSmartIF = [](auto&&... args) { (args.reset(), ...); };
256 
257  releaseService(m_incidentSvc,
258  m_evtStore,
264 
265  releaseTool(m_coolHelper,
267  m_monTool);
268 
269  releaseSmartIF(m_whiteboard,
271  m_aess,
273 
274  return StatusCode::SUCCESS;
275 }
276 
277 // =============================================================================
278 // Implementation of ITrigEventLoopMgr::prepareForStart
279 // =============================================================================
281 {
282  try {
283  const auto& rparams = pt.get_child("RunParams");
284  m_sorHelper = std::make_unique<TrigSORFromPtreeHelper>(msgSvc(), m_detectorStore, m_sorPath, rparams);
285  }
286  catch(ptree_bad_path& e) {
287  ATH_MSG_ERROR("Bad ptree path: \"" << e.path<ptree::path_type>().dump() << "\" - " << e.what());
288  return StatusCode::FAILURE;
289  }
290 
291  // Override run/timestamp if needed
292  if (m_forceRunNumber > 0) {
293  m_sorHelper->setRunNumber(m_forceRunNumber);
294  ATH_MSG_WARNING("Run number overwrite:" << m_forceRunNumber);
295  }
296  if (m_forceSOR_ns > 0) {
297  m_sorHelper->setSORtime_ns(m_forceSOR_ns);
298  ATH_MSG_WARNING("SOR time overwrite:" << m_forceSOR_ns);
299  }
300 
301  // Set our "run context"
302  m_currentRunCtx.setEventID( m_sorHelper->eventID() );
303  m_currentRunCtx.setExtension(Atlas::ExtendedEventContext(m_evtStore->hiveProxyDict(),
304  m_currentRunCtx.eventID().run_number()));
305 
306  // Some algorithms expect a valid context during start()
307  ATH_MSG_DEBUG("Setting context for start transition: " << m_currentRunCtx.eventID());
308  Gaudi::Hive::setCurrentContext(m_currentRunCtx);
309 
310  try {
311  ATH_CHECK( clearTemporaryStores() ); // do the necessary resets
312  ATH_CHECK( m_sorHelper->fillSOR(m_currentRunCtx) ); // update SOR in det store
313 
314  const auto& soral = getSorAttrList();
315  updateMetadataStore(soral); // update metadata store
316  }
317  catch(const std::exception& e) {
318  ATH_MSG_ERROR("Exception: " << e.what());
319  }
320 
321  ATH_CHECK( updateMagField(pt) ); // update magnetic field
322 
323  return StatusCode::SUCCESS;
324 }
325 
326 
327 // =============================================================================
328 // Implementation of ITrigEventLoopMgr::prepareForRun
329 // =============================================================================
330 StatusCode HltEventLoopMgr::prepareForRun(const ptree& /*pt*/)
331 {
332  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
333 
334  try
335  {
336  // Reset the AlgExecStateSvc (important in case there was a stop/start)
337  m_aess->reset(m_currentRunCtx);
338 
339  // Fire BeginRun incident
340  m_incidentSvc->fireIncident(Incident(name(), IncidentType::BeginRun, m_currentRunCtx));
341 
342  // Initialize COOL helper (needs to be done after IOVDbSvc has loaded all folders)
343  ATH_CHECK(m_coolHelper->readFolderInfo());
344 
345  // Run optional algs/sequences (e.g. CondAlgs ATR-26138)
347 
348  // close any open files (e.g. THistSvc)
349  ATH_CHECK(m_ioCompMgr->io_finalize());
350 
351  // Verify that there are no other open ROOT files (e.g. from dual-use tools).
352  if ( !gROOT->GetListOfFiles()->IsEmpty() ) {
353  std::unordered_map<std::string, size_t> dups;
354  for (const auto f : *gROOT->GetListOfFiles()) {
355  ++dups[f->GetName()];
356  }
357  // Exception for THistSvc files as those will remain open
358  auto histsvc = serviceLocator()->service("THistSvc", false).as<IIoComponent>();
359  for (const std::string& histfile : m_ioCompMgr->io_retrieve(histsvc.get())) {
360  dups.erase(histfile);
361  }
362  if (!dups.empty()) {
363  msg() << MSG::ERROR << "The following ROOT files (with #instances) have not been closed yet: ";
364  for (const auto& [n,c] : dups) msg() << n << "(x" << c << ") ";
365  msg() << endmsg;
366  }
367  }
368 
369  // close open DB connections
371 
372  // Assert that scheduler has not been initialised before forking
373  SmartIF<IService> svc = serviceLocator()->service(m_schedulerName, /*createIf=*/ false);
374  if (svc.isValid()) {
375  ATH_MSG_FATAL("Misconfiguration - Scheduler was initialised before forking!");
376  return StatusCode::FAILURE;
377  }
378 
379  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
380  return StatusCode::SUCCESS;
381  }
382  catch(const std::runtime_error& e)
383  {
384  ATH_MSG_ERROR("Runtime error: " << e.what());
385  }
386 
387  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
388  return StatusCode::FAILURE;
389 }
390 
391 
392 // =============================================================================
393 StatusCode HltEventLoopMgr::execAtStart(const EventContext& ctx) const
394 {
395  IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
396 
397  StatusCode sc;
398  for (const std::string& name : m_execAtStart) {
399  SmartIF<IAlgorithm>& alg = algMgr->algorithm(name, /*createIf*/false);
400  if ( alg ) {
401  ATH_MSG_INFO("Executing " << alg->name() << "...");
402  sc &= alg->sysExecute(ctx);
403  }
404  else ATH_MSG_WARNING("Cannot find algorithm or sequence " << name);
405  }
406  return sc;
407 }
408 
409 
410 // =============================================================================
411 // Implementation of ITrigEventLoopMgr::hltUpdateAfterFork
412 // =============================================================================
414 {
415  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
416 
417  updateDFProps();
418  ATH_MSG_INFO("Post-fork initialization for " << m_applicationName);
419 
420  ATH_MSG_DEBUG("Initialising the scheduler after forking");
421  m_schedulerSvc = serviceLocator()->service(m_schedulerName, /*createIf=*/ true);
422  if ( !m_schedulerSvc.isValid()){
423  ATH_MSG_FATAL("Error retrieving " << m_schedulerName << " interface ISchedulerSvc");
424  return StatusCode::FAILURE;
425  }
426  ATH_MSG_DEBUG("Initialised " << m_schedulerName << " interface ISchedulerSvc");
427 
428  ATH_MSG_DEBUG("Trying a stop-start of CoreDumpSvc");
429  SmartIF<IService> svc = serviceLocator()->service("CoreDumpSvc", /*createIf=*/ false);
430  if (svc.isValid()) {
431  StatusCode sc = svc->stop();
432  sc &= svc->start();
433  if (sc.isFailure()) {
434  ATH_MSG_WARNING("Could not perform stop/start for CoreDumpSvc");
435  }
436  else {
437  ATH_MSG_DEBUG("Done a stop-start of CoreDumpSvc");
438  }
439  }
440  else {
441  ATH_MSG_WARNING("Could not retrieve CoreDumpSvc");
442  }
443 
444  // Make sure output files, i.e. histograms are written to their own directory.
445  // Nothing happens if the online TrigMonTHistSvc is used as there are no output files.
446  SmartIF<IIoComponent> histsvc = serviceLocator()->service("THistSvc", /*createIf=*/ false).as<IIoComponent>();
447  if ( !m_ioCompMgr->io_retrieve(histsvc.get()).empty() ) {
448  std::filesystem::path worker_dir = std::filesystem::absolute("athenaHLT_workers");
449  std::ostringstream oss;
450  oss << "athenaHLT-" << std::setfill('0') << std::setw(2) << m_workerID;
451  worker_dir /= oss.str();
452  // Delete worker directory if it exists already
453  if ( std::filesystem::exists(worker_dir) ) {
454  if ( std::filesystem::remove_all(worker_dir) == 0 ) {
455  ATH_MSG_FATAL("Cannot delete previous worker directory " << worker_dir);
456  return StatusCode::FAILURE;
457  }
458  }
459  if ( !std::filesystem::create_directories(worker_dir) ) {
460  ATH_MSG_FATAL("Cannot create worker directory " << worker_dir);
461  return StatusCode::FAILURE;
462  }
463  ATH_MSG_INFO("Writing worker output files to " << worker_dir);
464  ATH_CHECK(m_ioCompMgr->io_update_all(worker_dir.string()));
465  }
466  ATH_CHECK(m_ioCompMgr->io_reinitialize());
467 
468  const size_t numSlots = m_whiteboard->getNumberOfStores();
469  m_freeSlots = numSlots;
470 
471  // Initialise vector of time points for event timeout monitoring
472  m_eventTimerStartPoint.clear();
474  m_isSlotProcessing.resize(numSlots, false);
475 
476  // Initialise vector of time points for free slots monitoring
477  m_freeSlotStartPoint.clear();
479 
480  // Initialise the queues used in parallel I/O steering
481  m_parallelIOQueue.set_capacity(static_cast<decltype(m_parallelIOQueue)::size_type>(m_maxParallelIOTasks.value()));
482  m_finishedEventsQueue.set_capacity(static_cast<decltype(m_finishedEventsQueue)::size_type>(numSlots));
483 
484  // Fire incident to update listeners after forking
486 
487  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
488  return StatusCode::SUCCESS;
489 }
490 
491 // =============================================================================
492 // Implementation of IEventProcessor::executeRun
493 // =============================================================================
495 {
496  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
497 
498  if (m_monitorScheduler) ATH_CHECK(m_schedulerMonSvc->startMonitoring());
499 
500  StatusCode sc = StatusCode::SUCCESS;
501  try {
502  sc = nextEvent(maxevt);
503  if (sc.isFailure()) ATH_MSG_FATAL("Event loop failed");
504  }
505  catch (const std::exception& e) {
506  ATH_MSG_FATAL("Event loop failed, std::exception caught: " << e.what());
507  sc = StatusCode::FAILURE;
508  }
509  catch (...) {
510  ATH_MSG_FATAL("Event loop failed, unknown exception caught");
511  sc = StatusCode::FAILURE;
512  }
513 
514  if (m_monitorScheduler) ATH_CHECK(m_schedulerMonSvc->stopMonitoring());
515 
516  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
517  return sc;
518 }
519 
520 // =============================================================================
521 // Implementation of IEventProcessor::nextEvent
522 // maxevt is not used - we always want to process all events delivered
523 // =============================================================================
525 {
526  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
527 
528  // Start the event timer thread
529  ATH_MSG_DEBUG("Starting the timeout thread");
530  m_timeoutThread = std::make_unique<HLT::LoopThread>([this]{return eventTimerCallback();}, m_timeoutThreadIntervalMs.value());
531  m_timeoutThread->start();
532 
533  // Start the event loop
534  ATH_MSG_INFO("Starting loop on events");
535  std::unique_lock<std::mutex> lock{m_loopStatus.loopEndedMutex};
536  m_inputThread = std::make_unique<HLT::LoopThread>([this]{return inputThreadCallback();}, m_maxIOWakeUpIntervalMs.value());
537  m_outputThread = std::make_unique<HLT::LoopThread>([this]{return outputThreadCallback();}, m_maxIOWakeUpIntervalMs.value());
538  m_outputThread->start();
539  m_inputThread->start();
540 
541  // Wait for event loop to end. The condition means the main input and output threads flagged they have
542  // nothing else to do and will exit asynchronously (later than the wait here ends)
543  ATH_MSG_DEBUG("Event loop started, the main thread is going to sleep until it finishes");
544  m_loopStatus.loopEndedCond.wait(lock, [this](){return m_loopStatus.loopEnded.load();});
545  ATH_MSG_INFO("All events processed, finalising the event loop");
546 
547  // Wait for the I/O TBB tasks and main I/O threads to finish. Note the TBB tasks need to finish first
548  // because they may notify the condition variables in the main I/O threads. The lifetime of the condition
549  // variables must span beyond any I/O TBB task.
550  ATH_MSG_DEBUG("Waiting for all I/O tasks and threads to return");
551  m_parallelIOTaskGroup.wait();
552  m_inputThread->wait();
553  m_outputThread->wait();
554 
555  // Stop the event timer thread
556  ATH_MSG_DEBUG("All I/O threads and tasks finished. Stopping the timeout thread");
557  m_timeoutThread->stop();
558  m_timeoutThread->wait();
559  ATH_MSG_DEBUG("The timeout thread finished");
560 
561  ATH_MSG_INFO("Finished loop on events");
562 
563  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
564  return StatusCode::SUCCESS;
565 }
566 
567 // =============================================================================
568 // Implementation of IEventProcessor::stopRun (obsolete for online runnning)
569 // =============================================================================
571  ATH_MSG_FATAL("Misconfiguration - the method HltEventLoopMgr::stopRun() cannot be used online");
572  return StatusCode::FAILURE;
573 }
574 
575 // =============================================================================
576 // Implementation of IEventProcessor::createEventContext
577 // =============================================================================
579  size_t eventNumber = ++m_localEventNumber;
580  auto slot = m_whiteboard->allocateStore(eventNumber); // returns npos on failure
581  if (slot == std::string::npos) {
582  // return an invalid EventContext
583  return EventContext();
584  }
585  return EventContext{ eventNumber, slot };
586 }
587 
588 // =============================================================================
589 // Implementation of IEventProcessor::executeEvent
590 // =============================================================================
592 {
593  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
594 
596 
597  // Monitor slot idle time (between scheduler popFinishedEvent and pushNewEvent)
598  // Note this is time of a scheduler slot being free, not equal to the time of a whiteboard slot being free
599  const auto slotIdleTime = std::chrono::steady_clock::now() - m_freeSlotStartPoint[ctx.slot()];
600  Monitored::Scalar<int64_t> monSlotIdleTime("SlotIdleTime", std::chrono::duration_cast<std::chrono::milliseconds>(slotIdleTime).count());
601  Monitored::Group(m_monTool, monSlotIdleTime);
602 
603  // Now add event to the scheduler
604  ATH_MSG_DEBUG("Adding event " << ctx.evt() << ", slot " << ctx.slot() << " to the scheduler");
605  StatusCode addEventStatus = m_schedulerSvc->pushNewEvent( new EventContext{std::move(ctx)} );
606 
607  // If this fails, we need to wait for something to complete
608  if (addEventStatus.isFailure()){
609  ATH_MSG_ERROR("Failed adding event to the scheduler");
610  return StatusCode::FAILURE;
611  }
612 
613  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
614  return StatusCode::SUCCESS;
615 }
616 
617 // =============================================================================
619 {
620  auto getDFProp = [&](const std::string& name, std::string& value, bool required = true) {
621  if (m_jobOptionsSvc->has("DataFlowConfig."+name)) {
622  value = m_jobOptionsSvc->get("DataFlowConfig."+name);
623  ATH_MSG_INFO(" ---> Read from DataFlow configuration: " << name << " = " << value);
624  } else {
625  msg() << (required ? MSG::WARNING : MSG::INFO)
626  << "Could not set Property " << name << " from DataFlow" << endmsg;
627  }
628  };
629 
630  getDFProp( "DF_ApplicationName", m_applicationName );
631  std::string wid, wpid;
632  getDFProp( "DF_WorkerId", wid, false );
633  getDFProp( "DF_Pid", wpid, false );
634  if (!wid.empty()) m_workerID = std::stoi(wid);
635  if (!wpid.empty()) m_workerPID = std::stoi(wpid);
636 }
637 
638 // =============================================================================
640 {
641  auto metadatacont = std::make_unique<ByteStreamMetadataContainer>();
642  metadatacont->push_back(std::make_unique<ByteStreamMetadata>(
643  sor_attrlist["RunNumber"].data<unsigned int>(),
644  0,
645  0,
646  sor_attrlist["RecordingEnabled"].data<bool>(),
647  0,
648  sor_attrlist["DetectorMaskSnd"].data<unsigned long long>(),
649  sor_attrlist["DetectorMaskFst"].data<unsigned long long>(),
650  0,
651  0,
652  "",
653  "",
654  "",
655  0,
656  std::vector<std::string>()
657  ));
658  // Record ByteStreamMetadataContainer in MetaData Store
659  if(m_inputMetaDataStore->record(std::move(metadatacont),"ByteStreamMetadata").isFailure()) {
660  ATH_MSG_WARNING("Unable to record MetaData in InputMetaDataStore");
661  }
662  else {
663  ATH_MSG_DEBUG("Recorded MetaData in InputMetaDataStore");
664  }
665 }
666 
667 //=========================================================================
669 {
671  try {
672  auto tor_cur = pt.get<float>("Magnets.ToroidsCurrent.value");
673  auto sol_cur = pt.get<float>("Magnets.SolenoidCurrent.value");
674 
675  // Set current on conditions alg
676  IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
677 
678  SmartIF<IAlgorithm>& fieldAlg = algMgr->algorithm("AtlasFieldMapCondAlg", /*createIf*/false);
679  if ( fieldAlg ) {
680  ATH_MSG_INFO("Setting field currents on AtlasFieldMapCondAlg");
681  ATH_CHECK( Gaudi::Utils::setProperty(fieldAlg, "MapSoleCurrent", sol_cur) );
682  ATH_CHECK( Gaudi::Utils::setProperty(fieldAlg, "MapToroCurrent", tor_cur) );
683  }
684  else ATH_MSG_WARNING("Cannot retrieve AtlasFieldMapCondAlg");
685 
686  ATH_MSG_INFO("*****************************************");
687  ATH_MSG_INFO(" Auto-configuration of magnetic field: ");
688  ATH_MSG_INFO(" solenoid current from IS = " << sol_cur);
689  ATH_MSG_INFO(" torroid current from IS = " << tor_cur);
690  ATH_MSG_INFO("*****************************************");
691  }
692  catch(ptree_bad_path& e) {
693  ATH_MSG_ERROR( "Cannot read magnet currents from ptree: " << e.what() );
694  return StatusCode::FAILURE;
695  }
696  }
697  return StatusCode::SUCCESS;
698 }
699 
700 
701 // =============================================================================
703 {
704  //----------------------------------------------------------------------------
705  // Clear the event store, if used in the event loop
706  //----------------------------------------------------------------------------
707  ATH_CHECK(m_evtStore->clearStore());
708  ATH_MSG_DEBUG("Cleared the EventStore");
709 
710  //----------------------------------------------------------------------------
711  // Clear the InputMetaDataStore
712  //----------------------------------------------------------------------------
713  ATH_CHECK(m_inputMetaDataStore->clearStore());
714  ATH_MSG_DEBUG("Cleared the InputMetaDataStore");
715 
716  return StatusCode::SUCCESS;
717 }
718 
719 // =============================================================================
721 {
722  auto sor = m_detectorStore->retrieve<const TrigSORFromPtreeHelper::SOR>(m_sorPath);
723  if (sor==nullptr) {
724  throw std::runtime_error("Cannot retrieve " + m_sorPath);
725  }
726  if(sor->size() != 1)
727  {
728  // This branch should never be entered (the CondAttrListCollection
729  // corresponding to the SOR should contain one single AttrList). Since
730  // that's required by code ahead but not checked at compile time, we
731  // explicitly guard against any potential future mistake with this check
732  throw std::runtime_error("SOR record should have one and one only attribute list, but it has " + std::to_string(sor->size()));
733  }
734 
735  const auto & soral = sor->begin()->second;
736  printSORAttrList(soral);
737  return soral;
738 }
739 
740 // =============================================================================
742 {
743  unsigned long long sorTime_ns(atr["SORTime"].data<unsigned long long>());
744 
745  // Human readable format of SOR time
746  time_t sorTime_sec = sorTime_ns / std::nano::den;
747  struct tm buf;
748 
749  ATH_MSG_INFO("SOR parameters:");
750  ATH_MSG_INFO(" RunNumber = " << atr["RunNumber"].data<unsigned int>());
751  ATH_MSG_INFO(" SORTime [ns] = " << sorTime_ns <<
752  " (" << std::put_time(localtime_r(&sorTime_sec, &buf), "%F %T") << ") ");
753 
754  auto dmfst = atr["DetectorMaskFst"].data<unsigned long long>();
755  auto dmsnd = atr["DetectorMaskSnd"].data<unsigned long long>();
756  ATH_MSG_INFO(" DetectorMaskFst = 0x" << std::format("{:016x}", dmfst));
757  ATH_MSG_INFO(" DetectorMaskSnd = 0x" << std::format("{:016x}", dmsnd));
758  ATH_MSG_INFO(" Complete DetectorMask = 0x" << std::format("{:016x}", dmfst)
759  << std::format("{:016x}", dmsnd));
760 
761  ATH_MSG_INFO(" RunType = " << atr["RunType"].data<std::string>());
762  ATH_MSG_INFO(" RecordingEnabled = " << (atr["RecordingEnabled"].data<bool>() ? "true" : "false"));
763 }
764 
765 // =============================================================================
766 StatusCode HltEventLoopMgr::failedEvent(HLT::OnlineErrorCode errorCode, const EventContext& eventContext)
767 {
768  ATH_MSG_VERBOSE("start of " << __FUNCTION__ << " with errorCode = " << errorCode
769  << ", context = " << eventContext << " eventID = " << eventContext.eventID());
770 
771  // Used by MsgSvc (and possibly others but not relevant here)
772  Gaudi::Hive::setCurrentContext(eventContext);
773 
774  auto returnFailureAndStopEventLoop = [this]() -> StatusCode {
775  ATH_MSG_INFO("Stopping event loop due to failure");
776  // Change the loop exit code to FAILURE
777  m_loopStatus.exitCode = StatusCode::FAILURE;
778 
779  // Flag eventsAvailable=false which will result in I/O threads to finish all the ongoing event processing
780  // and then stop. We cannot flag loopEnded=true here yet, because it would finish the I/O threads while
781  // there might be still events being processed and they would crash when finished.
783 
784  // Inform the caller the failure could not be handled cleanly and the event loop will stop
785  return StatusCode::FAILURE;
786  };
787 
788  //----------------------------------------------------------------------------
789  // Handle framework errors by printing an informative message and breaking the loop
790  //----------------------------------------------------------------------------
792  ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode
793  << " meaning there was a framework error before requesting a new event. No output will be produced for this event"
794  << " and the event loop will exit after all ongoing processing is finished.");
795  return returnFailureAndStopEventLoop();
796  }
798  ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode
799  << " meaning a new event could not be correctly read. No output will be produced for this event."
800  << " The event loop will exit after all ongoing processing is finished.");
801  return returnFailureAndStopEventLoop();
802  }
804  ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode
805  << " meaning there was a framework error after HLT result was already sent out."
806  << " The event loop will exit after all ongoing processing is finished.");
807  return returnFailureAndStopEventLoop();
808  }
810  ATH_MSG_ERROR("Failed to access the slot for the processed event, cannot produce output. OnlineErrorCode="
811  << errorCode << ". The event loop will exit after all ongoing processing is finished unless the failed event"
812  << " reaches a hard timeout sooner and this process is killed.");
813  return returnFailureAndStopEventLoop();
814  }
816  // Here we cannot be certain if the scheduler started processing the event or not. If yes, the output thread
817  // will finalise the event as normal. If not, the event will eventually reach a hard timeout and this process
818  // is killed, or we exit the process without ever producing output for this event (needs to be handled upstream).
819  ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode
820  << ". Cannot determine if the event processing started or not and whether a decision for this event will be"
821  << " produced. The event loop will exit after all ongoing processing is finished, which may include or"
822  << " not include the problematic event.");
823  return returnFailureAndStopEventLoop();
824  }
826  ATH_MSG_ERROR("Failure occurred with OnlineErrorCode=" << errorCode
827  << " meaning the Scheduler returned FAILURE when asked to give a finished event. Will keep trying to"
828  << " pop further events if there are any still in the scheduler, but this may keep repeating until"
829  << " this process is killed by hard timeout or other means. If all ongoing processing manages to finish"
830  << " then the event loop will exit.");
831  return returnFailureAndStopEventLoop();
832  }
833  if (!eventContext.valid()) {
834  ATH_MSG_ERROR("Failure occurred with an invalid EventContext. Likely there was a framework error before"
835  << " requesting a new event or after sending the result of a finished event. OnlineErrorCode=" << errorCode
836  << ". The event loop will exit after all ongoing processing is finished.");
837  return returnFailureAndStopEventLoop();
838  }
839 
840  //----------------------------------------------------------------------------
841  // Make sure we are using the right store
842  //----------------------------------------------------------------------------
843  if (m_whiteboard->selectStore(eventContext.slot()).isFailure()) {
845  }
846 
847  //----------------------------------------------------------------------------
848  // Define a debug stream tag for the HLT result
849  //----------------------------------------------------------------------------
850  std::string debugStreamName;
851  switch (errorCode) {
853  debugStreamName = m_algErrorDebugStreamName.value();
854  break;
856  debugStreamName = m_timeoutDebugStreamName.value();
857  break;
859  debugStreamName = m_truncationDebugStreamName.value();
860  break;
861  default:
862  debugStreamName = m_fwkErrorDebugStreamName.value();
863  break;
864  }
865  eformat::helper::StreamTag debugStreamTag{debugStreamName, eformat::DEBUG_TAG, true};
866 
867  //----------------------------------------------------------------------------
868  // Create an HLT result for the failed event (copy one if it exists and contains serialised data)
869  //----------------------------------------------------------------------------
870  std::unique_ptr<HLT::HLTResultMT> hltResultPtr;
871  StatusCode buildResultCode{StatusCode::SUCCESS};
872  auto hltResultRH = SG::makeHandle(m_hltResultRHKey,eventContext);
873  if (hltResultRH.isValid() && !hltResultRH->getSerialisedData().empty()) {
874  // There is already an existing result, create a copy with the error code and stream tag
875  hltResultPtr = std::make_unique<HLT::HLTResultMT>(*hltResultRH);
876  hltResultPtr->addErrorCode(errorCode);
877  buildResultCode &= hltResultPtr->addStreamTag(debugStreamTag);
878  } else {
879  // Create a result if not available, pre-fill with error code an stream tag, then try to fill event data
880  hltResultPtr = std::make_unique<HLT::HLTResultMT>();
881  hltResultPtr->addErrorCode(errorCode);
882  buildResultCode &= hltResultPtr->addStreamTag(debugStreamTag);
883  // Fill the result unless we already failed doing this before
884  if (errorCode != HLT::OnlineErrorCode::NO_HLT_RESULT) {
885  buildResultCode &= m_hltResultMaker->fillResult(*hltResultPtr,eventContext);
886  }
887  }
888 
889  // Try to record the result in th event store
890  SG::WriteHandleKey<HLT::HLTResultMT> hltResultWHK(m_hltResultRHKey.key()+"_FailedEvent");
891  buildResultCode &= hltResultWHK.initialize();
892  auto hltResultWH = SG::makeHandle(hltResultWHK,eventContext);
893  if (buildResultCode.isFailure() || hltResultWH.record(std::move(hltResultPtr)).isFailure()) {
894  if (errorCode == HLT::OnlineErrorCode::NO_HLT_RESULT) {
895  // Avoid infinite loop
896  ATH_MSG_ERROR("Second failure to build or record the HLT Result in event store while handling a failed event. "
897  << "Cannot force-accept this event from HLT side, will rely on data collector to do this. "
898  << "The event loop will exit after all ongoing processing is finished.");
899  return returnFailureAndStopEventLoop();
900  }
901  ATH_MSG_ERROR("Failed to build or record the HLT Result in event store while handling a failed event. "
902  << "Trying again with skipped filling of the result contents (except debug stream tag).");
904  }
905 
906  //----------------------------------------------------------------------------
907  // Monitor event processing time for the failed (force-accepted) event
908  //----------------------------------------------------------------------------
909  auto eventTime = std::chrono::steady_clock::now() - m_eventTimerStartPoint[eventContext.slot()];
910  int64_t eventTimeMillisec = std::chrono::duration_cast<std::chrono::milliseconds>(eventTime).count();
911  auto monTimeAny = Monitored::Scalar<int64_t>("TotalTime", eventTimeMillisec);
912  auto monTimeAcc = Monitored::Scalar<int64_t>("TotalTimeAccepted", eventTimeMillisec);
913  Monitored::Group(m_monTool, monTimeAny, monTimeAcc);
914 
915  //----------------------------------------------------------------------------
916  // Try to build and send the output
917  //----------------------------------------------------------------------------
918  if (m_outputCnvSvc->connectOutput("").isFailure()) {
919  ATH_MSG_ERROR("The output conversion service failed in connectOutput() while handling a failed event. "
920  << "Cannot force-accept this event from HLT side, will rely on data collector to do this. "
921  << "The event loop will exit after all ongoing processing is finished.");
922  return returnFailureAndStopEventLoop();
923  }
924 
925  DataObject* hltResultDO = m_evtStore->accessData(hltResultWH.clid(),hltResultWH.key());
926  if (hltResultDO == nullptr) {
927  if (errorCode == HLT::OnlineErrorCode::NO_HLT_RESULT) {
928  // Avoid infinite loop
929  ATH_MSG_ERROR("Second failure to build or record the HLT Result in event store while handling a failed event. "
930  << "Cannot force-accept this event from HLT side, will rely on data collector to do this. "
931  << "The event loop will exit after all ongoing processing is finished.");
932  return returnFailureAndStopEventLoop();
933  }
934  ATH_MSG_ERROR("Failed to retrieve DataObject for the HLT result object while handling a failed event. "
935  << "Trying again with skipped filling of the result contents (except debug stream tag).");
937  }
938 
939  IOpaqueAddress* addr = nullptr;
940  if (m_outputCnvSvc->createRep(hltResultDO,addr).isFailure() || addr == nullptr) {
941  ATH_MSG_ERROR("Conversion of HLT result object to the output format failed while handling a failed event. "
942  << "Cannot force-accept this event from HLT side, will rely on data collector to do this. "
943  << "The event loop will exit after all ongoing processing is finished.");
944  delete addr;
945  return returnFailureAndStopEventLoop();
946  }
947 
948  if (m_outputCnvSvc->commitOutput("",true).isFailure()) {
949  ATH_MSG_ERROR("The output conversion service failed in commitOutput() while handling a failed event. "
950  << "Cannot force-accept this event from HLT side, will rely on data collector to do this. "
951  << "The event loop will exit after all ongoing processing is finished.");
952  delete addr;
953  return returnFailureAndStopEventLoop();
954  }
955 
956  // The output has been sent out, the ByteStreamAddress can be deleted
957  delete addr;
958 
959  //------------------------------------------------------------------------
960  // Reset the timeout flag and the timer, and mark the slot as idle
961  //------------------------------------------------------------------------
962  resetEventTimer(eventContext, /*processing=*/ false);
963 
964  //----------------------------------------------------------------------------
965  // Clear the event data slot
966  //----------------------------------------------------------------------------
967  // Need to copy the event context because it's managed by the event store and clearWBSlot deletes it
968  const EventContext eventContextCopy = eventContext;
969  if (clearWBSlot(eventContext.slot()).isFailure())
970  return failedEvent(HLT::OnlineErrorCode::AFTER_RESULT_SENT,eventContextCopy);
971 
972  // Only now after store clearing we can allow the slot to be filled again,
973  // so we increment m_freeSlots and notify the input thread
974  ++m_freeSlots;
975  if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) {
976  m_inputThread->cond().notify_all();
977  }
978 
979  //----------------------------------------------------------------------------
980  // Finish handling the failed event
981  //----------------------------------------------------------------------------
982 
983  // Unless this is an event data or algorithm processing failure, increment the number of framework failures
984  if (!HLT::isEventProcessingErrorCode(errorCode)) {
985  if ( m_maxFrameworkErrors.value()>=0 && ((++m_nFrameworkErrors)>m_maxFrameworkErrors.value()) ) {
986  ATH_MSG_ERROR("Failure with OnlineErrorCode=" << errorCode
987  << " was successfully handled, but the number of tolerable framework errors for this HltEventLoopMgr instance,"
988  << " which is " << m_maxFrameworkErrors.value() << ", was exceeded. Current local event number is "
989  << eventContextCopy.evt() << ", slot " << eventContextCopy.slot()
990  << ". The event loop will exit after all ongoing processing is finished.");
991  return returnFailureAndStopEventLoop();
992  }
993  }
994 
995  // Even if handling the failed event succeeded, print an error message with failed event details
996  ATH_MSG_ERROR("Failed event with OnlineErrorCode=" << errorCode
997  << " Current local event number is " << eventContextCopy.evt() << ", slot " << eventContextCopy.slot());
998 
999  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
1000  return StatusCode::SUCCESS; // error handling succeeded, event loop may continue
1001 }
1002 
1003 // =============================================================================
1005 {
1006  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
1008  for (size_t i=0; i<m_eventTimerStartPoint.size(); ++i) {
1009  // iterate over all slots and check for timeout
1010  if (!m_isSlotProcessing.at(i)) continue;
1012  EventContext ctx(0,i); // we only need the slot number for Athena::Timeout instance
1013  // don't duplicate the actions if the timeout was already reached
1014  if (!Athena::Timeout::instance(ctx).reached()) {
1015  ATH_MSG_ERROR("Soft timeout in slot " << i << ". Processing time exceeded the limit of " << m_softTimeoutValue.count() << " ms");
1017  // Generate stack trace and scheduler dump only once, on the first timeout
1018  if (m_traceOnTimeout.value() && !m_timeoutTraceGenerated) {
1019  m_schedulerSvc->dumpState();
1020  ATH_MSG_INFO("Generating stack trace due to the soft timeout");
1021  m_timeoutTraceGenerated = true;
1022  gSystem->StackTrace();
1023  }
1024  }
1025  }
1026  }
1027  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
1028 }
1029 
1030 // =============================================================================
1031 void HltEventLoopMgr::resetEventTimer(const EventContext& eventContext, bool processing) {
1032  if (!eventContext.valid()) {return;}
1033  {
1034  std::unique_lock<std::mutex> lock(m_timeoutThread->mutex());
1035  m_eventTimerStartPoint[eventContext.slot()] = std::chrono::steady_clock::now();
1036  m_isSlotProcessing[eventContext.slot()] = processing;
1037  resetTimeout(Athena::Timeout::instance(eventContext));
1038  }
1039  m_timeoutThread->cond().notify_all();
1040 }
1041 
1042 // =============================================================================
1044 {
1045  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
1046  auto monTime = Monitored::Timer<std::chrono::duration<float, std::milli>>("TIME_clearStore");
1047  StatusCode sc = m_whiteboard->clearStore(evtSlot);
1048  Monitored::Group(m_monTool, monTime);
1049  if( !sc.isSuccess() ) {
1050  ATH_MSG_WARNING("Clear of event data store failed");
1051  }
1052  ATH_MSG_VERBOSE("end of " << __FUNCTION__ << ", returning m_whiteboard->freeStore(evtSlot=" << evtSlot << ")");
1053  return m_whiteboard->freeStore(evtSlot);
1054 }
1055 
1056 // =============================================================================
1058  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
1059  if (m_loopStatus.loopEnded) {
1060  ATH_MSG_VERBOSE("Event loop ended, stopping the input thread and returning from " << __FUNCTION__);
1061  m_inputThread->stop();
1062  // Notify output thread which may be still waiting for events
1063  m_outputThread->cond().notify_all();
1064  return;
1065  }
1066 
1067  // Early exit conditions
1069  ATH_MSG_VERBOSE("No more events, flagging the event loop as finished, stopping the input thread"
1070  << " and returning from " << __FUNCTION__);
1071  m_inputThread->stop();
1072  // Notify output thread which may be still waiting for events
1073  m_outputThread->cond().notify_all();
1074  return;
1075  }
1076  const size_t numSlotsToFill = m_freeSlots.load();
1077  if (numSlotsToFill==0) {
1078  ATH_MSG_VERBOSE("No free slots, returning from " << __FUNCTION__);
1079  return;
1080  }
1081  m_freeSlots -= numSlotsToFill;
1082 
1083  // Read in and start processing another event
1084  ATH_MSG_DEBUG("Free slots = " << numSlotsToFill << ". Reading new event(s) to fill the slot(s).");
1085 
1086  // Fill all free slots with new events
1087  for (size_t i=0; i<numSlotsToFill; ++i) {
1088  auto task = [mgr=this](){
1089  StatusCode sc = StatusCode::SUCCESS;
1090  try {
1091  sc = mgr->startNextEvent();
1092  }
1093  catch (const std::exception& e) {
1094  mgr->error() << "Exception caught in startNextEvent: " << e.what() << endmsg;
1095  sc = StatusCode::FAILURE;
1096  }
1097  catch (...) {
1098  mgr->error() << "Exception caught in startNextEvent" << endmsg;
1099  sc = StatusCode::FAILURE;
1100  }
1101  if (sc.isFailure()) {
1102  mgr->error() << "startNextEvent failed, stopping the event loop" << endmsg;
1103  mgr->m_loopStatus.exitCode = StatusCode::FAILURE;
1104  mgr->m_loopStatus.eventsAvailable = false;
1105  return;
1106  }
1107  // Pop one item from parallel I/O queue to decrement its size - it doesn't matter which item
1108  // is popped, we only use the queue size to limit the number of tasks running in parallel
1109  bool popIOQueue{false};
1110  mgr->m_parallelIOQueue.pop(popIOQueue);
1111  };
1112 
1113  // Push one item to the parallel I/O queue to increment its size - the value doesn't matter,
1114  // we only use the queue size and benefit from the blocking push call here to limit the number
1115  // of tasks running in parallel. Once we can push to the queue, we can schedule the task.
1116  m_parallelIOQueue.push(true);
1117  m_parallelIOTaskGroup.run(std::move(task));
1118  }
1119  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
1120 }
1121 
1122 // =============================================================================
1124  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
1125  const size_t nslots = m_isSlotProcessing.size(); // size is fixed in hltUpdateAfterFork after configuring scheduler
1126  if (m_schedulerSvc->freeSlots() == nslots) {
1128  ATH_MSG_DEBUG("There are currently no events being processed by the Scheduler, returning from " << __FUNCTION__);
1129  } else if (m_freeSlots == nslots) {
1130  ATH_MSG_DEBUG("No more events to process and scheduler is empty, stopping the event loop and output thread");
1131  if (!m_loopStatus.loopEnded && m_outputThread!=nullptr) {
1132  m_outputThread->stop();
1133  }
1134  // Notify input thread which may be still waiting for free slots
1135  if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) {
1136  m_inputThread->cond().notify_all();
1137  }
1138  // Notify the main thread that the loop ended - this is the only place able to do this!
1139  m_loopStatus.loopEnded = true;
1140  m_loopStatus.loopEndedCond.notify_all();
1141  }
1142  else{
1143  ATH_MSG_DEBUG("No more events, but processing is still ongoing, returning from " << __FUNCTION__);
1144  }
1145  return;
1146  }
1147 
1148  //----------------------------------------------------------------------------
1149  // Pop events from the Scheduler
1150  //----------------------------------------------------------------------------
1151  std::vector<EventContext*> finishedEvtContexts;
1152  EventContext* finishedEvtContext(nullptr);
1153  const auto popStartTime = std::chrono::steady_clock::now();
1154 
1155  // Pop one event from the scheduler (blocking call)
1156  ATH_MSG_DEBUG("Waiting for a finished event from the Scheduler");
1157  if (m_schedulerSvc->popFinishedEvent(finishedEvtContext).isFailure()) {
1158  failedEvent(HLT::OnlineErrorCode::SCHEDULER_POP_FAILURE, EventContext()).ignore();
1159  delete finishedEvtContext;
1160  return;
1161  }
1162  ATH_MSG_DEBUG("Scheduler returned a finished event: " << finishedEvtContext);
1163  finishedEvtContexts.push_back(finishedEvtContext);
1164 
1165  // See if more events are available (non-blocking call)
1166  while (m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()){
1167  ATH_MSG_DEBUG("Scheduler returned a finished event: " << *finishedEvtContext);
1168  finishedEvtContexts.push_back(finishedEvtContext);
1169  }
1170  const auto popSpentTime = std::chrono::steady_clock::now() - popStartTime;
1171  const auto popSpentTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(popSpentTime).count();
1172  Monitored::Scalar<int64_t> monPopSchedulerTime{"PopSchedulerTime", popSpentTimeMs};
1173  Monitored::Scalar<size_t> monPopSchedulerNumEvt{"PopSchedulerNumEvt", finishedEvtContexts.size()};
1174  Monitored::Group{m_monTool, monPopSchedulerNumEvt, monPopSchedulerTime};
1175 
1176  //----------------------------------------------------------------------------
1177  // Post-process the finished events
1178  //----------------------------------------------------------------------------
1179  const size_t nFinishedEvents = finishedEvtContexts.size();
1180  ATH_MSG_DEBUG("Number of finished events to post-process: " << nFinishedEvents);
1181 
1182  // Push all post-processing tasks to TBB
1183  for (EventContext* thisFinishedEvtContext : finishedEvtContexts) {
1184  // Reset free slot timer for monitoring
1185  if (thisFinishedEvtContext != nullptr) {
1186  m_freeSlotStartPoint[thisFinishedEvtContext->slot()] = std::chrono::steady_clock::now();
1187  }
1188 
1189  // Create and enqueue the task
1190  m_finishedEventsQueue.push(thisFinishedEvtContext);
1191  auto task = [mgr=this](){
1192  StatusCode sc = StatusCode::SUCCESS;
1193  try {
1194  sc = mgr->processFinishedEvent();
1195  }
1196  catch (const std::exception& e) {
1197  mgr->error() << "Exception caught in processFinishedEvent: " << e.what() << endmsg;
1198  sc = StatusCode::FAILURE;
1199  }
1200  catch (...) {
1201  mgr->error() << "Exception caught in processFinishedEvent" << endmsg;
1202  sc = StatusCode::FAILURE;
1203  }
1204 
1205  if (sc.isFailure()) {
1206  mgr->error() << "processFinishedEvent failed, stopping the event loop" << endmsg;
1207  mgr->m_loopStatus.exitCode = StatusCode::FAILURE;
1208  mgr->m_loopStatus.eventsAvailable = false;
1209  }
1210 
1211  // Pop one item from parallel I/O queue to decrement its size - it doesn't matter which item
1212  // is popped, we only use the queue size to limit the number of tasks running in parallel
1213  bool popIOQueue{false};
1214  mgr->m_parallelIOQueue.pop(popIOQueue);
1215 
1216  // Wake up the output thread if it's sleeping - this prevents a deadlock after the last event
1217  // when input thread already finished and is no longer waking up the output thread. Spurious wake-ups
1218  // during the event loop from this notification should have negligible effect on CPU load.
1219  mgr->m_outputThread->cond().notify_one();
1220  };
1221 
1222  // Push one item to the parallel I/O queue to increment its size - the value doesn't matter,
1223  // we only use the queue size and benefit from the blocking push call here to limit the number
1224  // of tasks running in parallel. Once we can push to the queue, we can schedule the task.
1225  m_parallelIOQueue.push(true);
1226  m_parallelIOTaskGroup.run(std::move(task));
1227  }
1228 
1229  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
1230 }
1231 
1232 // =============================================================================
1234 {
1235  StatusCode sc = StatusCode::SUCCESS;
1236  auto check = [this, &sc](std::string&& errmsg, HLT::OnlineErrorCode errcode, const EventContext& eventContext) {
1237  if (sc.isSuccess()) {return false;}
1238  ATH_MSG_ERROR(errmsg);
1239  sc = failedEvent(errcode, eventContext);
1240  Gaudi::Hive::setCurrentContext(EventContext());
1241  return true;
1242  };
1243 
1244  //------------------------------------------------------------------------
1245  // Allocate event slot and create new EventContext
1246  //------------------------------------------------------------------------
1247 
1248  // Create an EventContext, allocating and selecting a whiteboard slot
1249  std::unique_ptr<EventContext> eventContextPtr = std::make_unique<EventContext>(createEventContext());
1250 
1251  sc = eventContextPtr->valid() ? StatusCode(StatusCode::SUCCESS) : StatusCode(StatusCode::FAILURE);
1252  if (check("Failed to allocate slot for a new event", HLT::OnlineErrorCode::BEFORE_NEXT_EVENT, *eventContextPtr)) {
1253  return sc;
1254  }
1255 
1256  sc = m_whiteboard->selectStore(eventContextPtr->slot());
1257  if (check("Failed to select event store slot number " + std::to_string(eventContextPtr->slot()),
1258  HLT::OnlineErrorCode::BEFORE_NEXT_EVENT, *eventContextPtr)) {
1259  return sc;
1260  }
1261 
1262  // We can completely avoid using ThreadLocalContext if we store the EventContext in the event store. Any
1263  // service/tool method which does not allow to pass EventContext as argument, can const-retrieve it from the
1264  // event store rather than using ThreadLocalContext.
1265 
1266  // We link the current store in the extension of the EventContext we just created. Only then we create
1267  // a WriteHandle for the EventContext using the EventContext itself. The handle will use the linked hiveProxyDict
1268  // to record the context in the current store.
1269  eventContextPtr->setExtension(Atlas::ExtendedEventContext(m_evtStore->hiveProxyDict(),
1270  m_currentRunCtx.eventID().run_number()));
1271  auto eventContext = SG::makeHandle(m_eventContextWHKey,*eventContextPtr);
1272  sc = eventContext.record(std::move(eventContextPtr));
1273  if (check("Failed to record new EventContext in the event store",
1274  HLT::OnlineErrorCode::BEFORE_NEXT_EVENT, *eventContext)) {
1275  return sc;
1276  }
1277 
1278  // Reset the AlgExecStateSvc
1279  m_aess->reset(*eventContext);
1280 
1281  ATH_MSG_DEBUG("Created new EventContext with number: " << eventContext->evt()
1282  << ", slot: " << eventContext->slot());
1283 
1284  // This ThreadLocalContext call is a not-so-nice behind-the-scenes way to inform some services about the current
1285  // context. If possible, services should use EventContext from the event store as recorded above. We have to set
1286  // the ThreadLocalContext here because some services still use it.
1287  Gaudi::Hive::setCurrentContext(*eventContext);
1288 
1289  //------------------------------------------------------------------------
1290  // Create a new address for EventInfo to facilitate automatic conversion from input data
1291  //------------------------------------------------------------------------
1292  IOpaqueAddress* addr = nullptr;
1293  sc = m_evtSelector->createAddress(*m_evtSelContext, addr);
1294  if (check("Event selector failed to create an IOpaqueAddress",
1295  HLT::OnlineErrorCode::BEFORE_NEXT_EVENT, *eventContext)) {
1296  return sc;
1297  }
1298 
1299  //------------------------------------------------------------------------
1300  // Get the next event
1301  //------------------------------------------------------------------------
1302  try {
1303  bool noEventsTemporarily{false};
1304  do {
1305  try {
1306  noEventsTemporarily = false;
1307  sc = m_evtSelector->next(*m_evtSelContext);
1308  } catch (const hltonl::Exception::NoEventsTemporarily& e) {
1309  ATH_MSG_DEBUG("No new input events available temporarily, requesting again");
1310  noEventsTemporarily = true;
1311  }
1312  } while (noEventsTemporarily);
1313  }
1314  catch (const hltonl::Exception::NoMoreEvents& e) {
1315  sc = StatusCode::SUCCESS;
1316  m_loopStatus.eventsAvailable = false;
1317  sc = clearWBSlot(eventContext->slot());
1318  if (sc.isFailure()) {
1319  ATH_MSG_WARNING("Failed to clear the whiteboard slot " << eventContext->slot()
1320  << " after NoMoreEvents detected");
1321  }
1322  // Increment m_freeSlots after clearing the store and notify the input thread
1323  ++m_freeSlots;
1324  if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) {
1325  m_inputThread->cond().notify_all();
1326  }
1327  return StatusCode::SUCCESS;
1328  }
1329  catch (const hltonl::Exception::MissingCTPFragment& e) {
1330  sc = StatusCode::FAILURE;
1331  if (check(e.what(), HLT::OnlineErrorCode::MISSING_CTP_FRAGMENT, *eventContext)) {
1332  return sc;
1333  }
1334  }
1335  catch (const hltonl::Exception::BadCTPFragment& e) {
1336  sc = StatusCode::FAILURE;
1337  if (check(e.what(), HLT::OnlineErrorCode::BAD_CTP_FRAGMENT, *eventContext)) {
1338  return sc;
1339  }
1340  }
1341  catch (const std::exception& e) {
1342  ATH_MSG_ERROR("Failed to get next event from the event source, std::exception caught: " << e.what());
1343  sc = StatusCode::FAILURE;
1344  }
1345  catch (...) {
1346  ATH_MSG_ERROR("Failed to get next event from the event source, unknown exception caught");
1347  sc = StatusCode::FAILURE;
1348  }
1349  if (check("Failed to get the next event",
1351  return sc;
1352  }
1353 
1354  //------------------------------------------------------------------------
1355  // Reset the timeout flag and the timer, and mark the slot as busy
1356  //------------------------------------------------------------------------
1357  resetEventTimer(*eventContext, /*processing=*/ true);
1358 
1359  //------------------------------------------------------------------------
1360  // Load event proxies and get event info
1361  //------------------------------------------------------------------------
1362  sc = m_evtStore->loadEventProxies();
1363  if (check("Failed to load event proxies", HLT::OnlineErrorCode::NO_EVENT_INFO, *eventContext)) {
1364  return sc;
1365  }
1366 
1367  auto eventInfo = SG::makeHandle(m_eventInfoRHKey,*eventContext);
1368  sc = eventInfo.isValid() ? StatusCode::SUCCESS : StatusCode::FAILURE;
1369  if (check("Failed to retrieve EventInfo", HLT::OnlineErrorCode::NO_EVENT_INFO, *eventContext)) {
1370  return sc;
1371  }
1372 
1373  ATH_MSG_DEBUG("Retrieved event info for the new event " << *eventInfo);
1374 
1375  // Set EventID for the EventContext
1376  EventID eid = eventIDFromxAOD(eventInfo.cptr());
1377  // Override run/LB/timestamp if needed
1378  if (m_forceRunNumber > 0) {
1379  eid.set_run_number(m_forceRunNumber);
1380  }
1381  if (m_forceLumiblock > 0) {
1382  eid.set_lumi_block(m_forceLumiblock);
1383  }
1384  if (m_forceSOR_ns > 0) {
1385  eid.set_time_stamp(m_forceSOR_ns / std::nano::den);
1386  eid.set_time_stamp_ns_offset(m_forceSOR_ns % std::nano::den);
1387  }
1388  eventContext->setEventID(eid);
1389 
1390  // Update thread-local EventContext after setting EventID
1391  Gaudi::Hive::setCurrentContext(*eventContext);
1392 
1393  //-----------------------------------------------------------------------
1394  // COOL updates for LB changes
1395  //-----------------------------------------------------------------------
1396 
1397  // Check if this is a new LB
1398  EventIDBase::number_type oldMaxLB{0}, newMaxLB{0};
1399  bool updatedLB{false};
1400  do {
1401  oldMaxLB = m_loopStatus.maxLB.load();
1402  newMaxLB = std::max(oldMaxLB, eventContext->eventID().lumi_block());
1403  updatedLB = newMaxLB > oldMaxLB;
1404  } while (updatedLB && !m_loopStatus.maxLB.compare_exchange_strong(oldMaxLB, newMaxLB));
1405  m_loopStatus.maxLB.compare_exchange_strong(oldMaxLB, newMaxLB);
1406 
1407  // Wait in case a COOL update is ongoing to avoid executeEvent
1408  // reading conditions data while they are being updated.
1409  {
1410  std::unique_lock<std::mutex> lock(m_loopStatus.coolUpdateMutex);
1411  m_loopStatus.coolUpdateCond.wait(lock, [&]{return !m_loopStatus.coolUpdateOngoing;});
1412  }
1413 
1414  // Do COOL updates (if needed) and notify other threads about it
1415  if (updatedLB) {
1416  {
1417  std::lock_guard<std::mutex> lock(m_loopStatus.coolUpdateMutex);
1419  sc = m_coolHelper->hltCoolUpdate(*eventContext);
1420  if (check("Failure during COOL update", HLT::OnlineErrorCode::COOL_UPDATE, *eventContext)) {
1422  return sc;
1423  }
1425  }
1426  m_loopStatus.coolUpdateCond.notify_all();
1427  }
1428 
1429  //------------------------------------------------------------------------
1430  // Process the event
1431  //------------------------------------------------------------------------
1432  // We need to make a copy of eventContext, as executeEvent uses move semantics and eventContext is already owned
1433  // by the event store. The copy we create here is pushed to the scheduler and retrieved back in drainScheduler
1434  // where we have to delete it.
1435  sc = executeEvent(EventContext(*eventContext));
1436  if (check("Failed to schedule event processing",
1437  HLT::OnlineErrorCode::SCHEDULING_FAILURE, *eventContext)) {
1438  return sc;
1439  }
1440  // Notify the output thread to start waiting for a finished event
1441  m_outputThread->cond().notify_one();
1442 
1443  //------------------------------------------------------------------------
1444  // Set ThreadLocalContext to an invalid context
1445  //------------------------------------------------------------------------
1446  // We have passed the event to the scheduler and we are entering back a context-less environment
1447  Gaudi::Hive::setCurrentContext( EventContext() );
1448 
1449  return sc;
1450 }
1451 
1452 // =============================================================================
1454 {
1455  EventContext* eventContext{nullptr};
1456  m_finishedEventsQueue.pop(eventContext);
1457 
1458  StatusCode sc = StatusCode::SUCCESS;
1459  auto check = [this, &sc, &eventContext](std::string&& errmsg, HLT::OnlineErrorCode errcode) {
1460  if (sc.isSuccess()) {return false;}
1461  ATH_MSG_ERROR(errmsg);
1462  const EventContext& eventContextRef = (eventContext==nullptr) ? EventContext() : *eventContext;
1463  sc = failedEvent(errcode, eventContextRef);
1464  Gaudi::Hive::setCurrentContext(EventContext());
1465  delete eventContext;
1466  return true;
1467  };
1468 
1469  //--------------------------------------------------------------------------
1470  // Basic checks, select slot, retrieve event info
1471  //--------------------------------------------------------------------------
1472  // Check if the EventContext object exists
1473  if (eventContext == nullptr) {
1474  sc = StatusCode::FAILURE;
1475  if (check("Detected nullptr EventContext while finalising a processed event",
1477  return sc;
1478  }
1479  }
1480 
1481  // Set ThreadLocalContext to the currently processed finished context
1482  Gaudi::Hive::setCurrentContext(eventContext);
1483 
1484  // Check the event processing status
1485  if (m_aess->eventStatus(*eventContext) != EventStatus::Success) {
1486  sc = StatusCode::FAILURE;
1487  auto algErrors = m_errorMonTool->algExecErrors(*eventContext);
1488  const HLT::OnlineErrorCode errCode = isTimedOut(algErrors) ?
1490  if (check("Processing event with context " + toString(*eventContext) + \
1491  " failed with status " + toString(m_aess->eventStatus(*eventContext)),
1492  errCode)) {
1493  return sc;
1494  }
1495  }
1496 
1497  // Select the whiteboard slot
1498  sc = m_whiteboard->selectStore(eventContext->slot());
1499  if (check("Failed to select event store slot " + std::to_string(eventContext->slot()),
1501  return sc;
1502  }
1503 
1504  // Fire EndProcessing incident - some services may depend on this
1505  m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndProcessing, *eventContext));
1506 
1507  //--------------------------------------------------------------------------
1508  // HLT output handling
1509  //--------------------------------------------------------------------------
1510  // Call the result builder to record HLTResultMT in SG
1511  sc = m_hltResultMaker->makeResult(*eventContext);
1512  if (check("Failed to create the HLT result object", HLT::OnlineErrorCode::NO_HLT_RESULT)) {return sc;}
1513 
1514  // Connect output (create the output container) - the argument is currently not used
1515  sc = m_outputCnvSvc->connectOutput("");
1516  if (check("Conversion service failed to connectOutput", HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) {return sc;}
1517 
1518  // Retrieve the HLT result and the corresponding DataObject
1519  auto hltResult = SG::makeHandle(m_hltResultRHKey,*eventContext);
1520  if (!hltResult.isValid()) {sc = StatusCode::FAILURE;}
1521  if (check("Failed to retrieve the HLT result", HLT::OnlineErrorCode::NO_HLT_RESULT)) {return sc;}
1522 
1523  DataObject* hltResultDO = m_evtStore->accessData(hltResult.clid(),hltResult.key());
1524  if (hltResultDO == nullptr) {sc = StatusCode::FAILURE;}
1525  if (check("Failed to retrieve the HLTResult DataObject", HLT::OnlineErrorCode::NO_HLT_RESULT)) {return sc;}
1526 
1527  // Check for result truncation
1528  if (!hltResult->getTruncatedModuleIds().empty() && hltResult->severeTruncation()) {sc = StatusCode::FAILURE;}
1529  if (check("HLT result truncation", HLT::OnlineErrorCode::RESULT_TRUNCATION)) {return sc;}
1530 
1531  // Convert the HLT result to the output data format
1532  IOpaqueAddress* addr = nullptr;
1533  sc = m_outputCnvSvc->createRep(hltResultDO,addr);
1534  if (sc.isFailure()) {delete addr;}
1535  if (check("Conversion service failed to convert HLTResult", HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE)) {return sc;}
1536 
1537  // Retrieve and convert the L1 result to the output data format
1538  IOpaqueAddress* l1addr = nullptr;
1539  IOpaqueAddress* l1addrLegacy = nullptr;
1540  if (m_rewriteLVL1) {
1541  // Run-3 L1 simulation result
1542  if (not m_l1TriggerResultRHKey.empty()) {
1543  auto l1TriggerResult = SG::makeHandle(m_l1TriggerResultRHKey, *eventContext);
1544  if (!l1TriggerResult.isValid()) {sc = StatusCode::FAILURE;}
1545  if (check("Failed to retrieve the L1 Trigger Result for RewriteLVL1",
1547  return sc;
1548  }
1549 
1550  DataObject* l1TriggerResultDO = m_evtStore->accessData(l1TriggerResult.clid(),l1TriggerResult.key());
1551  if (l1TriggerResultDO == nullptr) {sc = StatusCode::FAILURE;}
1552  if (check("Failed to retrieve the L1 Trigger Result DataObject for RewriteLVL1",
1554  return sc;
1555  }
1556 
1557  sc = m_outputCnvSvc->createRep(l1TriggerResultDO,l1addr);
1558  if (sc.isFailure()) {delete l1addr;}
1559  if (check("Conversion service failed to convert L1 Trigger Result for RewriteLVL1",
1561  return sc;
1562  }
1563  }
1564  // Legacy (Run-2) L1 simulation result
1565  if (not m_roibResultRHKey.empty()) {
1566  auto roibResult = SG::makeHandle(m_roibResultRHKey, *eventContext);
1567  if (!roibResult.isValid()) {sc = StatusCode::FAILURE;}
1568  if (check("Failed to retrieve the RoIBResult for RewriteLVL1",
1570  return sc;
1571  }
1572 
1573  DataObject* roibResultDO = m_evtStore->accessData(roibResult.clid(),roibResult.key());
1574  if (roibResultDO == nullptr) {sc = StatusCode::FAILURE;}
1575  if (check("Failed to retrieve the RoIBResult DataObject for RewriteLVL1",
1577  return sc;
1578  }
1579 
1580  sc = m_outputCnvSvc->createRep(roibResultDO,l1addrLegacy);
1581  if (sc.isFailure()) {delete l1addrLegacy;}
1582  if (check("Conversion service failed to convert RoIBResult for RewriteLVL1",
1584  return sc;
1585  }
1586  }
1587  }
1588 
1589  // Save event processing time before sending output
1590  bool eventAccepted = !hltResult->getStreamTags().empty();
1591  auto eventTime = std::chrono::steady_clock::now() - m_eventTimerStartPoint[eventContext->slot()];
1592  int64_t eventTimeMillisec = std::chrono::duration_cast<std::chrono::milliseconds>(eventTime).count();
1593 
1594  // Commit output (write/send the output data) - the arguments are currently not used
1595  sc = m_outputCnvSvc->commitOutput("",true);
1596  if (sc.isFailure()) {delete addr;}
1597  if (check("Conversion service failed to commitOutput", HLT::OnlineErrorCode::OUTPUT_SEND_FAILURE)) {return sc;}
1598 
1599  // The output has been sent out, the ByteStreamAddress can be deleted
1600  delete addr;
1601  delete l1addr;
1602  delete l1addrLegacy;
1603 
1604  //------------------------------------------------------------------------
1605  // Reset the timeout flag and the timer, and mark the slot as idle
1606  //------------------------------------------------------------------------
1607  resetEventTimer(*eventContext, /*processing=*/ false);
1608 
1609  //--------------------------------------------------------------------------
1610  // Clear the slot
1611  //--------------------------------------------------------------------------
1612  ATH_MSG_DEBUG("Clearing slot " << eventContext->slot()
1613  << " (event " << eventContext->evt() << ") of the whiteboard");
1614 
1615  sc = clearWBSlot(eventContext->slot());
1616  if (check("Whiteboard slot " + std::to_string(eventContext->slot()) + " could not be properly cleared",
1618  return sc;
1619  }
1620 
1621  ATH_MSG_DEBUG("Finished processing " << (eventAccepted ? "accepted" : "rejected")
1622  << " event with context " << *eventContext
1623  << " which took " << eventTimeMillisec << " ms");
1624 
1625  // Only now after store clearing we can allow the slot to be filled again,
1626  // so we increment m_freeSlots and notify the input thread
1627  ++m_freeSlots;
1628  if (!m_loopStatus.loopEnded && m_inputThread!=nullptr) {
1629  m_inputThread->cond().notify_all();
1630  }
1631 
1632  // Fill the time monitoring histograms
1633  auto monTimeAny = Monitored::Scalar<int64_t>("TotalTime", eventTimeMillisec);
1634  auto monTimeAcc = Monitored::Scalar<int64_t>(eventAccepted ? "TotalTimeAccepted" : "TotalTimeRejected", eventTimeMillisec);
1635  Monitored::Group(m_monTool, monTimeAny, monTimeAcc);
1636 
1637  // Set ThreadLocalContext to an invalid context as we entering a context-less environment
1638  Gaudi::Hive::setCurrentContext( EventContext() );
1639 
1640  // Delete the EventContext which was created when calling executeEvent( EventContext(*eventContext) )
1641  delete eventContext;
1642 
1643  return StatusCode::SUCCESS;
1644 }
ByteStreamMetadata.h
This file contains the class definition for the ByteStreamMetadata class.
AllowedVariables::e
e
Definition: AsgElectronSelectorTool.cxx:37
HltEventLoopMgr::clearWBSlot
StatusCode clearWBSlot(size_t evtSlot) const
Clear an event slot in the whiteboard.
Definition: HltEventLoopMgr.cxx:1043
HltEventLoopMgr::m_inputMetaDataStore
ServiceHandle< StoreGateSvc > m_inputMetaDataStore
Definition: HltEventLoopMgr.h:215
HltEventLoopMgr::m_schedulerName
Gaudi::Property< std::string > m_schedulerName
Definition: HltEventLoopMgr.h:233
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
HltEventLoopMgr::m_sorHelper
std::unique_ptr< TrigSORFromPtreeHelper > m_sorHelper
Definition: HltEventLoopMgr.h:230
HltEventLoopMgr::m_eventInfoRHKey
SG::ReadHandleKey< xAOD::EventInfo > m_eventInfoRHKey
Definition: HltEventLoopMgr.h:311
TrigDefs::Group
Group
Properties of a chain group.
Definition: GroupProperties.h:13
HLT::OnlineErrorCode::TIMEOUT
@ TIMEOUT
HltEventLoopMgr::m_jobOptionsSvc
ServiceHandle< Gaudi::Interfaces::IOptionsSvc > m_jobOptionsSvc
Definition: HltEventLoopMgr.h:212
SGout2dot.alg
alg
Definition: SGout2dot.py:243
athena.path
path
python interpreter configuration --------------------------------------—
Definition: athena.py:128
HltEventLoopMgr::failedEvent
StatusCode failedEvent(HLT::OnlineErrorCode errorCode, const EventContext &eventContext)
Handle a failure to process an event.
Definition: HltEventLoopMgr.cxx:766
HltEventLoopMgr::m_whiteboard
SmartIF< IHiveWhiteBoard > m_whiteboard
Definition: HltEventLoopMgr.h:225
HltEventLoopMgr::m_l1TriggerResultRHKey
SG::ReadHandleKey< xAOD::TrigCompositeContainer > m_l1TriggerResultRHKey
Definition: HltEventLoopMgr.h:314
PowhegControl_ttHplus_NLO.ss
ss
Definition: PowhegControl_ttHplus_NLO.py:83
HltEventLoopMgr::m_traceOnTimeout
Gaudi::Property< bool > m_traceOnTimeout
Definition: HltEventLoopMgr.h:248
hltonl::Exception::MissingCTPFragment
Thrown if the CTP ROBFragment cannot be retrieved for a new event.
Definition: HltExceptions.h:41
vtune_athena.format
format
Definition: vtune_athena.py:14
HltEventLoopMgr::EventLoopStatus::exitCode
StatusCode exitCode
Event exit status code.
Definition: HltEventLoopMgr.h:151
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
HltExceptions.h
HltEventLoopMgr::stop
virtual StatusCode stop() override
Definition: HltEventLoopMgr.cxx:229
HltEventLoopMgr::m_hardTimeout
Gaudi::Property< float > m_hardTimeout
Definition: HltEventLoopMgr.h:239
HltEventLoopMgr::updateMagField
StatusCode updateMagField(const boost::property_tree::ptree &pt) const
Set magnetic field currents from ptree.
Definition: HltEventLoopMgr.cxx:668
checkPlugins.dups
dups
Definition: checkPlugins.py:156
HltEventLoopMgr::execAtStart
StatusCode execAtStart(const EventContext &ctx) const
Execute optional algs/sequences.
Definition: HltEventLoopMgr.cxx:393
Run3DQTestingDriver.threads
threads
Definition: Run3DQTestingDriver.py:34
HltEventLoopMgr::eventTimerCallback
void eventTimerCallback()
The method executed by the event timeout monitoring thread.
Definition: HltEventLoopMgr.cxx:1004
HLT::OnlineErrorCode::AFTER_RESULT_SENT
@ AFTER_RESULT_SENT
HltEventLoopMgr::resetEventTimer
void resetEventTimer(const EventContext &eventContext, bool processing)
Reset the timeout flag and the timer, and mark the slot as busy or idle according to the second argum...
Definition: HltEventLoopMgr.cxx:1031
HLT::OnlineErrorCode::MISSING_CTP_FRAGMENT
@ MISSING_CTP_FRAGMENT
max
constexpr double max()
Definition: ap_fixedTest.cxx:33
HltEventLoopMgr::m_setMagFieldFromPtree
Gaudi::Property< bool > m_setMagFieldFromPtree
Definition: HltEventLoopMgr.h:289
AthenaInterprocess::UpdateAfterFork
Definition: Incidents.h:22
HltEventLoopMgr::EventLoopStatus::coolUpdateOngoing
bool coolUpdateOngoing
COOL update ongoing.
Definition: HltEventLoopMgr.h:149
HltEventLoopMgr::m_softTimeoutFraction
Gaudi::Property< float > m_softTimeoutFraction
Definition: HltEventLoopMgr.h:242
HltEventLoopMgr::m_outputThread
std::unique_ptr< HLT::LoopThread > m_outputThread
Output handling thread (triggers post-processing of finished events)
Definition: HltEventLoopMgr.h:341
HltEventLoopMgr::m_timeoutThread
std::unique_ptr< HLT::LoopThread > m_timeoutThread
Timeout thread.
Definition: HltEventLoopMgr.h:343
HltEventLoopMgr::m_applicationName
std::string m_applicationName
Application name.
Definition: HltEventLoopMgr.h:359
plotting.yearwise_luminosity.absolute
absolute
Definition: yearwise_luminosity.py:29
test_pyathena.pt
pt
Definition: test_pyathena.py:11
Monitored::Group
Group of local monitoring quantities and retain correlation when filling histograms
Definition: MonitoredGroup.h:54
ByteStreamMetadataContainer.h
This file contains the class definition for the ByteStreamMetadataContainer class.
HLT::OnlineErrorCode::OUTPUT_BUILD_FAILURE
@ OUTPUT_BUILD_FAILURE
HltEventLoopMgr::m_parallelIOTaskGroup
tbb::task_group m_parallelIOTaskGroup
Task group to execute parallel I/O tasks asynchronously.
Definition: HltEventLoopMgr.h:347
HltEventLoopMgr::m_schedulerSvc
SmartIF< IScheduler > m_schedulerSvc
Definition: HltEventLoopMgr.h:228
HLT::OnlineErrorCode::SCHEDULING_FAILURE
@ SCHEDULING_FAILURE
AtlasMcWeight::number_type
unsigned int number_type
Definition: AtlasMcWeight.h:20
HltEventLoopMgr::inputThreadCallback
void inputThreadCallback()
Definition: HltEventLoopMgr.cxx:1057
HltEventLoopMgr::EventLoopStatus::coolUpdateCond
std::condition_variable coolUpdateCond
Condition variable to synchronize COOL updates.
Definition: HltEventLoopMgr.h:145
HltEventLoopMgr::nextEvent
virtual StatusCode nextEvent(int maxevt=-1) override
Implementation of IEventProcessor::nextEvent which implements the event loop.
Definition: HltEventLoopMgr.cxx:524
athena.value
value
Definition: athena.py:124
HltEventLoopMgr::m_eventTimerStartPoint
std::vector< std::chrono::steady_clock::time_point > m_eventTimerStartPoint
Vector of event start-processing time stamps in each slot.
Definition: HltEventLoopMgr.h:331
HLT::OnlineErrorCode::CANNOT_ACCESS_SLOT
@ CANNOT_ACCESS_SLOT
HltEventLoopMgr::HltEventLoopMgr
HltEventLoopMgr(const std::string &name, ISvcLocator *svcLoc)
Standard constructor.
Definition: HltEventLoopMgr.cxx:68
python.PyKernel.AttributeList
AttributeList
Definition: PyKernel.py:36
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
SG::VarHandleKey::key
const std::string & key() const
Return the StoreGate ID for the referenced object.
Definition: AthToolSupport/AsgDataHandles/Root/VarHandleKey.cxx:141
SG::VarHandleKey::empty
bool empty() const
Test if the key is blank.
Definition: AthToolSupport/AsgDataHandles/Root/VarHandleKey.cxx:150
HltEventLoopMgr::createEventContext
virtual EventContext createEventContext() override
create an Event Context object
Definition: HltEventLoopMgr.cxx:578
HltEventLoopMgr::m_monitorScheduler
Gaudi::Property< bool > m_monitorScheduler
Definition: HltEventLoopMgr.h:305
CondAttrListCollection
This class is a collection of AttributeLists where each one is associated with a channel number....
Definition: CondAttrListCollection.h:52
HltEventLoopMgr::m_errorMonTool
ToolHandle< ITrigErrorMonTool > m_errorMonTool
Definition: HltEventLoopMgr.h:223
x
#define x
HltEventLoopMgr::m_sorPath
Gaudi::Property< std::string > m_sorPath
Definition: HltEventLoopMgr.h:283
XMLtoHeader.count
count
Definition: XMLtoHeader.py:85
HltEventLoopMgr::m_execAtStart
Gaudi::Property< std::vector< std::string > > m_execAtStart
Definition: HltEventLoopMgr.h:286
HLT::OnlineErrorCode::NO_EVENT_INFO
@ NO_EVENT_INFO
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
HLT::OnlineErrorCode::BEFORE_NEXT_EVENT
@ BEFORE_NEXT_EVENT
HltEventLoopMgr::m_maxParallelIOTasks
Gaudi::Property< int > m_maxParallelIOTasks
Definition: HltEventLoopMgr.h:252
HltEventLoopMgr::m_monTool
ToolHandle< GenericMonitoringTool > m_monTool
Definition: HltEventLoopMgr.h:222
HltEventLoopMgr::hltUpdateAfterFork
virtual StatusCode hltUpdateAfterFork(const boost::property_tree::ptree &pt) override
Definition: HltEventLoopMgr.cxx:413
HltEventLoopMgr::initialize
virtual StatusCode initialize() override
Definition: HltEventLoopMgr.cxx:83
SGHiveMgrSvc.h
HltEventLoopMgr::m_detectorStore
ServiceHandle< StoreGateSvc > m_detectorStore
Definition: HltEventLoopMgr.h:214
HLT::OnlineErrorCode::BAD_CTP_FRAGMENT
@ BAD_CTP_FRAGMENT
HltEventLoopMgr::m_forceLumiblock
Gaudi::Property< unsigned int > m_forceLumiblock
Definition: HltEventLoopMgr.h:295
HltEventLoopMgr::EventLoopStatus::eventsAvailable
std::atomic< bool > eventsAvailable
Event source has more events.
Definition: HltEventLoopMgr.h:135
HltEventLoopMgr::m_schedulerMonSvc
ServiceHandle< ISchedulerMonSvc > m_schedulerMonSvc
Definition: HltEventLoopMgr.h:219
SG::makeHandle
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
Definition: ReadCondHandle.h:270
BchCleanup.mgr
mgr
Definition: BchCleanup.py:294
HltEventLoopMgr::m_maxFrameworkErrors
Gaudi::Property< int > m_maxFrameworkErrors
Definition: HltEventLoopMgr.h:263
EventInfoFromxAOD.h
HltEventLoopMgr::m_softTimeoutValue
std::chrono::milliseconds m_softTimeoutValue
Soft timeout value set to HardTimeout*SoftTimeoutFraction at initialisation.
Definition: HltEventLoopMgr.h:345
HltEventLoopMgr::m_fwkErrorDebugStreamName
Gaudi::Property< std::string > m_fwkErrorDebugStreamName
Definition: HltEventLoopMgr.h:267
HltEventLoopMgr::finalize
virtual StatusCode finalize() override
Definition: HltEventLoopMgr.cxx:242
HLT::HLTResultMT::addStreamTag
StatusCode addStreamTag(const eformat::helper::StreamTag &streamTag)
Append one stream tag to the stored list.
Definition: HLTResultMT.cxx:62
HltEventLoopMgr::m_ioCompMgr
ServiceHandle< IIoComponentMgr > m_ioCompMgr
Definition: HltEventLoopMgr.h:216
hltonl::Exception::BadCTPFragment
Thrown if the CTP ROBFragment for a new event has non-zero status word or other errors.
Definition: HltExceptions.h:49
python.handimod.now
now
Definition: handimod.py:675
Amg::toString
std::string toString(const Translation3D &translation, int precision=4)
GeoPrimitvesToStringConverter.
Definition: GeoPrimitivesToStringConverter.h:40
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
SG::WriteHandleKey< HLT::HLTResultMT >
StdJOSetup.msgSvc
msgSvc
Provide convenience handles for various services.
Definition: StdJOSetup.py:36
lumiFormat.i
int i
Definition: lumiFormat.py:85
HLT::OnlineErrorCode::OUTPUT_SEND_FAILURE
@ OUTPUT_SEND_FAILURE
HltEventLoopMgr::m_forceRunNumber
Gaudi::Property< unsigned int > m_forceRunNumber
Definition: HltEventLoopMgr.h:292
beamspotman.n
n
Definition: beamspotman.py:731
Atlas::ExtendedEventContext
Definition: ExtendedEventContext.h:23
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
python.TrigInDetValidation_AODtoTrkNtuple_CA.histsvc
histsvc
TrigInDetMonitoring part ################################.
Definition: TrigInDetValidation_AODtoTrkNtuple_CA.py:55
Incidents.h
HLT::OnlineErrorCode::NO_HLT_RESULT
@ NO_HLT_RESULT
HLT::isEventProcessingErrorCode
constexpr bool isEventProcessingErrorCode(const OnlineErrorCode code)
Definition: OnlineErrorCode.h:70
HltEventLoopMgr::m_parallelIOQueue
tbb::concurrent_bounded_queue< bool > m_parallelIOQueue
Queue limiting the number of parallel I/O tasks.
Definition: HltEventLoopMgr.h:349
calibdata.exception
exception
Definition: calibdata.py:496
HltEventLoopMgr::m_rewriteLVL1
Gaudi::Property< bool > m_rewriteLVL1
Definition: HltEventLoopMgr.h:301
HltEventLoopMgr::m_roibResultRHKey
SG::ReadHandleKey< ROIB::RoIBResult > m_roibResultRHKey
Definition: HltEventLoopMgr.h:317
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
HltEventLoopMgr::m_evtStore
ServiceHandle< StoreGateSvc > m_evtStore
Definition: HltEventLoopMgr.h:213
hist_file_dump.f
f
Definition: hist_file_dump.py:135
xAOD::eventNumber
eventNumber
Definition: EventInfo_v1.cxx:124
SG::VarHandleKey::initialize
StatusCode initialize(bool used=true)
If this object is used as a property, then this should be called during the initialize phase.
Definition: AthToolSupport/AsgDataHandles/Root/VarHandleKey.cxx:103
Handler::svc
AthROOTErrorHandlerSvc * svc
Definition: AthROOTErrorHandlerSvc.cxx:10
Athena::Timeout::instance
static Timeout & instance()
Get reference to Timeout singleton.
Definition: Timeout.h:64
HltEventLoopMgr::m_maxIOWakeUpIntervalMs
Gaudi::Property< int > m_maxIOWakeUpIntervalMs
Definition: HltEventLoopMgr.h:257
hltonl::Exception::NoEventsTemporarily
Thrown if the event source cannot provide new events temporarily, e.g. when trigger is on hold.
Definition: HltExceptions.h:25
HltEventLoopMgr::executeEvent
virtual StatusCode executeEvent(EventContext &&ctx) override
Implementation of IEventProcessor::executeEvent which processes a single event.
Definition: HltEventLoopMgr.cxx:591
HltEventLoopMgr::m_timeoutDebugStreamName
Gaudi::Property< std::string > m_timeoutDebugStreamName
Definition: HltEventLoopMgr.h:275
perfmonmt-printer.required
required
Definition: perfmonmt-printer.py:184
HltEventLoopMgr::m_freeSlots
std::atomic< size_t > m_freeSlots
Number of free slots used to synchronise input/output tasks.
Definition: HltEventLoopMgr.h:337
HltEventLoopMgr::m_forceSOR_ns
Gaudi::Property< unsigned long long > m_forceSOR_ns
Definition: HltEventLoopMgr.h:298
HltEventLoopMgr::EventLoopStatus::coolUpdateMutex
std::mutex coolUpdateMutex
Mutex to synchronize COOL updates.
Definition: HltEventLoopMgr.h:147
HltEventLoopMgr::m_hltResultMaker
ToolHandle< HLTResultMTMaker > m_hltResultMaker
Definition: HltEventLoopMgr.h:221
HltEventLoopMgr::m_nFrameworkErrors
std::atomic< int > m_nFrameworkErrors
Counter of framework errors.
Definition: HltEventLoopMgr.h:357
HltEventLoopMgr::m_evtSelector
ServiceHandle< IEvtSelector > m_evtSelector
Definition: HltEventLoopMgr.h:217
HltEventLoopMgr::EventLoopStatus::loopEndedCond
std::condition_variable loopEndedCond
Condition variable to notify the main thread of the end of the event loop.
Definition: HltEventLoopMgr.h:139
HltEventLoopMgr::m_currentRunCtx
EventContext m_currentRunCtx
"Event" context of current run with dummy event/slot number
Definition: HltEventLoopMgr.h:325
HltEventLoopMgr::clearTemporaryStores
StatusCode clearTemporaryStores()
Clear per-event stores.
Definition: HltEventLoopMgr.cxx:702
ptree
boost::property_tree::ptree ptree
Definition: JsonFileLoader.cxx:16
TrigCOOLUpdateHelper.h
Helper tool for COOL updates.
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
ActsTrk::to_string
std::string to_string(const DetectorType &type)
Definition: GeometryDefs.h:34
HLT::OnlineErrorCode::PROCESSING_FAILURE
@ PROCESSING_FAILURE
HltEventLoopMgr::outputThreadCallback
void outputThreadCallback()
The method executed by the output handling thread.
Definition: HltEventLoopMgr.cxx:1123
HltEventLoopMgr::m_workerPID
int m_workerPID
Worker PID.
Definition: HltEventLoopMgr.h:363
HltEventLoopMgr::processFinishedEvent
StatusCode processFinishedEvent()
Perform all end-of-event actions for a single event popped out from the scheduler.
Definition: HltEventLoopMgr.cxx:1453
LArNewCalib_Delay_OFC_Cali.check
check
Definition: LArNewCalib_Delay_OFC_Cali.py:267
HltEventLoopMgr::updateMetadataStore
void updateMetadataStore(const coral::AttributeList &sor_attrlist) const
Definition: HltEventLoopMgr.cxx:639
Athena::Status::TIMEOUT
@ TIMEOUT
Timeout during event processing.
SG::HiveMgrSvc::setNumProcs
static void setNumProcs(size_t numProcs)
Set number of concurrent processes.
Definition: SGHiveMgrSvc.cxx:35
HltEventLoopMgr::updateDFProps
void updateDFProps()
Read DataFlow configuration properties.
Definition: HltEventLoopMgr.cxx:618
HltEventLoopMgr::startNextEvent
StatusCode startNextEvent()
Definition: HltEventLoopMgr.cxx:1233
TrigRDBManager.h
HltEventLoopMgr::EventLoopStatus::loopEnded
std::atomic< bool > loopEnded
No more events available and all ongoing processing has finished.
Definition: HltEventLoopMgr.h:137
HltEventLoopMgr::m_whiteboardName
Gaudi::Property< std::string > m_whiteboardName
Definition: HltEventLoopMgr.h:236
HLT::OnlineErrorCode
OnlineErrorCode
Definition: OnlineErrorCode.h:15
HltEventLoopMgr::m_algErrorDebugStreamName
Gaudi::Property< std::string > m_algErrorDebugStreamName
Definition: HltEventLoopMgr.h:271
HltEventLoopMgr::m_outputCnvSvc
ServiceHandle< IConversionSvc > m_outputCnvSvc
Definition: HltEventLoopMgr.h:218
HltEventLoopMgr::m_workerID
int m_workerID
Worker ID.
Definition: HltEventLoopMgr.h:361
HLTResultMT.h
HltEventLoopMgr::m_aess
SmartIF< IAlgExecStateSvc > m_aess
Definition: HltEventLoopMgr.h:227
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
HltEventLoopMgr::stopRun
virtual StatusCode stopRun() override
Implementation of IEventProcessor::stopRun (obsolete for online runnning)
Definition: HltEventLoopMgr.cxx:570
HltEventLoopMgr.h
HltEventLoopMgr::m_algResourcePool
SmartIF< IAlgResourcePool > m_algResourcePool
Definition: HltEventLoopMgr.h:226
HltEventLoopMgr::printSORAttrList
void printSORAttrList(const coral::AttributeList &atr) const
Print the SOR record.
Definition: HltEventLoopMgr.cxx:741
HLT::HLTResultMT::addErrorCode
void addErrorCode(const HLT::OnlineErrorCode &errorCode, const eformat::helper::Status &firstStatusWord={ eformat::GenericStatus::DATA_CORRUPTION, eformat::FullEventStatus::PSC_PROBLEM })
Append an error code.
Definition: HLTResultMT.cxx:218
eventIDFromxAOD
EventID eventIDFromxAOD(const xAOD::EventInfo *xaod)
Create EventID object from xAOD::EventInfo.
Definition: EventInfoFromxAOD.cxx:16
HltEventLoopMgr::m_truncationDebugStreamName
Gaudi::Property< std::string > m_truncationDebugStreamName
Definition: HltEventLoopMgr.h:279
HltEventLoopMgr::m_freeSlotStartPoint
std::vector< std::chrono::steady_clock::time_point > m_freeSlotStartPoint
Vector of time stamps telling when each scheduler slot was freed.
Definition: HltEventLoopMgr.h:333
EventID
This class provides a unique identification for each event, in terms of run/event number and/or a tim...
Definition: EventID.h:35
HltEventLoopMgr::m_inputThread
std::unique_ptr< HLT::LoopThread > m_inputThread
Input handling thread (triggers reading new events)
Definition: HltEventLoopMgr.h:339
HltEventLoopMgr::m_localEventNumber
std::atomic< size_t > m_localEventNumber
Event counter used for local bookkeeping; incremental per instance of HltEventLoopMgr,...
Definition: HltEventLoopMgr.h:327
HltEventLoopMgr::m_timeoutThreadIntervalMs
Gaudi::Property< unsigned int > m_timeoutThreadIntervalMs
Definition: HltEventLoopMgr.h:245
HLT::OnlineErrorCode::SCHEDULER_POP_FAILURE
@ SCHEDULER_POP_FAILURE
HltEventLoopMgr::m_hltResultRHKey
SG::ReadHandleKey< HLT::HLTResultMT > m_hltResultRHKey
StoreGate key for reading the HLT result.
Definition: HltEventLoopMgr.h:320
HltEventLoopMgr::m_incidentSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
Definition: HltEventLoopMgr.h:211
TrigRDBManager::closeDBConnections
static StatusCode closeDBConnections(MsgStream &msg)
Close database connections.
Definition: TrigRDBManager.h:28
HltEventLoopMgr::EventLoopStatus::maxLB
std::atomic< EventIDBase::number_type > maxLB
Max lumiblock number seen in the loop.
Definition: HltEventLoopMgr.h:143
HLT::OnlineErrorCode::COOL_UPDATE
@ COOL_UPDATE
HLT::OnlineErrorCode::RESULT_TRUNCATION
@ RESULT_TRUNCATION
HltEventLoopMgr::m_evtSelContext
IEvtSelector::Context * m_evtSelContext
Event selector context.
Definition: HltEventLoopMgr.h:329
Athena::TimeoutMaster::resetTimeout
void resetTimeout(Timeout &instance)
Reset timeout.
Definition: Timeout.h:83
python.dummyaccess.exists
def exists(filename)
Definition: dummyaccess.py:9
Monitored::Scalar
Declare a monitored scalar variable.
Definition: MonitoredScalar.h:34
HltEventLoopMgr::m_finishedEventsQueue
tbb::concurrent_bounded_queue< EventContext * > m_finishedEventsQueue
Queue of events ready for output processing.
Definition: HltEventLoopMgr.h:351
HltEventLoopMgr::prepareForStart
virtual StatusCode prepareForStart(const boost::property_tree::ptree &) override
Definition: HltEventLoopMgr.cxx:280
Athena::TimeoutMaster::setTimeout
void setTimeout(Timeout &instance)
Set timeout.
Definition: Timeout.h:80
HltEventLoopMgr::m_timeoutTraceGenerated
bool m_timeoutTraceGenerated
Flag set when a soft timeout produces a stack trace, to avoid producing multiple traces.
Definition: HltEventLoopMgr.h:355
HltEventLoopMgr::m_loopStatus
EventLoopStatus m_loopStatus
Object keeping track of the event loop status.
Definition: HltEventLoopMgr.h:353
SG::AllowEmpty
@ AllowEmpty
Definition: StoreGate/StoreGate/VarHandleKey.h:30
module_driven_slicing.histfile
histfile
Definition: module_driven_slicing.py:571
HltEventLoopMgr::EventLoopStatus::loopEndedMutex
std::mutex loopEndedMutex
Mutex to notify the main thread of the end of the event loop.
Definition: HltEventLoopMgr.h:141
HltEventLoopMgr::~HltEventLoopMgr
virtual ~HltEventLoopMgr() noexcept override
Standard destructor.
Definition: HltEventLoopMgr.cxx:74
StoreGateSvc.h
python.compressB64.c
def c
Definition: compressB64.py:93
HltEventLoopMgr::m_eventContextWHKey
SG::WriteHandleKey< EventContext > m_eventContextWHKey
Definition: HltEventLoopMgr.h:308
AthStatusCode.h
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
HLT::OnlineErrorCode::CANNOT_RETRIEVE_EVENT
@ CANNOT_RETRIEVE_EVENT
hltonl::Exception::NoMoreEvents
Thrown if all events are already read from the input and another one is requested.
Definition: HltExceptions.h:17
Monitored::Timer
A monitored timer.
Definition: MonitoredTimer.h:32
HltEventLoopMgr::executeRun
virtual StatusCode executeRun(int maxevt=-1) override
Implementation of IEventProcessor::executeRun which calls IEventProcessor::nextEvent.
Definition: HltEventLoopMgr.cxx:494
python.CaloScaleNoiseConfig.args
args
Definition: CaloScaleNoiseConfig.py:80
TSU::T
unsigned long long T
Definition: L1TopoDataTypes.h:35
HltEventLoopMgr::m_coolHelper
ToolHandle< TrigCOOLUpdateHelper > m_coolHelper
Definition: HltEventLoopMgr.h:220
HltEventLoopMgr::getSorAttrList
const coral::AttributeList & getSorAttrList() const
Extract the single attr list off the SOR CondAttrListCollection.
Definition: HltEventLoopMgr.cxx:720
mapkey::key
key
Definition: TElectronEfficiencyCorrectionTool.cxx:37
HltEventLoopMgr::m_isSlotProcessing
std::vector< bool > m_isSlotProcessing
Vector of flags to tell if a slot is idle or processing.
Definition: HltEventLoopMgr.h:335