17#include "GaudiKernel/IAlgorithm.h"
18#include "GaudiKernel/SmartIF.h"
19#include "GaudiKernel/Incident.h"
20#include "GaudiKernel/DataObject.h"
21#include "GaudiKernel/IIncidentSvc.h"
22#include "GaudiKernel/IDataManagerSvc.h"
23#include "GaudiKernel/IDataProviderSvc.h"
24#include "GaudiKernel/IConversionSvc.h"
25#include "GaudiKernel/GaudiException.h"
26#include "GaudiKernel/AppReturnCode.h"
27#include "GaudiKernel/MsgStream.h"
28#include "GaudiKernel/EventIDBase.h"
29#include "GaudiKernel/ThreadLocalContext.h"
30#include "GaudiKernel/FileIncident.h"
34#include "EventInfo/EventInfo.h"
40#include "tbb/tick_count.h"
41#include <yampl/ISocket.h>
42#include "yampl/SocketFactory.h"
54 leftString(std::string & s,
char sc){
55 bool truncated{
false};
57 if (n!=std::string::npos){
65 leftString(std::string & s,
int n){
66 bool truncated{
false};
67 if (
static_cast<size_t>(n) <
s.size()){
76 , ISvcLocator* svcLoc)
77 : base_class(nam, svcLoc)
98 "Name of Event Selector to use. If empty string (default) "
99 "take value from ApplicationMgr");
101 "Histogram persistency technology to use: ROOT, HBOOK, NONE. "
102 "By default (empty string) get property value from "
105 "histogram write/update interval");
107 "Controls behaviour of event loop depending on return code of"
108 " Algorithms. 0: all non-SUCCESSes terminate job. "
109 "1: RECOVERABLE skips to next event, FAILURE terminates job "
110 "(DEFAULT). 2: RECOVERABLE and FAILURE skip to next events");
112 "Print event heartbeat printouts every m_eventPrintoutInterval events");
113 declareProperty(
"ClearStorePolicy",
115 "Configure the policy wrt handling of when the "
116 "'clear-the-event-store' event shall happen: at EndEvent "
117 "(default as it is makes things easier for memory management"
118 ") or at BeginEvent (easier e.g. for interactive use)");
119 declareProperty(
"PreSelectTools",
m_tools,
"AlgTools for event pre-selection")->
123 "Name of the scheduler to be used");
126 "Name of the Whiteboard to be used");
131 "ServiceHandle for EvtIdModifierSvc");
133 declareProperty(
"FakeLumiBlockInterval",
m_flmbi = 0,
134 "Event interval at which to increment lumiBlock# when "
135 "creating events without an EventSelector. Zero means "
136 "don't increment it");
138 "timestamp interval between events when creating Events "
139 "without an EventSelector");
141 "Require valid input attribute list to be present");
143 "In case of DoubleEventSelector use event number from secondary input");
145 declareProperty(
"ESTestPilotMessages",
m_testPilotMessages,
"List of messages from fake pilot for test mode");
157 info() <<
"Initializing " <<
name() <<
endmsg;
159 StatusCode
sc = MinimalEventLoopMgr::initialize();
160 if(!
sc.isSuccess()) {
161 error() <<
"Failed to initialize base class MinimalEventLoopMgr" <<
endmsg;
172 return StatusCode::FAILURE;
177 fatal() <<
"Error retrieving SchedulerSvc interface ISchedulerSvc." <<
endmsg;
178 return StatusCode::FAILURE;
183 fatal() <<
"Error retrieving AlgResourcePool" <<
endmsg;
184 return StatusCode::FAILURE;
187 m_aess = serviceLocator()->service(
"AlgExecStateSvc");
189 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
190 return StatusCode::FAILURE;
199 SmartIF<IProperty> prpMgr(serviceLocator());
200 if(!prpMgr.isValid()) {
201 fatal() <<
"IProperty interface not found in ApplicationMgr." <<
endmsg;
202 return StatusCode::FAILURE;
208 if(histPersName.empty()) {
212 if(histPersName !=
"NONE") {
217 SmartIF<IProperty> histSvc{serviceLocator()->service(
"RootHistSvc")};
220 error() <<
"could not locate actual Histogram persistency service" <<
endmsg;
223 const Gaudi::Details::PropertyBase &prop = histSvc->getProperty(
"OutputFile");
226 const StringProperty &sprop =
dynamic_cast<const StringProperty&
>(prop);
230 verbose() <<
"could not dcast OutputFile property to a StringProperty."
231 <<
" Need to fix Gaudi."
234 val = prop.toString();
238 && val !=
"UndefinedROOTOutputFileName") {
244 if (msgLevel(MSG::DEBUG)) {
245 debug() <<
"Histograms saving not required." <<
endmsg;
253 debug() <<
"EventID modifier Service not set. No run number, ... overrides "
258 debug() <<
"Could not find EventID modifier Service. No run number, ... "
267 const std::string& selName(
m_evtsel.value());
269 if (selName.empty()) {
272 if (
sc.isFailure()) {
273 warning() <<
"Unable to set EvtSel property" <<
endmsg;
277 if( !selName.empty() && selName !=
"NONE") {
278 SmartIF<IEvtSelector> theEvtSel{serviceLocator()->service(selName)};
285 fatal() <<
"Can not create the event selector Context." <<
endmsg;
286 return StatusCode::FAILURE;
288 if (msgLevel(MSG::INFO)) {
289 SmartIF<INamedInterface> named{theEvtSel};
291 info() <<
"Setup EventSelector service " << named->name( ) <<
endmsg;
295 else if (
sc.isFailure()) {
296 fatal() <<
"No valid event selector called " << selName <<
endmsg;
297 return StatusCode::FAILURE;
307 return StatusCode::FAILURE;
318 info() <<
"Using secondary event number." <<
endmsg;
322 info() <<
"runnung in standalone TEST MODE" <<
endmsg;
339 if ( policyName !=
"BeginEvent" &&
340 policyName !=
"EndEvent" ) {
342 fatal() <<
"Unknown policy [" << policyName
343 <<
"] for the 'ClearStore-policy !\n"
344 <<
" Valid values are: BeginEvent, EndEvent"
346 throw GaudiException(
"Can not setup 'ClearStore'-policy",
348 StatusCode::FAILURE);
370 unsigned int toolCtr = 0;
371 for ( ; firstTool != lastTool; ++firstTool )
391 StatusCode
sc = MinimalEventLoopMgr::finalize();
394 error() <<
"Error in Service base class Finalize"
401 error() <<
"Error in writing Histograms"
427 unsigned int toolCtr = 0;
428 info() <<
"Summary of AthenaEvtLoopPreSelectTool invocation: (invoked/success/failure)" <<
endmsg;
429 info() <<
"-----------------------------------------------------" <<
endmsg;
431 for ( ; firstTool != lastTool; ++firstTool ) {
432 info() << std::setw(2) << std::setiosflags(std::ios_base::right)
433 << toolCtr+1 <<
".) " << std::resetiosflags(std::ios_base::right)
434 << std::setw(48) << std::setfill(
'.')
435 << std::setiosflags(std::ios_base::left)
436 << (*firstTool)->name() << std::resetiosflags(std::ios_base::left)
439 << std::setw(6) << std::setiosflags(std::ios_base::right)
451 return (
sc.isFailure() || sc2.isFailure() ) ? StatusCode::FAILURE :
461 StatusCode
sc (StatusCode::SUCCESS);
464 std::vector<DataObject*> objects;
466 DataObject* obj = reg->object();
467 if ( !obj || obj->clID() == CLID_StatisticsFile )
return false;
468 objects.push_back( obj );
472 if ( !
sc.isSuccess() ) {
473 error() <<
"Error while traversing Histogram data store" <<
endmsg;
477 if ( objects.size() > 0) {
480 if (
m_nevt == 1 || force ||
481 (writeInterval != 0 &&
m_nevt%writeInterval == 0) ) {
484 sc = std::accumulate( begin( objects ), end( objects ),
sc, [&]( StatusCode isc,
auto& i ) {
485 IOpaqueAddress* pAddr =
nullptr;
487 if ( iret.isFailure() )
return iret;
488 i->registry()->setAddress( pAddr );
491 sc = std::accumulate( begin( objects ), end( objects ),
sc, [&]( StatusCode isc,
auto& i ) {
492 IRegistry* reg = i->registry();
493 StatusCode iret =
m_histoPersSvc->fillRepRefs( reg->address(), i );
494 return iret.isFailure() ? iret : isc;
496 if ( !
sc.isSuccess() ) {
497 error() <<
"Error while saving Histograms." <<
endmsg;
501 if (force || (writeInterval != 0 &&
m_nevt%writeInterval == 0) ) {
502 if (msgLevel(MSG::DEBUG)) {
debug() <<
"committing Histograms" <<
endmsg; }
517 return StatusCode::SUCCESS;
525 return StatusCode::SUCCESS;
537 always() <<
"A stopRun was requested by an incidentListener. "
538 <<
"Do not process this event."
541 return (StatusCode::SUCCESS);
547 Gaudi::Hive::setCurrentContext ( ctx );
550 if (declEvtRootSc == 0 ) {
552 return StatusCode::SUCCESS;
553 }
else if ( declEvtRootSc == -1) {
554 error() <<
"declareEventRootAddress for context " << ctx <<
" failed"
556 return StatusCode::FAILURE;
559 EventID::event_number_t evtNumber = ctx.eventID().event_number();
560 unsigned int conditionsRun = ctx.eventID().run_number();
565 if (attr->exists(
"ConditionsRun")) {
566 conditionsRun = (*attr)[
"ConditionsRun"].data<
unsigned int>();
571 Gaudi::Hive::setCurrentContext ( ctx );
574 if (
m_eventStore->record(std::make_unique<EventContext> (ctx),
575 "EventContext").isFailure())
577 error() <<
"Error recording event context object" <<
endmsg;
578 return (StatusCode::FAILURE);
591 info() <<
" ===>>> start of run " <<
m_currentRun <<
" <<<==="
599 bool toolsPassed=
true;
604 unsigned int toolCtr=0;
606 tool_store::iterator theTool =
m_tools.begin();
607 tool_store::iterator lastTool =
m_tools.end();
608 while(toolsPassed && theTool!=lastTool )
610 toolsPassed = (*theTool)->passEvent(ctx.eventID());
622 info() <<
" ===>>> start processing event #" << evtNumber <<
", run #" <<
m_currentRun
623 <<
" on slot " << ctx.slot() <<
", " <<
m_proc
624 <<
" events processed so far <<<===" <<
endmsg;
626 info() <<
" ===>>> start processing event #" << evtNumber <<
", run #" <<
m_currentRun
627 <<
" on slot " << ctx.slot() <<
", "
629 <<
" events processed so far <<<===" <<
endmsg;
642 debug() <<
"Adding event " << ctx.evt()
643 <<
", slot " << ctx.slot()
644 <<
" to the scheduler" <<
endmsg;
648 StatusCode addEventStatus =
m_schedulerSvc->pushNewEvent(
new EventContext{ std::move(ctx) } );
651 if (!addEventStatus.isSuccess()){
652 fatal() <<
"An event processing slot should be now free in the scheduler, but it appears not to be the case." <<
endmsg;
662 Gaudi::Hive::setCurrentContext( EventContext() );
664 return StatusCode::SUCCESS;
674 if (
sc.isSuccess()) {
685 SmartIF<IProperty> appmgr(serviceLocator());
686 if(Gaudi::setAppReturnCode(appmgr, Gaudi::ReturnCode::ScheduledStop,
true).isFailure()) {
687 error() <<
"Could not set return code of the application ("
688 << Gaudi::ReturnCode::ScheduledStop <<
")" <<
endmsg;
691 return StatusCode::SUCCESS;
705 StatusCode
sc = MinimalEventLoopMgr::stop();
712 for (
size_t islot = 0; islot < nslot; islot++) {
716 Gaudi::Hive::setCurrentContext( EventContext() );
723 if(maxevt==0)
return StatusCode::SUCCESS;
727 std::unique_ptr<yampl::ISocket>{yampl::SocketFactory().createClientSocket(
736 info() <<
"Starting loop on events" <<
endmsg;
738 StatusCode
sc(StatusCode::SUCCESS);
741 auto start_time = tbb::tick_count::now();
742 auto secsFromStart = [&start_time]()->
double{
743 return (tbb::tick_count::now()-start_time).seconds();
746 std::unique_ptr<RangeStruct> range;
752 bool loop_ended = range->eventRangeID.empty();
756 m_incidentSvc->fireIncident(FileIncident(
name(),
"NextEventRange",range->eventRangeID));
759 bool no_more_events =
false;
763 debug() <<
" -> createdEvts: " << createdEvts <<
endmsg;
773 if ( !ctx.valid() ) {
774 sc = StatusCode::FAILURE;
780 if (
sc.isFailure()) {
781 error() <<
"Terminating event processing loop due to errors" <<
endmsg;
793 if(range->eventRangeID.empty()) {
794 no_more_events =
true;
799 m_incidentSvc->fireIncident(FileIncident(
name(),
"NextEventRange",range->eventRangeID));
815 sc = StatusCode::FAILURE;
822 sc = StatusCode::SUCCESS;
831 info() <<
"---> Loop Finished (seconds): " << secsFromStart() <<
endmsg;
845 error() <<
"Seek failed; unsupported by event selector"
847 return StatusCode::FAILURE;
852 fatal() <<
"Can not create the event selector Context."
854 return StatusCode::FAILURE;
860 if (
sc.isSuccess()) {
885 error() <<
"Collection size unsupported by event selector"
892 fatal() <<
"Can not create the event selector Context."
907 if(inc.type()!=
"BeforeFork")
911 warning() <<
"Skipping BeforeFork handler. Either no event selector is provided or begin run has already passed" <<
endmsg;
917 error() <<
"Failed to initialize Algorithms" <<
endmsg;
923 IOpaqueAddress* addr = 0;
925 if(!
sc.isSuccess()) {
926 info() <<
"No more events in event selection " <<
endmsg;
930 if (
sc.isFailure()) {
931 error() <<
"Could not create an IOpaqueAddress" <<
endmsg;
937 if(!
sc.isSuccess()) {
938 error() <<
"Error declaring Event object" <<
endmsg;
944 warning() <<
"Error loading Event proxies" <<
endmsg;
950 if(!
sc.isSuccess()) {
951 error() <<
"Unable to retrieve Event root object" <<
endmsg;
962 if(!
sc.isSuccess()) {
963 error() <<
"Clear of Event data store failed" <<
endmsg;
974 if ( !
sc.isSuccess() ) {
979 if( !
sc.isSuccess() ) {
981 if (
sc.isSuccess() ) {
983 if ( !
sc.isSuccess() ) {
984 warning() <<
"Error creating IOpaqueAddress." <<
endmsg;
1000 StatusCode
sc(StatusCode::SUCCESS);
1005 std::unique_ptr<const EventInfo> pEvent;
1013 IOpaqueAddress* addr = 0;
1017 error() <<
"Seek failed; unsupported by event selector" <<
endmsg;
1022 if(
sc.isFailure()) {
1029 if ( !
sc.isSuccess() ) {
1031 info() <<
"No more events in event selection " <<
endmsg;
1036 error() <<
"Could not create an IOpaqueAddress" <<
endmsg;
1045 if( !
sc.isSuccess() ) {
1047 warning() <<
"Error declaring Event object" <<
endmsg;
1051 error() <<
"Error loading Event proxies" <<
endmsg;
1054 bool consume_modifier_stream =
false;
1057 if ( pAttrList !=
nullptr && pAttrList->size() > 6 ) {
1059 unsigned int runNumber = (*pAttrList)[
"RunNumber"].data<
unsigned int>();
1060 unsigned long long eventNumber = (*pAttrList)[
"EventNumber"].data<
unsigned long long>();
1061 unsigned int eventTime = (*pAttrList)[
"EventTime"].data<
unsigned int>();
1062 unsigned int eventTimeNS = (*pAttrList)[
"EventTimeNanoSec"].data<
unsigned int>();
1063 unsigned int lumiBlock = (*pAttrList)[
"LumiBlockN"].data<
unsigned int>();
1064 unsigned int bunchId = (*pAttrList)[
"BunchId"].data<
unsigned int>();
1066 consume_modifier_stream =
true;
1069 unsigned long long eventNumberSecondary{};
1070 if ( !(pAttrList->exists(
"hasSecondaryInput") && (*pAttrList)[
"hasSecondaryInput"].data<
bool>()) ) {
1071 fatal() <<
"Secondary EventNumber requested, but secondary input does not exist!" <<
endmsg;
1074 if ( pAttrList->exists(
"EventNumber_secondary") ) {
1075 eventNumberSecondary = (*pAttrList)[
"EventNumber_secondary"].data<
unsigned long long>();
1081 if (pEventSecondary) {
1082 eventNumberSecondary = pEventSecondary->
event_ID()->event_number();
1085 fatal() <<
"Secondary EventNumber requested, but it does not exist!" <<
endmsg;
1089 if (eventNumberSecondary != 0) {
1091 info() <<
" ===>>> using secondary event #" << eventNumberSecondary <<
" instead of #" << eventNumber <<
"<<<===" <<
endmsg;
1093 eventNumber = eventNumberSecondary;
1098 pEvent = std::make_unique<EventInfo>(
1099 std::make_unique<EventID>(runNumber, eventNumber, eventTime,
1100 eventTimeNS, lumiBlock, bunchId),
1105 fatal() <<
"Valid input attribute list required but not present!";
1109 const EventInfo* pEventObserver{pEvent.get()};
1110 if (!pEventObserver) {
1114 if( !pEventObserver ) {
1119 if( !
sc.isSuccess() ) {
1120 error() <<
"Unable to retrieve Event root object" <<
endmsg;
1123 consume_modifier_stream =
true;
1125 pEvent = std::make_unique<EventInfo>(
1128 pEventObserver = pEvent.get();
1130 if( !
sc.isSuccess() ) {
1131 error() <<
"Error declaring event data object" <<
endmsg;
1135 consume_modifier_stream =
false;
1143 consume_modifier_stream);
1149 unsigned int runNmb{1}, evtNmb{
m_nevt + 1};
1156 auto eid = std::make_unique<EventID> (runNmb,evtNmb,
m_timeStamp);
1158 eid->set_lumi_block( runNmb );
1162 pEvent = std::make_unique<EventInfo>(std::move(eid),
1163 std::make_unique<EventType>());
1167 debug() <<
"selecting store: " << ctx.slot() <<
endmsg;
1171 debug() <<
"recording EventInfo " << *pEvent->event_ID() <<
" in "
1174 if( !
sc.isSuccess() ) {
1175 error() <<
"Error declaring event data object" <<
endmsg;
1185 bool consume_modifier_stream) {
1191 m_evtIdModSvc->modify_evtid(new_eID, ctx.evt(), consume_modifier_stream);
1192 if (msgLevel(MSG::DEBUG)) {
1193 unsigned int oldrunnr = eID.run_number();
1194 unsigned int oldLB = eID.lumi_block();
1195 unsigned int oldTS = eID.time_stamp();
1196 unsigned int oldTSno = eID.time_stamp_ns_offset();
1197 debug() <<
"modifyEventContext: use evtIdModSvc runnr=" << oldrunnr
1198 <<
" -> " << new_eID.run_number() <<
endmsg;
1199 debug() <<
"modifyEventContext: use evtIdModSvc LB=" << oldLB <<
" -> "
1200 << new_eID.lumi_block() <<
endmsg;
1201 debug() <<
"modifyEventContext: use evtIdModSvc TimeStamp=" << oldTS
1202 <<
" -> " << new_eID.time_stamp() <<
endmsg;
1203 debug() <<
"modifyEventContext: use evtIdModSvc TimeStamp ns Offset="
1204 << oldTSno <<
" -> " << new_eID.time_stamp_ns_offset() <<
endmsg;
1206 ctx.setEventID(new_eID);
1208 ctx.eventID().run_number());
1212 ctx.setEventID(eID);
1221 if (
sc.isFailure()) {
1222 fatal() <<
"Slot " << ctx.slot()
1223 <<
" could not be selected for the WhiteBoard" <<
endmsg;
1224 return EventContext{};
1228 debug() <<
"created EventContext, num: " << ctx.evt() <<
" in slot: "
1237 Gaudi::setAppReturnCode(
m_appMgrProperty, Gaudi::ReturnCode::Success,
true).ignore();
1252 StatusCode
sc(StatusCode::SUCCESS);
1255 std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
1257 EventContext* finishedEvtContext{
nullptr};
1260 debug() <<
"drainScheduler: [" << finishedEvts <<
"] Waiting for a context" <<
endmsg;
1264 if (
sc.isSuccess()){
1265 debug() <<
"drainScheduler: scheduler not empty: Context "
1266 << finishedEvtContext <<
endmsg;
1267 finishedEvtContexts.emplace_back(finishedEvtContext);
1270 debug() <<
"drainScheduler: scheduler empty" <<
endmsg;
1275 while (
m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()){
1276 finishedEvtContexts.emplace_back(finishedEvtContext);
1281 for (
auto& thisFinishedEvtContext : finishedEvtContexts){
1282 if (!thisFinishedEvtContext) {
1283 fatal() <<
"Detected nullptr ctxt while clearing WB!"<<
endmsg;
1288 if (
m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
1289 fatal() <<
"Failed event detected on " << thisFinishedEvtContext
1290 <<
" w/ fail mode: "
1291 <<
m_aess->eventStatus(*thisFinishedEvtContext) <<
endmsg;
1297 EventID::event_number_t n_evt(0);
1299 if (
m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
1300 n_run = thisFinishedEvtContext->eventID().run_number();
1301 n_evt = thisFinishedEvtContext->eventID().event_number();
1303 error() <<
"DrainSched: unable to select store "
1304 << thisFinishedEvtContext->slot() <<
endmsg;
1313 Gaudi::Hive::setCurrentContext( *thisFinishedEvtContext );
1314 info() <<
"Firing EndProcessing" <<
endmsg;
1315 m_incidentSvc->fireIncident(Incident(
name(), IncidentType::EndProcessing, *thisFinishedEvtContext ));
1321 std::string outputFileReport = rangeReport->second + std::string(
",ID:")
1322 + rangeReport->first + std::string(
",CPU:N/A,WALL:N/A");
1326 memcpy(message2pilot,outputFileReport.data(),outputFileReport.size());
1327 m_socket->send(message2pilot,outputFileReport.size());
1329 info() <<
"Reported the output " << outputFileReport <<
endmsg;
1333 debug() <<
"Clearing slot " << thisFinishedEvtContext->slot()
1334 <<
" (event " << thisFinishedEvtContext->evt()
1335 <<
") of the whiteboard" <<
endmsg;
1337 StatusCode
sc =
clearWBSlot(thisFinishedEvtContext->slot());
1338 if (!
sc.isSuccess()) {
1339 error() <<
"Whiteboard slot " << thisFinishedEvtContext->slot()
1340 <<
" could not be properly cleared";
1352 info() <<
" ===>>> done processing event #" << n_evt <<
", run #" << n_run
1353 <<
" on slot " << thisFinishedEvtContext->slot() <<
", "
1354 <<
m_proc <<
" events processed so far <<<===" <<
endmsg;
1356 info() <<
" ===>>> done processing event #" << n_evt <<
", run #" << n_run
1357 <<
" on slot " << thisFinishedEvtContext->slot() <<
", "
1359 <<
" events processed so far <<<===" <<
endmsg;
1360 std::ofstream outfile(
"eventLoopHeartBeat.txt");
1362 error() <<
" unable to open: eventLoopHeartBeat.txt" <<
endmsg;
1366 outfile <<
" done processing event #" << n_evt <<
", run #" << n_run
1367 <<
" " <<
m_nev <<
" events read so far <<<===" << std::endl;
1372 debug() <<
"drainScheduler thisFinishedEvtContext: " << thisFinishedEvtContext
1376 return ( fail ? -1 : 1 );
1384 if( !
sc.isSuccess() ) {
1385 warning() <<
"Clear of Event data store failed" <<
endmsg;
1393 static const std::string strReady(
"Ready for events");
1394 static const std::string strStopProcessing(
"No more events");
1398 static std::atomic<size_t> line_n = 0;
1399 info() <<
"in TEST MODE, Range #" << line_n+1 <<
endmsg;
1403 void* ready_message = malloc(strReady.size());
1404 if (!ready_message) std::abort();
1405 memcpy(ready_message,strReady.data(),strReady.size());
1406 socket->send(ready_message,strReady.size());
1407 void* eventRangeMessage;
1408 std::string strPeerId;
1409 ssize_t eventRangeSize = socket->recv(eventRangeMessage,strPeerId);
1410 range = std::string((
const char*)eventRangeMessage,eventRangeSize);
1411 leftString(range,
'\n');
1414 std::unique_ptr<RangeStruct> result = std::make_unique<RangeStruct>();
1415 if(range.compare(strStopProcessing)==0) {
1416 info() <<
"No more events from the server" <<
endmsg;
1419 info() <<
"Got Event Range from the pilot: " << range <<
endmsg;
1424 if(range.starts_with(
"[{")) range=range.substr(2);
1425 if(range.ends_with(
"}]")){
1426 const int truncate = range.size()-2;
1427 leftString(range, truncate);
1430 std::map<std::string,std::string> eventRangeMap;
1432 size_t endpos = range.find(
',');
1433 while(endpos!=std::string::npos) {
1435 std::string keyValue(range.substr(startpos,endpos-startpos));
1436 size_t colonPos = keyValue.find(
':');
1437 std::string strKey = keyValue.substr(0,colonPos);
1438 std::string strVal = keyValue.substr(colonPos+1);
1441 eventRangeMap[strKey]=std::move(strVal);
1444 startpos = endpos+1;
1445 endpos = range.find(
',',startpos);
1449 std::string keyValue(range.substr(startpos));
1450 size_t colonPos = keyValue.find(
':');
1451 std::string strKey = keyValue.substr(0,colonPos);
1452 std::string strVal = keyValue.substr(colonPos+1);
1455 eventRangeMap[strKey]=std::move(strVal);
1463 std::string errorStr{
""};
1465 if(eventRangeMap.find(
"eventRangeID")==eventRangeMap.end()
1466 || eventRangeMap.find(
"startEvent")==eventRangeMap.end()
1467 || eventRangeMap.find(
"lastEvent")==eventRangeMap.end()
1468 || eventRangeMap.find(
"PFN")==eventRangeMap.end()) {
1470 errorStr =
"ERR_ATHENAMP_PARSE \"" + range +
"\": Wrong format";
1473 if(errorStr.empty()) {
1474 result->startEvent = std::atoi(eventRangeMap[
"startEvent"].c_str());
1475 result->lastEvent = std::atoi(eventRangeMap[
"lastEvent"].c_str());
1477 if(eventRangeMap[
"eventRangeID"].
empty()
1478 || eventRangeMap[
"PFN"].
empty()
1479 || result->lastEvent < result->startEvent) {
1481 errorStr =
"ERR_ATHENAMP_PARSE \"" + range +
"\": Wrong values of range fields";
1485 if(
m_pfn != eventRangeMap[
"PFN"]) {
1486 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_evtSelector);
1487 if(!propertyServer) {
1488 errorStr =
"ERR_ATHENAMP_PARSE \"" + range +
"\": Unable to dyn-cast the event selector to IProperty";
1491 std::string strInpuCol(
"InputCollections");
1492 std::vector<std::string> vectInpCol{eventRangeMap[
"PFN"],};
1493 StringArrayProperty inputFileList(std::move(strInpuCol), vectInpCol);
1494 if(propertyServer->setProperty(inputFileList).isFailure()) {
1495 errorStr =
"ERR_ATHENAMP_PARSE \"" + range +
"\": Unable to set input file name property to the Event Selector";
1498 m_pfn = eventRangeMap[
"PFN"];
1505 if(errorStr.empty()) {
1507 debug() <<
"*** Decoded Event Range ***" <<
endmsg;
1508 for (
const auto& fieldvalue : eventRangeMap) {
1509 debug() << fieldvalue.first <<
":" << fieldvalue.second <<
endmsg;
1512 result->eventRangeID = eventRangeMap[
"eventRangeID"];
1513 result->pfn = eventRangeMap[
"PFN"];
1518 warning() << errorStr <<
endmsg;
1519 info() <<
"Ignoring this event range" <<
endmsg;
1522 memcpy(errorMessage,errorStr.data(),errorStr.size());
1523 socket->send(errorMessage,errorStr.size());
1535 while(i<
str.size() &&
str[i]==
' ') i++;
1536 if(i)
str =
str.substr(i);
1538 if(
str.empty())
return;
1542 while(
str[i]==
' ') i--;
1543 if(i)
str.resize(i+1);
1550 if(
str.starts_with(
"u\'")) {
1552 if(
str.rfind(
'\'')==
str.size()-1) {
1556 else if(
str.starts_with(
"\"")) {
1558 if(
str.rfind(
'\"')==
str.size()-1) {
#define ATH_CHECK
Evaluate an expression and check for errors.
ClearStorePolicy::Type clearStorePolicy(const std::string &policyName, MsgStream &msg)
returns the enum-version of the policy (by name)
Helpers for checking error return status codes and reporting errors.
#define CHECK(...)
Evaluate an expression and check for errors.
Assign a CLID to EventContext.
This class provides a unique identification for each event, in terms of run/event number and/or a tim...
EventID eventIDFromxAOD(const xAOD::EventInfo *xaod)
Create EventID object from xAOD::EventInfo.
EventType eventTypeFromxAOD(const xAOD::EventInfo *xaod)
Create EventType object from xAOD::EventInfo.
This class provides general information about an event.
Extension to IEvtSelector to allow for seeking.
void setProperty(columnar::PythonToolHandle &self, const std::string &key, nb::object value)
This file contains the class definition for the OutputStreamSequencerSvc class.
An AttributeList represents a logical row of attributes in a metadata table.
static const Attributes_t empty
An AttributeList represents a logical row of attributes in a metadata table.
StatusCode getEventRoot(IOpaqueAddress *&refpAddr)
Create event address using event selector.
bool m_requireInputAttributeList
require input attribute list
virtual StatusCode finalize() override
implementation of IAppMgrUI::finalize
virtual EventContext createEventContext() override
implementation of IEventProcessor::createEventContext()
virtual void resetAppReturnCode() override
Reset the application return code.
virtual StatusCode executeEvent(EventContext &&ctx) override
implementation of IEventProcessor::executeEvent(void* par)
tool_store::const_iterator tool_iterator
virtual int curEvent() const override
Return the current event count.
StringProperty m_histPersName
ServiceHandle< OutputStreamSequencerSvc > m_outSeqSvc
SmartIF< IScheduler > m_schedulerSvc
A shortcut for the scheduler.
void setClearStorePolicy(Gaudi::Details::PropertyBase &clearStorePolicy)
property update handler:set the clear-store policy value and check its value.
IntegerProperty m_failureMode
unsigned int m_timeStampInt
virtual StatusCode executeAlgorithms()
Run the algorithms for the current event.
std::unique_ptr< yampl::ISocket > m_socket
ServiceHandle< Athena::IConditionsCleanerSvc > m_conditionsCleaner
std::unique_ptr< RangeStruct > getNextRange(yampl::ISocket *socket)
virtual bool terminateLoop() override
tool_stats m_toolReject
tool returns StatusCode::FAILURE counter
UnsignedIntegerProperty m_eventPrintoutInterval
tool_stats m_toolAccept
tool returns StatusCode::SUCCESS counter
virtual StatusCode executeRun(int maxevt) override
implementation of IEventProcessor::executeRun(int maxevt)
number_type m_currentRun
current run number
AthenaMtesEventLoopMgr()=delete
void modifyEventContext(EventContext &ctx, const EventID &eID, bool consume_modifier_stream)
tool_store m_tools
internal tool store
virtual const std::string & name() const override
unsigned int m_nev
events processed
virtual StatusCode initialize() override
implementation of IAppMgrUI::initalize
SmartIF< IAlgResourcePool > m_algResourcePool
Reference to the Algorithm resource pool.
IEvtIdModifierSvc_t m_evtIdModSvc
virtual StatusCode writeHistograms(bool force=false)
Dump out histograms as needed.
SmartIF< IHiveWhiteBoard > m_whiteboard
Reference to the Whiteboard interface.
virtual ~AthenaMtesEventLoopMgr()
Standard Destructor.
StatusCode initializeAlgorithms()
Initialize all algorithms and output streams.
SmartIF< IAlgExecStateSvc > m_aess
Reference to the Algorithm Execution State Svc.
tool_stats m_toolInvoke
tool called counter
virtual int drainScheduler(int &finishedEvents, bool report) override
Drain the scheduler from all actions that may be queued.
bool m_useSecondaryEventNumber
read event number from secondary input
virtual StatusCode nextEvent(int maxevt) override
implementation of IAppMgrUI::nextEvent. maxevt==0 returns immediately
EvtContext * m_evtContext
Gaudi event selector Context (may be used as a cursor by the evt selector)
StatusCode clearWBSlot(int evtSlot)
Clear a slot in the WB.
std::string m_whiteboardName
Name of the Whiteboard to be used.
int declareEventRootAddress(EventContext &)
Declare the root address of the event.
bool m_scheduledStop
Scheduled stop of event processing.
virtual StatusCode seek(int evt) override
Seek to a given event.
virtual StatusCode stop() override
implementation of IService::stop
std::string m_schedulerName
Name of the scheduler to be used.
StringArrayProperty m_testPilotMessages
IDataManagerSvc_t m_histoDataMgrSvc
Reference to the Histogram Data Service.
StoreGateSvc_t m_eventStore
Reference to StoreGateSvc;.
virtual int size() override
Return the size of the collection.
Gaudi::Property< std::string > m_eventRangeChannel
StringProperty m_clearStorePolicy
SmartIF< IProperty > m_appMgrProperty
Property interface of ApplicationMgr.
EventContext m_lastEventContext
virtual void setCurrentEventNum(int num) override
ServiceHandle< IConversionSvc > IConversionSvc_t
IConversionSvc_t m_histoPersSvc
virtual StatusCode stopRun() override
implementation of IEventProcessor::stopRun()
void setupPreSelectTools(Gaudi::Details::PropertyBase &)
property update handler:sets up the Pre-selection tools
IIncidentSvc_t m_incidentSvc
Reference to the incident service.
void trimRangeStrings(std::string &str)
UnsignedIntegerProperty m_writeInterval
virtual void handle(const Incident &inc) override
IIncidentListenet interfaces.
IEvtSelector * m_evtSelector
Reference to the Event Selector.
void resetTimeout(Timeout &instance)
Reset timeout.
static Timeout & instance()
Get reference to Timeout singleton.
void setConditionsRun(EventIDBase::number_type conditionsRun)
This class provides a unique identification for each event, in terms of run/event number and/or a tim...
EventIDBase::number_type number_type
EventID * event_ID()
the unique identification of the event.
Abstract interface for seeking for an event selector.
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
virtual int size(IEvtSelector::Context &c) const =0
Return the size of the collection, or -1 if we can't get the size.
std::unique_ptr< RangeReport_t > RangeReport_ptr
int ir
counter of the current depth
void setExtendedEventContext(EventContext &ctx, ExtendedEventContext &&ectx)
Move an extended context into a context object.
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
void * xmalloc(size_t size)
Trapping version of malloc.
thread_local event_number_t eventIndex
EventInfo_v1 EventInfo
Definition of the latest event info version.
Trapping version of malloc.