 |
ATLAS Offline Software
|
The MPI event loop manager.
More...
#include <MPIHiveEventLoopMgr.h>
|
typedef IEvtSelector::Context | EvtContext |
|
typedef std::list< SmartIF< IAlgorithm > > | ListAlg |
|
The MPI event loop manager.
As with AthenaHiveEventLoopMgr but in a multi-node MPI environment. This class is derived from and implemented in terms of AthenaHiveEventLoopMgr.
Definition at line 34 of file MPIHiveEventLoopMgr.h.
◆ EvtContext
◆ IConversionSvc_t
◆ IDataManagerSvc_t
◆ IEvtIdModifierSvc_t
◆ IIncidentSvc_t
◆ ListAlg
◆ number_type
◆ StoreGateSvc_t
◆ tool_iterator
◆ tool_stats
◆ tool_stats_iterator
◆ tool_store
◆ tool_type
◆ MPIHiveEventLoopMgr()
MPIHiveEventLoopMgr::MPIHiveEventLoopMgr |
( |
const std::string & |
name, |
|
|
ISvcLocator * |
svcLoc |
|
) |
| |
◆ ~MPIHiveEventLoopMgr()
MPIHiveEventLoopMgr::~MPIHiveEventLoopMgr |
( |
| ) |
|
|
virtualdefault |
◆ clearWBSlot()
StatusCode AthenaHiveEventLoopMgr::clearWBSlot |
( |
int |
evtSlot | ) |
|
|
protectedinherited |
◆ createEventContext()
EventContext AthenaHiveEventLoopMgr::createEventContext |
( |
| ) |
|
|
overrideprotectedvirtualinherited |
Create event context.
Definition at line 1167 of file AthenaHiveEventLoopMgr.cxx.
1172 if (
sc.isFailure()) {
1174 <<
" could not be selected for the WhiteBoard" );
1175 return EventContext{};
1180 ATH_MSG_DEBUG (
"created EventContext, num: " << ctx.evt() <<
" in slot: "
◆ curEvent()
int AthenaHiveEventLoopMgr::curEvent |
( |
| ) |
const |
|
overridevirtualinherited |
◆ declareEventRootAddress()
int AthenaHiveEventLoopMgr::declareEventRootAddress |
( |
EventContext & |
ctx | ) |
|
|
protectedinherited |
Declare the root address of the event.
FIXME ???
Definition at line 952 of file AthenaHiveEventLoopMgr.cxx.
964 std::unique_ptr<const EventInfo> pEvent{};
971 IOpaqueAddress* addr{};
975 if ( !
sc.isSuccess() ) {
991 if( !
sc.isSuccess() ) {
996 }
if ((
sc=
eventStore()->loadEventProxies()).isFailure()) {
1001 bool consume_modifier_stream =
false;
1005 if ( pAttrList !=
nullptr && pAttrList->size() > 6 ) {
1007 unsigned int runNumber = (*pAttrList)[
"RunNumber"].data<
unsigned int>();
1008 unsigned long long eventNumber = (*pAttrList)[
"EventNumber"].data<
unsigned long long>();
1009 unsigned int eventTime = (*pAttrList)[
"EventTime"].data<
unsigned int>();
1010 unsigned int eventTimeNS = (*pAttrList)[
"EventTimeNanoSec"].data<
unsigned int>();
1011 unsigned int lumiBlock = (*pAttrList)[
"LumiBlockN"].data<
unsigned int>();
1012 unsigned int bunchId = (*pAttrList)[
"BunchId"].data<
unsigned int>();
1015 consume_modifier_stream =
true;
1018 unsigned long long eventNumberSecondary{};
1019 if ( !(pAttrList->exists(
"hasSecondaryInput") && (*pAttrList)[
"hasSecondaryInput"].data<
bool>()) ) {
1020 ATH_MSG_FATAL (
"Secondary EventNumber requested, but secondary input does not exist!" );
1023 if ( pAttrList->exists(
"EventNumber_secondary") ) {
1024 eventNumberSecondary = (*pAttrList)[
"EventNumber_secondary"].data<
unsigned long long>();
1030 if (pEventSecondary) {
1031 eventNumberSecondary = pEventSecondary->
event_ID()->event_number();
1034 ATH_MSG_FATAL (
"Secondary EventNumber requested, but it does not exist!" );
1038 if (eventNumberSecondary != 0) {
1042 ATH_MSG_INFO (
" ===>>> using secondary event #" << eventNumberSecondary <<
" instead of #" <<
eventNumber <<
" <<<===" );
1048 pEvent = std::make_unique<EventInfo>(
1055 ATH_MSG_FATAL (
"Valid input attribute list required but not present!" );
1059 const EventInfo* pEventObserver{pEvent.get()};
1060 if (!pEventObserver) {
1065 if (pEventObserver) {
1066 consume_modifier_stream =
false;
1073 if( !
sc.isSuccess() ) {
1077 consume_modifier_stream =
true;
1078 ATH_MSG_DEBUG (
"use xAOD::EventInfo with runNumber=" << pXEvent->runNumber() );
1080 pEvent = std::make_unique<EventInfo>(
1083 pEventObserver = pEvent.get();
1085 if( !
sc.isSuccess() ) {
1093 consume_modifier_stream);
1100 unsigned int runNmb{1}, evtNmb{
m_nevt + 1};
1107 auto eid = std::make_unique<EventID> (runNmb,evtNmb,
m_timeStamp);
1109 eid->set_lumi_block( runNmb );
1113 pEvent = std::make_unique<EventInfo>(std::move(eid),
1114 std::make_unique<EventType>());
1116 bool consume_modifier_stream =
true;
1126 ATH_MSG_DEBUG (
"recording EventInfo " << *pEvent->event_ID() <<
" in "
1130 if( !
sc.isSuccess() ) {
◆ drainLocalScheduler()
StatusCode MPIHiveEventLoopMgr::drainLocalScheduler |
( |
| ) |
|
|
protected |
Drain the local scheduler of any (at least one) completed events.
drainLocalScheduler Drain the local scheduler on this MPI rank
Definition at line 355 of file MPIHiveEventLoopMgr.cxx.
360 std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
362 EventContext* finishedEvtContext(
nullptr);
366 <<
"] Waiting for a context");
370 if (
sc.isSuccess()) {
371 ATH_MSG_DEBUG(
"drainLocalScheduler: scheduler not empty: Context "
372 << finishedEvtContext);
373 finishedEvtContexts.emplace_back(finishedEvtContext);
377 return StatusCode::SUCCESS;
381 while (
m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()) {
382 finishedEvtContexts.emplace_back(finishedEvtContext);
387 for (
auto& thisFinishedEvtContext : finishedEvtContexts) {
388 if (!thisFinishedEvtContext) {
390 fail = StatusCode::FAILURE;
396 thisFinishedEvtContext->eventID().run_number(),
397 thisFinishedEvtContext->eventID().event_number(),
398 m_aess->eventStatus(*thisFinishedEvtContext));
400 if (
m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
402 << thisFinishedEvtContext <<
" w/ fail mode: "
403 <<
m_aess->eventStatus(*thisFinishedEvtContext));
404 thisFinishedEvtContext.reset();
405 fail = StatusCode::FAILURE;
412 if (
m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
413 n_run = thisFinishedEvtContext->eventID().run_number();
414 n_evt = thisFinishedEvtContext->eventID().event_number();
417 << thisFinishedEvtContext->slot());
418 thisFinishedEvtContext.reset();
419 fail = StatusCode::FAILURE;
425 Gaudi::Hive::setCurrentContext(*thisFinishedEvtContext);
427 Incident(
name(), IncidentType::EndProcessing, *thisFinishedEvtContext));
430 << thisFinishedEvtContext->slot() <<
" (event "
431 << thisFinishedEvtContext->evt() <<
") of the whiteboard");
434 if (!
sc.isSuccess()) {
435 ATH_MSG_ERROR(
"Whiteboard slot " << thisFinishedEvtContext->slot()
436 <<
" could not be properly cleared");
437 if (
fail != StatusCode::FAILURE) {
440 thisFinishedEvtContext.reset();
452 << n_evt <<
", run #" << n_run <<
" on slot "
453 << thisFinishedEvtContext->slot() <<
", " <<
m_proc
454 <<
" events processed so far <<<===");
457 << n_evt <<
", run #" << n_run <<
" on slot "
458 << thisFinishedEvtContext->slot() <<
", " <<
m_nev
459 <<
" events read and " <<
m_proc
460 <<
" events processed so far <<<===");
462 std::ofstream
outfile(
"eventLoopHeartBeat.txt");
465 fail = StatusCode::FAILURE;
466 thisFinishedEvtContext.reset();
469 outfile <<
" done processing event #" << n_evt <<
", run #" << n_run
470 <<
" " <<
m_nev <<
" events read so far <<<===" << std::endl;
475 << thisFinishedEvtContext);
477 thisFinishedEvtContext.reset();
◆ drainScheduler()
int AthenaHiveEventLoopMgr::drainScheduler |
( |
int & |
finishedEvents | ) |
|
|
protectedinherited |
Drain the scheduler from all actions that may be queued.
Definition at line 1190 of file AthenaHiveEventLoopMgr.cxx.
1195 std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
1197 EventContext* finishedEvtContext(
nullptr);
1200 ATH_MSG_DEBUG (
"drainScheduler: [" << finishedEvts <<
"] Waiting for a context" );
1204 if (
sc.isSuccess()){
1205 ATH_MSG_DEBUG (
"drainScheduler: scheduler not empty: Context "
1206 << finishedEvtContext );
1207 finishedEvtContexts.emplace_back(finishedEvtContext);
1215 while (
m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()){
1216 finishedEvtContexts.emplace_back(finishedEvtContext);
1221 for (
auto& thisFinishedEvtContext : finishedEvtContexts){
1222 if (!thisFinishedEvtContext) {
1223 ATH_MSG_FATAL (
"Detected nullptr ctxt while clearing WB!");
1228 if (
m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
1229 ATH_MSG_FATAL (
"Failed event detected on " << thisFinishedEvtContext
1230 <<
" w/ fail mode: "
1231 <<
m_aess->eventStatus(*thisFinishedEvtContext) );
1239 if (
m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
1240 n_run = thisFinishedEvtContext->eventID().run_number();
1241 n_evt = thisFinishedEvtContext->eventID().event_number();
1244 << thisFinishedEvtContext->slot() );
1253 Gaudi::Hive::setCurrentContext( *thisFinishedEvtContext );
1254 m_incidentSvc->fireIncident(Incident(
name(), IncidentType::EndProcessing, *thisFinishedEvtContext ));
1256 ATH_MSG_DEBUG (
"Clearing slot " << thisFinishedEvtContext->slot()
1257 <<
" (event " << thisFinishedEvtContext->evt()
1258 <<
") of the whiteboard" );
1261 if (!
sc.isSuccess()) {
1262 ATH_MSG_ERROR (
"Whiteboard slot " << thisFinishedEvtContext->slot()
1263 <<
" could not be properly cleared" );
1275 ATH_MSG_INFO (
" ===>>> done processing event #" << n_evt <<
", run #" << n_run
1276 <<
" on slot " << thisFinishedEvtContext->slot() <<
", "
1277 <<
m_proc <<
" events processed so far <<<===" );
1279 ATH_MSG_INFO (
" ===>>> done processing event #" << n_evt <<
", run #" << n_run
1280 <<
" on slot " << thisFinishedEvtContext->slot() <<
", "
1282 <<
" events processed so far <<<===" );
1283 std::ofstream
outfile(
"eventLoopHeartBeat.txt");
1285 ATH_MSG_ERROR (
" unable to open: eventLoopHeartBeat.txt" );
1289 outfile <<
" done processing event #" << n_evt <<
", run #" << n_run
1290 <<
" " <<
m_nev <<
" events read so far <<<===" << std::endl;
1295 ATH_MSG_DEBUG (
"drainScheduler thisFinishedEvtContext: " << thisFinishedEvtContext );
1298 return (
fail ? -1 : 1 );
◆ eventStore()
◆ executeAlgorithms()
StatusCode AthenaHiveEventLoopMgr::executeAlgorithms |
( |
| ) |
|
|
protectedvirtualinherited |
◆ executeEvent()
StatusCode AthenaHiveEventLoopMgr::executeEvent |
( |
EventContext && |
ctx | ) |
|
|
overridevirtualinherited |
implementation of IEventProcessor::executeEvent(void* par)
Fire begin-Run incident if new run:
Definition at line 501 of file AthenaHiveEventLoopMgr.cxx.
506 ATH_MSG_ALWAYS (
"A stopRun was requested by an incidentListener. "
507 <<
"Do not process this event." );
509 return (StatusCode::SUCCESS);
515 Gaudi::Hive::setCurrentContext ( ctx );
518 if (declEvtRootSc == 0 ) {
520 return StatusCode::SUCCESS;
521 }
else if ( declEvtRootSc == -1) {
522 ATH_MSG_ERROR (
"declareEventRootAddress for context " << ctx <<
" failed" );
523 return StatusCode::FAILURE;
527 unsigned int conditionsRun = ctx.eventID().run_number();
530 if (
eventStore()->contains<AthenaAttributeList> (
"Input") &&
532 if (attr->exists (
"ConditionsRun")) {
533 conditionsRun = (*attr)[
"ConditionsRun"].data<
unsigned int>();
538 Gaudi::Hive::setCurrentContext ( ctx );
541 if (
eventStore()->record(std::make_unique<EventContext> (ctx),
542 "EventContext").isFailure())
545 return (StatusCode::FAILURE);
565 bool toolsPassed=
true;
570 unsigned int toolCtr=0;
574 while(toolsPassed && theTool!=lastTool )
576 toolsPassed = (*theTool)->passEvent(ctx.eventID());
589 <<
" on slot " << ctx.slot() <<
", " <<
m_proc
590 <<
" events processed so far <<<===" );
594 <<
" on slot " << ctx.slot() <<
", "
596 <<
" events processed so far <<<===" );
611 <<
", slot " << ctx.slot()
612 <<
" to the scheduler" );
619 if (!addEventStatus.isSuccess()){
620 ATH_MSG_FATAL (
"An event processing slot should be now free in the scheduler, but it appears not to be the case." );
630 Gaudi::Hive::setCurrentContext( EventContext() );
632 return StatusCode::SUCCESS;
◆ executeRun()
StatusCode AthenaHiveEventLoopMgr::executeRun |
( |
int |
maxevt | ) |
|
|
overridevirtualinherited |
implementation of IEventProcessor::executeRun(int maxevt)
Definition at line 639 of file AthenaHiveEventLoopMgr.cxx.
643 bool eventfailed =
false;
651 return StatusCode::FAILURE;
654 return StatusCode::SUCCESS;
◆ finalize()
StatusCode MPIHiveEventLoopMgr::finalize |
( |
| ) |
|
|
overridevirtual |
◆ getEventRoot()
StatusCode AthenaHiveEventLoopMgr::getEventRoot |
( |
IOpaqueAddress *& |
refpAddr | ) |
|
|
inherited |
Create event address using event selector.
Definition at line 930 of file AthenaHiveEventLoopMgr.cxx.
933 if ( !
sc.isSuccess() ) {
938 if( !
sc.isSuccess() ) {
940 if (
sc.isSuccess() ) {
942 if ( !
sc.isSuccess() ) {
◆ handle()
void AthenaHiveEventLoopMgr::handle |
( |
const Incident & |
inc | ) |
|
|
overridevirtualinherited |
IIncidentListenet interfaces.
Definition at line 855 of file AthenaHiveEventLoopMgr.cxx.
858 if(inc.type() ==
"EndAlgorithms") {
863 if( !
sc.isSuccess() ) {
869 if(inc.type()!=
"BeforeFork")
873 ATH_MSG_WARNING (
"Skipping BeforeFork handler. Either no event selector is provided or begin run has already passed" );
885 if(!
sc.isSuccess()) {
889 IOpaqueAddress* addr{
nullptr};
891 if (
sc.isFailure()) {
898 if(!
sc.isSuccess()) {
904 if(
eventStore()->loadEventProxies().isFailure()) {
912 if(!
sc.isSuccess()) {
922 if(!
sc.isSuccess()) {
◆ initialize()
StatusCode MPIHiveEventLoopMgr::initialize |
( |
| ) |
|
|
overridevirtual |
◆ initializeAlgorithms()
StatusCode AthenaHiveEventLoopMgr::initializeAlgorithms |
( |
| ) |
|
|
protectedinherited |
◆ initMessaging()
void AthMessaging::initMessaging |
( |
| ) |
const |
|
privateinherited |
Initialize our message level and MessageSvc.
This method should only be called once.
Definition at line 39 of file AthMessaging.cxx.
◆ insertEvent()
StatusCode MPIHiveEventLoopMgr::insertEvent |
( |
int |
eventIdx, |
|
|
bool & |
endOfStream, |
|
|
std::int64_t |
requestTime_ns |
|
) |
| |
|
protected |
Insert an event into the local scheduler.
Insert event into local scheduler.
Definition at line 324 of file MPIHiveEventLoopMgr.cxx.
331 Gaudi::Hive::setCurrentContext(ctx);
335 return StatusCode::FAILURE;
342 m_clusterSvc->log_addEvent(eventIdx, evtID.run_number(), evtID.event_number(),
345 if (
sc.isRecoverable()) {
347 }
else if (
sc.isSuccess()) {
◆ masterEventLoop()
StatusCode MPIHiveEventLoopMgr::masterEventLoop |
( |
int |
maxEvt | ) |
|
|
protected |
Master event loop (runs on master, provides events over MPI)
Master event loop.
Definition at line 62 of file MPIHiveEventLoopMgr.cxx.
70 return StatusCode::FAILURE;
72 if (maxEvt < 0 || maxEvt >
evt) {
75 ATH_MSG_INFO(
"Will be processing " << maxEvt <<
" events");
79 std::vector<bool> workers_done(
m_clusterSvc->numRanks(),
false);
82 int num_workers_done = 1;
90 for (
int evt = skipEvts;
evt < skipEvts + maxEvt;) {
103 statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
104 workers_done.at(
msg.source) =
108 if (!workers_done.at(
i)) {
112 workers_done[
i] =
true;
121 statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
122 workers_done.at(
msg.source) =
true;
133 ATH_MSG_INFO(
"Provided all events to workers, waiting for them to complete.");
135 while (num_workers_done < m_clusterSvc->numRanks()) {
146 statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
147 workers_done.at(
msg.source) =
151 if (!workers_done.at(
i)) {
155 workers_done[
i] =
true;
164 statuses.at(
msg.source) = get<ClusterMessage::WorkerStatus>(
msg.payload);
165 workers_done.at(
msg.source) =
true;
183 for (
const auto& worker_status :
statuses) {
184 if (worker_status.status.isFailure() &&
186 sc = worker_status.status;
188 n_created += worker_status.createdEvents;
189 n_skipped += worker_status.skippedEvents;
190 n_finished += worker_status.finishedEvents;
192 if ((worker_idx++) != 0) {
193 ATH_MSG_INFO(
"Worker " << worker_idx <<
": SC " << worker_status.status
194 <<
", created " << worker_status.createdEvents
195 <<
", skipped " << worker_status.skippedEvents
196 <<
", finished " << worker_status.finishedEvents);
200 ATH_MSG_INFO(
"Overall: SC " <<
sc <<
", created " << n_created <<
", skipped "
201 << n_skipped <<
", finished " << n_finished);
202 ATH_MSG_INFO(
"MASTER: Took " << std::chrono::hh_mm_ss(all_provided)
203 <<
" to provide all events.");
204 ATH_MSG_INFO(
"MASTER: Took " << std::chrono::hh_mm_ss(all_done)
205 <<
" to complete all events.");
◆ modifyEventContext()
void AthenaHiveEventLoopMgr::modifyEventContext |
( |
EventContext & |
ctx, |
|
|
const EventID & |
eID, |
|
|
bool |
consume_modifier_stream |
|
) |
| |
|
virtualinherited |
Definition at line 1141 of file AthenaHiveEventLoopMgr.cxx.
1147 m_evtIdModSvc->modify_evtid(new_eID, ctx.evt(), consume_modifier_stream);
1149 unsigned int oldrunnr=eID.run_number();
1150 unsigned int oldLB=eID.lumi_block();
1151 unsigned int oldTS=eID.time_stamp();
1152 unsigned int oldTSno=eID.time_stamp_ns_offset();
1153 ATH_MSG_DEBUG (
"modifyEventContext: use evtIdModSvc runnr=" << oldrunnr <<
" -> " << new_eID.run_number() );
1154 ATH_MSG_DEBUG (
"modifyEventContext: use evtIdModSvc LB=" << oldLB <<
" -> " << new_eID.lumi_block() );
1155 ATH_MSG_DEBUG (
"modifyEventContext: use evtIdModSvc TimeStamp=" << oldTS <<
" -> " << new_eID.time_stamp() );
1156 ATH_MSG_DEBUG (
"modifyEventContext: use evtIdModSvc TimeStamp ns Offset=" << oldTSno <<
" -> " << new_eID.time_stamp_ns_offset() );
1158 ctx.setEventID( new_eID );
1163 ctx.setEventID( eID );
◆ msg() [1/4]
MsgStream & AthMessaging::msg |
( |
| ) |
const |
|
inlineinherited |
The standard message stream.
Returns a reference to the default message stream May not be invoked before sysInitialize() has been invoked.
Definition at line 164 of file AthMessaging.h.
◆ msg() [2/4]
MsgStream & AthMessaging::msg |
|
inlineinherited |
The standard message stream.
Returns a reference to the default message stream May not be invoked before sysInitialize() has been invoked.
Definition at line 92 of file AthMessaging.h.
◆ msg() [3/4]
MsgStream & AthMessaging::msg |
( |
const MSG::Level |
lvl | ) |
const |
|
inlineinherited |
The standard message stream.
Returns a reference to the default message stream May not be invoked before sysInitialize() has been invoked.
Definition at line 179 of file AthMessaging.h.
180 {
return msg() << lvl; }
◆ msg() [4/4]
MsgStream & AthMessaging::msg |
|
inlineinherited |
The standard message stream.
Returns a reference to the default message stream May not be invoked before sysInitialize() has been invoked.
Definition at line 99 of file AthMessaging.h.
180 {
return msg() << lvl; }
◆ msgLvl() [1/2]
bool AthMessaging::msgLvl |
( |
const MSG::Level |
lvl | ) |
const |
|
inlineinherited |
Test the output level.
- Parameters
-
lvl | The message level to test against |
- Returns
- boolean Indicating if messages at given level will be printed
- Return values
-
true | Messages at level "lvl" will be printed |
Definition at line 151 of file AthMessaging.h.
◆ msgLvl() [2/2]
bool AthMessaging::msgLvl |
|
inlineinherited |
Test the output level.
- Parameters
-
lvl | The message level to test against |
- Returns
- boolean Indicating if messages at given level will be printed
- Return values
-
true | Messages at level "lvl" will be printed |
Definition at line 86 of file AthMessaging.h.
◆ name()
virtual const std::string& AthenaHiveEventLoopMgr::name |
( |
| ) |
const |
|
inlineoverridevirtualinherited |
◆ nextEvent()
StatusCode MPIHiveEventLoopMgr::nextEvent |
( |
int |
maxevt | ) |
|
|
overridevirtual |
implementation of IAppMgrUI::nextEvent. maxevt==0 returns immediately
nextEvent Dispatches to either masterEventLoop or workerEventLoop
Reimplemented from AthenaHiveEventLoopMgr.
Definition at line 44 of file MPIHiveEventLoopMgr.cxx.
47 return StatusCode::SUCCESS;
◆ resetTimeout()
void Athena::TimeoutMaster::resetTimeout |
( |
Timeout & |
instance | ) |
|
|
inlineprotectedinherited |
Reset timeout.
Definition at line 83 of file Timeout.h.
◆ seek()
StatusCode AthenaHiveEventLoopMgr::seek |
( |
int |
evt | ) |
|
|
overridevirtualinherited |
Seek to a given event.
Definition at line 793 of file AthenaHiveEventLoopMgr.cxx.
797 ATH_MSG_ERROR (
"Seek failed; unsupported by event selector" );
798 return StatusCode::FAILURE;
803 ATH_MSG_FATAL (
"Can not create the event selector Context." );
804 return StatusCode::FAILURE;
810 if (
sc.isSuccess()) {
811 m_incidentSvc->fireIncident(ContextIncident<std::tuple<int, int>>(
◆ setLevel()
void AthMessaging::setLevel |
( |
MSG::Level |
lvl | ) |
|
|
inherited |
◆ setTimeout()
void Athena::TimeoutMaster::setTimeout |
( |
Timeout & |
instance | ) |
|
|
inlineprotectedinherited |
◆ setupPreSelectTools()
void AthenaHiveEventLoopMgr::setupPreSelectTools |
( |
Gaudi::Details::PropertyBase & |
| ) |
|
|
protectedinherited |
property update handler:sets up the Pre-selection tools
Definition at line 327 of file AthenaHiveEventLoopMgr.cxx.
342 unsigned int toolCtr = 0;
343 for ( ; firstTool != lastTool; ++firstTool )
◆ size()
int AthenaHiveEventLoopMgr::size |
( |
| ) |
|
|
overridevirtualinherited |
◆ stop()
StatusCode AthenaHiveEventLoopMgr::stop |
( |
| ) |
|
|
overridevirtualinherited |
implementation of IService::stop
Definition at line 674 of file AthenaHiveEventLoopMgr.cxx.
688 for (
size_t islot = 0; islot < nslot; islot++) {
692 Gaudi::Hive::setCurrentContext( EventContext() );
◆ stopRun()
StatusCode AthenaHiveEventLoopMgr::stopRun |
( |
| ) |
|
|
overridevirtualinherited |
implementation of IEventProcessor::stopRun()
Definition at line 659 of file AthenaHiveEventLoopMgr.cxx.
661 SmartIF<IProperty> appmgr(serviceLocator());
662 if(Gaudi::setAppReturnCode(appmgr, Gaudi::ReturnCode::ScheduledStop).isFailure()) {
663 ATH_MSG_ERROR (
"Could not set return code of the application ("
664 << Gaudi::ReturnCode::ScheduledStop <<
")" );
667 return StatusCode::SUCCESS;
◆ workerEventLoop()
StatusCode MPIHiveEventLoopMgr::workerEventLoop |
( |
| ) |
|
|
protected |
Worker event loop (runs on worker, requests events over MPI)
Worker event loop.
Definition at line 210 of file MPIHiveEventLoopMgr.cxx.
211 bool end_of_stream =
false;
226 if (
sc.isFailure()) {
246 return StatusCode::FAILURE;
252 << std::chrono::hh_mm_ss(loop_time)
260 std::size_t numSlots =
m_whiteboard->getNumberOfStores();
279 return StatusCode::FAILURE;
282 int evt = get<int>(
msg.payload);
286 std::chrono::duration_cast<std::chrono::nanoseconds>(request_time)
288 if (
sc.isFailure() && !
sc.isRecoverable()) {
301 << std::chrono::hh_mm_ss(loop_time)
308 std::size_t numSlots =
m_whiteboard->getNumberOfStores();
◆ writeHistograms()
StatusCode AthenaHiveEventLoopMgr::writeHistograms |
( |
bool |
force = false | ) |
|
|
protectedvirtualinherited |
Dump out histograms as needed.
Definition at line 428 of file AthenaHiveEventLoopMgr.cxx.
433 std::vector<DataObject*>
objects;
435 DataObject*
obj = reg->object();
436 if ( !
obj ||
obj->clID() == CLID_StatisticsFile )
return false;
441 if ( !
sc.isSuccess() ) {
442 ATH_MSG_ERROR (
"Error while traversing Histogram data store" );
450 (writeInterval != 0 &&
m_nevt%writeInterval == 0) ) {
454 IOpaqueAddress* pAddr =
nullptr;
456 if ( iret.isFailure() )
return iret;
457 i->registry()->setAddress( pAddr );
461 IRegistry* reg =
i->registry();
463 return iret.isFailure() ? iret : isc;
465 if ( !
sc.isSuccess() ) {
470 if (
force || (writeInterval != 0 &&
m_nevt%writeInterval == 0) ) {
◆ ATLAS_THREAD_SAFE
std::atomic_flag m_initialized AthMessaging::ATLAS_THREAD_SAFE = ATOMIC_FLAG_INIT |
|
mutableprivateinherited |
◆ m_abortEventListener
SmartIF< IIncidentListener > AthenaHiveEventLoopMgr::m_abortEventListener |
|
protectedinherited |
◆ m_aess
SmartIF<IAlgExecStateSvc> AthenaHiveEventLoopMgr::m_aess |
|
protectedinherited |
◆ m_algResourcePool
SmartIF<IAlgResourcePool> AthenaHiveEventLoopMgr::m_algResourcePool |
|
protectedinherited |
◆ m_appMgrProperty
SmartIF<IProperty> AthenaHiveEventLoopMgr::m_appMgrProperty |
|
protectedinherited |
◆ m_clusterSvc
◆ m_conditionsCleaner
◆ m_currentRun
◆ m_doEvtHeartbeat
bool AthenaHiveEventLoopMgr::m_doEvtHeartbeat |
|
protectedinherited |
◆ m_eventPrintoutInterval
UnsignedIntegerProperty AthenaHiveEventLoopMgr::m_eventPrintoutInterval |
|
protectedinherited |
◆ m_eventStore
◆ m_evtContext
◆ m_evtIdModSvc
◆ m_evtsel
StringProperty AthenaHiveEventLoopMgr::m_evtsel |
|
protectedinherited |
◆ m_evtSelector
IEvtSelector* AthenaHiveEventLoopMgr::m_evtSelector |
|
protectedinherited |
◆ m_evtSelectorCurrentPos
int MPIHiveEventLoopMgr::m_evtSelectorCurrentPos = 0 |
|
private |
◆ m_failureMode
IntegerProperty AthenaHiveEventLoopMgr::m_failureMode |
|
protectedinherited |
◆ m_firstEventAlone
bool AthenaHiveEventLoopMgr::m_firstEventAlone |
|
privateinherited |
◆ m_firstEventIndex
UnsignedIntegerProperty MPIHiveEventLoopMgr::m_firstEventIndex |
|
private |
Initial value:{
this, "FirstEventIndex", 0, "First event index (Exec.SkipEvents)"}
Definition at line 70 of file MPIHiveEventLoopMgr.h.
◆ m_firstRun
bool AthenaHiveEventLoopMgr::m_firstRun |
|
protectedinherited |
◆ m_flmbi
unsigned int AthenaHiveEventLoopMgr::m_flmbi |
|
privateinherited |
◆ m_histoDataMgrSvc
◆ m_histoPersSvc
◆ m_histPersName
StringProperty AthenaHiveEventLoopMgr::m_histPersName |
|
protectedinherited |
◆ m_imsg
std::atomic<IMessageSvc*> AthMessaging::m_imsg { nullptr } |
|
mutableprivateinherited |
◆ m_incidentSvc
◆ m_lastEventContext
EventContext AthenaHiveEventLoopMgr::m_lastEventContext |
|
protectedinherited |
◆ m_lvl
std::atomic<MSG::Level> AthMessaging::m_lvl { MSG::NIL } |
|
mutableprivateinherited |
◆ m_msg_tls
boost::thread_specific_ptr<MsgStream> AthMessaging::m_msg_tls |
|
mutableprivateinherited |
MsgStream instance (a std::cout like with print-out levels)
Definition at line 132 of file AthMessaging.h.
◆ m_nev
unsigned int AthenaHiveEventLoopMgr::m_nev |
|
protectedinherited |
◆ m_nevt
unsigned int AthenaHiveEventLoopMgr::m_nevt |
|
privateinherited |
◆ m_nLocalCreatedEvts
int MPIHiveEventLoopMgr::m_nLocalCreatedEvts {0} |
|
protected |
◆ m_nLocalFinishedEvts
int MPIHiveEventLoopMgr::m_nLocalFinishedEvts {0} |
|
protected |
◆ m_nLocalSkippedEvts
int MPIHiveEventLoopMgr::m_nLocalSkippedEvts {0} |
|
protected |
◆ m_nm
std::string AthMessaging::m_nm |
|
privateinherited |
◆ m_proc
unsigned int AthenaHiveEventLoopMgr::m_proc |
|
protectedinherited |
◆ m_requireInputAttributeList
bool AthenaHiveEventLoopMgr::m_requireInputAttributeList {} |
|
protectedinherited |
◆ m_scheduledStop
bool AthenaHiveEventLoopMgr::m_scheduledStop |
|
protectedinherited |
◆ m_schedulerName
std::string AthenaHiveEventLoopMgr::m_schedulerName |
|
protectedinherited |
◆ m_schedulerSvc
SmartIF<IScheduler> AthenaHiveEventLoopMgr::m_schedulerSvc |
|
protectedinherited |
◆ m_terminateLoop
bool AthenaHiveEventLoopMgr::m_terminateLoop { false } |
|
protectedinherited |
◆ m_timeStamp
unsigned int AthenaHiveEventLoopMgr::m_timeStamp { 0 } |
|
privateinherited |
◆ m_timeStampInt
unsigned int AthenaHiveEventLoopMgr::m_timeStampInt |
|
privateinherited |
◆ m_toolAccept
◆ m_toolInvoke
◆ m_toolReject
◆ m_tools
◆ m_useSecondaryEventNumber
bool AthenaHiveEventLoopMgr::m_useSecondaryEventNumber {} |
|
protectedinherited |
◆ m_useTools
bool AthenaHiveEventLoopMgr::m_useTools |
|
protectedinherited |
◆ m_whiteboard
SmartIF<IHiveWhiteBoard> AthenaHiveEventLoopMgr::m_whiteboard |
|
protectedinherited |
◆ m_whiteboardName
std::string AthenaHiveEventLoopMgr::m_whiteboardName |
|
protectedinherited |
◆ m_writeHists
bool AthenaHiveEventLoopMgr::m_writeHists |
|
privateinherited |
◆ m_writeInterval
UnsignedIntegerProperty AthenaHiveEventLoopMgr::m_writeInterval |
|
privateinherited |
The documentation for this class was generated from the following files:
def retrieve(aClass, aKey=None)
JetConstituentVector::iterator iterator
tool_stats m_toolReject
tool returns StatusCode::FAILURE counter
StatusCode record(T *p2BRegistered, const TKEY &key)
Record an object with a key.
std::atomic< MSG::Level > m_lvl
Current logging level.
virtual int size() override
Return the size of the collection.
IEvtSelector * m_evtSelector
Reference to the Event Selector.
bool m_useSecondaryEventNumber
read event number from secondary input
StatusCode insertEvent(int eventIdx, bool &endOfStream, std::int64_t requestTime_ns)
Insert an event into the local scheduler.
tool_store m_tools
internal tool store
IIncidentSvc_t m_incidentSvc
Reference to the incident service.
StatusCode initializeAlgorithms()
Initialize all algorithms and output streams.
int m_evtSelectorCurrentPos
virtual const std::string & name() const override
std::atomic< IMessageSvc * > m_imsg
MessageSvc pointer.
StoreGateSvc_t m_eventStore
Reference to StoreGateSvc;.
IMessageSvc * getMessageSvc(bool quiet=false)
unsigned int m_timeStampInt
UnsignedIntegerProperty m_eventPrintoutInterval
SmartIF< IScheduler > m_schedulerSvc
A shortcut for the scheduler.
StatusCode retrieve(const T *&ptr) const
Retrieve the default object into a const T*.
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
StatusCode clearWBSlot(int evtSlot)
Clear a slot in the WB.
EventIDBase::event_number_t event_number_t
SmartIF< IProperty > m_appMgrProperty
Property interface of ApplicationMgr.
SmartIF< IAlgExecStateSvc > m_aess
Reference to the Algorithm Execution State Svc.
IEvtIdModifierSvc_t m_evtIdModSvc
tool_store::const_iterator tool_iterator
StatusCode workerEventLoop()
Worker event loop (runs on worker, requests events over MPI)
EventIDBase::number_type number_type
virtual void modifyEventContext(EventContext &ctx, const EventID &eID, bool consume_modifier_stream)
::StatusCode StatusCode
StatusCode definition for legacy code.
EventID * event_ID()
the unique identification of the event.
virtual int size(IEvtSelector::Context &c) const =0
Return the size of the collection, or -1 if we can't get the size.
#define ATH_MSG_ALWAYS(x)
An AttributeList represents a logical row of attributes in a metadata table. The name and type of eac...
#define CHECK(...)
Evaluate an expression and check for errors.
UnsignedIntegerProperty m_firstEventIndex
virtual StatusCode clearStore(bool forceRemove=false) override final
clear DataStore contents: called by the event loop mgrs
static Timeout & instance()
Get reference to Timeout singleton.
MsgStream & msg() const
The standard message stream.
virtual StatusCode writeHistograms(bool force=false)
Dump out histograms as needed.
bool m_requireInputAttributeList
require input attribute list
A class describing a message sent between nodes in a cluster.
void setConditionsRun(EventIDBase::number_type conditionsRun)
StatusCode masterEventLoop(int maxEvt)
Master event loop (runs on master, provides events over MPI)
virtual StatusCode executeEvent(EventContext &&ctx) override
implementation of IEventProcessor::executeEvent(void* par)
StatusCode drainLocalScheduler()
Drain the local scheduler of any (at least one) completed events.
virtual EventContext createEventContext() override
Create event context.
This class provides general information about an event. Event information is provided by the accessor...
virtual StatusCode initialize() override
implementation of IAppMgrUI::initalize
StatusCode recordAddress(const std::string &skey, IOpaqueAddress *pAddress, bool clearAddressFlag=true)
Create a proxy object using an IOpaqueAddress and a transient key.
const T * tryConstRetrieve() const
Class describing the basic event information.
AthenaHiveEventLoopMgr()=delete
virtual StatusCode finalize() override
implementation of IAppMgrUI::finalize
tool_stats m_toolInvoke
tool called counter
accumulate
Update flags based on parser line args.
ServiceHandle< IMPIClusterSvc > m_clusterSvc
Reference to the MPIClusterSvc.
ServiceHandle< Athena::IConditionsCleanerSvc > m_conditionsCleaner
#define ATH_MSG_WARNING(x)
bool m_scheduledStop
Scheduled stop of event processing.
std::string m_nm
Message source name.
IConversionSvc_t m_histoPersSvc
EventID eventIDFromxAOD(const xAOD::EventInfo *xaod)
Create EventID object from xAOD::EventInfo.
tool_stats m_toolAccept
tool returns StatusCode::SUCCESS counter
This class provides a unique identification for each event, in terms of run/event number and/or a tim...
SmartIF< IHiveWhiteBoard > m_whiteboard
Reference to the Whiteboard interface.
int declareEventRootAddress(EventContext &)
Declare the root address of the event.
EventContext m_lastEventContext
StoreGateSvc * eventStore() const
void resetTimeout(Timeout &instance)
Reset timeout.
number_type m_currentRun
current run number
void initMessaging() const
Initialize our message level and MessageSvc.
IDataManagerSvc_t m_histoDataMgrSvc
Reference to the Histogram Data Service.
EvtContext * m_evtContext
Gaudi event selector Context (may be used as a cursor by the evt selector)
boost::thread_specific_ptr< MsgStream > m_msg_tls
MsgStream instance (a std::cout like with print-out levels)
unsigned int m_nev
events processed
EventType eventTypeFromxAOD(const xAOD::EventInfo *xaod)
Create EventType object from xAOD::EventInfo.
UnsignedIntegerProperty m_writeInterval
virtual StatusCode nextEvent(int maxevt) override
implementation of IAppMgrUI::nextEvent. maxevt==0 returns immediately
void setExtendedEventContext(EventContext &ctx, ExtendedEventContext &&ectx)
Move an extended context into a context object.
Abstract interface for seeking for an event selector.
virtual StatusCode seek(int evt) override
Seek to a given event.
thread_local event_number_t eventIndex