ATLAS Offline Software
Loading...
Searching...
No Matches
MPIHiveEventLoopMgr Class Reference

The MPI event loop manager. More...

#include <MPIHiveEventLoopMgr.h>

Inheritance diagram for MPIHiveEventLoopMgr:
Collaboration diagram for MPIHiveEventLoopMgr:

Public Types

typedef IEvtSelector::Context EvtContext
typedef std::list< SmartIF< IAlgorithm > > ListAlg

Public Member Functions

 MPIHiveEventLoopMgr (const std::string &name, ISvcLocator *svcLoc)
 Standard Constructor.
virtual ~MPIHiveEventLoopMgr ()
 Standard Destructor.
virtual StatusCode initialize () override
 implementation of IAppMgrUI::initalize
virtual StatusCode finalize () override
 implementation of IAppMgrUI::finalize
virtual StatusCode nextEvent (int maxevt) override
 implementation of IAppMgrUI::nextEvent. maxevt==0 returns immediately
StatusCode getEventRoot (IOpaqueAddress *&refpAddr)
 Create event address using event selector.
virtual StatusCode executeEvent (EventContext &&ctx) override
 implementation of IEventProcessor::executeEvent(void* par)
virtual StatusCode executeRun (int maxevt) override
 implementation of IEventProcessor::executeRun(int maxevt)
virtual StatusCode stopRun () override
 implementation of IEventProcessor::stopRun()
virtual StatusCode stop () override
 implementation of IService::stop
virtual StatusCode seek (int evt) override
 Seek to a given event.
virtual int curEvent () const override
 Return the current event count.
virtual int size () override
 Return the size of the collection.
virtual void handle (const Incident &inc) override
 IIncidentListenet interfaces.
virtual const std::string & name () const override
virtual void modifyEventContext (EventContext &ctx, const EventID &eID, bool consume_modifier_stream)
MsgStream & msg () const
 The standard message stream.
MsgStream & msg (const MSG::Level lvl) const
 The standard message stream.
bool msgLvl (const MSG::Level lvl) const
 Test the output level.
void setLevel (MSG::Level lvl)
 Change the current logging level.

Protected Types

typedef ServiceHandle< IIncidentSvc > IIncidentSvc_t
typedef ServiceHandle< StoreGateSvcStoreGateSvc_t
typedef ServiceHandle< IDataManagerSvc > IDataManagerSvc_t
typedef ServiceHandle< IConversionSvc > IConversionSvc_t
typedef ServiceHandle< IEvtIdModifierSvcIEvtIdModifierSvc_t
typedef EventID::number_type number_type
typedef IAthenaEvtLoopPreSelectTool tool_type
typedef ToolHandleArray< tool_typetool_store
typedef tool_store::const_iterator tool_iterator
typedef std::vector< unsigned int > tool_stats
typedef tool_stats::const_iterator tool_stats_iterator

Protected Member Functions

StatusCode drainLocalScheduler ()
 Drain the local scheduler of any (at least one) completed events.
StatusCode insertEvent (int eventIdx, bool &endOfStream, std::int64_t requestTime_ns)
 Insert an event into the local scheduler.
StatusCode workerEventLoop ()
 Worker event loop (runs on worker, requests events over MPI)
StatusCode masterEventLoop (int maxEvt)
 Master event loop (runs on master, provides events over MPI)
void setupPreSelectTools (Gaudi::Details::PropertyBase &)
 property update handler:sets up the Pre-selection tools
virtual StatusCode writeHistograms (bool force=false)
 Dump out histograms as needed.
virtual StatusCode executeAlgorithms ()
 Run the algorithms for the current event.
StatusCode initializeAlgorithms ()
 Initialize all algorithms and output streams.
StatusCode clearWBSlot (int evtSlot)
 Clear a slot in the WB.
int declareEventRootAddress (EventContext &)
 Declare the root address of the event.
virtual EventContext createEventContext () override
 Create event context.
int drainScheduler (int &finishedEvents)
 Drain the scheduler from all actions that may be queued.
void setTimeout (Timeout &instance)
 Set timeout.
void resetTimeout (Timeout &instance)
 Reset timeout.

Protected Attributes

ServiceHandle< IMPIClusterSvcm_clusterSvc
 Reference to the MPIClusterSvc.
int m_contiguousFailedEvts {0}
int m_totalFailedEvts {0}
int m_nLocalCreatedEvts {0}
int m_nLocalSkippedEvts {0}
int m_nLocalFinishedEvts {0}
IIncidentSvc_t m_incidentSvc
 Reference to the incident service.
StoreGateSvc_t m_eventStore
 Reference to StoreGateSvc;.
IEvtSelector * m_evtSelector
 Reference to the Event Selector.
EvtContextm_evtContext
 Gaudi event selector Context (may be used as a cursor by the evt selector)
StringProperty m_evtsel
IDataManagerSvc_t m_histoDataMgrSvc
 Reference to the Histogram Data Service.
IConversionSvc_t m_histoPersSvc
IEvtIdModifierSvc_t m_evtIdModSvc
StringProperty m_histPersName
number_type m_currentRun
 current run number
bool m_firstRun
IntegerProperty m_failureMode
UnsignedIntegerProperty m_eventPrintoutInterval
tool_stats m_toolInvoke
 tool called counter
tool_stats m_toolReject
 tool returns StatusCode::FAILURE counter
tool_stats m_toolAccept
 tool returns StatusCode::SUCCESS counter
tool_store m_tools
 internal tool store
bool m_requireInputAttributeList {}
 require input attribute list
bool m_useSecondaryEventNumber {}
 read event number from secondary input
SmartIF< IHiveWhiteBoard > m_whiteboard
 Reference to the Whiteboard interface.
SmartIF< IAlgResourcePool > m_algResourcePool
 Reference to the Algorithm resource pool.
SmartIF< IAlgExecStateSvc > m_aess
 Reference to the Algorithm Execution State Svc.
SmartIF< IProperty > m_appMgrProperty
 Property interface of ApplicationMgr.
SmartIF< IScheduler > m_schedulerSvc
 A shortcut for the scheduler.
SmartIF< IIncidentListener > m_abortEventListener
 Instance of the incident listener waiting for AbortEvent.
std::string m_schedulerName
 Name of the scheduler to be used.
std::string m_whiteboardName
 Name of the Whiteboard to be used.
bool m_scheduledStop
 Scheduled stop of event processing.
bool m_terminateLoop { false }
unsigned int m_nev
 events processed
unsigned int m_proc
bool m_useTools
bool m_doEvtHeartbeat
EventContext m_lastEventContext

Private Member Functions

StoreGateSvceventStore () const
void initMessaging () const
 Initialize our message level and MessageSvc.

Private Attributes

UnsignedIntegerProperty m_firstEventIndex
int m_evtSelectorCurrentPos = 0
unsigned int m_nevt
unsigned int m_timeStamp { 0 }
UnsignedIntegerProperty m_writeInterval
bool m_writeHists
bool m_firstEventAlone
unsigned int m_flmbi
unsigned int m_timeStampInt
ServiceHandle< Athena::IConditionsCleanerSvcm_conditionsCleaner
std::string m_nm
 Message source name.
boost::thread_specific_ptr< MsgStream > m_msg_tls
 MsgStream instance (a std::cout like with print-out levels)
std::atomic< IMessageSvc * > m_imsg { nullptr }
 MessageSvc pointer.
std::atomic< MSG::Level > m_lvl { MSG::NIL }
 Current logging level.
std::atomic_flag m_initialized ATLAS_THREAD_SAFE = ATOMIC_FLAG_INIT
 Messaging initialized (initMessaging)

Detailed Description

The MPI event loop manager.

As with AthenaHiveEventLoopMgr but in a multi-node MPI environment. This class is derived from and implemented in terms of AthenaHiveEventLoopMgr.

Definition at line 34 of file MPIHiveEventLoopMgr.h.

Member Typedef Documentation

◆ EvtContext

typedef IEvtSelector::Context AthenaHiveEventLoopMgr::EvtContext
inherited

Definition at line 74 of file AthenaHiveEventLoopMgr.h.

◆ IConversionSvc_t

typedef ServiceHandle<IConversionSvc> AthenaHiveEventLoopMgr::IConversionSvc_t
protectedinherited

Definition at line 96 of file AthenaHiveEventLoopMgr.h.

◆ IDataManagerSvc_t

typedef ServiceHandle<IDataManagerSvc> AthenaHiveEventLoopMgr::IDataManagerSvc_t
protectedinherited

Definition at line 92 of file AthenaHiveEventLoopMgr.h.

◆ IEvtIdModifierSvc_t

◆ IIncidentSvc_t

typedef ServiceHandle<IIncidentSvc> AthenaHiveEventLoopMgr::IIncidentSvc_t
protectedinherited

Definition at line 77 of file AthenaHiveEventLoopMgr.h.

◆ ListAlg

typedef std::list<SmartIF<IAlgorithm> > AthenaHiveEventLoopMgr::ListAlg
inherited

Definition at line 264 of file AthenaHiveEventLoopMgr.h.

◆ number_type

Definition at line 107 of file AthenaHiveEventLoopMgr.h.

◆ StoreGateSvc_t

Definition at line 81 of file AthenaHiveEventLoopMgr.h.

◆ tool_iterator

typedef tool_store::const_iterator AthenaHiveEventLoopMgr::tool_iterator
protectedinherited

Definition at line 121 of file AthenaHiveEventLoopMgr.h.

◆ tool_stats

typedef std::vector<unsigned int> AthenaHiveEventLoopMgr::tool_stats
protectedinherited

Definition at line 122 of file AthenaHiveEventLoopMgr.h.

◆ tool_stats_iterator

typedef tool_stats::const_iterator AthenaHiveEventLoopMgr::tool_stats_iterator
protectedinherited

Definition at line 123 of file AthenaHiveEventLoopMgr.h.

◆ tool_store

typedef ToolHandleArray< tool_type > AthenaHiveEventLoopMgr::tool_store
protectedinherited

Definition at line 120 of file AthenaHiveEventLoopMgr.h.

◆ tool_type

Definition at line 119 of file AthenaHiveEventLoopMgr.h.

Constructor & Destructor Documentation

◆ MPIHiveEventLoopMgr()

MPIHiveEventLoopMgr::MPIHiveEventLoopMgr ( const std::string & name,
ISvcLocator * svcLoc )

Standard Constructor.

Definition at line 21 of file MPIHiveEventLoopMgr.cxx.

23 : AthenaHiveEventLoopMgr(name, svcLoc) {}
virtual const std::string & name() const override
AthenaHiveEventLoopMgr()=delete

◆ ~MPIHiveEventLoopMgr()

MPIHiveEventLoopMgr::~MPIHiveEventLoopMgr ( )
virtualdefault

Standard Destructor.

Member Function Documentation

◆ clearWBSlot()

