5 #include <GaudiKernel/DataIncident.h>
6 #define ATHENASERVICES_ATHENAHIVEEVENTLOOPMGR_CPP
25 #include "GaudiKernel/IAlgorithm.h"
26 #include "GaudiKernel/SmartIF.h"
27 #include "GaudiKernel/Incident.h"
28 #include "GaudiKernel/DataObject.h"
29 #include "GaudiKernel/IIncidentSvc.h"
30 #include "GaudiKernel/IEvtSelector.h"
31 #include "GaudiKernel/IDataManagerSvc.h"
32 #include "GaudiKernel/IDataProviderSvc.h"
33 #include "GaudiKernel/IConversionSvc.h"
34 #include "GaudiKernel/GaudiException.h"
35 #include "GaudiKernel/AppReturnCode.h"
36 #include "GaudiKernel/MsgStream.h"
37 #include "Gaudi/Property.h"
38 #include "GaudiKernel/EventIDBase.h"
39 #include "GaudiKernel/ThreadLocalContext.h"
43 #include "EventInfo/EventInfo.h"
51 #include <GaudiKernel/IScheduler.h>
53 #include "tbb/tick_count.h"
62 : MinimalEventLoopMgr(nam, svcLoc),
64 m_incidentSvc (
"IncidentSvc", nam ),
65 m_eventStore(
"StoreGateSvc", nam ),
66 m_evtSelector(0), m_evtContext(0),
67 m_histoDataMgrSvc(
"HistogramDataSvc", nam ),
68 m_histoPersSvc (
"HistogramPersistencySvc", nam ),
69 m_evtIdModSvc (
"", nam ),
70 m_currentRun(0), m_firstRun(true), m_tools(this), m_nevt(0), m_writeHists(false),
71 m_nev(0), m_proc(0), m_useTools(false),m_doEvtHeartbeat(false),
72 m_conditionsCleaner(
"Athena::ConditionsCleanerSvc", nam )
75 "Name of Event Selector to use. If empty string (default) "
76 "take value from ApplicationMgr");
78 "Histogram persistency technology to use: ROOT, HBOOK, NONE. "
79 "By default (empty string) get property value from "
82 "histogram write/update interval");
84 "Controls behaviour of event loop depending on return code of"
85 " Algorithms. 0: all non-SUCCESSes terminate job. "
86 "1: RECOVERABLE skips to next event, FAILURE terminates job "
87 "(DEFAULT). 2: RECOVERABLE and FAILURE skip to next events");
89 "Print event heartbeat printouts every m_eventPrintoutInterval events");
94 "Name of the scheduler to be used");
97 "Name of the Whiteboard to be used");
102 "ServiceHandle for EvtIdModifierSvc");
105 "Event interval at which to increment lumiBlock# when "
106 "creating events without an EventSelector. Zero means "
107 "don't increment it");
109 "timestamp interval between events when creating Events "
110 "without an EventSelector");
112 "Require valid input attribute list to be present");
114 "In case of DoubleEventSelector use event number from secondary input");
117 "process all of first event before scheduling any more");
140 if ( !
sc.isSuccess() )
142 ATH_MSG_ERROR (
"Failed to initialize base class MinimalEventLoopMgr" );
153 return StatusCode::FAILURE;
158 ATH_MSG_FATAL (
"Error retrieving SchedulerSvc interface ISchedulerSvc." );
159 return StatusCode::FAILURE;
165 return StatusCode::FAILURE;
168 m_aess = serviceLocator()->service(
"AlgExecStateSvc");
171 return StatusCode::FAILURE;
175 if( !
sc.isSuccess() )
177 ATH_MSG_FATAL (
"Error retrieving pointer to StoreGateSvc" );
186 if( !
sc.isSuccess() )
195 SmartIF<IProperty> prpMgr(serviceLocator());
196 if ( !prpMgr.isValid() )
198 ATH_MSG_FATAL (
"IProperty interface not found in ApplicationMgr." );
199 return StatusCode::FAILURE;
207 if( !
sc.isSuccess() )
214 if ( histPersName.length() == 0 )
216 CHECK(setProperty(prpMgr->getProperty(
"HistogramPersistency")));
219 if ( histPersName !=
"NONE" ) {
225 if (histPersName ==
"ROOT") {
226 sc = serviceLocator()->service(
"RootHistSvc", is);
227 }
else if ( histPersName ==
"HBOOK" ) {
228 sc = serviceLocator()->service(
"HbookHistSvc", is);
231 if (
sc.isFailure()) {
232 ATH_MSG_ERROR (
"could not locate actual Histogram persistency service" );
236 ATH_MSG_ERROR (
"Could not dcast HistPersSvc to a Service" );
238 const Gaudi::Details::PropertyBase &prop =
s->getProperty(
"OutputFile");
241 const StringProperty &sprop =
dynamic_cast<const StringProperty&
>( prop );
244 ATH_MSG_VERBOSE (
"could not dcast OutputFile property to a StringProperty."
245 <<
" Need to fix Gaudi." );
246 val = prop.toString();
250 val !=
"UndefinedROOTOutputFileName" &&
251 val !=
"UndefinedHbookOutputFileName" ) {
265 ATH_MSG_DEBUG (
"EventID modifier Service not set. No run number, ... overrides will be applied." );
268 ATH_MSG_INFO (
"Could not find EventID modifier Service. No run number, ... overrides will be applied." );
277 sc = setProperty(prpMgr->getProperty(
"EvtSel"));
282 IEvtSelector* theEvtSel(0);
290 ATH_MSG_FATAL (
"Can not create the event selector Context." );
291 return StatusCode::FAILURE;
293 if (msgLevel(MSG::INFO)) {
294 INamedInterface* named (
dynamic_cast< INamedInterface*
>(theEvtSel));
296 ATH_MSG_INFO (
"Setup EventSelector service " << named->name( ) );
299 }
else if (
sc.isFailure()) {
301 return StatusCode::FAILURE;
345 unsigned int toolCtr = 0;
346 for ( ; firstTool != lastTool; ++firstTool )
400 unsigned int toolCtr = 0;
401 ATH_MSG_INFO (
"Summary of AthenaEvtLoopPreSelectTool invocation: (invoked/success/failure)" );
402 ATH_MSG_INFO (
"-----------------------------------------------------" );
404 for ( ; firstTool != lastTool; ++firstTool ) {
405 ATH_MSG_INFO ( std::setw(2) << std::setiosflags(std::ios_base::right)
406 << toolCtr+1 <<
".) " << std::resetiosflags(std::ios_base::right)
407 << std::setw(48) << std::setfill(
'.')
408 << std::setiosflags(std::ios_base::left)
409 << (*firstTool)->name() << std::resetiosflags(std::ios_base::left)
412 << std::setw(6) << std::setiosflags(std::ios_base::right)
423 return (
sc.isFailure() || sc2.isFailure() ) ? StatusCode::FAILURE :
436 std::vector<DataObject*>
objects;
438 DataObject*
obj =
reg->object();
439 if ( !
obj ||
obj->clID() == CLID_StatisticsFile )
return false;
444 if ( !
sc.isSuccess() ) {
445 ATH_MSG_ERROR (
"Error while traversing Histogram data store" );
453 (writeInterval != 0 &&
m_nevt%writeInterval == 0) ) {
457 IOpaqueAddress* pAddr =
nullptr;
459 if ( iret.isFailure() )
return iret;
460 i->registry()->setAddress( pAddr );
464 IRegistry*
reg =
i->registry();
466 return iret.isFailure() ? iret : isc;
468 if ( !
sc.isSuccess() ) {
473 if (
force || (writeInterval != 0 &&
m_nevt%writeInterval == 0) ) {
489 return StatusCode::SUCCESS;
497 return StatusCode::SUCCESS;
509 ATH_MSG_ALWAYS (
"A stopRun was requested by an incidentListener. "
510 <<
"Do not process this event." );
512 return (StatusCode::SUCCESS);
518 Gaudi::Hive::setCurrentContext ( ctx );
521 if (declEvtRootSc == 0 ) {
523 return StatusCode::SUCCESS;
524 }
else if ( declEvtRootSc == -1) {
525 ATH_MSG_ERROR (
"declareEventRootAddress for context " << ctx <<
" failed" );
526 return StatusCode::FAILURE;
530 unsigned int conditionsRun = ctx.eventID().run_number();
533 if (
eventStore()->contains<AthenaAttributeList> (
"Input") &&
535 if (attr->exists (
"ConditionsRun")) {
536 conditionsRun = (*attr)[
"ConditionsRun"].data<
unsigned int>();
541 Gaudi::Hive::setCurrentContext ( ctx );
544 if (
eventStore()->record(std::make_unique<EventContext> (ctx),
545 "EventContext").isFailure())
548 return (StatusCode::FAILURE);
568 bool toolsPassed=
true;
573 unsigned int toolCtr=0;
577 while(toolsPassed && theTool!=lastTool )
579 toolsPassed = (*theTool)->passEvent(ctx.eventID());
592 <<
" on slot " << ctx.slot() <<
", " <<
m_proc
593 <<
" events processed so far <<<===" );
597 <<
" on slot " << ctx.slot() <<
", "
599 <<
" events processed so far <<<===" );
614 <<
", slot " << ctx.slot()
615 <<
" to the scheduler" );
622 if (!addEventStatus.isSuccess()){
623 ATH_MSG_FATAL (
"An event processing slot should be now free in the scheduler, but it appears not to be the case." );
633 Gaudi::Hive::setCurrentContext( EventContext() );
635 return StatusCode::SUCCESS;
646 bool eventfailed =
false;
654 return StatusCode::FAILURE;
657 return StatusCode::SUCCESS;
664 SmartIF<IProperty> appmgr(serviceLocator());
665 if(Gaudi::setAppReturnCode(appmgr, Gaudi::ReturnCode::ScheduledStop).isFailure()) {
666 ATH_MSG_ERROR (
"Could not set return code of the application ("
667 << Gaudi::ReturnCode::ScheduledStop <<
")" );
670 return StatusCode::SUCCESS;
691 for (
size_t islot = 0; islot < nslot; islot++) {
695 Gaudi::Hive::setCurrentContext( EventContext() );
706 if (0 == maxevt)
return StatusCode::SUCCESS;
709 Gaudi::setAppReturnCode(
m_appMgrProperty, Gaudi::ReturnCode::Success,
true).ignore();
720 bool loop_ended=
false;
727 auto secsFromStart = [&start_time]()->
double{
731 while ( !loop_ended and ( (maxevt < 0) or (finishedEvts < maxevt) ) ){
736 (newEvtAllowed || createdEvts == 0) &&
737 ( (createdEvts < maxevt) or (maxevt<0) ) &&
744 if ( !ctx.valid() ) {
745 sc = StatusCode::FAILURE;
750 if (
sc.isFailure()) {
751 ATH_MSG_ERROR (
"Terminating event processing loop due to errors" );
770 sc = StatusCode::FAILURE;
771 }
else if (
ir == 0) {
774 sc = StatusCode::SUCCESS;
778 newEvtAllowed =
true;
783 ATH_MSG_INFO (
"---> Loop Finished (seconds): " << secsFromStart() );
800 ATH_MSG_ERROR (
"Seek failed; unsupported by event selector" );
801 return StatusCode::FAILURE;
806 ATH_MSG_FATAL (
"Can not create the event selector Context." );
807 return StatusCode::FAILURE;
813 if (
sc.isSuccess()) {
814 m_incidentSvc->fireIncident(ContextIncident<std::tuple<int, int>>(
840 ATH_MSG_ERROR (
"Collection size unsupported by event selector" );
846 ATH_MSG_FATAL (
"Can not create the event selector Context." );
861 if(inc.type() ==
"EndAlgorithms") {
866 if( !
sc.isSuccess() ) {
872 if(inc.type()!=
"BeforeFork")
876 ATH_MSG_WARNING (
"Skipping BeforeFork handler. Either no event selector is provided or begin run has already passed" );
888 if(!
sc.isSuccess()) {
892 IOpaqueAddress* addr{
nullptr};
894 if (
sc.isFailure()) {
901 if(!
sc.isSuccess()) {
907 if(
eventStore()->loadEventProxies().isFailure()) {
915 if(!
sc.isSuccess()) {
925 if(!
sc.isSuccess()) {
939 if ( IEventSeek::interfaceID().versionMatch(riid) ) {
940 *ppvInterface =
dynamic_cast<IEventSeek*
>(
this);
942 else if ( IEventProcessor::interfaceID().versionMatch(riid) ) {
943 *ppvInterface =
dynamic_cast<IEventProcessor*
>(
this);
949 return MinimalEventLoopMgr::queryInterface(riid, ppvInterface);
952 return StatusCode::SUCCESS;
961 if ( !
sc.isSuccess() ) {
966 if( !
sc.isSuccess() ) {
968 if (
sc.isSuccess() ) {
970 if ( !
sc.isSuccess() ) {
995 std::unique_ptr<EventInfo> pEventPtr;
1000 IOpaqueAddress* addr{};
1004 if ( !
sc.isSuccess() ) {
1020 if( !
sc.isSuccess() ) {
1025 }
if ((
sc=
eventStore()->loadEventProxies()).isFailure()) {
1030 bool consume_modifier_stream =
false;
1034 if ( pAttrList !=
nullptr && pAttrList->size() > 6 ) {
1036 unsigned int runNumber = (*pAttrList)[
"RunNumber"].data<
unsigned int>();
1037 unsigned long long eventNumber = (*pAttrList)[
"EventNumber"].data<
unsigned long long>();
1038 unsigned int eventTime = (*pAttrList)[
"EventTime"].data<
unsigned int>();
1039 unsigned int eventTimeNS = (*pAttrList)[
"EventTimeNanoSec"].data<
unsigned int>();
1040 unsigned int lumiBlock = (*pAttrList)[
"LumiBlockN"].data<
unsigned int>();
1041 unsigned int bunchId = (*pAttrList)[
"BunchId"].data<
unsigned int>();
1044 consume_modifier_stream =
true;
1047 unsigned long long eventNumberSecondary{};
1048 if ( !(pAttrList->exists(
"hasSecondaryInput") && (*pAttrList)[
"hasSecondaryInput"].data<
bool>()) ) {
1049 ATH_MSG_FATAL (
"Secondary EventNumber requested, but secondary input does not exist!" );
1052 if ( pAttrList->exists(
"EventNumber_secondary") ) {
1053 eventNumberSecondary = (*pAttrList)[
"EventNumber_secondary"].data<
unsigned long long>();
1059 if (pEventSecondary) {
1060 eventNumberSecondary = pEventSecondary->
event_ID()->event_number();
1063 ATH_MSG_FATAL (
"Secondary EventNumber requested, but it does not exist!" );
1067 if (eventNumberSecondary != 0) {
1071 ATH_MSG_INFO (
" ===>>> using secondary event #" << eventNumberSecondary <<
" instead of #" <<
eventNumber <<
" <<<===" );
1077 pEventPtr = std::make_unique<EventInfo>
1079 pEvent = pEventPtr.get();
1083 ATH_MSG_FATAL (
"Valid input attribute list required but not present!" );
1093 consume_modifier_stream =
false;
1101 if( !
sc.isSuccess() ) {
1105 consume_modifier_stream =
true;
1106 ATH_MSG_DEBUG (
"use xAOD::EventInfo with runNumber=" << pXEvent->runNumber() );
1110 pEvent = pEventPtr.get();
1112 if( !
sc.isSuccess() ) {
1126 unsigned int runNmb{1}, evtNmb{
m_nevt + 1};
1133 auto eid = std::make_unique<EventID> (runNmb,evtNmb,
m_timeStamp);
1135 eid->set_lumi_block( runNmb );
1141 bool consume_modifier_stream =
true;
1151 ATH_MSG_DEBUG (
"recording EventInfo " << *pEvent->event_ID() <<
" in "
1155 if( !
sc.isSuccess() ) {
1172 m_evtIdModSvc->modify_evtid(new_eID, ctx.evt(), consume_modifier_stream);
1174 unsigned int oldrunnr=eID.run_number();
1175 unsigned int oldLB=eID.lumi_block();
1176 unsigned int oldTS=eID.time_stamp();
1177 unsigned int oldTSno=eID.time_stamp_ns_offset();
1178 ATH_MSG_DEBUG (
"modifyEventContext: use evtIdModSvc runnr=" << oldrunnr <<
" -> " << new_eID.run_number() );
1179 ATH_MSG_DEBUG (
"modifyEventContext: use evtIdModSvc LB=" << oldLB <<
" -> " << new_eID.lumi_block() );
1180 ATH_MSG_DEBUG (
"modifyEventContext: use evtIdModSvc TimeStamp=" << oldTS <<
" -> " << new_eID.time_stamp() );
1181 ATH_MSG_DEBUG (
"modifyEventContext: use evtIdModSvc TimeStamp ns Offset=" << oldTSno <<
" -> " << new_eID.time_stamp_ns_offset() );
1183 ctx.setEventID( new_eID );
1188 ctx.setEventID( eID );
1197 if (
sc.isFailure()) {
1199 <<
" could not be selected for the WhiteBoard" );
1200 return EventContext{};
1205 ATH_MSG_DEBUG (
"created EventContext, num: " << ctx.evt() <<
" in slot: "
1220 std::vector<EventContext*> finishedEvtContexts;
1222 EventContext* finishedEvtContext(
nullptr);
1225 ATH_MSG_DEBUG (
"drainScheduler: [" << finishedEvts <<
"] Waiting for a context" );
1229 if (
sc.isSuccess()){
1230 ATH_MSG_DEBUG (
"drainScheduler: scheduler not empty: Context "
1231 << finishedEvtContext );
1232 finishedEvtContexts.push_back(finishedEvtContext);
1240 while (
m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()){
1241 finishedEvtContexts.push_back(finishedEvtContext);
1246 for (
auto& thisFinishedEvtContext : finishedEvtContexts){
1247 if (!thisFinishedEvtContext) {
1248 ATH_MSG_FATAL (
"Detected nullptr ctxt while clearing WB!");
1253 if (
m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
1254 ATH_MSG_FATAL (
"Failed event detected on " << thisFinishedEvtContext
1255 <<
" w/ fail mode: "
1256 <<
m_aess->eventStatus(*thisFinishedEvtContext) );
1257 delete thisFinishedEvtContext;
1265 if (
m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
1266 n_run = thisFinishedEvtContext->eventID().run_number();
1267 n_evt = thisFinishedEvtContext->eventID().event_number();
1270 << thisFinishedEvtContext->slot() );
1271 delete thisFinishedEvtContext;
1280 Gaudi::Hive::setCurrentContext( *thisFinishedEvtContext );
1281 m_incidentSvc->fireIncident(Incident(
name(), IncidentType::EndProcessing, *thisFinishedEvtContext ));
1283 ATH_MSG_DEBUG (
"Clearing slot " << thisFinishedEvtContext->slot()
1284 <<
" (event " << thisFinishedEvtContext->evt()
1285 <<
") of the whiteboard" );
1288 if (!
sc.isSuccess()) {
1289 ATH_MSG_ERROR (
"Whiteboard slot " << thisFinishedEvtContext->slot()
1290 <<
" could not be properly cleared" );
1292 delete thisFinishedEvtContext;
1303 ATH_MSG_INFO (
" ===>>> done processing event #" << n_evt <<
", run #" << n_run
1304 <<
" on slot " << thisFinishedEvtContext->slot() <<
", "
1305 <<
m_proc <<
" events processed so far <<<===" );
1307 ATH_MSG_INFO (
" ===>>> done processing event #" << n_evt <<
", run #" << n_run
1308 <<
" on slot " << thisFinishedEvtContext->slot() <<
", "
1310 <<
" events processed so far <<<===" );
1311 std::ofstream
outfile(
"eventLoopHeartBeat.txt");
1313 ATH_MSG_ERROR (
" unable to open: eventLoopHeartBeat.txt" );
1315 delete thisFinishedEvtContext;
1318 outfile <<
" done processing event #" << n_evt <<
", run #" << n_run
1319 <<
" " <<
m_nev <<
" events read so far <<<===" << std::endl;
1324 ATH_MSG_DEBUG (
"drainScheduler thisFinishedEvtContext: " << thisFinishedEvtContext );
1327 delete thisFinishedEvtContext;
1330 return (
fail ? -1 : 1 );