17#include "GaudiKernel/IAlgorithm.h"
18#include "GaudiKernel/SmartIF.h"
19#include "GaudiKernel/Incident.h"
20#include "GaudiKernel/DataObject.h"
21#include "GaudiKernel/IIncidentSvc.h"
22#include "GaudiKernel/IDataManagerSvc.h"
23#include "GaudiKernel/IDataProviderSvc.h"
24#include "GaudiKernel/IConversionSvc.h"
25#include "GaudiKernel/GaudiException.h"
26#include "GaudiKernel/AppReturnCode.h"
27#include "GaudiKernel/MsgStream.h"
28#include "GaudiKernel/EventIDBase.h"
29#include "GaudiKernel/ThreadLocalContext.h"
30#include "GaudiKernel/FileIncident.h"
34#include "EventInfo/EventInfo.h"
40#include "tbb/tick_count.h"
41#include <yampl/ISocket.h>
42#include "yampl/SocketFactory.h"
54 leftString(std::string & s,
char sc){
55 bool truncated{
false};
57 if (n!=std::string::npos){
65 leftString(std::string & s,
int n){
66 bool truncated{
false};
67 if (
static_cast<size_t>(n) <
s.size()){
76 , ISvcLocator* svcLoc)
77 : base_class(nam, svcLoc)
98 "Name of Event Selector to use. If empty string (default) "
99 "take value from ApplicationMgr");
101 "Histogram persistency technology to use: ROOT, HBOOK, NONE. "
102 "By default (empty string) get property value from "
105 "histogram write/update interval");
107 "Controls behaviour of event loop depending on return code of"
108 " Algorithms. 0: all non-SUCCESSes terminate job. "
109 "1: RECOVERABLE skips to next event, FAILURE terminates job "
110 "(DEFAULT). 2: RECOVERABLE and FAILURE skip to next events");
112 "Print event heartbeat printouts every m_eventPrintoutInterval events");
113 declareProperty(
"ClearStorePolicy",
115 "Configure the policy wrt handling of when the "
116 "'clear-the-event-store' event shall happen: at EndEvent "
117 "(default as it is makes things easier for memory management"
118 ") or at BeginEvent (easier e.g. for interactive use)");
119 declareProperty(
"PreSelectTools",
m_tools,
"AlgTools for event pre-selection")->
123 "Name of the scheduler to be used");
126 "Name of the Whiteboard to be used");
131 "ServiceHandle for EvtIdModifierSvc");
133 declareProperty(
"FakeLumiBlockInterval",
m_flmbi = 0,
134 "Event interval at which to increment lumiBlock# when "
135 "creating events without an EventSelector. Zero means "
136 "don't increment it");
138 "timestamp interval between events when creating Events "
139 "without an EventSelector");
141 "Require valid input attribute list to be present");
143 "In case of DoubleEventSelector use event number from secondary input");
145 declareProperty(
"ESTestPilotMessages",
m_testPilotMessages,
"List of messages from fake pilot for test mode");
157 info() <<
"Initializing " <<
name() <<
endmsg;
159 StatusCode
sc = MinimalEventLoopMgr::initialize();
160 if(!
sc.isSuccess()) {
161 error() <<
"Failed to initialize base class MinimalEventLoopMgr" <<
endmsg;
172 return StatusCode::FAILURE;
177 fatal() <<
"Error retrieving SchedulerSvc interface ISchedulerSvc." <<
endmsg;
178 return StatusCode::FAILURE;
183 fatal() <<
"Error retrieving AlgResourcePool" <<
endmsg;
184 return StatusCode::FAILURE;
187 m_aess = serviceLocator()->service(
"AlgExecStateSvc");
189 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
190 return StatusCode::FAILURE;
199 SmartIF<IProperty> prpMgr(serviceLocator());
200 if(!prpMgr.isValid()) {
201 fatal() <<
"IProperty interface not found in ApplicationMgr." <<
endmsg;
202 return StatusCode::FAILURE;
208 if(histPersName.empty()) {
212 if(histPersName !=
"NONE") {
217 SmartIF<IProperty> histSvc{serviceLocator()->service(
"RootHistSvc")};
220 error() <<
"could not locate actual Histogram persistency service" <<
endmsg;
223 const Gaudi::Details::PropertyBase &prop = histSvc->getProperty(
"OutputFile");
226 const StringProperty &sprop =
dynamic_cast<const StringProperty&
>(prop);
230 verbose() <<
"could not dcast OutputFile property to a StringProperty."
231 <<
" Need to fix Gaudi."
234 val = prop.toString();
238 && val !=
"UndefinedROOTOutputFileName") {
244 if (msgLevel(MSG::DEBUG)) {
245 debug() <<
"Histograms saving not required." <<
endmsg;
253 debug() <<
"EventID modifier Service not set. No run number, ... overrides "
258 debug() <<
"Could not find EventID modifier Service. No run number, ... "
267 const std::string& selName(
m_evtsel.value());
269 if (selName.empty()) {
272 if (
sc.isFailure()) {
273 warning() <<
"Unable to set EvtSel property" <<
endmsg;
277 if( !selName.empty() && selName !=
"NONE") {
278 SmartIF<IEvtSelector> theEvtSel{serviceLocator()->service(selName)};
285 fatal() <<
"Can not create the event selector Context." <<
endmsg;
286 return StatusCode::FAILURE;
288 if (msgLevel(MSG::INFO)) {
289 SmartIF<INamedInterface> named{theEvtSel};
291 info() <<
"Setup EventSelector service " << named->name( ) <<
endmsg;
295 else if (
sc.isFailure()) {
296 fatal() <<
"No valid event selector called " << selName <<
endmsg;
297 return StatusCode::FAILURE;
307 return StatusCode::FAILURE;
318 info() <<
"Using secondary event number." <<
endmsg;
322 info() <<
"runnung in standalone TEST MODE" <<
endmsg;
345 if ( policyName !=
"BeginEvent" &&
346 policyName !=
"EndEvent" ) {
348 fatal() <<
"Unknown policy [" << policyName
349 <<
"] for the 'ClearStore-policy !\n"
350 <<
" Valid values are: BeginEvent, EndEvent"
352 throw GaudiException(
"Can not setup 'ClearStore'-policy",
354 StatusCode::FAILURE);
376 unsigned int toolCtr = 0;
377 for ( ; firstTool != lastTool; ++firstTool )
397 StatusCode
sc = MinimalEventLoopMgr::finalize();
400 error() <<
"Error in Service base class Finalize"
407 error() <<
"Error in writing Histograms"
433 unsigned int toolCtr = 0;
434 info() <<
"Summary of AthenaEvtLoopPreSelectTool invocation: (invoked/success/failure)" <<
endmsg;
435 info() <<
"-----------------------------------------------------" <<
endmsg;
437 for ( ; firstTool != lastTool; ++firstTool ) {
438 info() << std::setw(2) << std::setiosflags(std::ios_base::right)
439 << toolCtr+1 <<
".) " << std::resetiosflags(std::ios_base::right)
440 << std::setw(48) << std::setfill(
'.')
441 << std::setiosflags(std::ios_base::left)
442 << (*firstTool)->name() << std::resetiosflags(std::ios_base::left)
445 << std::setw(6) << std::setiosflags(std::ios_base::right)
457 return (
sc.isFailure() || sc2.isFailure() ) ? StatusCode::FAILURE :
467 StatusCode
sc (StatusCode::SUCCESS);
470 std::vector<DataObject*> objects;
472 DataObject* obj = reg->object();
473 if ( !obj || obj->clID() == CLID_StatisticsFile )
return false;
474 objects.push_back( obj );
478 if ( !
sc.isSuccess() ) {
479 error() <<
"Error while traversing Histogram data store" <<
endmsg;
483 if ( objects.size() > 0) {
486 if (
m_nevt == 1 || force ||
487 (writeInterval != 0 &&
m_nevt%writeInterval == 0) ) {
490 sc = std::accumulate( begin( objects ), end( objects ),
sc, [&]( StatusCode isc,
auto& i ) {
491 IOpaqueAddress* pAddr =
nullptr;
493 if ( iret.isFailure() )
return iret;
494 i->registry()->setAddress( pAddr );
497 sc = std::accumulate( begin( objects ), end( objects ),
sc, [&]( StatusCode isc,
auto& i ) {
498 IRegistry* reg = i->registry();
499 StatusCode iret =
m_histoPersSvc->fillRepRefs( reg->address(), i );
500 return iret.isFailure() ? iret : isc;
502 if ( !
sc.isSuccess() ) {
503 error() <<
"Error while saving Histograms." <<
endmsg;
507 if (force || (writeInterval != 0 &&
m_nevt%writeInterval == 0) ) {
508 if (msgLevel(MSG::DEBUG)) {
debug() <<
"committing Histograms" <<
endmsg; }
523 return StatusCode::SUCCESS;
531 return StatusCode::SUCCESS;
543 always() <<
"A stopRun was requested by an incidentListener. "
544 <<
"Do not process this event."
547 return (StatusCode::SUCCESS);
553 Gaudi::Hive::setCurrentContext ( ctx );
556 if (declEvtRootSc == 0 ) {
558 return StatusCode::SUCCESS;
559 }
else if ( declEvtRootSc == -1) {
560 error() <<
"declareEventRootAddress for context " << ctx <<
" failed"
562 return StatusCode::FAILURE;
565 EventID::event_number_t evtNumber = ctx.eventID().event_number();
566 unsigned int conditionsRun = ctx.eventID().run_number();
570 eventStore()->retrieve(attr,
"Input").isSuccess()) {
571 if (attr->exists(
"ConditionsRun")) {
572 conditionsRun = (*attr)[
"ConditionsRun"].data<
unsigned int>();
577 Gaudi::Hive::setCurrentContext ( ctx );
580 if (
eventStore()->record(std::make_unique<EventContext> (ctx),
581 "EventContext").isFailure())
583 error() <<
"Error recording event context object" <<
endmsg;
584 return (StatusCode::FAILURE);
597 info() <<
" ===>>> start of run " <<
m_currentRun <<
" <<<==="
605 bool toolsPassed=
true;
610 unsigned int toolCtr=0;
612 tool_store::iterator theTool =
m_tools.begin();
613 tool_store::iterator lastTool =
m_tools.end();
614 while(toolsPassed && theTool!=lastTool )
616 toolsPassed = (*theTool)->passEvent(ctx.eventID());
628 info() <<
" ===>>> start processing event #" << evtNumber <<
", run #" <<
m_currentRun
629 <<
" on slot " << ctx.slot() <<
", " <<
m_proc
630 <<
" events processed so far <<<===" <<
endmsg;
632 info() <<
" ===>>> start processing event #" << evtNumber <<
", run #" <<
m_currentRun
633 <<
" on slot " << ctx.slot() <<
", "
635 <<
" events processed so far <<<===" <<
endmsg;
648 debug() <<
"Adding event " << ctx.evt()
649 <<
", slot " << ctx.slot()
650 <<
" to the scheduler" <<
endmsg;
654 StatusCode addEventStatus =
m_schedulerSvc->pushNewEvent(
new EventContext{ std::move(ctx) } );
657 if (!addEventStatus.isSuccess()){
658 fatal() <<
"An event processing slot should be now free in the scheduler, but it appears not to be the case." <<
endmsg;
668 Gaudi::Hive::setCurrentContext( EventContext() );
670 return StatusCode::SUCCESS;
680 if (
sc.isSuccess()) {
691 SmartIF<IProperty> appmgr(serviceLocator());
692 if(Gaudi::setAppReturnCode(appmgr, Gaudi::ReturnCode::ScheduledStop,
true).isFailure()) {
693 error() <<
"Could not set return code of the application ("
694 << Gaudi::ReturnCode::ScheduledStop <<
")" <<
endmsg;
697 return StatusCode::SUCCESS;
711 StatusCode
sc = MinimalEventLoopMgr::stop();
718 for (
size_t islot = 0; islot < nslot; islot++) {
722 Gaudi::Hive::setCurrentContext( EventContext() );
729 if(maxevt==0)
return StatusCode::SUCCESS;
733 std::unique_ptr<yampl::ISocket>{yampl::SocketFactory().createClientSocket(
742 info() <<
"Starting loop on events" <<
endmsg;
744 StatusCode
sc(StatusCode::SUCCESS);
747 auto start_time = tbb::tick_count::now();
748 auto secsFromStart = [&start_time]()->
double{
749 return (tbb::tick_count::now()-start_time).seconds();
752 std::unique_ptr<RangeStruct> range;
758 bool loop_ended = range->eventRangeID.empty();
762 m_incidentSvc->fireIncident(FileIncident(
name(),
"NextEventRange",range->eventRangeID));
765 bool no_more_events =
false;
769 debug() <<
" -> createdEvts: " << createdEvts <<
endmsg;
779 if ( !ctx.valid() ) {
780 sc = StatusCode::FAILURE;
786 if (
sc.isFailure()) {
787 error() <<
"Terminating event processing loop due to errors" <<
endmsg;
799 if(range->eventRangeID.empty()) {
800 no_more_events =
true;
805 m_incidentSvc->fireIncident(FileIncident(
name(),
"NextEventRange",range->eventRangeID));
821 sc = StatusCode::FAILURE;
828 sc = StatusCode::SUCCESS;
837 info() <<
"---> Loop Finished (seconds): " << secsFromStart() <<
endmsg;
851 error() <<
"Seek failed; unsupported by event selector"
853 return StatusCode::FAILURE;
858 fatal() <<
"Can not create the event selector Context."
860 return StatusCode::FAILURE;
866 if (
sc.isSuccess()) {
891 error() <<
"Collection size unsupported by event selector"
898 fatal() <<
"Can not create the event selector Context."
913 if(inc.type()!=
"BeforeFork")
917 warning() <<
"Skipping BeforeFork handler. Either no event selector is provided or begin run has already passed" <<
endmsg;
923 error() <<
"Failed to initialize Algorithms" <<
endmsg;
929 IOpaqueAddress* addr = 0;
931 if(!
sc.isSuccess()) {
932 info() <<
"No more events in event selection " <<
endmsg;
936 if (
sc.isFailure()) {
937 error() <<
"Could not create an IOpaqueAddress" <<
endmsg;
943 if(!
sc.isSuccess()) {
944 error() <<
"Error declaring Event object" <<
endmsg;
949 if(
eventStore()->loadEventProxies().isFailure()) {
950 warning() <<
"Error loading Event proxies" <<
endmsg;
956 if(!
sc.isSuccess()) {
957 error() <<
"Unable to retrieve Event root object" <<
endmsg;
968 if(!
sc.isSuccess()) {
969 error() <<
"Clear of Event data store failed" <<
endmsg;
980 if ( !
sc.isSuccess() ) {
985 if( !
sc.isSuccess() ) {
987 if (
sc.isSuccess() ) {
989 if ( !
sc.isSuccess() ) {
990 warning() <<
"Error creating IOpaqueAddress." <<
endmsg;
1006 StatusCode
sc(StatusCode::SUCCESS);
1011 std::unique_ptr<const EventInfo> pEvent;
1019 IOpaqueAddress* addr = 0;
1023 error() <<
"Seek failed; unsupported by event selector" <<
endmsg;
1028 if(
sc.isFailure()) {
1035 if ( !
sc.isSuccess() ) {
1037 info() <<
"No more events in event selection " <<
endmsg;
1042 error() <<
"Could not create an IOpaqueAddress" <<
endmsg;
1051 if( !
sc.isSuccess() ) {
1053 warning() <<
"Error declaring Event object" <<
endmsg;
1056 }
if ((
sc=
eventStore()->loadEventProxies()).isFailure()) {
1057 error() <<
"Error loading Event proxies" <<
endmsg;
1060 bool consume_modifier_stream =
false;
1063 if ( pAttrList !=
nullptr && pAttrList->size() > 6 ) {
1065 unsigned int runNumber = (*pAttrList)[
"RunNumber"].data<
unsigned int>();
1066 unsigned long long eventNumber = (*pAttrList)[
"EventNumber"].data<
unsigned long long>();
1067 unsigned int eventTime = (*pAttrList)[
"EventTime"].data<
unsigned int>();
1068 unsigned int eventTimeNS = (*pAttrList)[
"EventTimeNanoSec"].data<
unsigned int>();
1069 unsigned int lumiBlock = (*pAttrList)[
"LumiBlockN"].data<
unsigned int>();
1070 unsigned int bunchId = (*pAttrList)[
"BunchId"].data<
unsigned int>();
1072 consume_modifier_stream =
true;
1075 unsigned long long eventNumberSecondary{};
1076 if ( !(pAttrList->exists(
"hasSecondaryInput") && (*pAttrList)[
"hasSecondaryInput"].data<
bool>()) ) {
1077 fatal() <<
"Secondary EventNumber requested, but secondary input does not exist!" <<
endmsg;
1080 if ( pAttrList->exists(
"EventNumber_secondary") ) {
1081 eventNumberSecondary = (*pAttrList)[
"EventNumber_secondary"].data<
unsigned long long>();
1087 if (pEventSecondary) {
1088 eventNumberSecondary = pEventSecondary->
event_ID()->event_number();
1091 fatal() <<
"Secondary EventNumber requested, but it does not exist!" <<
endmsg;
1095 if (eventNumberSecondary != 0) {
1097 info() <<
" ===>>> using secondary event #" << eventNumberSecondary <<
" instead of #" << eventNumber <<
"<<<===" <<
endmsg;
1099 eventNumber = eventNumberSecondary;
1104 pEvent = std::make_unique<EventInfo>(
1105 std::make_unique<EventID>(runNumber, eventNumber, eventTime,
1106 eventTimeNS, lumiBlock, bunchId),
1111 fatal() <<
"Valid input attribute list required but not present!";
1115 const EventInfo* pEventObserver{pEvent.get()};
1116 if (!pEventObserver) {
1120 if( !pEventObserver ) {
1125 if( !
sc.isSuccess() ) {
1126 error() <<
"Unable to retrieve Event root object" <<
endmsg;
1129 consume_modifier_stream =
true;
1131 pEvent = std::make_unique<EventInfo>(
1134 pEventObserver = pEvent.get();
1136 if( !
sc.isSuccess() ) {
1137 error() <<
"Error declaring event data object" <<
endmsg;
1141 consume_modifier_stream =
false;
1149 consume_modifier_stream);
1155 unsigned int runNmb{1}, evtNmb{
m_nevt + 1};
1162 auto eid = std::make_unique<EventID> (runNmb,evtNmb,
m_timeStamp);
1164 eid->set_lumi_block( runNmb );
1168 pEvent = std::make_unique<EventInfo>(std::move(eid),
1169 std::make_unique<EventType>());
1173 debug() <<
"selecting store: " << ctx.slot() <<
endmsg;
1177 debug() <<
"recording EventInfo " << *pEvent->event_ID() <<
" in "
1180 if( !
sc.isSuccess() ) {
1181 error() <<
"Error declaring event data object" <<
endmsg;
1191 bool consume_modifier_stream) {
1197 m_evtIdModSvc->modify_evtid(new_eID, ctx.evt(), consume_modifier_stream);
1198 if (msgLevel(MSG::DEBUG)) {
1199 unsigned int oldrunnr = eID.run_number();
1200 unsigned int oldLB = eID.lumi_block();
1201 unsigned int oldTS = eID.time_stamp();
1202 unsigned int oldTSno = eID.time_stamp_ns_offset();
1203 debug() <<
"modifyEventContext: use evtIdModSvc runnr=" << oldrunnr
1204 <<
" -> " << new_eID.run_number() <<
endmsg;
1205 debug() <<
"modifyEventContext: use evtIdModSvc LB=" << oldLB <<
" -> "
1206 << new_eID.lumi_block() <<
endmsg;
1207 debug() <<
"modifyEventContext: use evtIdModSvc TimeStamp=" << oldTS
1208 <<
" -> " << new_eID.time_stamp() <<
endmsg;
1209 debug() <<
"modifyEventContext: use evtIdModSvc TimeStamp ns Offset="
1210 << oldTSno <<
" -> " << new_eID.time_stamp_ns_offset() <<
endmsg;
1212 ctx.setEventID(new_eID);
1214 ctx.eventID().run_number());
1218 ctx.setEventID(eID);
1227 if (
sc.isFailure()) {
1228 fatal() <<
"Slot " << ctx.slot()
1229 <<
" could not be selected for the WhiteBoard" <<
endmsg;
1230 return EventContext{};
1234 debug() <<
"created EventContext, num: " << ctx.evt() <<
" in slot: "
1243 Gaudi::setAppReturnCode(
m_appMgrProperty, Gaudi::ReturnCode::Success,
true).ignore();
1258 StatusCode
sc(StatusCode::SUCCESS);
1261 std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
1263 EventContext* finishedEvtContext{
nullptr};
1266 debug() <<
"drainScheduler: [" << finishedEvts <<
"] Waiting for a context" <<
endmsg;
1270 if (
sc.isSuccess()){
1271 debug() <<
"drainScheduler: scheduler not empty: Context "
1272 << finishedEvtContext <<
endmsg;
1273 finishedEvtContexts.emplace_back(finishedEvtContext);
1276 debug() <<
"drainScheduler: scheduler empty" <<
endmsg;
1281 while (
m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()){
1282 finishedEvtContexts.emplace_back(finishedEvtContext);
1287 for (
auto& thisFinishedEvtContext : finishedEvtContexts){
1288 if (!thisFinishedEvtContext) {
1289 fatal() <<
"Detected nullptr ctxt while clearing WB!"<<
endmsg;
1294 if (
m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
1295 fatal() <<
"Failed event detected on " << thisFinishedEvtContext
1296 <<
" w/ fail mode: "
1297 <<
m_aess->eventStatus(*thisFinishedEvtContext) <<
endmsg;
1303 EventID::event_number_t n_evt(0);
1305 if (
m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
1306 n_run = thisFinishedEvtContext->eventID().run_number();
1307 n_evt = thisFinishedEvtContext->eventID().event_number();
1309 error() <<
"DrainSched: unable to select store "
1310 << thisFinishedEvtContext->slot() <<
endmsg;
1319 Gaudi::Hive::setCurrentContext( *thisFinishedEvtContext );
1320 info() <<
"Firing EndProcessing" <<
endmsg;
1321 m_incidentSvc->fireIncident(Incident(
name(), IncidentType::EndProcessing, *thisFinishedEvtContext ));
1327 std::string outputFileReport = rangeReport->second + std::string(
",ID:")
1328 + rangeReport->first + std::string(
",CPU:N/A,WALL:N/A");
1332 memcpy(message2pilot,outputFileReport.data(),outputFileReport.size());
1333 m_socket->send(message2pilot,outputFileReport.size());
1335 info() <<
"Reported the output " << outputFileReport <<
endmsg;
1339 debug() <<
"Clearing slot " << thisFinishedEvtContext->slot()
1340 <<
" (event " << thisFinishedEvtContext->evt()
1341 <<
") of the whiteboard" <<
endmsg;
1343 StatusCode
sc =
clearWBSlot(thisFinishedEvtContext->slot());
1344 if (!
sc.isSuccess()) {
1345 error() <<
"Whiteboard slot " << thisFinishedEvtContext->slot()
1346 <<
" could not be properly cleared";
1358 info() <<
" ===>>> done processing event #" << n_evt <<
", run #" << n_run
1359 <<
" on slot " << thisFinishedEvtContext->slot() <<
", "
1360 <<
m_proc <<
" events processed so far <<<===" <<
endmsg;
1362 info() <<
" ===>>> done processing event #" << n_evt <<
", run #" << n_run
1363 <<
" on slot " << thisFinishedEvtContext->slot() <<
", "
1365 <<
" events processed so far <<<===" <<
endmsg;
1366 std::ofstream outfile(
"eventLoopHeartBeat.txt");
1368 error() <<
" unable to open: eventLoopHeartBeat.txt" <<
endmsg;
1372 outfile <<
" done processing event #" << n_evt <<
", run #" << n_run
1373 <<
" " <<
m_nev <<
" events read so far <<<===" << std::endl;
1378 debug() <<
"drainScheduler thisFinishedEvtContext: " << thisFinishedEvtContext
1382 return ( fail ? -1 : 1 );
1390 if( !
sc.isSuccess() ) {
1391 warning() <<
"Clear of Event data store failed" <<
endmsg;
1399 static const std::string strReady(
"Ready for events");
1400 static const std::string strStopProcessing(
"No more events");
1404 static std::atomic<size_t> line_n = 0;
1405 info() <<
"in TEST MODE, Range #" << line_n+1 <<
endmsg;
1409 void* ready_message = malloc(strReady.size());
1410 if (!ready_message) std::abort();
1411 memcpy(ready_message,strReady.data(),strReady.size());
1412 socket->send(ready_message,strReady.size());
1413 void* eventRangeMessage;
1414 std::string strPeerId;
1415 ssize_t eventRangeSize = socket->recv(eventRangeMessage,strPeerId);
1416 range = std::string((
const char*)eventRangeMessage,eventRangeSize);
1417 leftString(range,
'\n');
1420 std::unique_ptr<RangeStruct>
result = std::make_unique<RangeStruct>();
1421 if(range.compare(strStopProcessing)==0) {
1422 info() <<
"No more events from the server" <<
endmsg;
1425 info() <<
"Got Event Range from the pilot: " << range <<
endmsg;
1430 if(range.starts_with(
"[{")) range=range.substr(2);
1431 if(range.ends_with(
"}]")){
1432 const int truncate = range.size()-2;
1433 leftString(range, truncate);
1436 std::map<std::string,std::string> eventRangeMap;
1438 size_t endpos = range.find(
',');
1439 while(endpos!=std::string::npos) {
1441 std::string keyValue(range.substr(startpos,endpos-startpos));
1442 size_t colonPos = keyValue.find(
':');
1443 std::string strKey = keyValue.substr(0,colonPos);
1444 std::string strVal = keyValue.substr(colonPos+1);
1447 eventRangeMap[strKey]=std::move(strVal);
1450 startpos = endpos+1;
1451 endpos = range.find(
',',startpos);
1455 std::string keyValue(range.substr(startpos));
1456 size_t colonPos = keyValue.find(
':');
1457 std::string strKey = keyValue.substr(0,colonPos);
1458 std::string strVal = keyValue.substr(colonPos+1);
1461 eventRangeMap[strKey]=std::move(strVal);
1469 std::string errorStr{
""};
1471 if(eventRangeMap.find(
"eventRangeID")==eventRangeMap.end()
1472 || eventRangeMap.find(
"startEvent")==eventRangeMap.end()
1473 || eventRangeMap.find(
"lastEvent")==eventRangeMap.end()
1474 || eventRangeMap.find(
"PFN")==eventRangeMap.end()) {
1476 errorStr =
"ERR_ATHENAMP_PARSE \"" + range +
"\": Wrong format";
1479 if(errorStr.empty()) {
1480 result->startEvent = std::atoi(eventRangeMap[
"startEvent"].c_str());
1481 result->lastEvent = std::atoi(eventRangeMap[
"lastEvent"].c_str());
1483 if(eventRangeMap[
"eventRangeID"].
empty()
1484 || eventRangeMap[
"PFN"].
empty()
1487 errorStr =
"ERR_ATHENAMP_PARSE \"" + range +
"\": Wrong values of range fields";
1491 if(
m_pfn != eventRangeMap[
"PFN"]) {
1492 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_evtSelector);
1493 if(!propertyServer) {
1494 errorStr =
"ERR_ATHENAMP_PARSE \"" + range +
"\": Unable to dyn-cast the event selector to IProperty";
1497 std::string strInpuCol(
"InputCollections");
1498 std::vector<std::string> vectInpCol{eventRangeMap[
"PFN"],};
1499 StringArrayProperty inputFileList(std::move(strInpuCol), vectInpCol);
1500 if(propertyServer->setProperty(inputFileList).isFailure()) {
1501 errorStr =
"ERR_ATHENAMP_PARSE \"" + range +
"\": Unable to set input file name property to the Event Selector";
1504 m_pfn = eventRangeMap[
"PFN"];
1511 if(errorStr.empty()) {
1513 debug() <<
"*** Decoded Event Range ***" <<
endmsg;
1514 for (
const auto& fieldvalue : eventRangeMap) {
1515 debug() << fieldvalue.first <<
":" << fieldvalue.second <<
endmsg;
1518 result->eventRangeID = eventRangeMap[
"eventRangeID"];
1519 result->pfn = eventRangeMap[
"PFN"];
1524 warning() << errorStr <<
endmsg;
1525 info() <<
"Ignoring this event range" <<
endmsg;
1528 memcpy(errorMessage,errorStr.data(),errorStr.size());
1529 socket->send(errorMessage,errorStr.size());
1541 while(i<
str.size() &&
str[i]==
' ') i++;
1542 if(i)
str =
str.substr(i);
1544 if(
str.empty())
return;
1548 while(
str[i]==
' ') i--;
1549 if(i)
str.resize(i+1);
1556 if(
str.starts_with(
"u\'")) {
1558 if(
str.rfind(
'\'')==
str.size()-1) {
1562 else if(
str.starts_with(
"\"")) {
1564 if(
str.rfind(
'\"')==
str.size()-1) {
#define ATH_CHECK
Evaluate an expression and check for errors.
ClearStorePolicy::Type clearStorePolicy(const std::string &policyName, MsgStream &msg)
returns the enum-version of the policy (by name)
Helpers for checking error return status codes and reporting errors.
#define CHECK(...)
Evaluate an expression and check for errors.
Assign a CLID to EventContext.
This class provides a unique identification for each event, in terms of run/event number and/or a tim...
EventID eventIDFromxAOD(const xAOD::EventInfo *xaod)
Create EventID object from xAOD::EventInfo.
EventType eventTypeFromxAOD(const xAOD::EventInfo *xaod)
Create EventType object from xAOD::EventInfo.
This class provides general information about an event.
Extension to IEvtSelector to allow for seeking.
void setProperty(columnar::PythonToolHandle &self, const std::string &key, nb::object value)
This file contains the class definition for the OutputStreamSequencerSvc class.
An AttributeList represents a logical row of attributes in a metadata table.
static const Attributes_t empty
An AttributeList represents a logical row of attributes in a metadata table.
StatusCode getEventRoot(IOpaqueAddress *&refpAddr)
Create event address using event selector.
bool m_requireInputAttributeList
require input attribute list
virtual StatusCode finalize() override
implementation of IAppMgrUI::finalize
virtual EventContext createEventContext() override
implementation of IEventProcessor::createEventContext()
virtual void resetAppReturnCode() override
Reset the application return code.
virtual StatusCode executeEvent(EventContext &&ctx) override
implementation of IEventProcessor::executeEvent(void* par)
tool_store::const_iterator tool_iterator
virtual int curEvent() const override
Return the current event count.
StringProperty m_histPersName
ServiceHandle< OutputStreamSequencerSvc > m_outSeqSvc
SmartIF< IScheduler > m_schedulerSvc
A shortcut for the scheduler.
void setClearStorePolicy(Gaudi::Details::PropertyBase &clearStorePolicy)
property update handler:set the clear-store policy value and check its value.
IntegerProperty m_failureMode
unsigned int m_timeStampInt
virtual StatusCode executeAlgorithms()
Run the algorithms for the current event.
std::unique_ptr< yampl::ISocket > m_socket
ServiceHandle< Athena::IConditionsCleanerSvc > m_conditionsCleaner
std::unique_ptr< RangeStruct > getNextRange(yampl::ISocket *socket)
virtual bool terminateLoop() override
tool_stats m_toolReject
tool returns StatusCode::FAILURE counter
UnsignedIntegerProperty m_eventPrintoutInterval
tool_stats m_toolAccept
tool returns StatusCode::SUCCESS counter
virtual StatusCode executeRun(int maxevt) override
implementation of IEventProcessor::executeRun(int maxevt)
number_type m_currentRun
current run number
AthenaMtesEventLoopMgr()=delete
void modifyEventContext(EventContext &ctx, const EventID &eID, bool consume_modifier_stream)
tool_store m_tools
internal tool store
virtual const std::string & name() const override
unsigned int m_nev
events processed
virtual StatusCode initialize() override
implementation of IAppMgrUI::initalize
SmartIF< IAlgResourcePool > m_algResourcePool
Reference to the Algorithm resource pool.
IEvtIdModifierSvc_t m_evtIdModSvc
virtual StatusCode writeHistograms(bool force=false)
Dump out histograms as needed.
SmartIF< IHiveWhiteBoard > m_whiteboard
Reference to the Whiteboard interface.
virtual ~AthenaMtesEventLoopMgr()
Standard Destructor.
StatusCode initializeAlgorithms()
Initialize all algorithms and output streams.
SmartIF< IAlgExecStateSvc > m_aess
Reference to the Algorithm Execution State Svc.
tool_stats m_toolInvoke
tool called counter
virtual int drainScheduler(int &finishedEvents, bool report) override
Drain the scheduler from all actions that may be queued.
bool m_useSecondaryEventNumber
read event number from secondary input
virtual StatusCode nextEvent(int maxevt) override
implementation of IAppMgrUI::nextEvent. maxevt==0 returns immediately
EvtContext * m_evtContext
Gaudi event selector Context (may be used as a cursor by the evt selector)
StatusCode clearWBSlot(int evtSlot)
Clear a slot in the WB.
std::string m_whiteboardName
Name of the Whiteboard to be used.
int declareEventRootAddress(EventContext &)
Declare the root address of the event.
bool m_scheduledStop
Scheduled stop of event processing.
virtual StatusCode seek(int evt) override
Seek to a given event.
virtual StatusCode stop() override
implementation of IService::stop
std::string m_schedulerName
Name of the scheduler to be used.
StringArrayProperty m_testPilotMessages
IDataManagerSvc_t m_histoDataMgrSvc
Reference to the Histogram Data Service.
StoreGateSvc_t m_eventStore
Reference to StoreGateSvc;.
virtual int size() override
Return the size of the collection.
Gaudi::Property< std::string > m_eventRangeChannel
StringProperty m_clearStorePolicy
SmartIF< IProperty > m_appMgrProperty
Property interface of ApplicationMgr.
EventContext m_lastEventContext
virtual void setCurrentEventNum(int num) override
ServiceHandle< IConversionSvc > IConversionSvc_t
IConversionSvc_t m_histoPersSvc
virtual StatusCode stopRun() override
implementation of IEventProcessor::stopRun()
void setupPreSelectTools(Gaudi::Details::PropertyBase &)
property update handler:sets up the Pre-selection tools
IIncidentSvc_t m_incidentSvc
Reference to the incident service.
StoreGateSvc * eventStore() const
void trimRangeStrings(std::string &str)
UnsignedIntegerProperty m_writeInterval
virtual void handle(const Incident &inc) override
IIncidentListenet interfaces.
IEvtSelector * m_evtSelector
Reference to the Event Selector.
void resetTimeout(Timeout &instance)
Reset timeout.
static Timeout & instance()
Get reference to Timeout singleton.
void setConditionsRun(EventIDBase::number_type conditionsRun)
This class provides a unique identification for each event, in terms of run/event number and/or a tim...
EventIDBase::number_type number_type
EventID * event_ID()
the unique identification of the event.
Abstract interface for seeking for an event selector.
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
virtual int size(IEvtSelector::Context &c) const =0
Return the size of the collection, or -1 if we can't get the size.
std::unique_ptr< RangeReport_t > RangeReport_ptr
The Athena Transient Store API.
StatusCode recordAddress(const std::string &skey, CxxUtils::RefCountedPtr< IOpaqueAddress > pAddress, bool clearAddressFlag=true)
Create a proxy object using an IOpaqueAddress and a transient key.
StatusCode record(T *p2BRegistered, const TKEY &key)
Record an object with a key.
const T * tryConstRetrieve() const
StatusCode retrieve(const T *&ptr) const
Retrieve the default object into a const T*.
virtual StatusCode clearStore(bool forceRemove=false) override final
clear DataStore contents: called by the event loop mgrs
int ir
counter of the current depth
bool contains(const std::string &s, const std::string ®x)
does a string contain the substring
void setExtendedEventContext(EventContext &ctx, ExtendedEventContext &&ectx)
Move an extended context into a context object.
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
void * xmalloc(size_t size)
Trapping version of malloc.
thread_local event_number_t eventIndex
EventInfo_v1 EventInfo
Definition of the latest event info version.
Trapping version of malloc.