StatusCode AthenaHiveEventLoopMgr::clearWBSlot ( int evtSlot)
protectedinherited

Clear a slot in the WB.

Definition at line 1304 of file AthenaHiveEventLoopMgr.cxx.

1304 {
1305 return m_whiteboard->freeStore(evtSlot);
1306}
SmartIF< IHiveWhiteBoard > m_whiteboard
Reference to the Whiteboard interface.

◆ createEventContext()

EventContext AthenaHiveEventLoopMgr::createEventContext ( )
overrideprotectedvirtualinherited

Create event context.

Definition at line 1167 of file AthenaHiveEventLoopMgr.cxx.

1167 {
1168
1169 EventContext ctx{ m_nevt, m_whiteboard->allocateStore( m_nevt ) };
1170
1171 StatusCode sc = m_whiteboard->selectStore( ctx.slot() );
1172 if (sc.isFailure()) {
1173 ATH_MSG_FATAL ( "Slot " << ctx.slot()
1174 << " could not be selected for the WhiteBoard" );
1175 return EventContext{}; // invalid EventContext
1176 } else {
1178 Atlas::ExtendedEventContext( eventStore()->hiveProxyDict() ) );
1179
1180 ATH_MSG_DEBUG ( "created EventContext, num: " << ctx.evt() << " in slot: "
1181 << ctx.slot() );
1182 }
1183
1184 return ctx;
1185}
#define ATH_MSG_FATAL(x)
#define ATH_MSG_DEBUG(x)
static Double_t sc
StoreGateSvc * eventStore() const
void setExtendedEventContext(EventContext &ctx, ExtendedEventContext &&ectx)
Move an extended context into a context object.
::StatusCode StatusCode
StatusCode definition for legacy code.

◆ curEvent()

int AthenaHiveEventLoopMgr::curEvent ( ) const
overridevirtualinherited

Return the current event count.

Definition at line 825 of file AthenaHiveEventLoopMgr.cxx.

826{
827 return m_nevt;
828}

◆ declareEventRootAddress()

int AthenaHiveEventLoopMgr::declareEventRootAddress ( EventContext & ctx)
protectedinherited

Declare the root address of the event.

FIXME ???

Definition at line 952 of file AthenaHiveEventLoopMgr.cxx.

952 {
953
954 // return codes:
955 // -1 : error
956 // 0 : no more events in selection
957 // 1 : ok
958
959 StatusCode sc(StatusCode::SUCCESS);
960
961 //-----------------------------------------------------------------------
962 // we need an EventInfo Object to fire the incidents.
963 //-----------------------------------------------------------------------
964 std::unique_ptr<const EventInfo> pEvent{};
965 if ( m_evtContext ) {
966 // Deal with the case when an EventSelector is provided
967 //
968 // FIXME: flow control if no more events in selector, etc.
969 //
970
971 IOpaqueAddress* addr{};
972
973 sc = m_evtSelector->next(*m_evtContext);
974
975 if ( !sc.isSuccess() ) {
976 // This is the end of the loop. No more events in the selection
977 ATH_MSG_INFO ( "No more events in event selection " );
978 return 0;
979 }
980
981 if (m_evtSelector->createAddress(*m_evtContext, addr).isFailure()) {
982 ATH_MSG_ERROR ( "Could not create an IOpaqueAddress" );
983 return -1;
984 }
985
986
987 // Most iterators provide the IOA of an event header (EventInfo, DataHeader)
988 if (0 != addr) {
989 //create its proxy
990 sc = eventStore()->recordAddress(addr);
991 if( !sc.isSuccess() ) {
993 ATH_MSG_WARNING ( "Error declaring Event object" );
994 return 0;
995 }
996 } if ((sc=eventStore()->loadEventProxies()).isFailure()) {
997 ATH_MSG_ERROR ( "Error loading Event proxies" );
998 return -1;
999 }
1000
1001 bool consume_modifier_stream = false;
1002 // First try to build a legacy EventInfo object from the TAG information
1003 // Read the attribute list
1004 const AthenaAttributeList* pAttrList = eventStore()->tryConstRetrieve<AthenaAttributeList>("Input");
1005 if ( pAttrList != nullptr && pAttrList->size() > 6 ) { // Try making EventID-only EventInfo object from in-file TAG
1006 try {
1007 unsigned int runNumber = (*pAttrList)["RunNumber"].data<unsigned int>();
1008 unsigned long long eventNumber = (*pAttrList)["EventNumber"].data<unsigned long long>();
1009 unsigned int eventTime = (*pAttrList)["EventTime"].data<unsigned int>();
1010 unsigned int eventTimeNS = (*pAttrList)["EventTimeNanoSec"].data<unsigned int>();
1011 unsigned int lumiBlock = (*pAttrList)["LumiBlockN"].data<unsigned int>();
1012 unsigned int bunchId = (*pAttrList)["BunchId"].data<unsigned int>();
1013
1014 ATH_MSG_DEBUG ( "use TAG with runNumber=" << runNumber );
1015 consume_modifier_stream = true;
1016 // an option to override primary eventNumber with the secondary one in case of DoubleEventSelector
1018 unsigned long long eventNumberSecondary{};
1019 if ( !(pAttrList->exists("hasSecondaryInput") && (*pAttrList)["hasSecondaryInput"].data<bool>()) ) {
1020 ATH_MSG_FATAL ( "Secondary EventNumber requested, but secondary input does not exist!" );
1021 return -1;
1022 }
1023 if ( pAttrList->exists("EventNumber_secondary") ) {
1024 eventNumberSecondary = (*pAttrList)["EventNumber_secondary"].data<unsigned long long>();
1025 }
1026 else {
1027 // try legacy EventInfo if secondary input did not have attribute list
1028 // primary input should not have this EventInfo type
1029 const EventInfo* pEventSecondary = eventStore()->tryConstRetrieve<EventInfo>();
1030 if (pEventSecondary) {
1031 eventNumberSecondary = pEventSecondary->event_ID()->event_number();
1032 }
1033 else {
1034 ATH_MSG_FATAL ( "Secondary EventNumber requested, but it does not exist!" );
1035 return -1;
1036 }
1037 }
1038 if (eventNumberSecondary != 0) {
1039 m_doEvtHeartbeat = (m_eventPrintoutInterval.value() > 0 &&
1040 0 == (m_nev % m_eventPrintoutInterval.value()));
1041 if (m_doEvtHeartbeat) {
1042 ATH_MSG_INFO ( " ===>>> using secondary event #" << eventNumberSecondary << " instead of #" << eventNumber << " <<<===" );
1043 }
1044 eventNumber = eventNumberSecondary;
1045 }
1046 }
1047
1048 pEvent = std::make_unique<EventInfo>(
1049 std::make_unique<EventID>(runNumber, eventNumber, eventTime,
1050 eventTimeNS, lumiBlock, bunchId),
1051 nullptr);
1052 } catch (...) {
1053 }
1054 } else if (m_requireInputAttributeList) {
1055 ATH_MSG_FATAL ( "Valid input attribute list required but not present!" );
1056 return -1;
1057 }
1058 // In the case that there is no TAG information
1059 const EventInfo* pEventObserver{pEvent.get()};
1060 if (!pEventObserver) {
1061 // Secondly try to retrieve a legacy EventInfo object from the input file
1062 // Again, m_nevt is incremented after executeEvent in the Hive manager so we don't need a -1
1064 pEventObserver = eventStore()->tryConstRetrieve<EventInfo>();
1065 if (pEventObserver) {
1066 consume_modifier_stream = false; // stream will already have been consumed during EventInfo TP conversion
1067 ATH_MSG_DEBUG ( "use EventInfo" );
1068 } else {
1069 // Finally try to retrieve an xAOD::EventInfo object from the
1070 // input file and build a legacy EventInfo object from that.
1071 const xAOD::EventInfo* pXEvent{nullptr};
1072 sc = eventStore()->retrieve(pXEvent);
1073 if( !sc.isSuccess() ) {
1074 ATH_MSG_ERROR ( "Unable to retrieve Event root object" );
1075 return -1;
1076 }
1077 consume_modifier_stream = true;
1078 ATH_MSG_DEBUG ( "use xAOD::EventInfo with runNumber=" << pXEvent->runNumber() );
1079 // Build the old-style Event Info object for those clients that still need it
1080 pEvent = std::make_unique<EventInfo>(
1081 std::make_unique<EventID>(eventIDFromxAOD(pXEvent)),
1082 std::make_unique<EventType>(eventTypeFromxAOD(pXEvent)));
1083 pEventObserver = pEvent.get();
1084 sc = eventStore()->record(std::move(pEvent), "");
1085 if( !sc.isSuccess() ) {
1086 ATH_MSG_ERROR ( "Error declaring event data object" );
1087 return -1;
1088 }
1089 }
1090 }
1091
1092 modifyEventContext(ctx, *(pEventObserver->event_ID()),
1093 consume_modifier_stream);
1094
1095 }
1096 else {
1097 // No EventSelector is provided, so with no iterator it's up to us
1098 // to create an EventInfo
1099 // first event # == 1
1100 unsigned int runNmb{1}, evtNmb{m_nevt + 1};
1101
1102 // increment the run/lumiBlock number if desired
1103 if (m_flmbi != 0) {
1104 runNmb = m_nevt / m_flmbi + 1;
1105 evtNmb = m_nevt % m_flmbi + 1;
1106 }
1107 auto eid = std::make_unique<EventID> (runNmb,evtNmb, m_timeStamp);
1108 // Change lumiBlock# to match runNumber
1109 eid->set_lumi_block( runNmb );
1110
1112
1113 pEvent = std::make_unique<EventInfo>(std::move(eid),
1114 std::make_unique<EventType>());
1115
1116 bool consume_modifier_stream = true;
1117 // EventInfo TP Conversion not called in this case, so we would
1118 // want to consume the next IoV from the list in the
1119 // EvtIdModifierSvc.
1120 modifyEventContext(ctx,*(pEvent->event_ID()), consume_modifier_stream);
1121
1122 ATH_MSG_DEBUG ( "selecting store: " << ctx.slot() );
1123
1124 m_whiteboard->selectStore( ctx.slot() ).ignore();
1125
1126 ATH_MSG_DEBUG ( "recording EventInfo " << *pEvent->event_ID() << " in "
1127 << eventStore()->name() );
1128
1129 sc = eventStore()->record(std::move(pEvent), "McEventInfo");
1130 if( !sc.isSuccess() ) {
1131 ATH_MSG_ERROR ( "Error declaring event data object" );
1132 return -1;
1133 }
1134 }
1135
1136 return 1;
1137}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
EventID eventIDFromxAOD(const xAOD::EventInfo *xaod)
Create EventID object from xAOD::EventInfo.
EventType eventTypeFromxAOD(const xAOD::EventInfo *xaod)
Create EventType object from xAOD::EventInfo.
UnsignedIntegerProperty m_eventPrintoutInterval
IEvtSelector * m_evtSelector
Reference to the Event Selector.
unsigned int m_nev
events processed
bool m_useSecondaryEventNumber
read event number from secondary input
bool m_requireInputAttributeList
require input attribute list
EvtContext * m_evtContext
Gaudi event selector Context (may be used as a cursor by the evt selector)
virtual void modifyEventContext(EventContext &ctx, const EventID &eID, bool consume_modifier_stream)
EventID * event_ID()
the unique identification of the event.
StatusCode recordAddress(const std::string &skey, CxxUtils::RefCountedPtr< IOpaqueAddress > pAddress, bool clearAddressFlag=true)
Create a proxy object using an IOpaqueAddress and a transient key.
StatusCode record(T *p2BRegistered, const TKEY &key)
Record an object with a key.
const T * tryConstRetrieve() const
StatusCode retrieve(const T *&ptr) const
Retrieve the default object into a const T*.
uint32_t runNumber() const
The current event's run number.
thread_local event_number_t eventIndex
EventInfo_v1 EventInfo
Definition of the latest event info version.
setTeId lumiBlock

