16 #include "GaudiKernel/IAlgorithm.h"
17 #include "GaudiKernel/SmartIF.h"
18 #include "GaudiKernel/Incident.h"
19 #include "GaudiKernel/DataObject.h"
20 #include "GaudiKernel/IIncidentSvc.h"
21 #include "GaudiKernel/IDataManagerSvc.h"
22 #include "GaudiKernel/IDataProviderSvc.h"
23 #include "GaudiKernel/IConversionSvc.h"
24 #include "GaudiKernel/GaudiException.h"
25 #include "GaudiKernel/AppReturnCode.h"
26 #include "GaudiKernel/MsgStream.h"
27 #include "GaudiKernel/EventIDBase.h"
28 #include "GaudiKernel/ThreadLocalContext.h"
29 #include "GaudiKernel/FileIncident.h"
33 #include "EventInfo/EventInfo.h"
39 #include "tbb/tick_count.h"
40 #include <yampl/ISocket.h>
41 #include "yampl/SocketFactory.h"
54 leftString(std::string &
s,
char sc){
55 bool truncated{
false};
57 if (
n!=std::string::npos){
65 leftString(std::string &
s,
int n){
66 bool truncated{
false};
67 if (
static_cast<size_t>(
n) <
s.size()){
76 , ISvcLocator* svcLoc)
77 : base_class(nam, svcLoc)
78 , m_incidentSvc (
"IncidentSvc", nam )
79 , m_eventStore(
"StoreGateSvc", nam )
80 , m_evtSelector{
nullptr}
81 , m_evtContext{
nullptr}
82 , m_histoDataMgrSvc(
"HistogramDataSvc", nam )
83 , m_histoPersSvc (
"HistogramPersistencySvc", nam )
84 , m_evtIdModSvc(
"", nam)
93 , m_doEvtHeartbeat(
false)
94 , m_conditionsCleaner(
"Athena::ConditionsCleanerSvc", nam )
95 , m_outSeqSvc(
"OutputStreamSequencerSvc", nam)
97 declareProperty(
"EvtSel", m_evtsel,
98 "Name of Event Selector to use. If empty string (default) "
99 "take value from ApplicationMgr");
100 declareProperty(
"HistogramPersistency", m_histPersName=
"",
101 "Histogram persistency technology to use: ROOT, HBOOK, NONE. "
102 "By default (empty string) get property value from "
104 declareProperty(
"HistWriteInterval", m_writeInterval=0 ,
105 "histogram write/update interval");
106 declareProperty(
"FailureMode", m_failureMode=1 ,
107 "Controls behaviour of event loop depending on return code of"
108 " Algorithms. 0: all non-SUCCESSes terminate job. "
109 "1: RECOVERABLE skips to next event, FAILURE terminates job "
110 "(DEFAULT). 2: RECOVERABLE and FAILURE skip to next events");
111 declareProperty(
"EventPrintoutInterval", m_eventPrintoutInterval=1,
112 "Print event heartbeat printouts every m_eventPrintoutInterval events");
113 declareProperty(
"ClearStorePolicy",
114 m_clearStorePolicy =
"EndEvent",
115 "Configure the policy wrt handling of when the "
116 "'clear-the-event-store' event shall happen: at EndEvent "
117 "(default as it is makes things easier for memory management"
118 ") or at BeginEvent (easier e.g. for interactive use)");
119 declareProperty(
"PreSelectTools",m_tools,
"AlgTools for event pre-selection")->
122 declareProperty(
"SchedulerSvc", m_schedulerName=
"ForwardSchedulerSvc",
123 "Name of the scheduler to be used");
125 declareProperty(
"WhiteboardSvc", m_whiteboardName=
"EventDataSvc",
126 "Name of the Whiteboard to be used");
128 declareProperty(
"EventStore", m_eventStore);
130 declareProperty(
"EvtIdModifierSvc", m_evtIdModSvc,
131 "ServiceHandle for EvtIdModifierSvc");
133 declareProperty(
"FakeLumiBlockInterval", m_flmbi = 0,
134 "Event interval at which to increment lumiBlock# when "
135 "creating events without an EventSelector. Zero means "
136 "don't increment it");
137 declareProperty(
"FakeTimestampInterval", m_timeStampInt = 1,
138 "timestamp interval between events when creating Events "
139 "without an EventSelector");
140 declareProperty(
"RequireInputAttributeList", m_requireInputAttributeList =
false,
141 "Require valid input attribute list to be present");
142 declareProperty(
"UseSecondaryEventNumber", m_useSecondaryEventNumber =
false,
143 "In case of DoubleEventSelector use event number from secondary input");
145 declareProperty(
"ESTestPilotMessages", m_testPilotMessages,
"List of messages from fake pilot for test mode");
147 m_scheduledStop =
false;
160 if(!
sc.isSuccess()) {
161 error() <<
"Failed to initialize base class MinimalEventLoopMgr" <<
endmsg;
172 return StatusCode::FAILURE;
177 fatal() <<
"Error retrieving SchedulerSvc interface ISchedulerSvc." <<
endmsg;
178 return StatusCode::FAILURE;
183 fatal() <<
"Error retrieving AlgResourcePool" <<
endmsg;
184 return StatusCode::FAILURE;
187 m_aess = serviceLocator()->service(
"AlgExecStateSvc");
189 fatal() <<
"Error retrieving AlgExecStateSvc" <<
endmsg;
190 return StatusCode::FAILURE;
199 SmartIF<IProperty> prpMgr(serviceLocator());
200 if(!prpMgr.isValid()) {
201 fatal() <<
"IProperty interface not found in ApplicationMgr." <<
endmsg;
202 return StatusCode::FAILURE;
208 if(histPersName.empty()) {
209 ATH_CHECK(setProperty(prpMgr->getProperty(
"HistogramPersistency")));
212 if(histPersName !=
"NONE") {
217 SmartIF<IProperty>
histSvc{serviceLocator()->service(
"RootHistSvc")};
220 error() <<
"could not locate actual Histogram persistency service" <<
endmsg;
223 const Gaudi::Details::PropertyBase &prop =
histSvc->getProperty(
"OutputFile");
226 const StringProperty &sprop =
dynamic_cast<const StringProperty&
>(prop);
230 verbose() <<
"could not dcast OutputFile property to a StringProperty."
231 <<
" Need to fix Gaudi."
234 val = prop.toString();
238 &&
val !=
"UndefinedROOTOutputFileName") {
245 debug() <<
"Histograms saving not required." <<
endmsg;
253 debug() <<
"EventID modifier Service not set. No run number, ... overrides "
258 debug() <<
"Could not find EventID modifier Service. No run number, ... "
270 sc = setProperty(prpMgr->getProperty(
"EvtSel"));
272 if (
sc.isFailure()) {
273 warning() <<
"Unable to set EvtSel property" <<
endmsg;
278 SmartIF<IEvtSelector> theEvtSel{serviceLocator()->service(
selName)};
285 fatal() <<
"Can not create the event selector Context." <<
endmsg;
286 return StatusCode::FAILURE;
288 if (msgLevel(MSG::INFO)) {
289 SmartIF<INamedInterface> named{theEvtSel};
291 info() <<
"Setup EventSelector service " << named->name( ) <<
endmsg;
295 else if (
sc.isFailure()) {
296 fatal() <<
"No valid event selector called " <<
selName <<
endmsg;
297 return StatusCode::FAILURE;
307 return StatusCode::FAILURE;
318 info() <<
"Using secondary event number." <<
endmsg;
322 info() <<
"runnung in standalone TEST MODE" <<
endmsg;
345 if ( policyName !=
"BeginEvent" &&
346 policyName !=
"EndEvent" ) {
348 fatal() <<
"Unknown policy [" << policyName
349 <<
"] for the 'ClearStore-policy !\n"
350 <<
" Valid values are: BeginEvent, EndEvent"
352 throw GaudiException(
"Can not setup 'ClearStore'-policy",
354 StatusCode::FAILURE);
376 unsigned int toolCtr = 0;
377 for ( ; firstTool != lastTool; ++firstTool )
400 error() <<
"Error in Service base class Finalize"
407 error() <<
"Error in writing Histograms"
433 unsigned int toolCtr = 0;
434 info() <<
"Summary of AthenaEvtLoopPreSelectTool invocation: (invoked/success/failure)" <<
endmsg;
435 info() <<
"-----------------------------------------------------" <<
endmsg;
437 for ( ; firstTool != lastTool; ++firstTool ) {
438 info() << std::setw(2) << std::setiosflags(std::ios_base::right)
439 << toolCtr+1 <<
".) " << std::resetiosflags(std::ios_base::right)
440 << std::setw(48) << std::setfill(
'.')
441 << std::setiosflags(std::ios_base::left)
442 << (*firstTool)->name() << std::resetiosflags(std::ios_base::left)
445 << std::setw(6) << std::setiosflags(std::ios_base::right)
457 return (
sc.isFailure() || sc2.isFailure() ) ? StatusCode::FAILURE :
470 std::vector<DataObject*>
objects;
472 DataObject*
obj = reg->object();
473 if ( !
obj ||
obj->clID() == CLID_StatisticsFile )
return false;
478 if ( !
sc.isSuccess() ) {
479 error() <<
"Error while traversing Histogram data store" <<
endmsg;
487 (writeInterval != 0 &&
m_nevt%writeInterval == 0) ) {
491 IOpaqueAddress* pAddr =
nullptr;
493 if ( iret.isFailure() )
return iret;
494 i->registry()->setAddress( pAddr );
498 IRegistry* reg =
i->registry();
500 return iret.isFailure() ? iret : isc;
502 if ( !
sc.isSuccess() ) {
503 error() <<
"Error while saving Histograms." <<
endmsg;
507 if (
force || (writeInterval != 0 &&
m_nevt%writeInterval == 0) ) {
523 return StatusCode::SUCCESS;
531 return StatusCode::SUCCESS;
543 always() <<
"A stopRun was requested by an incidentListener. "
544 <<
"Do not process this event."
547 return (StatusCode::SUCCESS);
553 Gaudi::Hive::setCurrentContext ( ctx );
556 if (declEvtRootSc == 0 ) {
558 return StatusCode::SUCCESS;
559 }
else if ( declEvtRootSc == -1) {
560 error() <<
"declareEventRootAddress for context " << ctx <<
" failed"
562 return StatusCode::FAILURE;
566 unsigned int conditionsRun = ctx.eventID().run_number();
569 if (
eventStore()->contains<AthenaAttributeList>(
"Input") &&
571 if (attr->exists(
"ConditionsRun")) {
572 conditionsRun = (*attr)[
"ConditionsRun"].data<
unsigned int>();
577 Gaudi::Hive::setCurrentContext ( ctx );
580 if (
eventStore()->record(std::make_unique<EventContext> (ctx),
581 "EventContext").isFailure())
583 error() <<
"Error recording event context object" <<
endmsg;
584 return (StatusCode::FAILURE);
605 bool toolsPassed=
true;
610 unsigned int toolCtr=0;
614 while(toolsPassed && theTool!=lastTool )
616 toolsPassed = (*theTool)->passEvent(ctx.eventID());
628 info() <<
" ===>>> start processing event #" << evtNumber <<
", run #" <<
m_currentRun
629 <<
" on slot " << ctx.slot() <<
", " <<
m_proc
630 <<
" events processed so far <<<===" <<
endmsg;
632 info() <<
" ===>>> start processing event #" << evtNumber <<
", run #" <<
m_currentRun
633 <<
" on slot " << ctx.slot() <<
", "
635 <<
" events processed so far <<<===" <<
endmsg;
648 debug() <<
"Adding event " << ctx.evt()
649 <<
", slot " << ctx.slot()
650 <<
" to the scheduler" <<
endmsg;
657 if (!addEventStatus.isSuccess()){
658 fatal() <<
"An event processing slot should be now free in the scheduler, but it appears not to be the case." <<
endmsg;
668 Gaudi::Hive::setCurrentContext( EventContext() );
670 return StatusCode::SUCCESS;
680 if (
sc.isSuccess()) {
691 SmartIF<IProperty> appmgr(serviceLocator());
692 if(Gaudi::setAppReturnCode(appmgr, Gaudi::ReturnCode::ScheduledStop).isFailure()) {
693 error() <<
"Could not set return code of the application ("
694 << Gaudi::ReturnCode::ScheduledStop <<
")" <<
endmsg;
697 return StatusCode::SUCCESS;
718 for (
size_t islot = 0; islot < nslot; islot++) {
722 Gaudi::Hive::setCurrentContext( EventContext() );
729 if(maxevt==0)
return StatusCode::SUCCESS;
733 std::unique_ptr<yampl::ISocket>{yampl::SocketFactory().createClientSocket(
742 info() <<
"Starting loop on events" <<
endmsg;
748 auto secsFromStart = [&start_time]()->
double{
752 std::unique_ptr<RangeStruct>
range;
758 bool loop_ended =
range->eventRangeID.empty();
765 bool no_more_events =
false;
769 debug() <<
" -> createdEvts: " << createdEvts <<
endmsg;
779 if ( !ctx.valid() ) {
780 sc = StatusCode::FAILURE;
786 if (
sc.isFailure()) {
787 error() <<
"Terminating event processing loop due to errors" <<
endmsg;
799 if(
range->eventRangeID.empty()) {
800 no_more_events =
true;
821 sc = StatusCode::FAILURE;
828 sc = StatusCode::SUCCESS;
837 info() <<
"---> Loop Finished (seconds): " << secsFromStart() <<
endmsg;
851 error() <<
"Seek failed; unsupported by event selector"
853 return StatusCode::FAILURE;
858 fatal() <<
"Can not create the event selector Context."
860 return StatusCode::FAILURE;
866 if (
sc.isSuccess()) {
891 error() <<
"Collection size unsupported by event selector"
898 fatal() <<
"Can not create the event selector Context."
913 if(inc.type()!=
"BeforeFork")
917 warning() <<
"Skipping BeforeFork handler. Either no event selector is provided or begin run has already passed" <<
endmsg;
923 error() <<
"Failed to initialize Algorithms" <<
endmsg;
929 IOpaqueAddress* addr = 0;
931 if(!
sc.isSuccess()) {
932 info() <<
"No more events in event selection " <<
endmsg;
936 if (
sc.isFailure()) {
937 error() <<
"Could not create an IOpaqueAddress" <<
endmsg;
943 if(!
sc.isSuccess()) {
944 error() <<
"Error declaring Event object" <<
endmsg;
949 if(
eventStore()->loadEventProxies().isFailure()) {
950 warning() <<
"Error loading Event proxies" <<
endmsg;
956 if(!
sc.isSuccess()) {
957 error() <<
"Unable to retrieve Event root object" <<
endmsg;
968 if(!
sc.isSuccess()) {
969 error() <<
"Clear of Event data store failed" <<
endmsg;
980 if ( !
sc.isSuccess() ) {
985 if( !
sc.isSuccess() ) {
987 if (
sc.isSuccess() ) {
989 if ( !
sc.isSuccess() ) {
990 warning() <<
"Error creating IOpaqueAddress." <<
endmsg;
1011 std::unique_ptr<const EventInfo> pEvent;
1019 IOpaqueAddress* addr = 0;
1023 error() <<
"Seek failed; unsupported by event selector" <<
endmsg;
1028 if(
sc.isFailure()) {
1035 if ( !
sc.isSuccess() ) {
1037 info() <<
"No more events in event selection " <<
endmsg;
1042 error() <<
"Could not create an IOpaqueAddress" <<
endmsg;
1051 if( !
sc.isSuccess() ) {
1053 warning() <<
"Error declaring Event object" <<
endmsg;
1056 }
if ((
sc=
eventStore()->loadEventProxies()).isFailure()) {
1057 error() <<
"Error loading Event proxies" <<
endmsg;
1060 bool consume_modifier_stream =
false;
1063 if ( pAttrList !=
nullptr && pAttrList->size() > 6 ) {
1065 unsigned int runNumber = (*pAttrList)[
"RunNumber"].data<
unsigned int>();
1066 unsigned long long eventNumber = (*pAttrList)[
"EventNumber"].data<
unsigned long long>();
1067 unsigned int eventTime = (*pAttrList)[
"EventTime"].data<
unsigned int>();
1068 unsigned int eventTimeNS = (*pAttrList)[
"EventTimeNanoSec"].data<
unsigned int>();
1069 unsigned int lumiBlock = (*pAttrList)[
"LumiBlockN"].data<
unsigned int>();
1070 unsigned int bunchId = (*pAttrList)[
"BunchId"].data<
unsigned int>();
1072 consume_modifier_stream =
true;
1075 unsigned long long eventNumberSecondary{};
1076 if ( !(pAttrList->exists(
"hasSecondaryInput") && (*pAttrList)[
"hasSecondaryInput"].data<
bool>()) ) {
1077 fatal() <<
"Secondary EventNumber requested, but secondary input does not exist!" <<
endmsg;
1080 if ( pAttrList->exists(
"EventNumber_secondary") ) {
1081 eventNumberSecondary = (*pAttrList)[
"EventNumber_secondary"].data<
unsigned long long>();
1087 if (pEventSecondary) {
1088 eventNumberSecondary = pEventSecondary->
event_ID()->event_number();
1091 fatal() <<
"Secondary EventNumber requested, but it does not exist!" <<
endmsg;
1095 if (eventNumberSecondary != 0) {
1097 info() <<
" ===>>> using secondary event #" << eventNumberSecondary <<
" instead of #" <<
eventNumber <<
"<<<===" <<
endmsg;
1104 pEvent = std::make_unique<EventInfo>(
1111 fatal() <<
"Valid input attribute list required but not present!";
1115 const EventInfo* pEventObserver{pEvent.get()};
1116 if (!pEventObserver) {
1120 if( !pEventObserver ) {
1125 if( !
sc.isSuccess() ) {
1126 error() <<
"Unable to retrieve Event root object" <<
endmsg;
1129 consume_modifier_stream =
true;
1131 pEvent = std::make_unique<EventInfo>(
1134 pEventObserver = pEvent.get();
1136 if( !
sc.isSuccess() ) {
1137 error() <<
"Error declaring event data object" <<
endmsg;
1141 consume_modifier_stream =
false;
1149 consume_modifier_stream);
1155 unsigned int runNmb{1}, evtNmb{
m_nevt + 1};
1162 auto eid = std::make_unique<EventID> (runNmb,evtNmb,
m_timeStamp);
1164 eid->set_lumi_block( runNmb );
1168 pEvent = std::make_unique<EventInfo>(std::move(eid),
1169 std::make_unique<EventType>());
1173 debug() <<
"selecting store: " << ctx.slot() <<
endmsg;
1177 debug() <<
"recording EventInfo " << *pEvent->
event_ID() <<
" in "
1180 if( !
sc.isSuccess() ) {
1181 error() <<
"Error declaring event data object" <<
endmsg;
1191 bool consume_modifier_stream) {
1197 m_evtIdModSvc->modify_evtid(new_eID, ctx.evt(), consume_modifier_stream);
1199 unsigned int oldrunnr = eID.run_number();
1200 unsigned int oldLB = eID.lumi_block();
1201 unsigned int oldTS = eID.time_stamp();
1202 unsigned int oldTSno = eID.time_stamp_ns_offset();
1203 debug() <<
"modifyEventContext: use evtIdModSvc runnr=" << oldrunnr
1204 <<
" -> " << new_eID.run_number() <<
endmsg;
1205 debug() <<
"modifyEventContext: use evtIdModSvc LB=" << oldLB <<
" -> "
1206 << new_eID.lumi_block() <<
endmsg;
1207 debug() <<
"modifyEventContext: use evtIdModSvc TimeStamp=" << oldTS
1208 <<
" -> " << new_eID.time_stamp() <<
endmsg;
1209 debug() <<
"modifyEventContext: use evtIdModSvc TimeStamp ns Offset="
1210 << oldTSno <<
" -> " << new_eID.time_stamp_ns_offset() <<
endmsg;
1212 ctx.setEventID(new_eID);
1214 ctx.eventID().run_number());
1218 ctx.setEventID(eID);
1227 if (
sc.isFailure()) {
1228 fatal() <<
"Slot " << ctx.slot()
1229 <<
" could not be selected for the WhiteBoard" <<
endmsg;
1230 return EventContext{};
1234 debug() <<
"created EventContext, num: " << ctx.evt() <<
" in slot: "
1243 Gaudi::setAppReturnCode(
m_appMgrProperty, Gaudi::ReturnCode::Success,
true).ignore();
1261 std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
1263 EventContext* finishedEvtContext{
nullptr};
1266 debug() <<
"drainScheduler: [" << finishedEvts <<
"] Waiting for a context" <<
endmsg;
1270 if (
sc.isSuccess()){
1271 debug() <<
"drainScheduler: scheduler not empty: Context "
1272 << finishedEvtContext <<
endmsg;
1273 finishedEvtContexts.emplace_back(finishedEvtContext);
1276 debug() <<
"drainScheduler: scheduler empty" <<
endmsg;
1281 while (
m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()){
1282 finishedEvtContexts.emplace_back(finishedEvtContext);
1287 for (
auto& thisFinishedEvtContext : finishedEvtContexts){
1288 if (!thisFinishedEvtContext) {
1289 fatal() <<
"Detected nullptr ctxt while clearing WB!"<<
endmsg;
1294 if (
m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
1295 fatal() <<
"Failed event detected on " << thisFinishedEvtContext
1296 <<
" w/ fail mode: "
1297 <<
m_aess->eventStatus(*thisFinishedEvtContext) <<
endmsg;
1305 if (
m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
1306 n_run = thisFinishedEvtContext->eventID().run_number();
1307 n_evt = thisFinishedEvtContext->eventID().event_number();
1309 error() <<
"DrainSched: unable to select store "
1310 << thisFinishedEvtContext->slot() <<
endmsg;
1319 Gaudi::Hive::setCurrentContext( *thisFinishedEvtContext );
1321 m_incidentSvc->fireIncident(Incident(
name(), IncidentType::EndProcessing, *thisFinishedEvtContext ));
1327 std::string outputFileReport = rangeReport->second + std::string(
",ID:")
1328 + rangeReport->first + std::string(
",CPU:N/A,WALL:N/A");
1331 void* message2pilot =
malloc(outputFileReport.size());
1332 memcpy(message2pilot,outputFileReport.data(),outputFileReport.size());
1333 m_socket->send(message2pilot,outputFileReport.size());
1335 info() <<
"Reported the output " << outputFileReport <<
endmsg;
1339 debug() <<
"Clearing slot " << thisFinishedEvtContext->slot()
1340 <<
" (event " << thisFinishedEvtContext->evt()
1341 <<
") of the whiteboard" <<
endmsg;
1344 if (!
sc.isSuccess()) {
1345 error() <<
"Whiteboard slot " << thisFinishedEvtContext->slot()
1346 <<
" could not be properly cleared";
1358 info() <<
" ===>>> done processing event #" << n_evt <<
", run #" << n_run
1359 <<
" on slot " << thisFinishedEvtContext->slot() <<
", "
1360 <<
m_proc <<
" events processed so far <<<===" <<
endmsg;
1362 info() <<
" ===>>> done processing event #" << n_evt <<
", run #" << n_run
1363 <<
" on slot " << thisFinishedEvtContext->slot() <<
", "
1365 <<
" events processed so far <<<===" <<
endmsg;
1366 std::ofstream
outfile(
"eventLoopHeartBeat.txt");
1368 error() <<
" unable to open: eventLoopHeartBeat.txt" <<
endmsg;
1372 outfile <<
" done processing event #" << n_evt <<
", run #" << n_run
1373 <<
" " <<
m_nev <<
" events read so far <<<===" << std::endl;
1378 debug() <<
"drainScheduler thisFinishedEvtContext: " << thisFinishedEvtContext
1382 return (
fail ? -1 : 1 );
1390 if( !
sc.isSuccess() ) {
1391 warning() <<
"Clear of Event data store failed" <<
endmsg;
1399 static const std::string strReady(
"Ready for events");
1400 static const std::string strStopProcessing(
"No more events");
1404 static std::atomic<size_t> line_n = 0;
1405 info() <<
"in TEST MODE, Range #" << line_n+1 <<
endmsg;
1409 void* ready_message =
malloc(strReady.size());
1410 memcpy(ready_message,strReady.data(),strReady.size());
1411 socket->send(ready_message,strReady.size());
1412 void* eventRangeMessage;
1413 std::string strPeerId;
1414 ssize_t eventRangeSize = socket->recv(eventRangeMessage,strPeerId);
1415 range = std::string((
const char*)eventRangeMessage,eventRangeSize);
1416 leftString(
range,
'\n');
1419 std::unique_ptr<RangeStruct>
result = std::make_unique<RangeStruct>();
1420 if(
range.compare(strStopProcessing)==0) {
1421 info() <<
"No more events from the server" <<
endmsg;
1430 if(
range.ends_with(
"}]")){
1435 std::map<std::string,std::string> eventRangeMap;
1437 size_t endpos =
range.find(
',');
1438 while(endpos!=std::string::npos) {
1440 std::string
keyValue(
range.substr(startpos,endpos-startpos));
1441 size_t colonPos =
keyValue.find(
':');
1442 std::string strKey =
keyValue.substr(0,colonPos);
1443 std::string strVal =
keyValue.substr(colonPos+1);
1446 eventRangeMap[strKey]=strVal;
1449 startpos = endpos+1;
1450 endpos =
range.find(
',',startpos);
1455 size_t colonPos =
keyValue.find(
':');
1456 std::string strKey =
keyValue.substr(0,colonPos);
1457 std::string strVal =
keyValue.substr(colonPos+1);
1460 eventRangeMap[strKey]=strVal;
1470 if(eventRangeMap.find(
"eventRangeID")==eventRangeMap.end()
1471 || eventRangeMap.find(
"startEvent")==eventRangeMap.end()
1472 || eventRangeMap.find(
"lastEvent")==eventRangeMap.end()
1473 || eventRangeMap.find(
"PFN")==eventRangeMap.end()) {
1475 errorStr =
"ERR_ATHENAMP_PARSE \"" +
range +
"\": Wrong format";
1482 if(eventRangeMap[
"eventRangeID"].
empty()
1483 || eventRangeMap[
"PFN"].
empty()
1486 errorStr =
"ERR_ATHENAMP_PARSE \"" +
range +
"\": Wrong values of range fields";
1490 if(
m_pfn != eventRangeMap[
"PFN"]) {
1491 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_evtSelector);
1492 if(!propertyServer) {
1493 errorStr =
"ERR_ATHENAMP_PARSE \"" +
range +
"\": Unable to dyn-cast the event selector to IProperty";
1496 std::string strInpuCol(
"InputCollections");
1497 std::vector<std::string> vectInpCol{eventRangeMap[
"PFN"],};
1498 StringArrayProperty inputFileList(strInpuCol, vectInpCol);
1499 if(propertyServer->setProperty(inputFileList).isFailure()) {
1500 errorStr =
"ERR_ATHENAMP_PARSE \"" +
range +
"\": Unable to set input file name property to the Event Selector";
1503 m_pfn = eventRangeMap[
"PFN"];
1512 debug() <<
"*** Decoded Event Range ***" <<
endmsg;
1513 for (
const auto& fieldvalue : eventRangeMap) {
1514 debug() << fieldvalue.first <<
":" << fieldvalue.second <<
endmsg;
1517 result->eventRangeID = eventRangeMap[
"eventRangeID"];
1518 result->pfn = eventRangeMap[
"PFN"];
1524 info() <<
"Ignoring this event range" <<
endmsg;
1528 socket->send(errorMessage,
errorStr.size());
1543 if(
str.empty())
return;
1547 while(
str[
i]==
' ')
i--;
1548 if(
i)
str.resize(
i+1);
1555 if(
str.starts_with(
"u\'")) {
1557 if(
str.rfind(
'\'')==
str.size()-1) {
1561 else if(
str.starts_with(
"\"")) {
1563 if(
str.rfind(
'\"')==
str.size()-1) {