5 #include <GaudiKernel/DataIncident.h>
8 #define ATHENASERVICES_ATHENAHIVEEVENTLOOPMGR_CPP
28 #include "GaudiKernel/IAlgorithm.h"
29 #include "GaudiKernel/SmartIF.h"
30 #include "GaudiKernel/Incident.h"
31 #include "GaudiKernel/DataObject.h"
32 #include "GaudiKernel/IIncidentSvc.h"
33 #include "GaudiKernel/IEvtSelector.h"
34 #include "GaudiKernel/IDataManagerSvc.h"
35 #include "GaudiKernel/IDataProviderSvc.h"
36 #include "GaudiKernel/IConversionSvc.h"
37 #include "GaudiKernel/GaudiException.h"
38 #include "GaudiKernel/AppReturnCode.h"
39 #include "GaudiKernel/MsgStream.h"
40 #include "Gaudi/Property.h"
41 #include "GaudiKernel/EventIDBase.h"
42 #include "GaudiKernel/ThreadLocalContext.h"
46 #include "EventInfo/EventInfo.h"
54 #include <GaudiKernel/IScheduler.h>
56 #include "tbb/tick_count.h"
65 : base_class(nam, svcLoc),
67 m_incidentSvc (
"IncidentSvc", nam ),
68 m_eventStore(
"StoreGateSvc", nam ),
69 m_evtSelector(0), m_evtContext(0),
70 m_histoDataMgrSvc(
"HistogramDataSvc", nam ),
71 m_histoPersSvc (
"HistogramPersistencySvc", nam ),
72 m_evtIdModSvc (
"", nam ),
73 m_currentRun(0), m_firstRun(true), m_tools(this), m_nevt(0), m_writeHists(false),
74 m_nev(0), m_proc(0), m_useTools(false),m_doEvtHeartbeat(false),
75 m_conditionsCleaner(
"Athena::ConditionsCleanerSvc", nam )
78 "Name of Event Selector to use. If empty string (default) "
79 "take value from ApplicationMgr");
81 "Histogram persistency technology to use: ROOT, HBOOK, NONE. "
82 "By default (empty string) get property value from "
85 "histogram write/update interval");
87 "Controls behaviour of event loop depending on return code of"
88 " Algorithms. 0: all non-SUCCESSes terminate job. "
89 "1: RECOVERABLE skips to next event, FAILURE terminates job "
90 "(DEFAULT). 2: RECOVERABLE and FAILURE skip to next events");
92 "Print event heartbeat printouts every m_eventPrintoutInterval events");
93 declareProperty(
"PreSelectTools",
m_tools,
"AlgTools for event pre-selection")->
97 "Name of the scheduler to be used");
100 "Name of the Whiteboard to be used");
105 "ServiceHandle for EvtIdModifierSvc");
107 declareProperty(
"FakeLumiBlockInterval",
m_flmbi = 0,
108 "Event interval at which to increment lumiBlock# when "
109 "creating events without an EventSelector. Zero means "
110 "don't increment it");
112 "timestamp interval between events when creating Events "
113 "without an EventSelector");
115 "Require valid input attribute list to be present");
117 "In case of DoubleEventSelector use event number from secondary input");
120 "process all of first event before scheduling any more");
143 if ( !
sc.isSuccess() )
145 ATH_MSG_ERROR (
"Failed to initialize base class MinimalEventLoopMgr" );
156 return StatusCode::FAILURE;
161 ATH_MSG_FATAL (
"Error retrieving SchedulerSvc interface ISchedulerSvc." );
162 return StatusCode::FAILURE;
168 return StatusCode::FAILURE;
171 m_aess = serviceLocator()->service(
"AlgExecStateSvc");
174 return StatusCode::FAILURE;
178 if( !
sc.isSuccess() )
180 ATH_MSG_FATAL (
"Error retrieving pointer to StoreGateSvc" );
189 if( !
sc.isSuccess() )
198 SmartIF<IProperty> prpMgr(serviceLocator());
199 if ( !prpMgr.isValid() )
201 ATH_MSG_FATAL (
"IProperty interface not found in ApplicationMgr." );
202 return StatusCode::FAILURE;
210 if( !
sc.isSuccess() )
217 if ( histPersName.length() == 0 )
219 CHECK(setProperty(prpMgr->getProperty(
"HistogramPersistency")));
222 if ( histPersName !=
"NONE" ) {
228 if (histPersName ==
"ROOT") {
229 histSvc = serviceLocator()->service(
"RootHistSvc");
230 }
else if ( histPersName ==
"HBOOK" ) {
231 histSvc = serviceLocator()->service(
"HbookHistSvc");
235 ATH_MSG_ERROR (
"could not locate actual Histogram persistency service" );
237 const Gaudi::Details::PropertyBase &prop =
histSvc->getProperty(
"OutputFile");
240 const StringProperty &sprop =
dynamic_cast<const StringProperty&
>( prop );
243 ATH_MSG_VERBOSE (
"could not dcast OutputFile property to a StringProperty."
244 <<
" Need to fix Gaudi." );
245 val = prop.toString();
249 val !=
"UndefinedROOTOutputFileName" &&
250 val !=
"UndefinedHbookOutputFileName" ) {
263 ATH_MSG_DEBUG (
"EventID modifier Service not set. No run number, ... overrides will be applied." );
266 ATH_MSG_INFO (
"Could not find EventID modifier Service. No run number, ... overrides will be applied." );
275 sc = setProperty(prpMgr->getProperty(
"EvtSel"));
280 SmartIF<IEvtSelector> theEvtSel{serviceLocator()->service(
selName )};
287 ATH_MSG_FATAL (
"Can not create the event selector Context." );
288 return StatusCode::FAILURE;
290 if (msgLevel(MSG::INFO)) {
291 SmartIF<INamedInterface> named(theEvtSel);
293 ATH_MSG_INFO (
"Setup EventSelector service " << named->name( ) );
296 }
else if (
sc.isFailure()) {
298 return StatusCode::FAILURE;
342 unsigned int toolCtr = 0;
343 for ( ; firstTool != lastTool; ++firstTool )
397 unsigned int toolCtr = 0;
398 ATH_MSG_INFO (
"Summary of AthenaEvtLoopPreSelectTool invocation: (invoked/success/failure)" );
399 ATH_MSG_INFO (
"-----------------------------------------------------" );
401 for ( ; firstTool != lastTool; ++firstTool ) {
402 ATH_MSG_INFO ( std::setw(2) << std::setiosflags(std::ios_base::right)
403 << toolCtr+1 <<
".) " << std::resetiosflags(std::ios_base::right)
404 << std::setw(48) << std::setfill(
'.')
405 << std::setiosflags(std::ios_base::left)
406 << (*firstTool)->name() << std::resetiosflags(std::ios_base::left)
409 << std::setw(6) << std::setiosflags(std::ios_base::right)
420 return (
sc.isFailure() || sc2.isFailure() ) ? StatusCode::FAILURE :
433 std::vector<DataObject*>
objects;
435 DataObject*
obj = reg->object();
436 if ( !
obj ||
obj->clID() == CLID_StatisticsFile )
return false;
441 if ( !
sc.isSuccess() ) {
442 ATH_MSG_ERROR (
"Error while traversing Histogram data store" );
450 (writeInterval != 0 &&
m_nevt%writeInterval == 0) ) {
454 IOpaqueAddress* pAddr =
nullptr;
456 if ( iret.isFailure() )
return iret;
457 i->registry()->setAddress( pAddr );
461 IRegistry* reg =
i->registry();
463 return iret.isFailure() ? iret : isc;
465 if ( !
sc.isSuccess() ) {
470 if (
force || (writeInterval != 0 &&
m_nevt%writeInterval == 0) ) {
486 return StatusCode::SUCCESS;
494 return StatusCode::SUCCESS;
506 ATH_MSG_ALWAYS (
"A stopRun was requested by an incidentListener. "
507 <<
"Do not process this event." );
509 return (StatusCode::SUCCESS);
515 Gaudi::Hive::setCurrentContext ( ctx );
518 if (declEvtRootSc == 0 ) {
520 return StatusCode::SUCCESS;
521 }
else if ( declEvtRootSc == -1) {
522 ATH_MSG_ERROR (
"declareEventRootAddress for context " << ctx <<
" failed" );
523 return StatusCode::FAILURE;
527 unsigned int conditionsRun = ctx.eventID().run_number();
530 if (
eventStore()->contains<AthenaAttributeList> (
"Input") &&
532 if (attr->exists (
"ConditionsRun")) {
533 conditionsRun = (*attr)[
"ConditionsRun"].data<
unsigned int>();
538 Gaudi::Hive::setCurrentContext ( ctx );
541 if (
eventStore()->record(std::make_unique<EventContext> (ctx),
542 "EventContext").isFailure())
545 return (StatusCode::FAILURE);
565 bool toolsPassed=
true;
570 unsigned int toolCtr=0;
574 while(toolsPassed && theTool!=lastTool )
576 toolsPassed = (*theTool)->passEvent(ctx.eventID());
589 <<
" on slot " << ctx.slot() <<
", " <<
m_proc
590 <<
" events processed so far <<<===" );
594 <<
" on slot " << ctx.slot() <<
", "
596 <<
" events processed so far <<<===" );
611 <<
", slot " << ctx.slot()
612 <<
" to the scheduler" );
619 if (!addEventStatus.isSuccess()){
620 ATH_MSG_FATAL (
"An event processing slot should be now free in the scheduler, but it appears not to be the case." );
630 Gaudi::Hive::setCurrentContext( EventContext() );
632 return StatusCode::SUCCESS;
643 bool eventfailed =
false;
651 return StatusCode::FAILURE;
654 return StatusCode::SUCCESS;
661 SmartIF<IProperty> appmgr(serviceLocator());
662 if(Gaudi::setAppReturnCode(appmgr, Gaudi::ReturnCode::ScheduledStop).isFailure()) {
663 ATH_MSG_ERROR (
"Could not set return code of the application ("
664 << Gaudi::ReturnCode::ScheduledStop <<
")" );
667 return StatusCode::SUCCESS;
688 for (
size_t islot = 0; islot < nslot; islot++) {
692 Gaudi::Hive::setCurrentContext( EventContext() );
703 if (0 == maxevt)
return StatusCode::SUCCESS;
706 Gaudi::setAppReturnCode(
m_appMgrProperty, Gaudi::ReturnCode::Success,
true).ignore();
717 bool loop_ended=
false;
724 auto secsFromStart = [&start_time]()->
double{
728 while ( !loop_ended and ( (maxevt < 0) or (finishedEvts < maxevt) ) ){
733 (newEvtAllowed || createdEvts == 0) &&
734 ( (createdEvts < maxevt) or (maxevt<0) ) &&
741 if ( !ctx.valid() ) {
742 sc = StatusCode::FAILURE;
747 if (
sc.isFailure()) {
748 ATH_MSG_ERROR (
"Terminating event processing loop due to errors" );
767 sc = StatusCode::FAILURE;
768 }
else if (
ir == 0) {
771 sc = StatusCode::SUCCESS;
775 newEvtAllowed =
true;
780 ATH_MSG_INFO (
"---> Loop Finished (seconds): " << secsFromStart() );
797 ATH_MSG_ERROR (
"Seek failed; unsupported by event selector" );
798 return StatusCode::FAILURE;
803 ATH_MSG_FATAL (
"Can not create the event selector Context." );
804 return StatusCode::FAILURE;
810 if (
sc.isSuccess()) {
811 m_incidentSvc->fireIncident(ContextIncident<std::tuple<int, int>>(
837 ATH_MSG_ERROR (
"Collection size unsupported by event selector" );
843 ATH_MSG_FATAL (
"Can not create the event selector Context." );
858 if(inc.type() ==
"EndAlgorithms") {
863 if( !
sc.isSuccess() ) {
869 if(inc.type()!=
"BeforeFork")
873 ATH_MSG_WARNING (
"Skipping BeforeFork handler. Either no event selector is provided or begin run has already passed" );
885 if(!
sc.isSuccess()) {
889 IOpaqueAddress* addr{
nullptr};
891 if (
sc.isFailure()) {
898 if(!
sc.isSuccess()) {
904 if(
eventStore()->loadEventProxies().isFailure()) {
912 if(!
sc.isSuccess()) {
922 if(!
sc.isSuccess()) {
933 if ( !
sc.isSuccess() ) {
938 if( !
sc.isSuccess() ) {
940 if (
sc.isSuccess() ) {
942 if ( !
sc.isSuccess() ) {
964 std::unique_ptr<const EventInfo> pEvent{};
971 IOpaqueAddress* addr{};
975 if ( !
sc.isSuccess() ) {
991 if( !
sc.isSuccess() ) {
996 }
if ((
sc=
eventStore()->loadEventProxies()).isFailure()) {
1001 bool consume_modifier_stream =
false;
1005 if ( pAttrList !=
nullptr && pAttrList->size() > 6 ) {
1007 unsigned int runNumber = (*pAttrList)[
"RunNumber"].data<
unsigned int>();
1008 unsigned long long eventNumber = (*pAttrList)[
"EventNumber"].data<
unsigned long long>();
1009 unsigned int eventTime = (*pAttrList)[
"EventTime"].data<
unsigned int>();
1010 unsigned int eventTimeNS = (*pAttrList)[
"EventTimeNanoSec"].data<
unsigned int>();
1011 unsigned int lumiBlock = (*pAttrList)[
"LumiBlockN"].data<
unsigned int>();
1012 unsigned int bunchId = (*pAttrList)[
"BunchId"].data<
unsigned int>();
1015 consume_modifier_stream =
true;
1018 unsigned long long eventNumberSecondary{};
1019 if ( !(pAttrList->exists(
"hasSecondaryInput") && (*pAttrList)[
"hasSecondaryInput"].data<
bool>()) ) {
1020 ATH_MSG_FATAL (
"Secondary EventNumber requested, but secondary input does not exist!" );
1023 if ( pAttrList->exists(
"EventNumber_secondary") ) {
1024 eventNumberSecondary = (*pAttrList)[
"EventNumber_secondary"].data<
unsigned long long>();
1030 if (pEventSecondary) {
1031 eventNumberSecondary = pEventSecondary->
event_ID()->event_number();
1034 ATH_MSG_FATAL (
"Secondary EventNumber requested, but it does not exist!" );
1038 if (eventNumberSecondary != 0) {
1042 ATH_MSG_INFO (
" ===>>> using secondary event #" << eventNumberSecondary <<
" instead of #" <<
eventNumber <<
" <<<===" );
1048 pEvent = std::make_unique<EventInfo>(
1055 ATH_MSG_FATAL (
"Valid input attribute list required but not present!" );
1059 const EventInfo* pEventObserver{pEvent.get()};
1060 if (!pEventObserver) {
1065 if (pEventObserver) {
1066 consume_modifier_stream =
false;
1073 if( !
sc.isSuccess() ) {
1077 consume_modifier_stream =
true;
1078 ATH_MSG_DEBUG (
"use xAOD::EventInfo with runNumber=" << pXEvent->runNumber() );
1080 pEvent = std::make_unique<EventInfo>(
1083 pEventObserver = pEvent.get();
1085 if( !
sc.isSuccess() ) {
1093 consume_modifier_stream);
1100 unsigned int runNmb{1}, evtNmb{
m_nevt + 1};
1107 auto eid = std::make_unique<EventID> (runNmb,evtNmb,
m_timeStamp);
1109 eid->set_lumi_block( runNmb );
1113 pEvent = std::make_unique<EventInfo>(std::move(eid),
1114 std::make_unique<EventType>());
1116 bool consume_modifier_stream =
true;
1126 ATH_MSG_DEBUG (
"recording EventInfo " << *pEvent->event_ID() <<
" in "
1130 if( !
sc.isSuccess() ) {
1147 m_evtIdModSvc->modify_evtid(new_eID, ctx.evt(), consume_modifier_stream);
1149 unsigned int oldrunnr=eID.run_number();
1150 unsigned int oldLB=eID.lumi_block();
1151 unsigned int oldTS=eID.time_stamp();
1152 unsigned int oldTSno=eID.time_stamp_ns_offset();
1153 ATH_MSG_DEBUG (
"modifyEventContext: use evtIdModSvc runnr=" << oldrunnr <<
" -> " << new_eID.run_number() );
1154 ATH_MSG_DEBUG (
"modifyEventContext: use evtIdModSvc LB=" << oldLB <<
" -> " << new_eID.lumi_block() );
1155 ATH_MSG_DEBUG (
"modifyEventContext: use evtIdModSvc TimeStamp=" << oldTS <<
" -> " << new_eID.time_stamp() );
1156 ATH_MSG_DEBUG (
"modifyEventContext: use evtIdModSvc TimeStamp ns Offset=" << oldTSno <<
" -> " << new_eID.time_stamp_ns_offset() );
1158 ctx.setEventID( new_eID );
1163 ctx.setEventID( eID );
1172 if (
sc.isFailure()) {
1174 <<
" could not be selected for the WhiteBoard" );
1175 return EventContext{};
1180 ATH_MSG_DEBUG (
"created EventContext, num: " << ctx.evt() <<
" in slot: "
1195 std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
1197 EventContext* finishedEvtContext(
nullptr);
1200 ATH_MSG_DEBUG (
"drainScheduler: [" << finishedEvts <<
"] Waiting for a context" );
1204 if (
sc.isSuccess()){
1205 ATH_MSG_DEBUG (
"drainScheduler: scheduler not empty: Context "
1206 << finishedEvtContext );
1207 finishedEvtContexts.emplace_back(finishedEvtContext);
1215 while (
m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()){
1216 finishedEvtContexts.emplace_back(finishedEvtContext);
1221 for (
auto& thisFinishedEvtContext : finishedEvtContexts){
1222 if (!thisFinishedEvtContext) {
1223 ATH_MSG_FATAL (
"Detected nullptr ctxt while clearing WB!");
1228 if (
m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
1229 ATH_MSG_FATAL (
"Failed event detected on " << thisFinishedEvtContext
1230 <<
" w/ fail mode: "
1231 <<
m_aess->eventStatus(*thisFinishedEvtContext) );
1239 if (
m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
1240 n_run = thisFinishedEvtContext->eventID().run_number();
1241 n_evt = thisFinishedEvtContext->eventID().event_number();
1244 << thisFinishedEvtContext->slot() );
1253 Gaudi::Hive::setCurrentContext( *thisFinishedEvtContext );
1254 m_incidentSvc->fireIncident(Incident(
name(), IncidentType::EndProcessing, *thisFinishedEvtContext ));
1256 ATH_MSG_DEBUG (
"Clearing slot " << thisFinishedEvtContext->slot()
1257 <<
" (event " << thisFinishedEvtContext->evt()
1258 <<
") of the whiteboard" );
1261 if (!
sc.isSuccess()) {
1262 ATH_MSG_ERROR (
"Whiteboard slot " << thisFinishedEvtContext->slot()
1263 <<
" could not be properly cleared" );
1275 ATH_MSG_INFO (
" ===>>> done processing event #" << n_evt <<
", run #" << n_run
1276 <<
" on slot " << thisFinishedEvtContext->slot() <<
", "
1277 <<
m_proc <<
" events processed so far <<<===" );
1279 ATH_MSG_INFO (
" ===>>> done processing event #" << n_evt <<
", run #" << n_run
1280 <<
" on slot " << thisFinishedEvtContext->slot() <<
", "
1282 <<
" events processed so far <<<===" );
1283 std::ofstream
outfile(
"eventLoopHeartBeat.txt");
1285 ATH_MSG_ERROR (
" unable to open: eventLoopHeartBeat.txt" );
1289 outfile <<
" done processing event #" << n_evt <<
", run #" << n_run
1290 <<
" " <<
m_nev <<
" events read so far <<<===" << std::endl;
1295 ATH_MSG_DEBUG (
"drainScheduler thisFinishedEvtContext: " << thisFinishedEvtContext );
1298 return (
fail ? -1 : 1 );