◆ drainLocalScheduler()

StatusCode MPIHiveEventLoopMgr::drainLocalScheduler ( )
protected

Drain the local scheduler of any (at least one) completed events.

drainLocalScheduler Drain the local scheduler on this MPI rank

Definition at line 362 of file MPIHiveEventLoopMgr.cxx.

362 {
363
364 StatusCode sc(StatusCode::SUCCESS);
365
366 // maybe we can do better
367 std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
368
369 EventContext* finishedEvtContext(nullptr);
370
371 // Here we wait not to loose cpu resources
372 ATH_MSG_DEBUG("drainLocalScheduler: [" << m_nLocalFinishedEvts
373 << "] Waiting for a context");
374 sc = m_schedulerSvc->popFinishedEvent(finishedEvtContext);
375
376 // We got past it: cache the pointer
377 if (sc.isSuccess()) {
378 ATH_MSG_DEBUG("drainLocalScheduler: scheduler not empty: Context "
379 << finishedEvtContext);
380 finishedEvtContexts.emplace_back(finishedEvtContext);
381 } else {
382 // no more events left in scheduler to be drained
383 ATH_MSG_DEBUG("drainLocalScheduler: scheduler empty");
384 return StatusCode::SUCCESS;
385 }
386
387 // Let's see if we can pop other event contexts
388 while (m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()) {
389 finishedEvtContexts.emplace_back(finishedEvtContext);
390 }
391
392 // Now we flush them
393 StatusCode fail(StatusCode::SUCCESS);
394 for (auto& thisFinishedEvtContext : finishedEvtContexts) {
395 if (!thisFinishedEvtContext) {
396 ATH_MSG_FATAL("Detected nullptr ctxt while clearing WB!");
397 fail = StatusCode::FAILURE;
398 continue;
399 }
400
401 // Update event log
402 m_clusterSvc->log_completeEvent(
403 thisFinishedEvtContext->eventID().run_number(),
404 thisFinishedEvtContext->eventID().event_number(),
405 m_aess->eventStatus(*thisFinishedEvtContext));
406
407 if (m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
408 ATH_MSG_ERROR("Failed event detected on "
409 << thisFinishedEvtContext << " w/ fail mode: "
410 << m_aess->eventStatus(*thisFinishedEvtContext));
413 if (m_contiguousFailedEvts >= 3 || m_totalFailedEvts >= 10) {
414 // If we have 3 contiguous failed events or 10 total, end the job
415 thisFinishedEvtContext.reset();
416 fail = StatusCode::FAILURE;
417 continue;
418 }
419 }
420 else {
421 // Event succeeded, reset contiguous failed events
423 }
424
425 EventID::number_type n_run(0);
426 EventID::event_number_t n_evt(0);
427
428 if (m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
429 n_run = thisFinishedEvtContext->eventID().run_number();
430 n_evt = thisFinishedEvtContext->eventID().event_number();
431 } else {
432 ATH_MSG_ERROR("DrainSched: unable to select store "
433 << thisFinishedEvtContext->slot());
434 thisFinishedEvtContext.reset();
435 fail = StatusCode::FAILURE;
436 continue;
437 }
438
439 // Some code still needs global context in addition to that passed in the
440 // incident
441 Gaudi::Hive::setCurrentContext(*thisFinishedEvtContext);
442 m_incidentSvc->fireIncident(
443 Incident(name(), IncidentType::EndProcessing, *thisFinishedEvtContext));
444
445 ATH_MSG_DEBUG("Clearing slot "
446 << thisFinishedEvtContext->slot() << " (event "
447 << thisFinishedEvtContext->evt() << ") of the whiteboard");
448
449 StatusCode sc = clearWBSlot(thisFinishedEvtContext->slot());
450 if (!sc.isSuccess()) {
451 ATH_MSG_ERROR("Whiteboard slot " << thisFinishedEvtContext->slot()
452 << " could not be properly cleared");
453 if (fail != StatusCode::FAILURE) {
454 fail = sc;
455 }
456 thisFinishedEvtContext.reset();
457 continue;
458 }
459
461
462 writeHistograms().ignore();
463 ++m_proc;
464
465 if (m_doEvtHeartbeat) {
466 if (!m_useTools) {
467 ATH_MSG_INFO(" ===>>> done processing event #"
468 << n_evt << ", run #" << n_run << " on slot "
469 << thisFinishedEvtContext->slot() << ", " << m_proc
470 << " events processed so far <<<===");
471 } else {
472 ATH_MSG_INFO(" ===>>> done processing event #"
473 << n_evt << ", run #" << n_run << " on slot "
474 << thisFinishedEvtContext->slot() << ", " << m_nev
475 << " events read and " << m_proc
476 << " events processed so far <<<===");
477 }
478 std::ofstream outfile("eventLoopHeartBeat.txt");
479 if (!outfile) {
480 ATH_MSG_ERROR(" unable to open: eventLoopHeartBeat.txt");
481 fail = StatusCode::FAILURE;
482 thisFinishedEvtContext.reset();
483 continue;
484 }
485 outfile << " done processing event #" << n_evt << ", run #" << n_run
486 << " " << m_nev << " events read so far <<<===" << std::endl;
487 outfile.close();
488 }
489
490 ATH_MSG_DEBUG("drainLocalScheduler thisFinishedEvtContext: "
491 << thisFinishedEvtContext);
492
493 thisFinishedEvtContext.reset();
494 }
495
496 return fail;
497}
SmartIF< IAlgExecStateSvc > m_aess
Reference to the Algorithm Execution State Svc.
IIncidentSvc_t m_incidentSvc
Reference to the incident service.
StatusCode clearWBSlot(int evtSlot)
Clear a slot in the WB.
virtual StatusCode writeHistograms(bool force=false)
Dump out histograms as needed.
SmartIF< IScheduler > m_schedulerSvc
A shortcut for the scheduler.
EventIDBase::number_type number_type
Definition EventID.h:37
ServiceHandle< IMPIClusterSvc > m_clusterSvc
Reference to the MPIClusterSvc.
fail(message)

◆ drainScheduler()

int AthenaHiveEventLoopMgr::drainScheduler ( int & finishedEvents)
protectedinherited

Drain the scheduler from all actions that may be queued.

Definition at line 1190 of file AthenaHiveEventLoopMgr.cxx.

1190 {
1191
1192 StatusCode sc(StatusCode::SUCCESS);
1193
1194 // maybe we can do better
1195 std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
1196
1197 EventContext* finishedEvtContext(nullptr);
1198
1199 // Here we wait not to loose cpu resources
1200 ATH_MSG_DEBUG ( "drainScheduler: [" << finishedEvts << "] Waiting for a context" );
1201 sc = m_schedulerSvc->popFinishedEvent(finishedEvtContext);
1202
1203 // We got past it: cache the pointer
1204 if (sc.isSuccess()){
1205 ATH_MSG_DEBUG ( "drainScheduler: scheduler not empty: Context "
1206 << finishedEvtContext );
1207 finishedEvtContexts.emplace_back(finishedEvtContext);
1208 } else{
1209 // no more events left in scheduler to be drained
1210 ATH_MSG_DEBUG ( "drainScheduler: scheduler empty" );
1211 return 0;
1212 }
1213
1214 // Let's see if we can pop other event contexts
1215 while (m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()){
1216 finishedEvtContexts.emplace_back(finishedEvtContext);
1217 }
1218
1219 // Now we flush them
1220 bool fail(false);
1221 for (auto& thisFinishedEvtContext : finishedEvtContexts){
1222 if (!thisFinishedEvtContext) {
1223 ATH_MSG_FATAL ( "Detected nullptr ctxt while clearing WB!");
1224 fail = true;
1225 continue;
1226 }
1227
1228 if (m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
1229 ATH_MSG_FATAL ( "Failed event detected on " << thisFinishedEvtContext
1230 << " w/ fail mode: "
1231 << m_aess->eventStatus(*thisFinishedEvtContext) );
1232 fail = true;
1233 continue;
1234 }
1235
1236 EventID::number_type n_run(0);
1237 EventID::event_number_t n_evt(0);
1238
1239 if (m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
1240 n_run = thisFinishedEvtContext->eventID().run_number();
1241 n_evt = thisFinishedEvtContext->eventID().event_number();
1242 } else {
1243 ATH_MSG_ERROR ( "DrainSched: unable to select store "
1244 << thisFinishedEvtContext->slot() );
1245 fail = true;
1246 continue;
1247 }
1248
1249 // m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndEvent,
1250 // *thisFinishedEvtContext ));
1251
1252 // Some code still needs global context in addition to that passed in the incident
1253 Gaudi::Hive::setCurrentContext( *thisFinishedEvtContext );
1254 m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndProcessing, *thisFinishedEvtContext ));
1255
1256 ATH_MSG_DEBUG ( "Clearing slot " << thisFinishedEvtContext->slot()
1257 << " (event " << thisFinishedEvtContext->evt()
1258 << ") of the whiteboard" );
1259
1260 StatusCode sc = clearWBSlot(thisFinishedEvtContext->slot());
1261 if (!sc.isSuccess()) {
1262 ATH_MSG_ERROR ( "Whiteboard slot " << thisFinishedEvtContext->slot()
1263 << " could not be properly cleared" );
1264 fail = true;
1265 continue;
1266 }
1267
1268 finishedEvts++;
1269
1270 writeHistograms().ignore();
1271 ++m_proc;
1272
1273 if (m_doEvtHeartbeat) {
1274 if(!m_useTools)
1275 ATH_MSG_INFO ( " ===>>> done processing event #" << n_evt << ", run #" << n_run
1276 << " on slot " << thisFinishedEvtContext->slot() << ", "
1277 << m_proc << " events processed so far <<<===" );
1278 else
1279 ATH_MSG_INFO ( " ===>>> done processing event #" << n_evt << ", run #" << n_run
1280 << " on slot " << thisFinishedEvtContext->slot() << ", "
1281 << m_nev << " events read and " << m_proc
1282 << " events processed so far <<<===" );
1283 std::ofstream outfile( "eventLoopHeartBeat.txt");
1284 if ( !outfile ) {
1285 ATH_MSG_ERROR ( " unable to open: eventLoopHeartBeat.txt" );
1286 fail = true;
1287 continue;
1288 } else {
1289 outfile << " done processing event #" << n_evt << ", run #" << n_run
1290 << " " << m_nev << " events read so far <<<===" << std::endl;
1291 outfile.close();
1292 }
1293 }
1294
1295 ATH_MSG_DEBUG ( "drainScheduler thisFinishedEvtContext: " << thisFinishedEvtContext );
1296 }
1297
1298 return ( fail ? -1 : 1 );
1299
1300}

