24#include "GaudiKernel/ConcurrencyFlags.h"
25#include "GaudiKernel/IAlgManager.h"
26#include "GaudiKernel/IAlgorithm.h"
27#include "GaudiKernel/IEvtSelector.h"
28#include "GaudiKernel/IProperty.h"
29#include "GaudiKernel/IIoComponent.h"
30#include "GaudiKernel/ThreadLocalContext.h"
33#include "eformat/StreamTag.h"
50 bool isTimedOut(
const std::unordered_map<std::string_view,StatusCode>& algErrors) {
51 for (
const auto& [key,
sc] : algErrors) {
57 template <
typename T> std::string
toString(
const T&
x) {
58 std::ostringstream
ss;
63using namespace boost::property_tree;
69: base_class(name, svcLoc) {}
86 m_autoRetrieveTools =
false;
87 m_checkToolDeps =
false;
91 ATH_MSG_INFO(
" ---> HltEventLoopMgr = " << name() <<
" initialize");
126 const std::string& slots =
m_jobOptionsSvc->get(
"EventDataSvc.NSlots");
130 ATH_MSG_WARNING(
"Failed to retrieve the job property EventDataSvc.NSlots");
131 const std::string& threads =
m_jobOptionsSvc->get(
"AvalancheSchedulerSvc.ThreadPoolSize");
132 if (!threads.empty())
135 ATH_MSG_WARNING(
"Failed to retrieve the job property AvalancheSchedulerSvc.ThreadPoolSize");
137 const std::string& procs =
m_jobOptionsSvc->get(
"DataFlowConfig.DF_NumberOfWorkers");
138 if (!procs.empty()) {
143 catch (
const std::logic_error& ex) {
144 ATH_MSG_ERROR(
"Cannot convert " << procs <<
"to integer: " << ex.what());
145 return StatusCode::FAILURE;
149 ATH_MSG_WARNING(
"Failed to retrieve the job property DataFlowconfig.DF_NumberOfWorkers");
164 return StatusCode::FAILURE;
171 return StatusCode::FAILURE;
175 m_aess = serviceLocator()->service(
"AlgExecStateSvc");
178 return StatusCode::FAILURE;
223 return StatusCode::SUCCESS;
236 return StatusCode::SUCCESS;
244 ATH_MSG_INFO(
" ---> HltEventLoopMgr/" << name() <<
" finalize ");
249 auto releaseAndCheck = [&](
auto& handle, std::string_view handleType) {
250 if (handle.release().isFailure())
251 ATH_MSG_WARNING(
"finalize(): Failed to release " << handleType <<
" " << handle.typeAndName());
253 auto releaseService = [&](
auto&&... args) { (releaseAndCheck(args,
"service"), ...); };
254 auto releaseTool = [&](
auto&&... args) { (releaseAndCheck(args,
"tool"), ...); };
255 auto releaseSmartIF = [](
auto&&... args) { (args.reset(), ...); };
274 return StatusCode::SUCCESS;
283 const auto& rparams = pt.get_child(
"RunParams");
286 catch(ptree_bad_path& e) {
287 ATH_MSG_ERROR(
"Bad ptree path: \"" << e.path<ptree::path_type>().dump() <<
"\" - " << e.what());
288 return StatusCode::FAILURE;
317 catch(
const std::exception& e) {
323 return StatusCode::SUCCESS;
330StatusCode HltEventLoopMgr::prepareForRun(
const ptree& )
352 if ( !gROOT->GetListOfFiles()->IsEmpty() ) {
353 std::unordered_map<std::string, size_t> dups;
354 for (
const auto f : *gROOT->GetListOfFiles()) {
355 ++dups[f->GetName()];
358 auto histsvc = serviceLocator()->service(
"THistSvc",
false).as<IIoComponent>();
360 dups.erase(histfile);
363 msg() << MSG::ERROR <<
"The following ROOT files (with #instances) have not been closed yet: ";
364 for (
const auto& [n,c] : dups)
msg() <<
n <<
"(x" <<
c <<
") ";
375 ATH_MSG_FATAL(
"Misconfiguration - Scheduler was initialised before forking!");
376 return StatusCode::FAILURE;
380 return StatusCode::SUCCESS;
382 catch(
const std::runtime_error& e)
388 return StatusCode::FAILURE;
395 IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
399 SmartIF<IAlgorithm>& alg = algMgr->algorithm(name,
false);
402 sc &= alg->sysExecute(ctx);
424 return StatusCode::FAILURE;
429 SmartIF<IService> svc = serviceLocator()->service(
"CoreDumpSvc",
false);
431 StatusCode
sc = svc->stop();
433 if (
sc.isFailure()) {
446 SmartIF<IIoComponent> histsvc = serviceLocator()->service(
"THistSvc",
false).as<IIoComponent>();
447 if ( !
m_ioCompMgr->io_retrieve(histsvc.get()).empty() ) {
448 std::filesystem::path worker_dir = std::filesystem::absolute(
"athenaHLT_workers");
449 std::ostringstream oss;
450 oss <<
"athenaHLT-" << std::setfill(
'0') << std::setw(2) <<
m_workerID;
451 worker_dir /= oss.str();
453 if ( std::filesystem::exists(worker_dir) ) {
454 if ( std::filesystem::remove_all(worker_dir) == 0 ) {
455 ATH_MSG_FATAL(
"Cannot delete previous worker directory " << worker_dir);
456 return StatusCode::FAILURE;
459 if ( !std::filesystem::create_directories(worker_dir) ) {
460 ATH_MSG_FATAL(
"Cannot create worker directory " << worker_dir);
461 return StatusCode::FAILURE;
463 ATH_MSG_INFO(
"Writing worker output files to " << worker_dir);
468 const size_t numSlots =
m_whiteboard->getNumberOfStores();
488 return StatusCode::SUCCESS;
500 StatusCode
sc = StatusCode::SUCCESS;
505 catch (
const std::exception& e) {
506 ATH_MSG_FATAL(
"Event loop failed, std::exception caught: " << e.what());
507 sc = StatusCode::FAILURE;
510 ATH_MSG_FATAL(
"Event loop failed, unknown exception caught");
511 sc = StatusCode::FAILURE;
535 std::unique_lock<std::mutex> lock{
m_loopStatus.loopEndedMutex};
543 ATH_MSG_DEBUG(
"Event loop started, the main thread is going to sleep until it finishes");
545 ATH_MSG_INFO(
"All events processed, finalising the event loop");
550 ATH_MSG_DEBUG(
"Waiting for all I/O tasks and threads to return");
556 ATH_MSG_DEBUG(
"All I/O threads and tasks finished. Stopping the timeout thread");
564 return StatusCode::SUCCESS;
571 ATH_MSG_FATAL(
"Misconfiguration - the method HltEventLoopMgr::stopRun() cannot be used online");
572 return StatusCode::FAILURE;
581 if (slot == std::string::npos) {
583 return EventContext();
585 return EventContext{ eventNumber, slot };
604 ATH_MSG_DEBUG(
"Adding event " << ctx.evt() <<
", slot " << ctx.slot() <<
" to the scheduler");
605 StatusCode addEventStatus =
m_schedulerSvc->pushNewEvent(
new EventContext{std::move(ctx)} );
608 if (addEventStatus.isFailure()){
610 return StatusCode::FAILURE;
614 return StatusCode::SUCCESS;
620 auto getDFProp = [&](
const std::string& name, std::string& value,
bool required =
true) {
623 ATH_MSG_INFO(
" ---> Read from DataFlow configuration: " << name <<
" = " << value);
625 msg() << (required ? MSG::WARNING : MSG::INFO)
626 <<
"Could not set Property " << name <<
" from DataFlow" <<
endmsg;
631 std::string wid, wpid;
632 getDFProp(
"DF_WorkerId", wid,
false );
633 getDFProp(
"DF_Pid", wpid,
false );
634 if (!wid.empty())
m_workerID = std::stoi(wid);
641 auto metadatacont = std::make_unique<ByteStreamMetadataContainer>();
642 metadatacont->push_back(std::make_unique<ByteStreamMetadata>(
646 sor_attrlist[
"RecordingEnabled"].
data<bool>(),
656 std::vector<std::string>()
672 auto tor_cur = pt.get<
float>(
"Magnets.ToroidsCurrent.value");
673 auto sol_cur = pt.get<
float>(
"Magnets.SolenoidCurrent.value");
676 IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
678 SmartIF<IAlgorithm>& fieldAlg = algMgr->algorithm(
"AtlasFieldMapCondAlg",
false);
680 ATH_MSG_INFO(
"Setting field currents on AtlasFieldMapCondAlg");
681 ATH_CHECK( Gaudi::Utils::setProperty(fieldAlg,
"MapSoleCurrent", sol_cur) );
682 ATH_CHECK( Gaudi::Utils::setProperty(fieldAlg,
"MapToroCurrent", tor_cur) );
686 ATH_MSG_INFO(
"*****************************************");
687 ATH_MSG_INFO(
" Auto-configuration of magnetic field: ");
688 ATH_MSG_INFO(
" solenoid current from IS = " << sol_cur);
690 ATH_MSG_INFO(
"*****************************************");
692 catch(ptree_bad_path& e) {
693 ATH_MSG_ERROR(
"Cannot read magnet currents from ptree: " << e.what() );
694 return StatusCode::FAILURE;
697 return StatusCode::SUCCESS;
716 return StatusCode::SUCCESS;
724 throw std::runtime_error(
"Cannot retrieve " +
m_sorPath);
732 throw std::runtime_error(
"SOR record should have one and one only attribute list, but it has " + std::to_string(sor->size()));
735 const auto & soral = sor->begin()->second;
746 time_t sorTime_sec = sorTime_ns / std::nano::den;
752 " (" << std::put_time(localtime_r(&sorTime_sec, &buf),
"%F %T") <<
") ");
754 auto dmfst = atr[
"DetectorMaskFst"].data<
unsigned long long>();
755 auto dmsnd = atr[
"DetectorMaskSnd"].data<
unsigned long long>();
756 ATH_MSG_INFO(
" DetectorMaskFst = 0x" << std::format(
"{:016x}", dmfst));
757 ATH_MSG_INFO(
" DetectorMaskSnd = 0x" << std::format(
"{:016x}", dmsnd));
758 ATH_MSG_INFO(
" Complete DetectorMask = 0x" << std::format(
"{:016x}", dmfst)
759 << std::format(
"{:016x}", dmsnd));
768 ATH_MSG_VERBOSE(
"start of " << __FUNCTION__ <<
" with errorCode = " << errorCode
769 <<
", context = " << eventContext <<
" eventID = " << eventContext.eventID());
772 Gaudi::Hive::setCurrentContext(eventContext);
774 auto returnFailureAndStopEventLoop = [
this]() -> StatusCode {
785 return StatusCode::FAILURE;
792 ATH_MSG_ERROR(
"Failure occurred with OnlineErrorCode=" << errorCode
793 <<
" meaning there was a framework error before requesting a new event. No output will be produced for this event"
794 <<
" and the event loop will exit after all ongoing processing is finished.");
795 return returnFailureAndStopEventLoop();
798 ATH_MSG_ERROR(
"Failure occurred with OnlineErrorCode=" << errorCode
799 <<
" meaning a new event could not be correctly read. No output will be produced for this event."
800 <<
" The event loop will exit after all ongoing processing is finished.");
801 return returnFailureAndStopEventLoop();
804 ATH_MSG_ERROR(
"Failure occurred with OnlineErrorCode=" << errorCode
805 <<
" meaning there was a framework error after HLT result was already sent out."
806 <<
" The event loop will exit after all ongoing processing is finished.");
807 return returnFailureAndStopEventLoop();
810 ATH_MSG_ERROR(
"Failed to access the slot for the processed event, cannot produce output. OnlineErrorCode="
811 << errorCode <<
". The event loop will exit after all ongoing processing is finished unless the failed event"
812 <<
" reaches a hard timeout sooner and this process is killed.");
813 return returnFailureAndStopEventLoop();
819 ATH_MSG_ERROR(
"Failure occurred with OnlineErrorCode=" << errorCode
820 <<
". Cannot determine if the event processing started or not and whether a decision for this event will be"
821 <<
" produced. The event loop will exit after all ongoing processing is finished, which may include or"
822 <<
" not include the problematic event.");
823 return returnFailureAndStopEventLoop();
826 ATH_MSG_ERROR(
"Failure occurred with OnlineErrorCode=" << errorCode
827 <<
" meaning the Scheduler returned FAILURE when asked to give a finished event. Will keep trying to"
828 <<
" pop further events if there are any still in the scheduler, but this may keep repeating until"
829 <<
" this process is killed by hard timeout or other means. If all ongoing processing manages to finish"
830 <<
" then the event loop will exit.");
831 return returnFailureAndStopEventLoop();
833 if (!eventContext.valid()) {
834 ATH_MSG_ERROR(
"Failure occurred with an invalid EventContext. Likely there was a framework error before"
835 <<
" requesting a new event or after sending the result of a finished event. OnlineErrorCode=" << errorCode
836 <<
". The event loop will exit after all ongoing processing is finished.");
837 return returnFailureAndStopEventLoop();
843 if (
m_whiteboard->selectStore(eventContext.slot()).isFailure()) {
850 std::string debugStreamName;
865 eformat::helper::StreamTag debugStreamTag{debugStreamName, eformat::DEBUG_TAG,
true};
870 std::unique_ptr<HLT::HLTResultMT> hltResultPtr;
871 StatusCode buildResultCode{StatusCode::SUCCESS};
873 if (hltResultRH.isValid() && !hltResultRH->getSerialisedData().empty()) {
875 hltResultPtr = std::make_unique<HLT::HLTResultMT>(*hltResultRH);
876 hltResultPtr->addErrorCode(errorCode);
877 buildResultCode &= hltResultPtr->addStreamTag(debugStreamTag);
880 hltResultPtr = std::make_unique<HLT::HLTResultMT>();
881 hltResultPtr->addErrorCode(errorCode);
882 buildResultCode &= hltResultPtr->addStreamTag(debugStreamTag);
885 buildResultCode &=
m_hltResultMaker->fillResult(*hltResultPtr,eventContext);
893 if (buildResultCode.isFailure() || hltResultWH.record(std::move(hltResultPtr)).isFailure()) {
896 ATH_MSG_ERROR(
"Second failure to build or record the HLT Result in event store while handling a failed event. "
897 <<
"Cannot force-accept this event from HLT side, will rely on data collector to do this. "
898 <<
"The event loop will exit after all ongoing processing is finished.");
899 return returnFailureAndStopEventLoop();
901 ATH_MSG_ERROR(
"Failed to build or record the HLT Result in event store while handling a failed event. "
902 <<
"Trying again with skipped filling of the result contents (except debug stream tag).");
910 int64_t eventTimeMillisec = std::chrono::duration_cast<std::chrono::milliseconds>(eventTime).count();
919 ATH_MSG_ERROR(
"The output conversion service failed in connectOutput() while handling a failed event. "
920 <<
"Cannot force-accept this event from HLT side, will rely on data collector to do this. "
921 <<
"The event loop will exit after all ongoing processing is finished.");
922 return returnFailureAndStopEventLoop();
925 DataObject* hltResultDO =
m_evtStore->accessData(hltResultWH.clid(),hltResultWH.key());
926 if (hltResultDO ==
nullptr) {
929 ATH_MSG_ERROR(
"Second failure to build or record the HLT Result in event store while handling a failed event. "
930 <<
"Cannot force-accept this event from HLT side, will rely on data collector to do this. "
931 <<
"The event loop will exit after all ongoing processing is finished.");
932 return returnFailureAndStopEventLoop();
934 ATH_MSG_ERROR(
"Failed to retrieve DataObject for the HLT result object while handling a failed event. "
935 <<
"Trying again with skipped filling of the result contents (except debug stream tag).");
939 IOpaqueAddress* addr =
nullptr;
940 if (
m_outputCnvSvc->createRep(hltResultDO,addr).isFailure() || addr ==
nullptr) {
941 ATH_MSG_ERROR(
"Conversion of HLT result object to the output format failed while handling a failed event. "
942 <<
"Cannot force-accept this event from HLT side, will rely on data collector to do this. "
943 <<
"The event loop will exit after all ongoing processing is finished.");
946 return returnFailureAndStopEventLoop();
950 ATH_MSG_ERROR(
"The output conversion service failed in commitOutput() while handling a failed event. "
951 <<
"Cannot force-accept this event from HLT side, will rely on data collector to do this. "
952 <<
"The event loop will exit after all ongoing processing is finished.");
955 return returnFailureAndStopEventLoop();
971 const EventContext eventContextCopy = eventContext;
990 <<
" was successfully handled, but the number of tolerable framework errors for this HltEventLoopMgr instance,"
991 <<
" which is " <<
m_maxFrameworkErrors.value() <<
", was exceeded. Current local event number is "
992 << eventContextCopy.evt() <<
", slot " << eventContextCopy.slot()
993 <<
". The event loop will exit after all ongoing processing is finished.");
994 return returnFailureAndStopEventLoop();
999 ATH_MSG_ERROR(
"Failed event with OnlineErrorCode=" << errorCode
1000 <<
" Current local event number is " << eventContextCopy.evt() <<
", slot " << eventContextCopy.slot());
1003 return StatusCode::SUCCESS;
1010 auto now=std::chrono::steady_clock::now();
1015 EventContext ctx(0,i);
1023 ATH_MSG_INFO(
"Generating stack trace due to the soft timeout");
1025 gSystem->StackTrace();
1035 if (!eventContext.valid()) {
return;}
1052 if( !
sc.isSuccess() ) {
1055 ATH_MSG_VERBOSE(
"end of " << __FUNCTION__ <<
", returning m_whiteboard->freeStore(evtSlot=" << evtSlot <<
")");
1063 ATH_MSG_VERBOSE(
"Event loop ended, stopping the input thread and returning from " << __FUNCTION__);
1072 ATH_MSG_VERBOSE(
"No more events, flagging the event loop as finished, stopping the input thread"
1073 <<
" and returning from " << __FUNCTION__);
1080 if (numSlotsToFill==0) {
1087 ATH_MSG_DEBUG(
"Free slots = " << numSlotsToFill <<
". Reading new event(s) to fill the slot(s).");
1090 for (
size_t i=0; i<numSlotsToFill; ++i) {
1091 auto task = [mgr=
this](){
1092 StatusCode
sc = StatusCode::SUCCESS;
1094 sc = mgr->startNextEvent();
1096 catch (
const std::exception& e) {
1097 mgr->error() <<
"Exception caught in startNextEvent: " << e.what() <<
endmsg;
1098 sc = StatusCode::FAILURE;
1101 mgr->error() <<
"Exception caught in startNextEvent" <<
endmsg;
1102 sc = StatusCode::FAILURE;
1104 if (
sc.isFailure()) {
1105 mgr->error() <<
"startNextEvent failed, stopping the event loop" <<
endmsg;
1106 mgr->m_loopStatus.exitCode = StatusCode::FAILURE;
1107 mgr->m_loopStatus.eventsAvailable =
false;
1112 bool popIOQueue{
false};
1113 mgr->m_parallelIOQueue.pop(popIOQueue);
1131 ATH_MSG_DEBUG(
"There are currently no events being processed by the Scheduler, returning from " << __FUNCTION__);
1133 ATH_MSG_DEBUG(
"No more events to process and scheduler is empty, stopping the event loop and output thread");
1146 ATH_MSG_DEBUG(
"No more events, but processing is still ongoing, returning from " << __FUNCTION__);
1154 std::vector<EventContext*> finishedEvtContexts;
1155 EventContext* finishedEvtContext(
nullptr);
1156 const auto popStartTime = std::chrono::steady_clock::now();
1159 ATH_MSG_DEBUG(
"Waiting for a finished event from the Scheduler");
1160 if (
m_schedulerSvc->popFinishedEvent(finishedEvtContext).isFailure()) {
1162 delete finishedEvtContext;
1163 finishedEvtContext =
nullptr;
1166 ATH_MSG_DEBUG(
"Scheduler returned a finished event: " << finishedEvtContext);
1167 finishedEvtContexts.push_back(finishedEvtContext);
1170 while (
m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()){
1171 ATH_MSG_DEBUG(
"Scheduler returned a finished event: " << *finishedEvtContext);
1172 finishedEvtContexts.push_back(finishedEvtContext);
1174 const auto popSpentTime = std::chrono::steady_clock::now() - popStartTime;
1175 const auto popSpentTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(popSpentTime).count();
1183 const size_t nFinishedEvents = finishedEvtContexts.size();
1184 ATH_MSG_DEBUG(
"Number of finished events to post-process: " << nFinishedEvents);
1187 for (EventContext* thisFinishedEvtContext : finishedEvtContexts) {
1189 if (thisFinishedEvtContext !=
nullptr) {
1195 auto task = [mgr=
this](){
1196 StatusCode
sc = StatusCode::SUCCESS;
1198 sc = mgr->processFinishedEvent();
1200 catch (
const std::exception& e) {
1201 mgr->error() <<
"Exception caught in processFinishedEvent: " << e.what() <<
endmsg;
1202 sc = StatusCode::FAILURE;
1205 mgr->error() <<
"Exception caught in processFinishedEvent" <<
endmsg;
1206 sc = StatusCode::FAILURE;
1209 if (
sc.isFailure()) {
1210 mgr->error() <<
"processFinishedEvent failed, stopping the event loop" <<
endmsg;
1211 mgr->m_loopStatus.exitCode = StatusCode::FAILURE;
1212 mgr->m_loopStatus.eventsAvailable =
false;
1217 bool popIOQueue{
false};
1218 mgr->m_parallelIOQueue.pop(popIOQueue);
1223 mgr->m_outputThread->cond().notify_one();
1239 StatusCode
sc = StatusCode::SUCCESS;
1241 if (
sc.isSuccess()) {
return false;}
1244 Gaudi::Hive::setCurrentContext(EventContext());
1253 std::unique_ptr<EventContext> eventContextPtr = std::make_unique<EventContext>(
createEventContext());
1255 sc = eventContextPtr->valid() ? StatusCode(StatusCode::SUCCESS) : StatusCode(StatusCode::FAILURE);
1261 if (check(
"Failed to select event store slot number " + std::to_string(eventContextPtr->slot()),
1276 sc = eventContext.record(std::move(eventContextPtr));
1277 if (check(
"Failed to record new EventContext in the event store",
1283 m_aess->reset(*eventContext);
1285 ATH_MSG_DEBUG(
"Created new EventContext with number: " << eventContext->evt()
1286 <<
", slot: " << eventContext->slot());
1291 Gaudi::Hive::setCurrentContext(*eventContext);
1296 IOpaqueAddress* addr =
nullptr;
1298 if (check(
"Event selector failed to create an IOpaqueAddress",
1307 bool noEventsTemporarily{
false};
1310 noEventsTemporarily =
false;
1313 ATH_MSG_DEBUG(
"No new input events available temporarily, requesting again");
1314 noEventsTemporarily =
true;
1316 }
while (noEventsTemporarily);
1319 sc = StatusCode::SUCCESS;
1322 if (
sc.isFailure()) {
1323 ATH_MSG_WARNING(
"Failed to clear the whiteboard slot " << eventContext->slot()
1324 <<
" after NoMoreEvents detected");
1336 return StatusCode::SUCCESS;
1339 sc = StatusCode::FAILURE;
1345 sc = StatusCode::FAILURE;
1350 catch (
const std::exception& e) {
1351 ATH_MSG_ERROR(
"Failed to get next event from the event source, std::exception caught: " << e.what());
1352 sc = StatusCode::FAILURE;
1355 ATH_MSG_ERROR(
"Failed to get next event from the event source, unknown exception caught");
1356 sc = StatusCode::FAILURE;
1358 if (check(
"Failed to get the next event",
1377 sc = eventInfo.isValid() ? StatusCode::SUCCESS : StatusCode::FAILURE;
1382 ATH_MSG_DEBUG(
"Retrieved event info for the new event " << *eventInfo);
1395 eid.set_time_stamp_ns_offset(
m_forceSOR_ns % std::nano::den);
1397 eventContext->setEventID(eid);
1400 Gaudi::Hive::setCurrentContext(*eventContext);
1407 EventIDBase::number_type oldMaxLB{0}, newMaxLB{0};
1408 bool updatedLB{
false};
1411 newMaxLB = std::max(oldMaxLB, eventContext->eventID().lumi_block());
1412 updatedLB = newMaxLB > oldMaxLB;
1413 }
while (updatedLB && !
m_loopStatus.maxLB.compare_exchange_strong(oldMaxLB, newMaxLB));
1414 m_loopStatus.maxLB.compare_exchange_strong(oldMaxLB, newMaxLB);
1419 std::unique_lock<std::mutex> lock(
m_loopStatus.coolUpdateMutex);
1426 std::lock_guard<std::mutex> lock(
m_loopStatus.coolUpdateMutex);
1445 if (check(
"Failed to schedule event processing",
1456 Gaudi::Hive::setCurrentContext( EventContext() );
1464 EventContext* eventContext{
nullptr};
1467 StatusCode
sc = StatusCode::SUCCESS;
1469 if (
sc.isSuccess()) {
return false;}
1471 const EventContext& eventContextRef = (eventContext==
nullptr) ? EventContext() : *eventContext;
1473 Gaudi::Hive::setCurrentContext(EventContext());
1474 delete eventContext;
1475 eventContext =
nullptr;
1483 if (eventContext ==
nullptr) {
1484 sc = StatusCode::FAILURE;
1485 if (check(
"Detected nullptr EventContext while finalising a processed event",
1492 Gaudi::Hive::setCurrentContext(eventContext);
1495 if (
m_aess->eventStatus(*eventContext) != EventStatus::Success) {
1496 sc = StatusCode::FAILURE;
1500 if (check(
"Processing event with context " + toString(*eventContext) + \
1501 " failed with status " + toString(
m_aess->eventStatus(*eventContext)),
1509 if (check(
"Failed to select event store slot " + std::to_string(eventContext->slot()),
1515 m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndProcessing, *eventContext));
1530 if (!hltResult.isValid()) {
sc = StatusCode::FAILURE;}
1533 DataObject* hltResultDO =
m_evtStore->accessData(hltResult.clid(),hltResult.key());
1534 if (hltResultDO ==
nullptr) {
sc = StatusCode::FAILURE;}
1538 if (!hltResult->getTruncatedModuleIds().empty() && hltResult->severeTruncation()) {
sc = StatusCode::FAILURE;}
1542 IOpaqueAddress* addr =
nullptr;
1544 if (
sc.isFailure()) {
delete addr; addr=
nullptr;}
1548 IOpaqueAddress* l1addr =
nullptr;
1549 IOpaqueAddress* l1addrLegacy =
nullptr;
1554 if (!l1TriggerResult.isValid()) {
sc = StatusCode::FAILURE;}
1555 if (check(
"Failed to retrieve the L1 Trigger Result for RewriteLVL1",
1560 DataObject* l1TriggerResultDO =
m_evtStore->accessData(l1TriggerResult.clid(),l1TriggerResult.key());
1561 if (l1TriggerResultDO ==
nullptr) {
sc = StatusCode::FAILURE;}
1562 if (check(
"Failed to retrieve the L1 Trigger Result DataObject for RewriteLVL1",
1568 if (
sc.isFailure()) {
delete l1addr; l1addr =
nullptr;}
1569 if (check(
"Conversion service failed to convert L1 Trigger Result for RewriteLVL1",
1577 if (!roibResult.isValid()) {
sc = StatusCode::FAILURE;}
1578 if (check(
"Failed to retrieve the RoIBResult for RewriteLVL1",
1583 DataObject* roibResultDO =
m_evtStore->accessData(roibResult.clid(),roibResult.key());
1584 if (roibResultDO ==
nullptr) {
sc = StatusCode::FAILURE;}
1585 if (check(
"Failed to retrieve the RoIBResult DataObject for RewriteLVL1",
1591 if (
sc.isFailure()) {
delete l1addrLegacy; l1addrLegacy =
nullptr;}
1592 if (check(
"Conversion service failed to convert RoIBResult for RewriteLVL1",
1600 bool eventAccepted = !hltResult->getStreamTags().empty();
1602 int64_t eventTimeMillisec = std::chrono::duration_cast<std::chrono::milliseconds>(eventTime).count();
1606 if (
sc.isFailure()) {
delete addr; addr=
nullptr;}
1610 delete addr; addr =
nullptr;
1611 delete l1addr; l1addr =
nullptr;
1612 delete l1addrLegacy; l1addrLegacy =
nullptr;
1623 <<
" (event " << eventContext->evt() <<
") of the whiteboard");
1626 if (check(
"Whiteboard slot " + std::to_string(eventContext->slot()) +
" could not be properly cleared",
1631 ATH_MSG_DEBUG(
"Finished processing " << (eventAccepted ?
"accepted" :
"rejected")
1632 <<
" event with context " << *eventContext
1633 <<
" which took " << eventTimeMillisec <<
" ms");
1648 Gaudi::Hive::setCurrentContext( EventContext() );
1651 delete eventContext;
1652 eventContext =
nullptr;
1654 return StatusCode::SUCCESS;
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
EventID eventIDFromxAOD(const xAOD::EventInfo *xaod)
Create EventID object from xAOD::EventInfo.
char data[hepevt_bytes_allocation_ATLAS]
boost::property_tree::ptree ptree
Helper tool for COOL updates.
static thread_local std::ostringstream errmsg
void resetTimeout(Timeout &instance)
Reset timeout.
void setTimeout(Timeout &instance)
Set timeout.
static Timeout & instance()
Get reference to Timeout singleton.
This class provides a unique identification for each event, in terms of run/event number and/or a tim...
ToolHandle< TrigCOOLUpdateHelper > m_coolHelper
StatusCode clearTemporaryStores()
Clear per-event stores.
ServiceHandle< StoreGateSvc > m_inputMetaDataStore
SG::WriteHandleKey< EventContext > m_eventContextWHKey
ServiceHandle< IIoComponentMgr > m_ioCompMgr
Gaudi::Property< float > m_softTimeoutFraction
virtual StatusCode finalize() override
virtual StatusCode hltUpdateAfterFork(const boost::property_tree::ptree &pt) override
Gaudi::Property< int > m_maxParallelIOTasks
Gaudi::Property< int > m_maxIOWakeUpIntervalMs
StatusCode failedEvent(HLT::OnlineErrorCode errorCode, const EventContext &eventContext)
Handle a failure to process an event.
void printSORAttrList(const coral::AttributeList &atr) const
Print the SOR record.
tbb::concurrent_bounded_queue< bool > m_parallelIOQueue
Queue limiting the number of parallel I/O tasks.
std::unique_ptr< HLT::LoopThread > m_timeoutThread
Timeout thread.
SG::ReadHandleKey< xAOD::EventInfo > m_eventInfoRHKey
ServiceHandle< Gaudi::Interfaces::IOptionsSvc > m_jobOptionsSvc
StatusCode execAtStart(const EventContext &ctx) const
Execute optional algs/sequences.
std::unique_ptr< HLT::LoopThread > m_inputThread
Input handling thread (triggers reading new events)
std::atomic< size_t > m_localEventNumber
Event counter used for local bookkeeping; incremental per instance of HltEventLoopMgr,...
virtual StatusCode stop() override
tbb::concurrent_bounded_queue< EventContext * > m_finishedEventsQueue
Queue of events ready for output processing.
EventLoopStatus m_loopStatus
Object keeping track of the event loop status.
std::unique_ptr< HLT::LoopThread > m_outputThread
Output handling thread (triggers post-processing of finished events)
Gaudi::Property< std::vector< std::string > > m_execAtStart
StatusCode updateMagField(const boost::property_tree::ptree &pt) const
Set magnetic field currents from ptree.
tbb::task_group m_parallelIOTaskGroup
Task group to execute parallel I/O tasks asynchronously.
virtual StatusCode nextEvent(int maxevt=-1) override
Implementation of IEventProcessor::nextEvent which implements the event loop.
SG::ReadHandleKey< xAOD::TrigCompositeContainer > m_l1TriggerResultRHKey
void outputThreadCallback()
The method executed by the output handling thread.
StatusCode processFinishedEvent()
Perform all end-of-event actions for a single event popped out from the scheduler.
virtual StatusCode executeRun(int maxevt=-1) override
Implementation of IEventProcessor::executeRun which calls IEventProcessor::nextEvent.
Gaudi::Property< std::string > m_truncationDebugStreamName
ServiceHandle< StoreGateSvc > m_evtStore
virtual StatusCode initialize() override
Gaudi::Property< unsigned int > m_forceLumiblock
virtual EventContext createEventContext() override
create an Event Context object
std::vector< std::chrono::steady_clock::time_point > m_freeSlotStartPoint
Vector of time stamps telling when each scheduler slot was freed.
void inputThreadCallback()
ServiceHandle< IEvtSelector > m_evtSelector
std::string m_applicationName
Application name.
Gaudi::Property< unsigned int > m_timeoutThreadIntervalMs
SmartIF< IAlgResourcePool > m_algResourcePool
StatusCode clearWBSlot(size_t evtSlot) const
Clear an event slot in the whiteboard.
virtual StatusCode executeEvent(EventContext &&ctx) override
Implementation of IEventProcessor::executeEvent which processes a single event.
std::vector< bool > m_isSlotProcessing
Vector of flags to tell if a slot is idle or processing.
SG::ReadHandleKey< ROIB::RoIBResult > m_roibResultRHKey
IEvtSelector::Context * m_evtSelContext
Event selector context.
ServiceHandle< IIncidentSvc > m_incidentSvc
SmartIF< IHiveWhiteBoard > m_whiteboard
bool m_timeoutTraceGenerated
Flag set when a soft timeout produces a stack trace, to avoid producing multiple traces.
Gaudi::Property< std::string > m_timeoutDebugStreamName
virtual StatusCode stopRun() override
Implementation of IEventProcessor::stopRun (obsolete for online runnning)
Gaudi::Property< unsigned long long > m_forceSOR_ns
SmartIF< IScheduler > m_schedulerSvc
StatusCode startNextEvent()
Gaudi::Property< std::string > m_algErrorDebugStreamName
virtual StatusCode prepareForStart(const boost::property_tree::ptree &) override
std::chrono::milliseconds m_softTimeoutValue
Soft timeout value set to HardTimeout*SoftTimeoutFraction at initialisation.
SmartIF< IAlgExecStateSvc > m_aess
void updateDFProps()
Read DataFlow configuration properties.
Gaudi::Property< std::string > m_whiteboardName
std::vector< std::chrono::steady_clock::time_point > m_eventTimerStartPoint
Vector of event start-processing time stamps in each slot.
std::atomic< size_t > m_freeSlots
Number of free slots used to synchronise input/output tasks.
Gaudi::Property< int > m_maxFrameworkErrors
ServiceHandle< IConversionSvc > m_outputCnvSvc
EventContext m_currentRunCtx
"Event" context of current run with dummy event/slot number
void eventTimerCallback()
The method executed by the event timeout monitoring thread.
Gaudi::Property< std::string > m_fwkErrorDebugStreamName
Gaudi::Property< float > m_hardTimeout
ToolHandle< GenericMonitoringTool > m_monTool
std::atomic< int > m_nFrameworkErrors
Counter of framework errors.
HltEventLoopMgr(const std::string &name, ISvcLocator *svcLoc)
Standard constructor.
ToolHandle< HLTResultMTMaker > m_hltResultMaker
void updateMetadataStore(const coral::AttributeList &sor_attrlist) const
ToolHandle< ITrigErrorMonTool > m_errorMonTool
Gaudi::Property< unsigned int > m_forceRunNumber
Gaudi::Property< std::string > m_sorPath
int m_workerPID
Worker PID.
ServiceHandle< ISchedulerMonSvc > m_schedulerMonSvc
Gaudi::Property< bool > m_monitorScheduler
Gaudi::Property< bool > m_traceOnTimeout
Gaudi::Property< std::string > m_schedulerName
const coral::AttributeList & getSorAttrList() const
Extract the single attr list off the SOR CondAttrListCollection.
std::unique_ptr< TrigSORFromPtreeHelper > m_sorHelper
void resetEventTimer(const EventContext &eventContext, bool processing)
Reset the timeout flag and the timer, and mark the slot as busy or idle according to the second argum...
Gaudi::Property< bool > m_setMagFieldFromPtree
SG::ReadHandleKey< HLT::HLTResultMT > m_hltResultRHKey
StoreGate key for reading the HLT result.
virtual ~HltEventLoopMgr() noexcept override
Standard destructor.
Gaudi::Property< bool > m_rewriteLVL1
ServiceHandle< StoreGateSvc > m_detectorStore
Group of local monitoring quantities and retain correlation when filling histograms
Declare a monitored scalar variable.
static void setNumProcs(size_t numProcs)
Set number of concurrent processes.
StatusCode initialize(bool used=true)
If this object is used as a property, then this should be called during the initialize phase.
Property holding a SG store/key/clid from which a WriteHandle is made.
static StatusCode closeDBConnections(MsgStream &msg)
Close database connections.
CondAttrListCollection SOR
Thrown if the CTP ROBFragment for a new event has non-zero status word or other errors.
Thrown if the CTP ROBFragment cannot be retrieved for a new event.
Thrown if the event source cannot provide new events temporarily, e.g.
Thrown if all events are already read from the input and another one is requested.
int count(std::string s, const std::string ®x)
count how many occurances of a regx are in a string
std::string toString(const Translation3D &translation, int precision=4)
GeoPrimitvesToStringConverter.
@ TIMEOUT
Timeout during event processing.
constexpr bool isEventProcessingErrorCode(const OnlineErrorCode code)
AthROOTErrorHandlerSvc * svc
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
histsvc
TrigInDetMonitoring part ################################.