◆ eventStore()

StoreGateSvc * MPIHiveEventLoopMgr::eventStore ( ) const
inlineprivate

Definition at line 499 of file MPIHiveEventLoopMgr.cxx.

499 {
500 return m_eventStore.get();
501}
StoreGateSvc_t m_eventStore
Reference to StoreGateSvc;.

◆ executeAlgorithms()

StatusCode AthenaHiveEventLoopMgr::executeAlgorithms ( )
protectedvirtualinherited

Run the algorithms for the current event.

Definition at line 492 of file AthenaHiveEventLoopMgr.cxx.

492 {
493
494 return StatusCode::SUCCESS;
495}

◆ executeEvent()

StatusCode AthenaHiveEventLoopMgr::executeEvent ( EventContext && ctx)
overridevirtualinherited

implementation of IEventProcessor::executeEvent(void* par)

Fire begin-Run incident if new run:

Definition at line 501 of file AthenaHiveEventLoopMgr.cxx.

502{
503
504 // An incident may schedule a stop, in which case is better to exit before the actual execution.
505 if ( m_scheduledStop ) {
506 ATH_MSG_ALWAYS ( "A stopRun was requested by an incidentListener. "
507 << "Do not process this event." );
508 m_terminateLoop = true;
509 return (StatusCode::SUCCESS);
510 }
511
512 m_aess->reset( ctx );
513
514 // Make sure context with slot is set before calling es->next().
515 Gaudi::Hive::setCurrentContext ( ctx );
516
517 int declEvtRootSc = declareEventRootAddress( ctx );
518 if (declEvtRootSc == 0 ) { // We ran out of events!
519 m_terminateLoop = true; // we have finished!
520 return StatusCode::SUCCESS;
521 } else if ( declEvtRootSc == -1) {
522 ATH_MSG_ERROR ( "declareEventRootAddress for context " << ctx << " failed" );
523 return StatusCode::FAILURE;
524 }
525
526 EventID::event_number_t evtNumber = ctx.eventID().event_number();
527 unsigned int conditionsRun = ctx.eventID().run_number();
528 if (!m_evtIdModSvc.isSet()) {
529 const AthenaAttributeList* attr = nullptr;
530 if (eventStore()->contains<AthenaAttributeList> ("Input") &&
531 eventStore()->retrieve(attr, "Input").isSuccess()) {
532 if (attr->exists ("ConditionsRun")) {
533 conditionsRun = (*attr)["ConditionsRun"].data<unsigned int>();
534 }
535 }
536 }
538 Gaudi::Hive::setCurrentContext ( ctx );
539
540 // Record EventContext in current whiteboard
541 if (eventStore()->record(std::make_unique<EventContext> (ctx),
542 "EventContext").isFailure())
543 {
544 ATH_MSG_ERROR ( "Error recording event context object" );
545 return (StatusCode::FAILURE);
546 }
547
549 if (m_firstRun || (m_currentRun != ctx.eventID().run_number()) ) {
550 // Fire EndRun incident unless this is the first run
551 if (!m_firstRun) {
552 // FIXME!!!
553 m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndRun));
554 }
555 m_firstRun=false;
556 m_currentRun = ctx.eventID().run_number();
557
558 ATH_MSG_INFO ( " ===>>> start of run " << m_currentRun << " <<<===" );
559
560 // FIXME!!! Fire BeginRun "Incident"
561 m_incidentSvc->fireIncident(Incident(name(),IncidentType::BeginRun,ctx));
562
563 }
564
565 bool toolsPassed=true;
566 // CGL: FIXME
567 // bool eventFailed = false;
568
569 // Call any attached tools to reject events early
570 unsigned int toolCtr=0;
571 if(m_useTools) {
572 tool_store::iterator theTool = m_tools.begin();
573 tool_store::iterator lastTool = m_tools.end();
574 while(toolsPassed && theTool!=lastTool )
575 {
576 toolsPassed = (*theTool)->passEvent(ctx.eventID());
577 m_toolInvoke[toolCtr]++;
578 {toolsPassed ? m_toolAccept[toolCtr]++ : m_toolReject[toolCtr]++;}
579 ++toolCtr;
580 ++theTool;
581 }
582 }
583
585 0 == (m_nev % m_eventPrintoutInterval.value()));
586 if (m_doEvtHeartbeat) {
587 if(!m_useTools) {
588 ATH_MSG_INFO ( " ===>>> start processing event #" << evtNumber << ", run #" << m_currentRun
589 << " on slot " << ctx.slot() << ", " << m_proc
590 << " events processed so far <<<===" );
591 }
592 else {
593 ATH_MSG_INFO ( " ===>>> start processing event #" << evtNumber << ", run #" << m_currentRun
594 << " on slot " << ctx.slot() << ", "
595 << m_nev << " events read and " << m_proc
596 << " events processed so far <<<===" );
597 }
598 }
599
600 // Reset the timeout singleton
602 if(toolsPassed) {
603
604 CHECK( m_conditionsCleaner->event (ctx, true) );
605
606 // Remember the last event context for after event processing finishes.
607 m_lastEventContext = ctx;
608
609 // Now add event to the scheduler
610 ATH_MSG_DEBUG ( "Adding event " << ctx.evt()
611 << ", slot " << ctx.slot()
612 << " to the scheduler" );
613
614 m_incidentSvc->fireIncident(Incident(name(), IncidentType::BeginProcessing,
615 ctx));
616 StatusCode addEventStatus = m_schedulerSvc->pushNewEvent( new EventContext{ std::move(ctx) } );
617
618 // If this fails, we need to wait for something to complete
619 if (!addEventStatus.isSuccess()){
620 ATH_MSG_FATAL ( "An event processing slot should be now free in the scheduler, but it appears not to be the case." );
621 }
622
623 } // end of toolsPassed test
624
625 ++m_nev;
626
627 ++m_nevt;
628
629 // invalidate thread local context once outside of event execute loop
630 Gaudi::Hive::setCurrentContext( EventContext() );
631
632 return StatusCode::SUCCESS;
633
634}
#define ATH_MSG_ALWAYS(x)
#define CHECK(...)
Evaluate an expression and check for errors.
number_type m_currentRun
current run number
tool_stats m_toolAccept
tool returns StatusCode::SUCCESS counter
tool_stats m_toolInvoke
tool called counter
int declareEventRootAddress(EventContext &)
Declare the root address of the event.
bool m_scheduledStop
Scheduled stop of event processing.
ServiceHandle< Athena::IConditionsCleanerSvc > m_conditionsCleaner
tool_store m_tools
internal tool store
tool_stats m_toolReject
tool returns StatusCode::FAILURE counter
IEvtIdModifierSvc_t m_evtIdModSvc
void resetTimeout(Timeout &instance)
Reset timeout.
Definition Timeout.h:83
static Timeout & instance()
Get reference to Timeout singleton.
Definition Timeout.h:64
void setConditionsRun(EventIDBase::number_type conditionsRun)
bool contains(const std::string &s, const std::string &regx)
does a string contain the substring
Definition hcg.cxx:114
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
retrieve(aClass, aKey=None)
Definition PyKernel.py:110

◆ executeRun()

StatusCode AthenaHiveEventLoopMgr::executeRun ( int maxevt)
overridevirtualinherited

implementation of IEventProcessor::executeRun(int maxevt)

Definition at line 639 of file AthenaHiveEventLoopMgr.cxx.

640{
641
643 bool eventfailed = false;
644
645 // Call now the nextEvent(...)
646 sc = nextEvent(maxevt);
647 if (!sc.isSuccess())
648 eventfailed = true;
649
650 if (eventfailed)
651 return StatusCode::FAILURE;
652
653 m_incidentSvc->fireIncident(Incident(name(),"EndEvtLoop"));
654 return StatusCode::SUCCESS;
655}
virtual StatusCode nextEvent(int maxevt) override
implementation of IAppMgrUI::nextEvent. maxevt==0 returns immediately

◆ finalize()

StatusCode MPIHiveEventLoopMgr::finalize ( )
overridevirtual

implementation of IAppMgrUI::finalize

Finalize Finalizes AthenaHiveEventLoopMgr.

Reimplemented from AthenaHiveEventLoopMgr.

Definition at line 38 of file MPIHiveEventLoopMgr.cxx.

38 {
40}
virtual StatusCode finalize() override
implementation of IAppMgrUI::finalize

◆ getEventRoot()

StatusCode AthenaHiveEventLoopMgr::getEventRoot ( IOpaqueAddress *& refpAddr)
inherited

Create event address using event selector.

Definition at line 930 of file AthenaHiveEventLoopMgr.cxx.

930 {
931 refpAddr = 0;
933 if ( !sc.isSuccess() ) {
934 return sc;
935 }
936 // Create root address and assign address to data service
937 sc = m_evtSelector->createAddress(*m_evtContext,refpAddr);
938 if( !sc.isSuccess() ) {
939 sc = m_evtSelector->next(*m_evtContext);
940 if ( sc.isSuccess() ) {
941 sc = m_evtSelector->createAddress(*m_evtContext,refpAddr);
942 if ( !sc.isSuccess() ) {
943 ATH_MSG_WARNING ( "Error creating IOpaqueAddress." );
944 }
945 }
946 }
947 return sc;
948}

◆ handle()

void AthenaHiveEventLoopMgr::handle ( const Incident & inc)
overridevirtualinherited

IIncidentListenet interfaces.

Definition at line 855 of file AthenaHiveEventLoopMgr.cxx.

856{
857
858 if(inc.type() == "EndAlgorithms") {
859 // Clear the store at the end of the event.
860 // Do it here so that it executes in an algorithm context and thus
861 // multiple stores can be cleared at the same time.
862 StatusCode sc = m_whiteboard->clearStore(inc.context().slot());
863 if( !sc.isSuccess() ) {
864 ATH_MSG_WARNING ( "Clear of Event data store failed" );
865 }
866 return;
867 }
868
869 if(inc.type()!="BeforeFork")
870 return;
871
872 if(!m_evtContext || !m_firstRun) {
873 ATH_MSG_WARNING ( "Skipping BeforeFork handler. Either no event selector is provided or begin run has already passed" );
874 }
875
876 // Initialize Algorithms and Output Streams
878 if(sc.isFailure()) {
879 ATH_MSG_ERROR ( "Failed to initialize Algorithms" );
880 return;
881 }
882
883 // Construct EventInfo
884 sc = m_evtSelector->next(*m_evtContext);
885 if(!sc.isSuccess()) {
886 ATH_MSG_INFO ( "No more events in event selection " );
887 return;
888 }
889 IOpaqueAddress* addr{nullptr};
890 sc = m_evtSelector->createAddress(*m_evtContext, addr);
891 if (sc.isFailure()) {
892 ATH_MSG_ERROR ( "Could not create an IOpaqueAddress" );
893 return;
894 }
895 if (0 != addr) {
896 //create its proxy
897 sc = eventStore()->recordAddress(addr);
898 if(!sc.isSuccess()) {
899 ATH_MSG_ERROR ( "Error declaring Event object" );
900 return;
901 }
902 }
903
904 if(eventStore()->loadEventProxies().isFailure()) {
905 ATH_MSG_WARNING ( "Error loading Event proxies" );
906 return;
907 }
908
909 // Retrieve the legacy EventInfo object
910 const EventInfo* pEvent{nullptr};
911 sc = eventStore()->retrieve(pEvent);
912 if(!sc.isSuccess()) {
913 ATH_MSG_ERROR ( "Unable to retrieve Event root object" );
914 return;
915 }
916
917 m_firstRun=false;
918 m_currentRun = pEvent->event_ID()->run_number();
919
920 // Clear Store
921 sc = eventStore()->clearStore();
922 if(!sc.isSuccess()) {
923 ATH_MSG_ERROR ( "Clear of Event data store failed" );
924 }
925}
StatusCode initializeAlgorithms()
Initialize all algorithms and output streams.
virtual StatusCode clearStore(bool forceRemove=false) override final
clear DataStore contents: called by the event loop mgrs

◆ initialize()

StatusCode MPIHiveEventLoopMgr::initialize ( )
overridevirtual

implementation of IAppMgrUI::initalize

Initialize Initializes AthenaHiveEventLoop and MPIClusterSvc.

Reimplemented from AthenaHiveEventLoopMgr.

Definition at line 30 of file MPIHiveEventLoopMgr.cxx.

30 {
31 // Initialize cluster svc
32 ATH_CHECK(m_clusterSvc.retrieve());
34}
#define ATH_CHECK
Evaluate an expression and check for errors.
virtual StatusCode initialize() override
implementation of IAppMgrUI::initalize

◆ initializeAlgorithms()

StatusCode AthenaHiveEventLoopMgr::initializeAlgorithms ( )
protectedinherited

Initialize all algorithms and output streams.

Definition at line 484 of file AthenaHiveEventLoopMgr.cxx.

484 {
485
486 return StatusCode::SUCCESS;
487}

◆ initMessaging()

void AthMessaging::initMessaging ( ) const
privateinherited

Initialize our message level and MessageSvc.

This method should only be called once.

Definition at line 39 of file AthMessaging.cxx.

40{
42 // If user did not set an explicit level, set a default
43 if (m_lvl == MSG::NIL) {
44 m_lvl = m_imsg ?
45 static_cast<MSG::Level>( m_imsg.load()->outputLevel(m_nm) ) :
46 MSG::INFO;
47 }
48}
std::string m_nm
Message source name.
std::atomic< IMessageSvc * > m_imsg
MessageSvc pointer.
std::atomic< MSG::Level > m_lvl
Current logging level.
IMessageSvc * getMessageSvc(bool quiet=false)

◆ insertEvent()

StatusCode MPIHiveEventLoopMgr::insertEvent ( int eventIdx,
bool & endOfStream,
std::int64_t requestTime_ns )
protected

Insert an event into the local scheduler.

Insert event into local scheduler.

Definition at line 329 of file MPIHiveEventLoopMgr.cxx.

330 {
331 // fast-forward to event
332 // Create the event context now so next writes into the next slot when
333 // skipping, not the one that's being used
334 endOfStream = false;
335 auto ctx = createEventContext();
336 Gaudi::Hive::setCurrentContext(ctx);
337 ctx.setEvt(eventIdx); // Make the event numbers in the log actually make sense
338 if (!ctx.valid()) {
339 endOfStream = true; // BUG: Doesn't actually mean end of stream. Remove
340 // after making sure!
341 return StatusCode::FAILURE;
342 }
343
344 const std::size_t slot = ctx.slot(); // Need this for later
345 ATH_CHECK(seek(eventIdx));
346 // execute event
347 StatusCode sc = executeEvent(std::move(ctx));
348 const auto evtID = m_lastEventContext.eventID(); // Set in AthenaHiveEventLoopMgr
349 m_clusterSvc->log_addEvent(eventIdx, evtID.run_number(), evtID.event_number(),
350 requestTime_ns, slot);
351
352 if (sc.isRecoverable()) {
354 } else if (sc.isSuccess()) {
356 }
357 return sc;
358}
virtual StatusCode seek(int evt) override
Seek to a given event.
virtual StatusCode executeEvent(EventContext &&ctx) override
implementation of IEventProcessor::executeEvent(void* par)
virtual EventContext createEventContext() override
Create event context.

◆ masterEventLoop()

StatusCode MPIHiveEventLoopMgr::masterEventLoop ( int maxEvt)
protected

Master event loop (runs on master, provides events over MPI)

Master event loop.

Definition at line 62 of file MPIHiveEventLoopMgr.cxx.

62 {
63 ATH_MSG_INFO("Running with " << m_clusterSvc->numRanks() << " ranks");
64 // Determine number of events to process
65 int skipEvts = int(m_firstEventIndex.value());
66 if (m_evtSelector != nullptr) {
67 int evt = size();
68 if (evt == -1) {
69 m_clusterSvc->abort();
70 return StatusCode::FAILURE;
71 }
72 if (maxEvt < 0 || maxEvt > evt) {
73 maxEvt = evt;
74 }
75 ATH_MSG_INFO("Will be processing " << maxEvt << " events");
76 }
77
78 // Setup worker status DB (Spare one at start)
79 std::vector<bool> workers_done(m_clusterSvc->numRanks(), false);
80 workers_done[0] =
81 true; // Set 0 to true because it doesn't correspond to a worker
82 int num_workers_done = 1; // Init to 1 so we can compare to numRanks
83 std::vector<ClusterMessage::WorkerStatus> statuses(m_clusterSvc->numRanks());
84
85 // Entering event loop
86 m_clusterSvc->barrier();
87 // Note: no ++evt. This is because this is really a message loop, and we don't
88 // want to increment evt if we haven't provided an event
89 auto start = Clock::now();
90 for (int evt = skipEvts; evt < skipEvts + maxEvt;) {
91 ClusterMessage msg = m_clusterSvc->waitReceiveMessage();
92 // Messages we can get are RequestEvent, FinalWorkerStatus, or WorkerError
93 if (msg.messageType == ClusterMessageType::RequestEvent) {
94 ATH_MSG_DEBUG("Starting event " << evt << " on " << msg.source);
95 m_clusterSvc->sendMessage(
96 msg.source, ClusterMessage(ClusterMessageType::ProvideEvent, evt));
97 ++evt;
98 continue;
99 }
100
101 if (msg.messageType == ClusterMessageType::WorkerError) {
102 ATH_MSG_ERROR("Received WorkerError message from " << msg.source);
104 workers_done.at(msg.source) =
105 true; // If a worker hits an error, it's done
106 ++num_workers_done;
107 for (int i = 1; i < m_clusterSvc->numRanks(); ++i) {
108 if (!workers_done.at(i)) {
109 // Tell workers that aren't done to emergency stop
110 m_clusterSvc->sendMessage(
111 i, ClusterMessage(ClusterMessageType::EmergencyStop));
112 workers_done[i] = true;
113 ++num_workers_done;
114 }
115 }
116 break;
117 }
118
119 if (msg.messageType == ClusterMessageType::FinalWorkerStatus) {
120 ATH_MSG_INFO("Received FinalWorkerStatus from " << msg.source);
122 workers_done.at(msg.source) = true; // Worker hit end of stream
123 ++num_workers_done;
124 continue;
125 }
126
127 // Other message types are an error
128 ATH_MSG_ERROR("Received unexpected message "
129 << std::format("{}", msg.messageType) << " from "
130 << msg.source);
131 }
132 auto all_provided = Clock::now() - start;
133 ATH_MSG_INFO("Provided all events to workers, waiting for them to complete.");
134 // Event loop done, tell remaining workers
135 while (num_workers_done < m_clusterSvc->numRanks()) {
136 ClusterMessage msg = m_clusterSvc->waitReceiveMessage();
137 // Messages we can get are RequestEvent, FinalWorkerStatus, or WorkerError
138 if (msg.messageType == ClusterMessageType::RequestEvent) {
139 m_clusterSvc->sendMessage(msg.source,
140 ClusterMessage(ClusterMessageType::EventsDone));
141 continue;
142 }
143
144 if (msg.messageType == ClusterMessageType::WorkerError) {
145 ATH_MSG_ERROR("Received WorkerError message from " << msg.source);
147 workers_done.at(msg.source) =
148 true; // If a worker hits an error, it's done
149 ++num_workers_done;
150 for (int i = 1; i < m_clusterSvc->numRanks(); ++i) {
151 if (!workers_done.at(i)) {
152 // Tell workers that aren't done to emergency stop
153 m_clusterSvc->sendMessage(
154 i, ClusterMessage(ClusterMessageType::EmergencyStop));
155 workers_done[i] = true;
156 ++num_workers_done;
157 }
158 }
159 break;
160 }
161
162 if (msg.messageType == ClusterMessageType::FinalWorkerStatus) {
163 ATH_MSG_INFO("Received FinalWorkerStatus from " << msg.source);
165 workers_done.at(msg.source) = true; // Told worker we're done
166 ++num_workers_done;
167 continue;
168 }
169
170 // Other message types are an error
171 ATH_MSG_ERROR("Received unexpected message "
172 << std::format("{}", msg.messageType) << " from "
173 << msg.source);
174 }
175 auto all_done = Clock::now() - start;
176 // Collate status
177 int n_created = 0;
178 int n_skipped = 0;
179 int n_finished = 0;
180
181 StatusCode sc = StatusCode::SUCCESS;
182 int worker_idx = 0;
183 for (const auto& worker_status : statuses) {
184 if (worker_status.status.isFailure() &&
185 worker_status.status != StatusCode(9999)) {
186 sc = worker_status.status;
187 }
188 n_created += worker_status.createdEvents;
189 n_skipped += worker_status.skippedEvents;
190 n_finished += worker_status.finishedEvents;
191
192 if ((worker_idx++) != 0) {
193 ATH_MSG_INFO("Worker " << worker_idx << ": SC " << worker_status.status
194 << ", created " << worker_status.createdEvents
195 << ", skipped " << worker_status.skippedEvents
196 << ", finished " << worker_status.finishedEvents);
197 }
198 }
199
200 ATH_MSG_INFO("Overall: SC " << sc << ", created " << n_created << ", skipped "
201 << n_skipped << ", finished " << n_finished);
202 ATH_MSG_INFO("MASTER: Took " << std::chrono::hh_mm_ss(all_provided)
203 << " to provide all events.");
204 ATH_MSG_INFO("MASTER: Took " << std::chrono::hh_mm_ss(all_done)
205 << " to complete all events.");
206 return sc;
207}
MsgStream & msg() const
The standard message stream.
virtual int size() override
Return the size of the collection.
UnsignedIntegerProperty m_firstEventIndex
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition hcg.cxx:130
list statuses

◆ modifyEventContext()

void AthenaHiveEventLoopMgr::modifyEventContext ( EventContext & ctx,
const EventID & eID,
bool consume_modifier_stream )
virtualinherited

Definition at line 1141 of file AthenaHiveEventLoopMgr.cxx.

1141 {
1142
1143 if(m_evtIdModSvc.isSet()) {
1144 EventID new_eID(eID);
1145 // In Hive EventLoopMgr ctx.evt() gets set to m_nevt and *then* m_nevt is
1146 // incremented later so it's zero-indexed and we don't need to subtract one
1147 m_evtIdModSvc->modify_evtid(new_eID, ctx.evt(), consume_modifier_stream);
1148 if (msgLevel(MSG::DEBUG)) {
1149 unsigned int oldrunnr=eID.run_number();
1150 unsigned int oldLB=eID.lumi_block();
1151 unsigned int oldTS=eID.time_stamp();
1152 unsigned int oldTSno=eID.time_stamp_ns_offset();
1153 ATH_MSG_DEBUG ( "modifyEventContext: use evtIdModSvc runnr=" << oldrunnr << " -> " << new_eID.run_number() );
1154 ATH_MSG_DEBUG ( "modifyEventContext: use evtIdModSvc LB=" << oldLB << " -> " << new_eID.lumi_block() );
1155 ATH_MSG_DEBUG ( "modifyEventContext: use evtIdModSvc TimeStamp=" << oldTS << " -> " << new_eID.time_stamp() );
1156 ATH_MSG_DEBUG ( "modifyEventContext: use evtIdModSvc TimeStamp ns Offset=" << oldTSno << " -> " << new_eID.time_stamp_ns_offset() );
1157 }
1158 ctx.setEventID( new_eID );
1159 Atlas::getExtendedEventContext(ctx).setConditionsRun( ctx.eventID().run_number() );
1160 return;
1161 }
1162
1163 ctx.setEventID( eID );
1164}

◆ msg() [1/2]

MsgStream & AthMessaging::msg ( ) const
inlineinherited

The standard message stream.

Returns a reference to the default message stream May not be invoked before sysInitialize() has been invoked.

Definition at line 92 of file AthMessaging.h.

164{
165 MsgStream* ms = m_msg_tls.get();
166 if (!ms) {
167 if (!m_initialized.test_and_set()) initMessaging();
168 ms = new MsgStream(m_imsg,m_nm);
169 m_msg_tls.reset( ms );
170 }
171
172 ms->setLevel (m_lvl);
173 return *ms;
174}
boost::thread_specific_ptr< MsgStream > m_msg_tls
MsgStream instance (a std::cout like with print-out levels)
void initMessaging() const
Initialize our message level and MessageSvc.

◆ msg() [2/2]

MsgStream & AthMessaging::msg ( const MSG::Level lvl) const
inlineinherited

The standard message stream.

Returns a reference to the default message stream May not be invoked before sysInitialize() has been invoked.

Definition at line 99 of file AthMessaging.h.

179{ return msg() << lvl; }
MsgStream & msg() const
The standard message stream.

◆ msgLvl()

bool AthMessaging::msgLvl ( const MSG::Level lvl) const
inlineinherited

Test the output level.

Parameters
lvlThe message level to test against
Returns
boolean Indicating if messages at given level will be printed
Return values
trueMessages at level "lvl" will be printed

Definition at line 86 of file AthMessaging.h.

152{
153 if (m_lvl <= lvl) {
154 msg() << lvl;
155 return true;
156 } else {
157 return false;
158 }
159}

◆ name()

virtual const std::string & AthenaHiveEventLoopMgr::name ( ) const
inlineoverridevirtualinherited

Definition at line 228 of file AthenaHiveEventLoopMgr.h.

228{ return Service::name(); } //FIXME

◆ nextEvent()

StatusCode MPIHiveEventLoopMgr::nextEvent ( int maxevt)
overridevirtual

implementation of IAppMgrUI::nextEvent. maxevt==0 returns immediately

nextEvent Dispatches to either masterEventLoop or workerEventLoop

Reimplemented from AthenaHiveEventLoopMgr.

Definition at line 44 of file MPIHiveEventLoopMgr.cxx.

44 {
45 // make nextEvent(0) a dummy call
46 if (0 == maxevt) {
47 return StatusCode::SUCCESS;
48 }
49
50 // Reset the application return code.
51 Gaudi::setAppReturnCode(m_appMgrProperty, Gaudi::ReturnCode::Success, true)
52 .ignore();
53 ATH_MSG_INFO("Starting loop on events");
54
55 if (m_clusterSvc->rank() == 0) {
56 return masterEventLoop(maxevt);
57 }
58 return workerEventLoop();
59}
SmartIF< IProperty > m_appMgrProperty
Property interface of ApplicationMgr.
StatusCode workerEventLoop()
Worker event loop (runs on worker, requests events over MPI)
StatusCode masterEventLoop(int maxEvt)
Master event loop (runs on master, provides events over MPI)

◆ resetTimeout()

void Athena::TimeoutMaster::resetTimeout ( Timeout & instance)
inlineprotectedinherited

Reset timeout.

Definition at line 83 of file Timeout.h.

83{ instance.reset(); }
std::map< std::string, double > instance

◆ seek()

StatusCode AthenaHiveEventLoopMgr::seek ( int evt)
overridevirtualinherited

Seek to a given event.

Definition at line 793 of file AthenaHiveEventLoopMgr.cxx.

794{
795 IEvtSelectorSeek* is = dynamic_cast<IEvtSelectorSeek*> (m_evtSelector);
796 if (is == 0) {
797 ATH_MSG_ERROR ( "Seek failed; unsupported by event selector" );
798 return StatusCode::FAILURE;
799 }
800 //cppcheck-suppress nullPointerRedundantCheck
801 if (!m_evtContext) {
802 if (m_evtSelector->createContext(m_evtContext).isFailure()) {
803 ATH_MSG_FATAL ( "Can not create the event selector Context." );
804 return StatusCode::FAILURE;
805 }
806 }
807 //m_evtContext cannot be null if createContext succeeded
808 //cppcheck-suppress nullPointerRedundantCheck
809 StatusCode sc = is->seek (*m_evtContext, evt);
810 if (sc.isSuccess()) {
811 m_incidentSvc->fireIncident(ContextIncident<std::tuple<int, int>>(
812 name(), "SkipEvents", std::tuple<int, int>(m_nevt, evt)));
813 m_nevt = evt;
814 }
815 else {
816 ATH_MSG_ERROR ( "Seek failed." );
817 }
818 return sc;
819}
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.

◆ setLevel()

void AthMessaging::setLevel ( MSG::Level lvl)
inherited

Change the current logging level.

Use this rather than msg().setLevel() for proper operation with MT.

Definition at line 28 of file AthMessaging.cxx.

29{
30 m_lvl = lvl;
31}

◆ setTimeout()

void Athena::TimeoutMaster::setTimeout ( Timeout & instance)
inlineprotectedinherited

Set timeout.

Definition at line 80 of file Timeout.h.

80{ instance.set(); }

◆ setupPreSelectTools()

void AthenaHiveEventLoopMgr::setupPreSelectTools ( Gaudi::Details::PropertyBase & )
protectedinherited

property update handler:sets up the Pre-selection tools

Definition at line 327 of file AthenaHiveEventLoopMgr.cxx.

327 {
328
329 m_toolInvoke.clear();
330 m_toolReject.clear();
331 m_toolAccept.clear();
332
333 m_tools.retrieve().ignore();
334 if(m_tools.size() > 0) {
335 m_useTools=true;
336 m_toolInvoke.resize(m_tools.size());
337 m_toolReject.resize(m_tools.size());
338 m_toolAccept.resize(m_tools.size());
339
340 tool_iterator firstTool = m_tools.begin();
341 tool_iterator lastTool = m_tools.end();
342 unsigned int toolCtr = 0;
343 for ( ; firstTool != lastTool; ++firstTool )
344 {
345 // reset statistics
346 m_toolInvoke[toolCtr] = 0;
347 m_toolReject[toolCtr] = 0;
348 m_toolAccept[toolCtr] = 0;
349 toolCtr++;
350 }
351 }
352
353 return;
354
355}
tool_store::const_iterator tool_iterator

◆ size()

int AthenaHiveEventLoopMgr::size ( )
overridevirtualinherited

Return the size of the collection.

Definition at line 833 of file AthenaHiveEventLoopMgr.cxx.

834{
835 IEvtSelectorSeek* cs = dynamic_cast<IEvtSelectorSeek*> (m_evtSelector);
836 if (cs == 0) {
837 ATH_MSG_ERROR ( "Collection size unsupported by event selector" );
838 return -1;
839 }
840 //cppcheck-suppress nullPointerRedundantCheck
841 if (!m_evtContext) {
842 if (m_evtSelector->createContext(m_evtContext).isFailure()) {
843 ATH_MSG_FATAL ( "Can not create the event selector Context." );
844 return -1;
845 }
846 }
847 //m_evtContext cannot be null if createContext succeeded
848 //cppcheck-suppress nullPointerRedundantCheck
849 return cs->size (*m_evtContext);
850}
virtual int size(IEvtSelector::Context &c) const =0
Return the size of the collection, or -1 if we can't get the size.

◆ stop()

StatusCode AthenaHiveEventLoopMgr::stop ( )
overridevirtualinherited

implementation of IService::stop

Definition at line 674 of file AthenaHiveEventLoopMgr.cxx.

675{
676 // To enable conditions access during stop we set an invalid EventContext
677 // (no event/slot number) but with valid EventID (and extended EventContext).
678 m_lastEventContext.setValid(false);
679 Gaudi::Hive::setCurrentContext( m_lastEventContext );
680
681 StatusCode sc = MinimalEventLoopMgr::stop();
682
683 // If we exit the event loop early due to an error, some event stores
684 // may not have been cleared. This can lead to segfaults later,
685 // as DetectorStore will usually get finalized before HiveSvcMgr.
686 // So make sure that all stores have been cleared at this point.
687 size_t nslot = m_whiteboard->getNumberOfStores();
688 for (size_t islot = 0; islot < nslot; islot++) {
689 sc &= clearWBSlot (islot);
690 }
691
692 Gaudi::Hive::setCurrentContext( EventContext() );
693 return sc;
694}

◆ stopRun()

StatusCode AthenaHiveEventLoopMgr::stopRun ( )
overridevirtualinherited

implementation of IEventProcessor::stopRun()

Definition at line 659 of file AthenaHiveEventLoopMgr.cxx.

659 {
660 // Set the application return code
661 SmartIF<IProperty> appmgr(serviceLocator());
662 if(Gaudi::setAppReturnCode(appmgr, Gaudi::ReturnCode::ScheduledStop, true).isFailure()) {
663 ATH_MSG_ERROR ( "Could not set return code of the application ("
664 << Gaudi::ReturnCode::ScheduledStop << ")" );
665 }
666 m_scheduledStop = true;
667 return StatusCode::SUCCESS;
668}

◆ workerEventLoop()

StatusCode MPIHiveEventLoopMgr::workerEventLoop ( )
protected

Worker event loop (runs on worker, requests events over MPI)

Worker event loop.

Definition at line 210 of file MPIHiveEventLoopMgr.cxx.

210 {
211 bool end_of_stream = false;
212 // barrier so all ranks enter message loop together
213 m_clusterSvc->barrier();
214 auto start = Clock::now();
216 while (true) {
217 // Drain the scheduler (wait for at least one event to complete, then free
218 // up completed slots) in two circumstances
219 // 1. Have created exactly one event, so the first event runs to completion
220 // before any more are scheduled
221 // 2. There are no free slots left
222 bool haveFreeSlots =
223 m_schedulerSvc->freeSlots() > 0 && m_whiteboard->freeSlots() > 0;
224 if (!haveFreeSlots || m_nLocalCreatedEvts == 1) {
226 if (sc.isFailure()) {
227 ClusterMessage::WorkerStatus status{};
228 status.status = sc;
229 status.createdEvents = m_nLocalCreatedEvts;
230 status.skippedEvents = m_nLocalSkippedEvts;
231 status.finishedEvents = m_nLocalFinishedEvts;
232 m_clusterSvc->sendMessage(
233 0, ClusterMessage(ClusterMessageType::WorkerError, status));
234 return sc;
235 }
236 }
237
238 auto start_time = Clock::now();
239 m_clusterSvc->sendMessage(0,
240 ClusterMessage(ClusterMessageType::RequestEvent));
241 ClusterMessage msg = m_clusterSvc->waitReceiveMessage();
242 auto request_time = Clock::now() - start_time;
243 if (msg.messageType == ClusterMessageType::EmergencyStop) {
244 // Emergency stop, return FAILURE after fully draining the scheduler to prevent segfault
245 std::size_t numSlots = m_whiteboard->getNumberOfStores();
246 while (m_schedulerSvc->freeSlots() < numSlots) {
247 // Ignore StatusCode, going to return FAILURE anyway
248 (void)(drainLocalScheduler());
249 }
250 ATH_MSG_ERROR("Received EmergencyStop message!");
251 return StatusCode::FAILURE;
252 }
253
254 if (msg.messageType == ClusterMessageType::EventsDone) {
255 auto loop_time = Clock::now() - start;
256 ATH_MSG_INFO("Worker " << m_clusterSvc->rank() << " DONE. Loop took "
257 << std::chrono::hh_mm_ss(loop_time)
258 << " to process " << m_nLocalCreatedEvts
259 << " events.");
260 // Been told we've reached end
261 // Provide status to master
262 ClusterMessage::WorkerStatus status{};
263 // At end of stream, we need to *fully* drain the scheduler
264 StatusCode sc = StatusCode::SUCCESS;
265 std::size_t numSlots = m_whiteboard->getNumberOfStores();
266 while (sc.isSuccess() && m_schedulerSvc->freeSlots() < numSlots) {
268 }
269 status.status = sc;
270 status.createdEvents = m_nLocalCreatedEvts;
271 status.skippedEvents = m_nLocalSkippedEvts;
272 status.finishedEvents = m_nLocalFinishedEvts;
273 m_clusterSvc->sendMessage(
274 0, ClusterMessage(ClusterMessageType::FinalWorkerStatus, status));
275 return sc;
276 }
277
278 // Any other message other than ProvideEvent would now be an error
279 if (msg.messageType != ClusterMessageType::ProvideEvent ||
280 msg.source != 0) {
281 ATH_MSG_ERROR("Received unexpected message "
282 << std::format("{}", msg.messageType) << " from "
283 << msg.source);
284 return StatusCode::FAILURE;
285 }
286
287 int evt = get<int>(msg.payload);
288 ATH_MSG_INFO("Starting event " << evt);
290 evt, end_of_stream,
291 std::chrono::duration_cast<std::chrono::nanoseconds>(request_time)
292 .count());
293 if (sc.isFailure() && !sc.isRecoverable()) {
294 ClusterMessage::WorkerStatus status{};
295 status.status = sc;
296 status.createdEvents = m_nLocalCreatedEvts;
297 status.skippedEvents = m_nLocalSkippedEvts;
298 status.finishedEvents = m_nLocalFinishedEvts;
299 m_clusterSvc->sendMessage(
300 0, ClusterMessage(ClusterMessageType::WorkerError, status));
301 return sc;
302 }
303 if (end_of_stream || m_terminateLoop) {
304 auto loop_time = Clock::now() - start;
305 ATH_MSG_INFO("Worker " << m_clusterSvc->rank() << " DONE. Loop took "
306 << std::chrono::hh_mm_ss(loop_time)
307 << " to process " << m_nLocalCreatedEvts
308 << " events.");
309 // reached end of stream, drain scheduler
310 ClusterMessage::WorkerStatus status{};
311 // At end of stream, we need to *fully* drain the scheduler
312 StatusCode sc = StatusCode::SUCCESS;
313 std::size_t numSlots = m_whiteboard->getNumberOfStores();
314 while (sc.isSuccess() && m_schedulerSvc->freeSlots() < numSlots) {
316 }
317 status.status = sc;
318 status.createdEvents = m_nLocalCreatedEvts;
319 status.skippedEvents = m_nLocalSkippedEvts;
320 status.finishedEvents = m_nLocalFinishedEvts;
321 m_clusterSvc->sendMessage(
322 0, ClusterMessage(ClusterMessageType::FinalWorkerStatus, status));
323 return sc;
324 }
325 }
326}
StatusCode drainLocalScheduler()
Drain the local scheduler of any (at least one) completed events.
StatusCode insertEvent(int eventIdx, bool &endOfStream, std::int64_t requestTime_ns)
Insert an event into the local scheduler.
int count(std::string s, const std::string &regx)
count how many occurances of a regx are in a string
Definition hcg.cxx:146
status
Definition merge.py:16

◆ writeHistograms()

StatusCode AthenaHiveEventLoopMgr::writeHistograms ( bool force = false)
protectedvirtualinherited

Dump out histograms as needed.

Definition at line 428 of file AthenaHiveEventLoopMgr.cxx.

428 {
429
430 StatusCode sc (StatusCode::SUCCESS);
431
432 if ( 0 != m_histoPersSvc && m_writeHists ) {
433 std::vector<DataObject*> objects;
434 sc = m_histoDataMgrSvc->traverseTree( [&objects]( IRegistry* reg, int ) {
435 DataObject* obj = reg->object();
436 if ( !obj || obj->clID() == CLID_StatisticsFile ) return false;
437 objects.push_back( obj );
438 return true;
439 } );
440
441 if ( !sc.isSuccess() ) {
442 ATH_MSG_ERROR ( "Error while traversing Histogram data store" );
443 return sc;
444 }
445
446 if ( objects.size() > 0) {
447 int writeInterval(m_writeInterval.value());
448
449 if ( m_nevt == 1 || force ||
450 (writeInterval != 0 && m_nevt%writeInterval == 0) ) {
451
452 // skip /stat entry!
453 sc = std::accumulate( begin( objects ), end( objects ), sc, [&]( StatusCode isc, auto& i ) {
454 IOpaqueAddress* pAddr = nullptr;
455 StatusCode iret = m_histoPersSvc->createRep( i, pAddr );
456 if ( iret.isFailure() ) return iret;
457 i->registry()->setAddress( pAddr );
458 return isc;
459 } );
460 sc = std::accumulate( begin( objects ), end( objects ), sc, [&]( StatusCode isc, auto& i ) {
461 IRegistry* reg = i->registry();
462 StatusCode iret = m_histoPersSvc->fillRepRefs( reg->address(), i );
463 return iret.isFailure() ? iret : isc;
464 } );
465 if ( ! sc.isSuccess() ) {
466 ATH_MSG_ERROR ( "Error while saving Histograms." );
467 }
468 }
469
470 if (force || (writeInterval != 0 && m_nevt%writeInterval == 0) ) {
471 ATH_MSG_DEBUG ( "committing Histograms" );
472 m_histoPersSvc->conversionSvc()->commitOutput("",true).ignore();
473 }
474 }
475
476 }
477
478 return sc;
479}
UnsignedIntegerProperty m_writeInterval
IDataManagerSvc_t m_histoDataMgrSvc
Reference to the Histogram Data Service.

Member Data Documentation

◆ ATLAS_THREAD_SAFE

std::atomic_flag m_initialized AthMessaging::ATLAS_THREAD_SAFE = ATOMIC_FLAG_INIT
mutableprivateinherited

Messaging initialized (initMessaging)

Definition at line 141 of file AthMessaging.h.

◆ m_abortEventListener

SmartIF< IIncidentListener > AthenaHiveEventLoopMgr::m_abortEventListener
protectedinherited

Instance of the incident listener waiting for AbortEvent.

Definition at line 176 of file AthenaHiveEventLoopMgr.h.

◆ m_aess

SmartIF<IAlgExecStateSvc> AthenaHiveEventLoopMgr::m_aess
protectedinherited

Reference to the Algorithm Execution State Svc.

Definition at line 160 of file AthenaHiveEventLoopMgr.h.

◆ m_algResourcePool

SmartIF<IAlgResourcePool> AthenaHiveEventLoopMgr::m_algResourcePool
protectedinherited

Reference to the Algorithm resource pool.

Definition at line 157 of file AthenaHiveEventLoopMgr.h.

◆ m_appMgrProperty

SmartIF<IProperty> AthenaHiveEventLoopMgr::m_appMgrProperty
protectedinherited

Property interface of ApplicationMgr.

Definition at line 163 of file AthenaHiveEventLoopMgr.h.

◆ m_clusterSvc

ServiceHandle<IMPIClusterSvc> MPIHiveEventLoopMgr::m_clusterSvc
protected
Initial value:
{this, "MPIClusterSvc", "",
"MPIClusterSvc"}

Reference to the MPIClusterSvc.

Definition at line 38 of file MPIHiveEventLoopMgr.h.

38 {this, "MPIClusterSvc", "",
39 "MPIClusterSvc"};

◆ m_conditionsCleaner

ServiceHandle<Athena::IConditionsCleanerSvc> AthenaHiveEventLoopMgr::m_conditionsCleaner
privateinherited

Definition at line 272 of file AthenaHiveEventLoopMgr.h.

◆ m_contiguousFailedEvts

int MPIHiveEventLoopMgr::m_contiguousFailedEvts {0}
protected

Definition at line 52 of file MPIHiveEventLoopMgr.h.

52{0};

◆ m_currentRun

number_type AthenaHiveEventLoopMgr::m_currentRun
protectedinherited

current run number

Definition at line 109 of file AthenaHiveEventLoopMgr.h.

◆ m_doEvtHeartbeat

bool AthenaHiveEventLoopMgr::m_doEvtHeartbeat
protectedinherited

Definition at line 251 of file AthenaHiveEventLoopMgr.h.

◆ m_eventPrintoutInterval

UnsignedIntegerProperty AthenaHiveEventLoopMgr::m_eventPrintoutInterval
protectedinherited

Definition at line 116 of file AthenaHiveEventLoopMgr.h.

◆ m_eventStore

StoreGateSvc_t AthenaHiveEventLoopMgr::m_eventStore
protectedinherited

Reference to StoreGateSvc;.

Property

Definition at line 83 of file AthenaHiveEventLoopMgr.h.

◆ m_evtContext

EvtContext* AthenaHiveEventLoopMgr::m_evtContext
protectedinherited

Gaudi event selector Context (may be used as a cursor by the evt selector)

Definition at line 88 of file AthenaHiveEventLoopMgr.h.

◆ m_evtIdModSvc

IEvtIdModifierSvc_t AthenaHiveEventLoopMgr::m_evtIdModSvc
protectedinherited

Definition at line 102 of file AthenaHiveEventLoopMgr.h.

◆ m_evtsel

StringProperty AthenaHiveEventLoopMgr::m_evtsel
protectedinherited

Definition at line 90 of file AthenaHiveEventLoopMgr.h.

◆ m_evtSelector

IEvtSelector* AthenaHiveEventLoopMgr::m_evtSelector
protectedinherited

Reference to the Event Selector.

Definition at line 86 of file AthenaHiveEventLoopMgr.h.

◆ m_evtSelectorCurrentPos

int MPIHiveEventLoopMgr::m_evtSelectorCurrentPos = 0
private

Definition at line 75 of file MPIHiveEventLoopMgr.h.

◆ m_failureMode

IntegerProperty AthenaHiveEventLoopMgr::m_failureMode
protectedinherited

Definition at line 113 of file AthenaHiveEventLoopMgr.h.

◆ m_firstEventAlone

bool AthenaHiveEventLoopMgr::m_firstEventAlone
privateinherited

Definition at line 258 of file AthenaHiveEventLoopMgr.h.

◆ m_firstEventIndex

UnsignedIntegerProperty MPIHiveEventLoopMgr::m_firstEventIndex
private
Initial value:
{
this, "FirstEventIndex", 0, "First event index (Exec.SkipEvents)"}

Definition at line 73 of file MPIHiveEventLoopMgr.h.

73 {
74 this, "FirstEventIndex", 0, "First event index (Exec.SkipEvents)"};

◆ m_firstRun

bool AthenaHiveEventLoopMgr::m_firstRun
protectedinherited

Definition at line 110 of file AthenaHiveEventLoopMgr.h.

◆ m_flmbi

unsigned int AthenaHiveEventLoopMgr::m_flmbi
privateinherited

Definition at line 260 of file AthenaHiveEventLoopMgr.h.

◆ m_histoDataMgrSvc

IDataManagerSvc_t AthenaHiveEventLoopMgr::m_histoDataMgrSvc
protectedinherited

Reference to the Histogram Data Service.

Definition at line 94 of file AthenaHiveEventLoopMgr.h.

◆ m_histoPersSvc

IConversionSvc_t AthenaHiveEventLoopMgr::m_histoPersSvc
protectedinherited

Definition at line 98 of file AthenaHiveEventLoopMgr.h.

◆ m_histPersName

StringProperty AthenaHiveEventLoopMgr::m_histPersName
protectedinherited

Definition at line 105 of file AthenaHiveEventLoopMgr.h.

◆ m_imsg

std::atomic<IMessageSvc*> AthMessaging::m_imsg { nullptr }
mutableprivateinherited

MessageSvc pointer.

Definition at line 135 of file AthMessaging.h.

135{ nullptr };

◆ m_incidentSvc

IIncidentSvc_t AthenaHiveEventLoopMgr::m_incidentSvc
protectedinherited

Reference to the incident service.

Definition at line 79 of file AthenaHiveEventLoopMgr.h.

◆ m_lastEventContext

EventContext AthenaHiveEventLoopMgr::m_lastEventContext
protectedinherited

Definition at line 255 of file AthenaHiveEventLoopMgr.h.

◆ m_lvl

std::atomic<MSG::Level> AthMessaging::m_lvl { MSG::NIL }
mutableprivateinherited

Current logging level.

Definition at line 138 of file AthMessaging.h.

138{ MSG::NIL };

◆ m_msg_tls

boost::thread_specific_ptr<MsgStream> AthMessaging::m_msg_tls
mutableprivateinherited

MsgStream instance (a std::cout like with print-out levels)

Definition at line 132 of file AthMessaging.h.

◆ m_nev

unsigned int AthenaHiveEventLoopMgr::m_nev
protectedinherited

events processed

Definition at line 248 of file AthenaHiveEventLoopMgr.h.

◆ m_nevt

unsigned int AthenaHiveEventLoopMgr::m_nevt
privateinherited

Definition at line 237 of file AthenaHiveEventLoopMgr.h.

◆ m_nLocalCreatedEvts

int MPIHiveEventLoopMgr::m_nLocalCreatedEvts {0}
protected

Definition at line 55 of file MPIHiveEventLoopMgr.h.

55{0};

◆ m_nLocalFinishedEvts

int MPIHiveEventLoopMgr::m_nLocalFinishedEvts {0}
protected

Definition at line 57 of file MPIHiveEventLoopMgr.h.

57{0};

◆ m_nLocalSkippedEvts

int MPIHiveEventLoopMgr::m_nLocalSkippedEvts {0}
protected

Definition at line 56 of file MPIHiveEventLoopMgr.h.

56{0};

◆ m_nm

std::string AthMessaging::m_nm
privateinherited

Message source name.

Definition at line 129 of file AthMessaging.h.

◆ m_proc

unsigned int AthenaHiveEventLoopMgr::m_proc
protectedinherited

Definition at line 249 of file AthenaHiveEventLoopMgr.h.

◆ m_requireInputAttributeList

bool AthenaHiveEventLoopMgr::m_requireInputAttributeList {}
protectedinherited

require input attribute list

Definition at line 134 of file AthenaHiveEventLoopMgr.h.

134{};

◆ m_scheduledStop

bool AthenaHiveEventLoopMgr::m_scheduledStop
protectedinherited

Scheduled stop of event processing.

Definition at line 182 of file AthenaHiveEventLoopMgr.h.

◆ m_schedulerName

std::string AthenaHiveEventLoopMgr::m_schedulerName
protectedinherited

Name of the scheduler to be used.

Definition at line 178 of file AthenaHiveEventLoopMgr.h.

◆ m_schedulerSvc

SmartIF<IScheduler> AthenaHiveEventLoopMgr::m_schedulerSvc
protectedinherited

A shortcut for the scheduler.

Definition at line 166 of file AthenaHiveEventLoopMgr.h.

◆ m_terminateLoop

bool AthenaHiveEventLoopMgr::m_terminateLoop { false }
protectedinherited

Definition at line 246 of file AthenaHiveEventLoopMgr.h.

246{ false };

◆ m_timeStamp

unsigned int AthenaHiveEventLoopMgr::m_timeStamp { 0 }
privateinherited

Definition at line 238 of file AthenaHiveEventLoopMgr.h.

238{ 0 };

◆ m_timeStampInt

unsigned int AthenaHiveEventLoopMgr::m_timeStampInt
privateinherited

Definition at line 260 of file AthenaHiveEventLoopMgr.h.

◆ m_toolAccept

tool_stats AthenaHiveEventLoopMgr::m_toolAccept
protectedinherited

tool returns StatusCode::SUCCESS counter

Definition at line 127 of file AthenaHiveEventLoopMgr.h.

◆ m_toolInvoke

tool_stats AthenaHiveEventLoopMgr::m_toolInvoke
protectedinherited

tool called counter

Definition at line 125 of file AthenaHiveEventLoopMgr.h.

◆ m_toolReject

tool_stats AthenaHiveEventLoopMgr::m_toolReject
protectedinherited

tool returns StatusCode::FAILURE counter

Definition at line 126 of file AthenaHiveEventLoopMgr.h.

◆ m_tools

tool_store AthenaHiveEventLoopMgr::m_tools
protectedinherited

internal tool store

Definition at line 128 of file AthenaHiveEventLoopMgr.h.

◆ m_totalFailedEvts

int MPIHiveEventLoopMgr::m_totalFailedEvts {0}
protected

Definition at line 53 of file MPIHiveEventLoopMgr.h.

53{0};

◆ m_useSecondaryEventNumber

bool AthenaHiveEventLoopMgr::m_useSecondaryEventNumber {}
protectedinherited

read event number from secondary input

Definition at line 137 of file AthenaHiveEventLoopMgr.h.

137{};

◆ m_useTools

bool AthenaHiveEventLoopMgr::m_useTools
protectedinherited

Definition at line 250 of file AthenaHiveEventLoopMgr.h.

◆ m_whiteboard

SmartIF<IHiveWhiteBoard> AthenaHiveEventLoopMgr::m_whiteboard
protectedinherited

Reference to the Whiteboard interface.

Definition at line 154 of file AthenaHiveEventLoopMgr.h.

◆ m_whiteboardName

std::string AthenaHiveEventLoopMgr::m_whiteboardName
protectedinherited

Name of the Whiteboard to be used.

Definition at line 180 of file AthenaHiveEventLoopMgr.h.

◆ m_writeHists

bool AthenaHiveEventLoopMgr::m_writeHists
privateinherited

Definition at line 241 of file AthenaHiveEventLoopMgr.h.

◆ m_writeInterval

UnsignedIntegerProperty AthenaHiveEventLoopMgr::m_writeInterval
privateinherited

Definition at line 240 of file AthenaHiveEventLoopMgr.h.


The documentation for this class was generated from the following files: