ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaMtesEventLoopMgr.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
6#include "ClearStorePolicy.h"
8
15#include "CxxUtils/xmalloc.h"
16
17#include "GaudiKernel/IAlgorithm.h"
18#include "GaudiKernel/SmartIF.h"
19#include "GaudiKernel/Incident.h"
20#include "GaudiKernel/DataObject.h"
21#include "GaudiKernel/IIncidentSvc.h"
22#include "GaudiKernel/IDataManagerSvc.h"
23#include "GaudiKernel/IDataProviderSvc.h"
24#include "GaudiKernel/IConversionSvc.h"
25#include "GaudiKernel/GaudiException.h"
26#include "GaudiKernel/AppReturnCode.h"
27#include "GaudiKernel/MsgStream.h"
28#include "GaudiKernel/EventIDBase.h"
29#include "GaudiKernel/ThreadLocalContext.h"
30#include "GaudiKernel/FileIncident.h"
31
33
34#include "EventInfo/EventInfo.h"
35#include "EventInfo/EventID.h"
36#include "EventInfo/EventType.h"
39
40#include "tbb/tick_count.h"
41#include <yampl/ISocket.h>
42#include "yampl/SocketFactory.h"
43
44#include <cassert>
45#include <ios>
46#include <iostream>
47#include <fstream>
48#include <iomanip>
49#include <cstdlib>
50#include <unistd.h>
51
52namespace {
53 bool
54 leftString(std::string & s, char sc){
55 bool truncated{false};
56 auto n = s.find(sc);
57 if (n!=std::string::npos){
58 s.resize(n);
59 truncated=true;
60 }
61 return truncated;
62}
63
64 bool
65 leftString(std::string & s, int n){
66 bool truncated{false};
67 if (static_cast<size_t>(n) < s.size()){
68 s.resize(n);
69 truncated=true;
70 }
71 return truncated;
72 }
73}
74
76 , ISvcLocator* svcLoc)
77 : base_class(nam, svcLoc)
78 , m_incidentSvc ( "IncidentSvc", nam )
79 , m_eventStore( "StoreGateSvc", nam )
80 , m_evtSelector{nullptr}
81 , m_evtContext{nullptr}
82 , m_histoDataMgrSvc( "HistogramDataSvc", nam )
83 , m_histoPersSvc ( "HistogramPersistencySvc", nam )
84 , m_evtIdModSvc("", nam)
85 , m_currentRun(0)
86 , m_firstRun(true)
87 , m_tools(this)
88 , m_nevt(0)
89 , m_writeHists(false)
90 , m_nev(0)
91 , m_proc(0)
92 , m_useTools(false)
93 , m_doEvtHeartbeat(false)
94 , m_conditionsCleaner( "Athena::ConditionsCleanerSvc", nam )
95 , m_outSeqSvc("OutputStreamSequencerSvc", nam)
96{
97 declareProperty("EvtSel", m_evtsel,
98 "Name of Event Selector to use. If empty string (default) "
99 "take value from ApplicationMgr");
100 declareProperty("HistogramPersistency", m_histPersName="",
101 "Histogram persistency technology to use: ROOT, HBOOK, NONE. "
102 "By default (empty string) get property value from "
103 "ApplicationMgr");
104 declareProperty("HistWriteInterval", m_writeInterval=0 ,
105 "histogram write/update interval");
106 declareProperty("FailureMode", m_failureMode=1 ,
107 "Controls behaviour of event loop depending on return code of"
108 " Algorithms. 0: all non-SUCCESSes terminate job. "
109 "1: RECOVERABLE skips to next event, FAILURE terminates job "
110 "(DEFAULT). 2: RECOVERABLE and FAILURE skip to next events");
111 declareProperty("EventPrintoutInterval", m_eventPrintoutInterval=1,
112 "Print event heartbeat printouts every m_eventPrintoutInterval events");
113 declareProperty("ClearStorePolicy",
114 m_clearStorePolicy = "EndEvent",
115 "Configure the policy wrt handling of when the "
116 "'clear-the-event-store' event shall happen: at EndEvent "
117 "(default as it is makes things easier for memory management"
118 ") or at BeginEvent (easier e.g. for interactive use)");
119 declareProperty("PreSelectTools",m_tools,"AlgTools for event pre-selection")->
120 declareUpdateHandler( &AthenaMtesEventLoopMgr::setupPreSelectTools, this );
121
122 declareProperty("SchedulerSvc", m_schedulerName="ForwardSchedulerSvc",
123 "Name of the scheduler to be used");
124
125 declareProperty("WhiteboardSvc", m_whiteboardName="EventDataSvc",
126 "Name of the Whiteboard to be used");
127
128 declareProperty("EventStore", m_eventStore);
129
130 declareProperty("EvtIdModifierSvc", m_evtIdModSvc,
131 "ServiceHandle for EvtIdModifierSvc");
132
133 declareProperty("FakeLumiBlockInterval", m_flmbi = 0,
134 "Event interval at which to increment lumiBlock# when "
135 "creating events without an EventSelector. Zero means "
136 "don't increment it");
137 declareProperty("FakeTimestampInterval", m_timeStampInt = 1,
138 "timestamp interval between events when creating Events "
139 "without an EventSelector");
140 declareProperty("RequireInputAttributeList", m_requireInputAttributeList = false,
141 "Require valid input attribute list to be present");
142 declareProperty("UseSecondaryEventNumber", m_useSecondaryEventNumber = false,
143 "In case of DoubleEventSelector use event number from secondary input");
144
145 declareProperty("ESTestPilotMessages", m_testPilotMessages, "List of messages from fake pilot for test mode");
146
147 m_scheduledStop = false;
148
149}
150
154
156{
157 info() << "Initializing " << name() << endmsg;
158
159 StatusCode sc = MinimalEventLoopMgr::initialize();
160 if(!sc.isSuccess()) {
161 error() << "Failed to initialize base class MinimalEventLoopMgr" << endmsg;
162 return sc;
163 }
164
165//-------------------------------------------------------------------------
166// Setup stuff for hive
167//-------------------------------------------------------------------------
168
169 m_whiteboard = serviceLocator()->service(m_whiteboardName);
170 if(!m_whiteboard.isValid()) {
171 fatal() << "Error retrieving " << m_whiteboardName << " interface IHiveWhiteBoard." << endmsg;
172 return StatusCode::FAILURE;
173 }
174
175 m_schedulerSvc = serviceLocator()->service(m_schedulerName);
176 if(!m_schedulerSvc.isValid()) {
177 fatal() << "Error retrieving SchedulerSvc interface ISchedulerSvc." << endmsg;
178 return StatusCode::FAILURE;
179 }
180
181 m_algResourcePool = serviceLocator()->service("AlgResourcePool");
182 if(!m_algResourcePool.isValid()) {
183 fatal() << "Error retrieving AlgResourcePool" << endmsg;
184 return StatusCode::FAILURE;
185 }
186
187 m_aess = serviceLocator()->service("AlgExecStateSvc");
188 if(!m_aess.isValid()) {
189 fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
190 return StatusCode::FAILURE;
191 }
192
193 ATH_CHECK(m_eventStore.retrieve());
194 ATH_CHECK(m_incidentSvc.retrieve());
195
196//--------------------------------------------------------------------------
197// Access Property Manager interface:
198//--------------------------------------------------------------------------
199 SmartIF<IProperty> prpMgr(serviceLocator());
200 if(!prpMgr.isValid()) {
201 fatal() << "IProperty interface not found in ApplicationMgr." << endmsg;
202 return StatusCode::FAILURE;
203 }
204
205 ATH_CHECK(m_histoDataMgrSvc.retrieve());
206
207 const std::string& histPersName(m_histPersName.value());
208 if(histPersName.empty()) {
209 ATH_CHECK(setProperty(prpMgr->getProperty("HistogramPersistency")));
210 }
211
212 if(histPersName != "NONE") {
213
214 m_histoPersSvc = IConversionSvc_t( "HistogramPersistencySvc",
215 this->name() );
216
217 SmartIF<IProperty> histSvc{serviceLocator()->service("RootHistSvc")};
218
219 if (!histSvc) {
220 error() << "could not locate actual Histogram persistency service" << endmsg;
221 }
222 else {
223 const Gaudi::Details::PropertyBase &prop = histSvc->getProperty("OutputFile");
224 std::string val;
225 try {
226 const StringProperty &sprop = dynamic_cast<const StringProperty&>(prop);
227 val = sprop.value();
228 }
229 catch (...) {
230 verbose() << "could not dcast OutputFile property to a StringProperty."
231 << " Need to fix Gaudi."
232 << endmsg;
233
234 val = prop.toString();
235 }
236
237 if (val != ""
238 && val != "UndefinedROOTOutputFileName") {
239 m_writeHists = true;
240 }
241 }
242 }
243 else {
244 if (msgLevel(MSG::DEBUG)) {
245 debug() << "Histograms saving not required." << endmsg;
246 }
247 }
248
249 //--------------------------------------------------------------------------
250 // Set up the EventID modifier Service
251 //--------------------------------------------------------------------------
252 if (m_evtIdModSvc.empty()) {
253 debug() << "EventID modifier Service not set. No run number, ... overrides "
254 "will "
255 "be applied."
256 << endmsg;
257 } else if (!m_evtIdModSvc.retrieve().isSuccess()) {
258 debug() << "Could not find EventID modifier Service. No run number, ... "
259 "overrides "
260 "will be applied."
261 << endmsg;
262 }
263
264 //-------------------------------------------------------------------------
265 // Setup EventSelector service
266 //-------------------------------------------------------------------------
267 const std::string& selName(m_evtsel.value());
268 // the evt sel is usually specified as a property of ApplicationMgr
269 if (selName.empty()) {
270 sc = setProperty(prpMgr->getProperty("EvtSel"));
271 }
272 if (sc.isFailure()) {
273 warning() << "Unable to set EvtSel property" << endmsg;
274 }
275
276 // We do not expect a Event Selector necessarily being declared
277 if( !selName.empty() && selName != "NONE") {
278 SmartIF<IEvtSelector> theEvtSel{serviceLocator()->service(selName)};
279 if(theEvtSel && (theEvtSel != m_evtSelector)) {
280 // Event Selector changed (or setup for the first time)
281 m_evtSelector = theEvtSel;
282
283 // reset iterator
284 if (m_evtSelector->createContext(m_evtContext).isFailure()) {
285 fatal() << "Can not create the event selector Context." << endmsg;
286 return StatusCode::FAILURE;
287 }
288 if (msgLevel(MSG::INFO)) {
289 SmartIF<INamedInterface> named{theEvtSel};
290 if (named) {
291 info() << "Setup EventSelector service " << named->name( ) << endmsg;
292 }
293 }
294 }
295 else if (sc.isFailure()) {
296 fatal() << "No valid event selector called " << selName << endmsg;
297 return StatusCode::FAILURE;
298 }
299 }
300
301//-------------------------------------------------------------------------
302// Setup 'Clear-Store' policy
303//-------------------------------------------------------------------------
304 try {
306 } catch(...) {
307 return StatusCode::FAILURE;
308 }
309
310 // Listen to the BeforeFork incident
311 m_incidentSvc->addListener(this,"BeforeFork",0);
312
313 CHECK( m_conditionsCleaner.retrieve() );
314 CHECK( m_outSeqSvc.retrieve() );
315
316 // Print if we override the event number using the one from secondary event
318 info() << "Using secondary event number." << endmsg;
319 }
320
321 if( m_testPilotMessages.value().size() > 0 ) {
322 info() << "runnung in standalone TEST MODE" << endmsg;
323 info() << " test contains " << m_testPilotMessages.value().size() << " event ranges" << endmsg;
324 for( const std::string& range: m_testPilotMessages.value() ) {
325 debug() << " " << range << endmsg;
326 }
327 m_inTestMode = true;
328 }
329 return sc;
330}
331
332inline
335 return m_eventStore.get();
336}
337
338//=========================================================================
339// property handlers
340//=========================================================================
341void
342AthenaMtesEventLoopMgr::setClearStorePolicy(Gaudi::Details::PropertyBase&) {
343 const std::string& policyName = m_clearStorePolicy.value();
344
345 if ( policyName != "BeginEvent" &&
346 policyName != "EndEvent" ) {
347
348 fatal() << "Unknown policy [" << policyName
349 << "] for the 'ClearStore-policy !\n"
350 << " Valid values are: BeginEvent, EndEvent"
351 << endmsg;
352 throw GaudiException("Can not setup 'ClearStore'-policy",
353 name(),
354 StatusCode::FAILURE);
355 }
356
357 return;
358}
359
360void
361AthenaMtesEventLoopMgr::setupPreSelectTools(Gaudi::Details::PropertyBase&) {
362
363 m_toolInvoke.clear();
364 m_toolReject.clear();
365 m_toolAccept.clear();
366
367 m_tools.retrieve().ignore();
368 if(m_tools.size() > 0) {
369 m_useTools=true;
370 m_toolInvoke.resize(m_tools.size());
371 m_toolReject.resize(m_tools.size());
372 m_toolAccept.resize(m_tools.size());
373
374 tool_iterator firstTool = m_tools.begin();
375 tool_iterator lastTool = m_tools.end();
376 unsigned int toolCtr = 0;
377 for ( ; firstTool != lastTool; ++firstTool )
378 {
379 // reset statistics
380 m_toolInvoke[toolCtr] = 0;
381 m_toolReject[toolCtr] = 0;
382 m_toolAccept[toolCtr] = 0;
383 toolCtr++;
384 }
385 }
386
387 return;
388
389}
390
391//=========================================================================
392// implementation of IAppMgrUI::finalize
393//=========================================================================
395{
396
397 StatusCode sc = MinimalEventLoopMgr::finalize();
398 if (sc.isFailure())
399 {
400 error() << "Error in Service base class Finalize"
401 << endmsg;
402 }
403
404 StatusCode sc2 = writeHistograms(true);
405 if (sc2.isFailure())
406 {
407 error() << "Error in writing Histograms"
408 << endmsg;
409 }
410
411 // Release all interfaces (ignore StatusCodes)
412 m_histoDataMgrSvc.release().ignore();
413 m_histoPersSvc.release().ignore();
414
415 m_whiteboard = 0;
417 m_schedulerSvc = 0;
418 // m_evtDataSvc = 0;
419
420 m_incidentSvc.release().ignore();
421
422 // Release event selector context
423 if ( m_evtSelector && m_evtContext ) {
424 m_evtSelector->releaseContext(m_evtContext).ignore();
425 // m_evtSelector = releaseInterface(m_evtSelector);
426 delete m_evtContext; m_evtContext = 0;
427 }
428
429
430 if(m_useTools) {
431 tool_iterator firstTool = m_tools.begin();
432 tool_iterator lastTool = m_tools.end();
433 unsigned int toolCtr = 0;
434 info() << "Summary of AthenaEvtLoopPreSelectTool invocation: (invoked/success/failure)" << endmsg;
435 info() << "-----------------------------------------------------" << endmsg;
436
437 for ( ; firstTool != lastTool; ++firstTool ) {
438 info() << std::setw(2) << std::setiosflags(std::ios_base::right)
439 << toolCtr+1 << ".) " << std::resetiosflags(std::ios_base::right)
440 << std::setw(48) << std::setfill('.')
441 << std::setiosflags(std::ios_base::left)
442 << (*firstTool)->name() << std::resetiosflags(std::ios_base::left)
443 << std::setfill(' ')
444 << " ("
445 << std::setw(6) << std::setiosflags(std::ios_base::right)
446 << m_toolInvoke[toolCtr]
447 << "/"
448 << m_toolAccept[toolCtr]
449 << "/"
450 << m_toolReject[toolCtr]
451 << ")"
452 << endmsg;
453 toolCtr++;
454 }
455 }
456
457 return ( sc.isFailure() || sc2.isFailure() ) ? StatusCode::FAILURE :
458 StatusCode::SUCCESS;
459
460}
461
462//=========================================================================
463// write out the histograms
464//=========================================================================
466
467 StatusCode sc (StatusCode::SUCCESS);
468
469 if ( 0 != m_histoPersSvc && m_writeHists ) {
470 std::vector<DataObject*> objects;
471 sc = m_histoDataMgrSvc->traverseTree( [&objects]( IRegistry* reg, int ) {
472 DataObject* obj = reg->object();
473 if ( !obj || obj->clID() == CLID_StatisticsFile ) return false;
474 objects.push_back( obj );
475 return true;
476 } );
477
478 if ( !sc.isSuccess() ) {
479 error() << "Error while traversing Histogram data store" << endmsg;
480 return sc;
481 }
482
483 if ( objects.size() > 0) {
484 int writeInterval(m_writeInterval.value());
485
486 if ( m_nevt == 1 || force ||
487 (writeInterval != 0 && m_nevt%writeInterval == 0) ) {
488
489 // skip /stat entry!
490 sc = std::accumulate( begin( objects ), end( objects ), sc, [&]( StatusCode isc, auto& i ) {
491 IOpaqueAddress* pAddr = nullptr;
492 StatusCode iret = m_histoPersSvc->createRep( i, pAddr );
493 if ( iret.isFailure() ) return iret;
494 i->registry()->setAddress( pAddr );
495 return isc;
496 } );
497 sc = std::accumulate( begin( objects ), end( objects ), sc, [&]( StatusCode isc, auto& i ) {
498 IRegistry* reg = i->registry();
499 StatusCode iret = m_histoPersSvc->fillRepRefs( reg->address(), i );
500 return iret.isFailure() ? iret : isc;
501 } );
502 if ( ! sc.isSuccess() ) {
503 error() << "Error while saving Histograms." << endmsg;
504 }
505 }
506
507 if (force || (writeInterval != 0 && m_nevt%writeInterval == 0) ) {
508 if (msgLevel(MSG::DEBUG)) { debug() << "committing Histograms" << endmsg; }
509 m_histoPersSvc->conversionSvc()->commitOutput("",true).ignore();
510 }
511 }
512
513 }
514
515 return sc;
516}
517
518//=========================================================================
519// Call sysInitialize() on all algorithms and output streams
520//=========================================================================
522
523 return StatusCode::SUCCESS;
524}
525
526//=========================================================================
527// Run the algorithms for the current event
528//=========================================================================
530
531 return StatusCode::SUCCESS;
532}
533
534
535//=========================================================================
536// executeEvent( EventContext &&ctx )
537//=========================================================================
538StatusCode AthenaMtesEventLoopMgr::executeEvent( EventContext &&ctx )
539{
540
541 // An incident may schedule a stop, in which case is better to exit before the actual execution.
542 if ( m_scheduledStop ) {
543 always() << "A stopRun was requested by an incidentListener. "
544 << "Do not process this event."
545 << endmsg;
546 m_terminateLoop = true;
547 return (StatusCode::SUCCESS);
548 }
549
550 m_aess->reset( ctx );
551
552 // Make sure context with slot is set before calling es->next().
553 Gaudi::Hive::setCurrentContext ( ctx );
554
555 int declEvtRootSc = declareEventRootAddress(ctx);
556 if (declEvtRootSc == 0 ) { // We ran out of events!
557 m_terminateLoop = true; // we have finished!
558 return StatusCode::SUCCESS;
559 } else if ( declEvtRootSc == -1) {
560 error() << "declareEventRootAddress for context " << ctx << " failed"
561 << endmsg;
562 return StatusCode::FAILURE;
563 }
564
565 EventID::event_number_t evtNumber = ctx.eventID().event_number();
566 unsigned int conditionsRun = ctx.eventID().run_number();
567 if (!m_evtIdModSvc.isSet()) {
568 const AthenaAttributeList* attr = nullptr;
570 eventStore()->retrieve(attr, "Input").isSuccess()) {
571 if (attr->exists("ConditionsRun")) {
572 conditionsRun = (*attr)["ConditionsRun"].data<unsigned int>();
573 }
574 }
575 }
577 Gaudi::Hive::setCurrentContext ( ctx );
578
579 // Record EventContext in current whiteboard
580 if (eventStore()->record(std::make_unique<EventContext> (ctx),
581 "EventContext").isFailure())
582 {
583 error() << "Error recording event context object" << endmsg;
584 return (StatusCode::FAILURE);
585 }
586
588 if (m_firstRun || (m_currentRun != ctx.eventID().run_number()) ) {
589 // Fire EndRun incident unless this is the first run
590 if (!m_firstRun) {
591 // FIXME!!!
592 m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndRun));
593 }
594 m_firstRun=false;
595 m_currentRun = ctx.eventID().run_number();
596
597 info() << " ===>>> start of run " << m_currentRun << " <<<==="
598 << endmsg;
599
600 // FIXME!!! Fire BeginRun "Incident"
601 m_incidentSvc->fireIncident(Incident(name(),IncidentType::BeginRun,ctx));
602
603 }
604
605 bool toolsPassed=true;
606 // CGL: FIXME
607 // bool eventFailed = false;
608
609 // Call any attached tools to reject events early
610 unsigned int toolCtr=0;
611 if(m_useTools) {
612 tool_store::iterator theTool = m_tools.begin();
613 tool_store::iterator lastTool = m_tools.end();
614 while(toolsPassed && theTool!=lastTool )
615 {
616 toolsPassed = (*theTool)->passEvent(ctx.eventID());
617 m_toolInvoke[toolCtr]++;
618 {toolsPassed ? m_toolAccept[toolCtr]++ : m_toolReject[toolCtr]++;}
619 ++toolCtr;
620 ++theTool;
621 }
622 }
623
625 0 == (m_nev % m_eventPrintoutInterval.value()));
626 if (m_doEvtHeartbeat) {
627 if(!m_useTools)
628 info() << " ===>>> start processing event #" << evtNumber << ", run #" << m_currentRun
629 << " on slot " << ctx.slot() << ", " << m_proc
630 << " events processed so far <<<===" << endmsg;
631 else
632 info() << " ===>>> start processing event #" << evtNumber << ", run #" << m_currentRun
633 << " on slot " << ctx.slot() << ", "
634 << m_nev << " events read and " << m_proc
635 << " events processed so far <<<===" << endmsg;
636 }
637
638 // Reset the timeout singleton
640 if(toolsPassed) {
641
642 CHECK( m_conditionsCleaner->event (ctx, true) );
643
644 // Remember the last event context for after event processing finishes.
645 m_lastEventContext = ctx;
646
647 // Now add event to the scheduler
648 debug() << "Adding event " << ctx.evt()
649 << ", slot " << ctx.slot()
650 << " to the scheduler" << endmsg;
651
652 m_incidentSvc->fireIncident(Incident(name(), IncidentType::BeginProcessing,
653 ctx));
654 StatusCode addEventStatus = m_schedulerSvc->pushNewEvent( new EventContext{ std::move(ctx) } );
655
656 // If this fails, we need to wait for something to complete
657 if (!addEventStatus.isSuccess()){
658 fatal() << "An event processing slot should be now free in the scheduler, but it appears not to be the case." << endmsg;
659 }
660
661 } // end of toolsPassed test
662
663 ++m_nev;
664
665 ++m_nevt;
666
667 // invalidate thread local context once outside of event execute loop
668 Gaudi::Hive::setCurrentContext( EventContext() );
669
670 return StatusCode::SUCCESS;
671
672}
673
674//=========================================================================
675// implementation of IEventProcessor::executeRun
676//=========================================================================
678{
679 StatusCode sc = nextEvent(maxevt);
680 if (sc.isSuccess()) {
681 m_incidentSvc->fireIncident(Incident(name(),"EndEvtLoop"));
682 }
683 return sc;
684}
685
686//-----------------------------------------------------------------------------
687// Implementation of IEventProcessor::stopRun()
688//-----------------------------------------------------------------------------
690 // Set the application return code
691 SmartIF<IProperty> appmgr(serviceLocator());
692 if(Gaudi::setAppReturnCode(appmgr, Gaudi::ReturnCode::ScheduledStop, true).isFailure()) {
693 error() << "Could not set return code of the application ("
694 << Gaudi::ReturnCode::ScheduledStop << ")" << endmsg;
695 }
696 m_scheduledStop = true;
697 return StatusCode::SUCCESS;
698}
699
700
701//-----------------------------------------------------------------------------
702// Implementation of IService::stop()
703//-----------------------------------------------------------------------------
705{
706 // To enable conditions access during stop we set an invalid EventContext
707 // (no event/slot number) but with valid EventID (and extended EventContext).
708 m_lastEventContext.setValid(false);
709 Gaudi::Hive::setCurrentContext( m_lastEventContext );
710
711 StatusCode sc = MinimalEventLoopMgr::stop();
712
713 // If we exit the event loop early due to an error, some event stores
714 // may not have been cleared. This can lead to segfaults later,
715 // as DetectorStore will usually get finalized before HiveSvcMgr.
716 // So make sure that all stores have been cleared at this point.
717 size_t nslot = m_whiteboard->getNumberOfStores();
718 for (size_t islot = 0; islot < nslot; islot++) {
719 sc &= clearWBSlot (islot);
720 }
721
722 Gaudi::Hive::setCurrentContext( EventContext() );
723 return sc;
724}
725
726
728{
729 if(maxevt==0) return StatusCode::SUCCESS;
730
731 // Create a socket to communicate with the Pilot
732 m_socket =
733 std::unique_ptr<yampl::ISocket>{yampl::SocketFactory().createClientSocket(
734 yampl::Channel{m_eventRangeChannel.value(), yampl::LOCAL},
735 yampl::MOVE_DATA)};
736
737 // Reset the application return code.
739
740 int finishedEvts =0;
741 int createdEvts =0;
742 info() << "Starting loop on events" << endmsg;
743
744 StatusCode sc(StatusCode::SUCCESS);
745
746 // Calculate runtime
747 auto start_time = tbb::tick_count::now();
748 auto secsFromStart = [&start_time]()->double{
749 return (tbb::tick_count::now()-start_time).seconds();
750 };
751
752 std::unique_ptr<RangeStruct> range;
753 while(!range) {
754 range = getNextRange(m_socket.get());
755 usleep(1000);
756 }
757
758 bool loop_ended = range->eventRangeID.empty();
759 if(!loop_ended) {
760 m_currentEvntNum = range->startEvent;
761 // Fire NextRange incident
762 m_incidentSvc->fireIncident(FileIncident(name(), "NextEventRange",range->eventRangeID));
763 }
764
765 bool no_more_events = false;
766
767 while(!loop_ended) {
768
769 debug() << " -> createdEvts: " << createdEvts << endmsg;
770
771 if(!m_terminateLoop // No scheduled loop termination
772 && !no_more_events // We are not yet done getting events
773 && m_schedulerSvc->freeSlots()>0) { // There are still free slots in the scheduler
774
775 debug() << "createdEvts: " << createdEvts << ", freeslots: " << m_schedulerSvc->freeSlots() << endmsg;
776
777 auto ctx = createEventContext();
778
779 if ( !ctx.valid() ) {
780 sc = StatusCode::FAILURE;
781 }
782 else {
783 sc = executeEvent( std::move(ctx) );
784 }
785
786 if (sc.isFailure()) {
787 error() << "Terminating event processing loop due to errors" << endmsg;
788 loop_ended = true;
789 }
790 else {
791 ++createdEvts;
792 if(++m_currentEvntNum > range->lastEvent) {
793 // Fetch next event range
794 range.reset();
795 while(!range) {
796 range = getNextRange(m_socket.get());
797 usleep(1000);
798 }
799 if(range->eventRangeID.empty()) {
800 no_more_events = true;
801 }
802 else {
803 m_currentEvntNum = range->startEvent;
804 // Fire NextRange incident
805 m_incidentSvc->fireIncident(FileIncident(name(), "NextEventRange",range->eventRangeID));
806 }
807 }
808 }
809 }
810 else {
811 // all the events were created but not all finished or the slots were
812 // all busy: the scheduler should finish its job
813
814 debug() << "Draining the scheduler" << endmsg;
815
816 // Pull out of the scheduler the finished events
817 int ir = drainScheduler(finishedEvts,true);
818 if(ir < 0) {
819 // some sort of error draining scheduler;
820 loop_ended = true;
821 sc = StatusCode::FAILURE;
822 }
823 else if(ir == 0) {
824 // no more events in scheduler
825 if(no_more_events) {
826 // We are done
827 loop_ended = true;
828 sc = StatusCode::SUCCESS;
829 }
830 }
831 else {
832 // keep going!
833 }
834 }
835 } // end main loop on finished events
836
837 info() << "---> Loop Finished (seconds): " << secsFromStart() <<endmsg;
838
839 return sc;
840}
841
842
843//=========================================================================
844// Seek to a given event.
845// The event selector must support the IEventSeek interface for this to work.
846//=========================================================================
848{
849 IEvtSelectorSeek* is = dynamic_cast<IEvtSelectorSeek*> (m_evtSelector);
850 if (is == 0) {
851 error() << "Seek failed; unsupported by event selector"
852 << endmsg;
853 return StatusCode::FAILURE;
854 }
855 //cppcheck-suppress nullPointerRedundantCheck
856 if (!m_evtContext) {
857 if (m_evtSelector->createContext(m_evtContext).isFailure()) {
858 fatal() << "Can not create the event selector Context."
859 << endmsg;
860 return StatusCode::FAILURE;
861 }
862 }
863 //m_evtContext cannot be null if createContext succeeded
864 //cppcheck-suppress nullPointerRedundantCheck
865 StatusCode sc = is->seek (*m_evtContext, evt);
866 if (sc.isSuccess()) {
867 m_nevt = evt;
868 }
869 else {
870 error() << "Seek failed." << endmsg;
871 }
872 return sc;
873}
874
875
876//=========================================================================
877// Return the current event count.
878//=========================================================================
880{
881 return m_nevt;
882}
883
884//=========================================================================
885// Return the collection size
886//=========================================================================
888{
889 IEvtSelectorSeek* cs = dynamic_cast<IEvtSelectorSeek*> (m_evtSelector);
890 if (cs == 0) {
891 error() << "Collection size unsupported by event selector"
892 << endmsg;
893 return -1;
894 }
895 //cppcheck-suppress nullPointerRedundantCheck
896 if (!m_evtContext) {
897 if (m_evtSelector->createContext(m_evtContext).isFailure()) {
898 fatal() << "Can not create the event selector Context."
899 << endmsg;
900 return -1;
901 }
902 }
903 //cppcheck-suppress nullPointerRedundantCheck
904 return cs->size (*m_evtContext);
905}
906
907//=========================================================================
908// Handle Incidents
909//=========================================================================
910void AthenaMtesEventLoopMgr::handle(const Incident& inc)
911{
912
913 if(inc.type()!="BeforeFork")
914 return;
915
916 if(!m_evtContext || !m_firstRun) {
917 warning() << "Skipping BeforeFork handler. Either no event selector is provided or begin run has already passed" << endmsg;
918 }
919
920 // Initialize Algorithms and Output Streams
921 StatusCode sc = initializeAlgorithms();
922 if(sc.isFailure()) {
923 error() << "Failed to initialize Algorithms" << endmsg;
924 return;
925 }
926
927 // Construct EventInfo
928 const EventInfo* pEvent(0);
929 IOpaqueAddress* addr = 0;
930 sc = m_evtSelector->next(*m_evtContext);
931 if(!sc.isSuccess()) {
932 info() << "No more events in event selection " << endmsg;
933 return;
934 }
935 sc = m_evtSelector->createAddress(*m_evtContext, addr);
936 if (sc.isFailure()) {
937 error() << "Could not create an IOpaqueAddress" << endmsg;
938 return;
939 }
940 if (0 != addr) {
941 //create its proxy
942 sc = eventStore()->recordAddress(addr);
943 if(!sc.isSuccess()) {
944 error() << "Error declaring Event object" << endmsg;
945 return;
946 }
947 }
948
949 if(eventStore()->loadEventProxies().isFailure()) {
950 warning() << "Error loading Event proxies" << endmsg;
951 return;
952 }
953
954 // Retrieve the Event object
955 sc = eventStore()->retrieve(pEvent);
956 if(!sc.isSuccess()) {
957 error() << "Unable to retrieve Event root object" << endmsg;
958 return;
959 }
960
961 m_firstRun=false;
962 m_currentRun = pEvent->event_ID()->run_number();
963
964 // Clear Store
965 const ClearStorePolicy::Type s_clearStore = clearStorePolicy( m_clearStorePolicy.value(), msgStream() );
966 if(s_clearStore==ClearStorePolicy::EndEvent) {
967 sc = eventStore()->clearStore();
968 if(!sc.isSuccess()) {
969 error() << "Clear of Event data store failed" << endmsg;
970 }
971 }
972}
973
974//---------------------------------------------------------------------------
975
977StatusCode AthenaMtesEventLoopMgr::getEventRoot(IOpaqueAddress*& refpAddr) {
978 refpAddr = 0;
979 StatusCode sc = m_evtSelector->next(*m_evtContext);
980 if ( !sc.isSuccess() ) {
981 return sc;
982 }
983 // Create root address and assign address to data service
984 sc = m_evtSelector->createAddress(*m_evtContext,refpAddr);
985 if( !sc.isSuccess() ) {
986 sc = m_evtSelector->next(*m_evtContext);
987 if ( sc.isSuccess() ) {
988 sc = m_evtSelector->createAddress(*m_evtContext,refpAddr);
989 if ( !sc.isSuccess() ) {
990 warning() << "Error creating IOpaqueAddress." << endmsg;
991 }
992 }
993 }
994 return sc;
995}
996
997//---------------------------------------------------------------------------
998
1000
1001 // return codes:
1002 // -1 : error
1003 // 0 : no more events in selection
1004 // 1 : ok
1005
1006 StatusCode sc(StatusCode::SUCCESS);
1007
1008 //-----------------------------------------------------------------------
1009 // we need an EventInfo Object to fire the incidents.
1010 //-----------------------------------------------------------------------
1011 std::unique_ptr<const EventInfo> pEvent;
1012 if ( m_evtContext ) {
1013 // Deal with the case when an EventSelector is provided
1014
1015 //
1016 // FIXME: flow control if no more events in selector, etc.
1017 //
1018
1019 IOpaqueAddress* addr = 0;
1020
1021 IEvtSelectorSeek* is = dynamic_cast<IEvtSelectorSeek*> (m_evtSelector);
1022 if (is == 0) {
1023 error() << "Seek failed; unsupported by event selector" << endmsg;
1024 return 0;
1025 }
1026
1028 if(sc.isFailure()) {
1029 error() << "Seek failed to Evt=" << m_currentEvntNum-1 << endmsg;
1030 return 0;
1031 }
1032
1033 sc = m_evtSelector->next(*m_evtContext);
1034
1035 if ( !sc.isSuccess() ) {
1036 // This is the end of the loop. No more events in the selection
1037 info() << "No more events in event selection " << endmsg;
1038 return 0;
1039 }
1040
1041 if (m_evtSelector->createAddress(*m_evtContext, addr).isFailure()) {
1042 error() << "Could not create an IOpaqueAddress" << endmsg;
1043 return -1;
1044 }
1045
1046
1047 // Most iterators provide the IOA of an event header (EventInfo, DataHeader)
1048 if (0 != addr) {
1049 //create its proxy
1050 sc = eventStore()->recordAddress(addr);
1051 if( !sc.isSuccess() ) {
1053 warning() << "Error declaring Event object" << endmsg;
1054 return 0;
1055 }
1056 } if ((sc=eventStore()->loadEventProxies()).isFailure()) {
1057 error() << "Error loading Event proxies" << endmsg;
1058 return -1;
1059 }
1060 bool consume_modifier_stream = false;
1061 // Read the attribute list
1063 if ( pAttrList != nullptr && pAttrList->size() > 6 ) { // Try making EventID-only EventInfo object from in-file TAG
1064 try {
1065 unsigned int runNumber = (*pAttrList)["RunNumber"].data<unsigned int>();
1066 unsigned long long eventNumber = (*pAttrList)["EventNumber"].data<unsigned long long>();
1067 unsigned int eventTime = (*pAttrList)["EventTime"].data<unsigned int>();
1068 unsigned int eventTimeNS = (*pAttrList)["EventTimeNanoSec"].data<unsigned int>();
1069 unsigned int lumiBlock = (*pAttrList)["LumiBlockN"].data<unsigned int>();
1070 unsigned int bunchId = (*pAttrList)["BunchId"].data<unsigned int>();
1071
1072 consume_modifier_stream = true;
1073 // an option to override primary eventNumber with the secondary one in case of DoubleEventSelector
1075 unsigned long long eventNumberSecondary{};
1076 if ( !(pAttrList->exists("hasSecondaryInput") && (*pAttrList)["hasSecondaryInput"].data<bool>()) ) {
1077 fatal() << "Secondary EventNumber requested, but secondary input does not exist!" << endmsg;
1078 return -1;
1079 }
1080 if ( pAttrList->exists("EventNumber_secondary") ) {
1081 eventNumberSecondary = (*pAttrList)["EventNumber_secondary"].data<unsigned long long>();
1082 }
1083 else {
1084 // try legacy EventInfo if secondary input did not have attribute list
1085 // primary input should not have this EventInfo type
1086 const EventInfo* pEventSecondary = eventStore()->tryConstRetrieve<EventInfo>();
1087 if (pEventSecondary) {
1088 eventNumberSecondary = pEventSecondary->event_ID()->event_number();
1089 }
1090 else {
1091 fatal() << "Secondary EventNumber requested, but it does not exist!" << endmsg;
1092 return -1;
1093 }
1094 }
1095 if (eventNumberSecondary != 0) {
1096 if (m_doEvtHeartbeat) {
1097 info() << " ===>>> using secondary event #" << eventNumberSecondary << " instead of #" << eventNumber << "<<<===" << endmsg;
1098 }
1099 eventNumber = eventNumberSecondary;
1100 }
1101 }
1102
1103 // never recorded in the eventStore
1104 pEvent = std::make_unique<EventInfo>(
1105 std::make_unique<EventID>(runNumber, eventNumber, eventTime,
1106 eventTimeNS, lumiBlock, bunchId),
1107 nullptr);
1108 } catch (...) {
1109 }
1110 } else if (m_requireInputAttributeList) {
1111 fatal() << "Valid input attribute list required but not present!";
1112 return -1;
1113 }
1114
1115 const EventInfo* pEventObserver{pEvent.get()};
1116 if (!pEventObserver) {
1117 // Retrieve the Event object
1119 pEventObserver = eventStore()->tryConstRetrieve<EventInfo>();
1120 if( !pEventObserver ) {
1121
1122 // Try to get the xAOD::EventInfo
1123 const xAOD::EventInfo* pXEvent{nullptr};
1124 sc = eventStore()->retrieve(pXEvent);
1125 if( !sc.isSuccess() ) {
1126 error() << "Unable to retrieve Event root object" << endmsg;
1127 return -1;
1128 }
1129 consume_modifier_stream = true;
1130 // Build the old-style Event Info object for those clients that still need it
1131 pEvent = std::make_unique<EventInfo>(
1132 std::make_unique<EventID>(eventIDFromxAOD(pXEvent)),
1133 std::make_unique<EventType>(eventTypeFromxAOD(pXEvent)));
1134 pEventObserver = pEvent.get();
1135 sc = eventStore()->record(std::move(pEvent), "");
1136 if( !sc.isSuccess() ) {
1137 error() << "Error declaring event data object" << endmsg;
1138 return -1;
1139 }
1140 } else {
1141 consume_modifier_stream = false;
1142 }
1143 }
1144
1145 // the pEvent was moved to the eventStore, the object is still 'alive'.
1146 // so the raw pEventObserver pointer is also still valid
1147 // cppcheck-suppress invalidLifetime
1148 modifyEventContext(ctx, *(pEventObserver->event_ID()),
1149 consume_modifier_stream);
1150
1151 } else {
1152
1153 // with no iterator it's up to us to create an EventInfo
1154 // first event # == 1
1155 unsigned int runNmb{1}, evtNmb{m_nevt + 1};
1156
1157 // increment the run/lumiBlock number if desired
1158 if (m_flmbi != 0) {
1159 runNmb = m_nevt / m_flmbi + 1;
1160 evtNmb = m_nevt % m_flmbi + 1;
1161 }
1162 auto eid = std::make_unique<EventID> (runNmb,evtNmb, m_timeStamp);
1163 // Change lumiBlock# to match runNumber
1164 eid->set_lumi_block( runNmb );
1165
1167
1168 pEvent = std::make_unique<EventInfo>(std::move(eid),
1169 std::make_unique<EventType>());
1170
1171 modifyEventContext(ctx,*(pEvent->event_ID()), true);
1172
1173 debug() << "selecting store: " << ctx.slot() << endmsg;
1174
1175 m_whiteboard->selectStore( ctx.slot() ).ignore();
1176
1177 debug() << "recording EventInfo " << *pEvent->event_ID() << " in "
1178 << eventStore()->name() << endmsg;
1179 sc = eventStore()->record(std::move(pEvent), "McEventInfo");
1180 if( !sc.isSuccess() ) {
1181 error() << "Error declaring event data object" << endmsg;
1182 return -1;
1183 }
1184 }
1185 return 1;
1186}
1187
1188//---------------------------------------------------------------------------
1190 const EventID& eID,
1191 bool consume_modifier_stream) {
1192
1193 if (m_evtIdModSvc.isSet()) {
1194 EventID new_eID(eID);
1195 // In Mtes EventLoopMgr ctx.evt() gets set to m_nevt and *then* m_nevt is
1196 // incremented later so it's zero-indexed and we don't need to subtract one
1197 m_evtIdModSvc->modify_evtid(new_eID, ctx.evt(), consume_modifier_stream);
1198 if (msgLevel(MSG::DEBUG)) {
1199 unsigned int oldrunnr = eID.run_number();
1200 unsigned int oldLB = eID.lumi_block();
1201 unsigned int oldTS = eID.time_stamp();
1202 unsigned int oldTSno = eID.time_stamp_ns_offset();
1203 debug() << "modifyEventContext: use evtIdModSvc runnr=" << oldrunnr
1204 << " -> " << new_eID.run_number() << endmsg;
1205 debug() << "modifyEventContext: use evtIdModSvc LB=" << oldLB << " -> "
1206 << new_eID.lumi_block() << endmsg;
1207 debug() << "modifyEventContext: use evtIdModSvc TimeStamp=" << oldTS
1208 << " -> " << new_eID.time_stamp() << endmsg;
1209 debug() << "modifyEventContext: use evtIdModSvc TimeStamp ns Offset="
1210 << oldTSno << " -> " << new_eID.time_stamp_ns_offset() << endmsg;
1211 }
1212 ctx.setEventID(new_eID);
1214 ctx.eventID().run_number());
1215 return;
1216 }
1217
1218 ctx.setEventID(eID);
1219}
1220
1221//---------------------------------------------------------------------------
1223
1224 EventContext ctx{ m_nevt, m_whiteboard->allocateStore( m_nevt ) };
1225
1226 StatusCode sc = m_whiteboard->selectStore( ctx.slot() );
1227 if (sc.isFailure()) {
1228 fatal() << "Slot " << ctx.slot()
1229 << " could not be selected for the WhiteBoard" << endmsg;
1230 return EventContext{}; // invalid EventContext
1231 } else {
1233
1234 debug() << "created EventContext, num: " << ctx.evt() << " in slot: "
1235 << ctx.slot() << endmsg;
1236 }
1237
1238 return ctx;
1239}
1240
1242{
1243 Gaudi::setAppReturnCode(m_appMgrProperty, Gaudi::ReturnCode::Success, true).ignore();
1244}
1245
1249
1253//---------------------------------------------------------------------------
1254
1255int
1256AthenaMtesEventLoopMgr::drainScheduler(int& finishedEvts,bool report){
1257
1258 StatusCode sc(StatusCode::SUCCESS);
1259
1260 // maybe we can do better
1261 std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
1262
1263 EventContext* finishedEvtContext{nullptr};
1264
1265 // Here we wait not to loose cpu resources
1266 debug() << "drainScheduler: [" << finishedEvts << "] Waiting for a context" << endmsg;
1267 sc = m_schedulerSvc->popFinishedEvent(finishedEvtContext);
1268
1269 // We got past it: cache the pointer
1270 if (sc.isSuccess()){
1271 debug() << "drainScheduler: scheduler not empty: Context "
1272 << finishedEvtContext << endmsg;
1273 finishedEvtContexts.emplace_back(finishedEvtContext);
1274 } else{
1275 // no more events left in scheduler to be drained
1276 debug() << "drainScheduler: scheduler empty" << endmsg;
1277 return 0;
1278 }
1279
1280 // Let's see if we can pop other event contexts
1281 while (m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()){
1282 finishedEvtContexts.emplace_back(finishedEvtContext);
1283 }
1284
1285 // Now we flush them
1286 bool fail(false);
1287 for (auto& thisFinishedEvtContext : finishedEvtContexts){
1288 if (!thisFinishedEvtContext) {
1289 fatal() << "Detected nullptr ctxt while clearing WB!"<< endmsg;
1290 fail = true;
1291 continue;
1292 }
1293
1294 if (m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
1295 fatal() << "Failed event detected on " << thisFinishedEvtContext
1296 << " w/ fail mode: "
1297 << m_aess->eventStatus(*thisFinishedEvtContext) << endmsg;
1298 fail = true;
1299 continue;
1300 }
1301
1302 EventID::number_type n_run(0);
1303 EventID::event_number_t n_evt(0);
1304
1305 if (m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
1306 n_run = thisFinishedEvtContext->eventID().run_number();
1307 n_evt = thisFinishedEvtContext->eventID().event_number();
1308 } else {
1309 error() << "DrainSched: unable to select store "
1310 << thisFinishedEvtContext->slot() << endmsg;
1311 fail = true;
1312 continue;
1313 }
1314
1315 // m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndEvent,
1316 // *thisFinishedEvtContext ));
1317
1318 // Some code still needs global context in addition to that passed in the incident
1319 Gaudi::Hive::setCurrentContext( *thisFinishedEvtContext );
1320 info() << "Firing EndProcessing" << endmsg;
1321 m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndProcessing, *thisFinishedEvtContext ));
1322
1323 if(report) {
1324 // If we completed an event range, then report it to the pilot
1325 OutputStreamSequencerSvc::RangeReport_ptr rangeReport = m_outSeqSvc->getRangeReport();
1326 if(rangeReport) {
1327 std::string outputFileReport = rangeReport->second + std::string(",ID:")
1328 + rangeReport->first + std::string(",CPU:N/A,WALL:N/A");
1329 if( not m_inTestMode ) {
1330 // In standalone test mode there is no pilot to talk to
1331 void* message2pilot = CxxUtils::xmalloc(outputFileReport.size());
1332 memcpy(message2pilot,outputFileReport.data(),outputFileReport.size());
1333 m_socket->send(message2pilot,outputFileReport.size());
1334 }
1335 info() << "Reported the output " << outputFileReport << endmsg;
1336 }
1337 }
1338
1339 debug() << "Clearing slot " << thisFinishedEvtContext->slot()
1340 << " (event " << thisFinishedEvtContext->evt()
1341 << ") of the whiteboard" << endmsg;
1342
1343 StatusCode sc = clearWBSlot(thisFinishedEvtContext->slot());
1344 if (!sc.isSuccess()) {
1345 error() << "Whiteboard slot " << thisFinishedEvtContext->slot()
1346 << " could not be properly cleared";
1347 fail = true;
1348 continue;
1349 }
1350
1351 finishedEvts++;
1352
1353 writeHistograms().ignore();
1354 ++m_proc;
1355
1356 if (m_doEvtHeartbeat) {
1357 if(!m_useTools)
1358 info() << " ===>>> done processing event #" << n_evt << ", run #" << n_run
1359 << " on slot " << thisFinishedEvtContext->slot() << ", "
1360 << m_proc << " events processed so far <<<===" << endmsg;
1361 else
1362 info() << " ===>>> done processing event #" << n_evt << ", run #" << n_run
1363 << " on slot " << thisFinishedEvtContext->slot() << ", "
1364 << m_nev << " events read and " << m_proc
1365 << " events processed so far <<<===" << endmsg;
1366 std::ofstream outfile( "eventLoopHeartBeat.txt");
1367 if ( !outfile ) {
1368 error() << " unable to open: eventLoopHeartBeat.txt" << endmsg;
1369 fail = true;
1370 continue;
1371 } else {
1372 outfile << " done processing event #" << n_evt << ", run #" << n_run
1373 << " " << m_nev << " events read so far <<<===" << std::endl;
1374 outfile.close();
1375 }
1376 }
1377
1378 debug() << "drainScheduler thisFinishedEvtContext: " << thisFinishedEvtContext
1379 << endmsg;
1380 }
1381
1382 return ( fail ? -1 : 1 );
1383
1384}
1385
1386//---------------------------------------------------------------------------
1387
1389 StatusCode sc = m_whiteboard->clearStore(evtSlot);
1390 if( !sc.isSuccess() ) {
1391 warning() << "Clear of Event data store failed" << endmsg;
1392 }
1393 return m_whiteboard->freeStore(evtSlot);
1394}
1395//---------------------------------------------------------------------------
1396
1397std::unique_ptr<AthenaMtesEventLoopMgr::RangeStruct> AthenaMtesEventLoopMgr::getNextRange(yampl::ISocket* socket)
1398{
1399 static const std::string strReady("Ready for events");
1400 static const std::string strStopProcessing("No more events");
1401
1402 std::string range;
1403 if( m_inTestMode ) {
1404 static std::atomic<size_t> line_n = 0;
1405 info() <<"in TEST MODE, Range #" << line_n+1 << endmsg;
1406 range = (line_n < m_testPilotMessages.value().size()) ? m_testPilotMessages.value()[line_n++] : strStopProcessing;
1407 } else {
1408 // Signal the Pilot that we are ready for event processing
1409 void* ready_message = malloc(strReady.size());
1410 if (!ready_message) std::abort();
1411 memcpy(ready_message,strReady.data(),strReady.size());
1412 socket->send(ready_message,strReady.size());
1413 void* eventRangeMessage;
1414 std::string strPeerId;
1415 ssize_t eventRangeSize = socket->recv(eventRangeMessage,strPeerId);
1416 range = std::string((const char*)eventRangeMessage,eventRangeSize);
1417 leftString(range, '\n');
1418 }
1419
1420 std::unique_ptr<RangeStruct> result = std::make_unique<RangeStruct>();
1421 if(range.compare(strStopProcessing)==0) {
1422 info() << "No more events from the server" << endmsg;
1423 return result;
1424 }
1425 info() << "Got Event Range from the pilot: " << range << endmsg;
1426
1427 // _____________________ Decode range string _____________________________
1428 // Expected the following format: [{KEY:VALUE[,KEY:VALUE]}]
1429 // First get rid of the leading '[{' and the trailing '}]'
1430 if(range.starts_with( "[{")) range=range.substr(2);
1431 if(range.ends_with( "}]")){
1432 const int truncate = range.size()-2;
1433 leftString(range, truncate);
1434 }
1435
1436 std::map<std::string,std::string> eventRangeMap;
1437 size_t startpos(0);
1438 size_t endpos = range.find(',');
1439 while(endpos!=std::string::npos) {
1440 // Get the Key-Value pair
1441 std::string keyValue(range.substr(startpos,endpos-startpos));
1442 size_t colonPos = keyValue.find(':');
1443 std::string strKey = keyValue.substr(0,colonPos);
1444 std::string strVal = keyValue.substr(colonPos+1);
1445 trimRangeStrings(strKey);
1446 trimRangeStrings(strVal);
1447 eventRangeMap[strKey]=std::move(strVal);
1448
1449 // Next iteration
1450 startpos = endpos+1;
1451 endpos = range.find(',',startpos);
1452 }
1453
1454 // Get the final Key-Value pair
1455 std::string keyValue(range.substr(startpos));
1456 size_t colonPos = keyValue.find(':');
1457 std::string strKey = keyValue.substr(0,colonPos);
1458 std::string strVal = keyValue.substr(colonPos+1);
1459 trimRangeStrings(strKey);
1460 trimRangeStrings(strVal);
1461 eventRangeMap[strKey]=std::move(strVal);
1462
1463 // _____________________ Consistency check for range string _____________________________
1464 // Three checks are performed:
1465 // 1. The existence of all required fields
1466 // 2. The consistency of field values
1467 // 3. Protection against having event ranges from different input files
1468 // NB. The last check is hopefully a temporary limitation of MTES
1469 std::string errorStr{""};
1470
1471 if(eventRangeMap.find("eventRangeID")==eventRangeMap.end()
1472 || eventRangeMap.find("startEvent")==eventRangeMap.end()
1473 || eventRangeMap.find("lastEvent")==eventRangeMap.end()
1474 || eventRangeMap.find("PFN")==eventRangeMap.end()) {
1475 // Wrong format
1476 errorStr = "ERR_ATHENAMP_PARSE \"" + range + "\": Wrong format";
1477 }
1478
1479 if(errorStr.empty()) {
1480 result->startEvent = std::atoi(eventRangeMap["startEvent"].c_str());
1481 result->lastEvent = std::atoi(eventRangeMap["lastEvent"].c_str());
1482
1483 if(eventRangeMap["eventRangeID"].empty()
1484 || eventRangeMap["PFN"].empty()
1485 || result->lastEvent < result->startEvent) {
1486 // Wrong values of range fields
1487 errorStr = "ERR_ATHENAMP_PARSE \"" + range + "\": Wrong values of range fields";
1488 }
1489 else {
1490 // Update m_pfn if necessary
1491 if(m_pfn != eventRangeMap["PFN"]) {
1492 IProperty* propertyServer = dynamic_cast<IProperty*>(m_evtSelector);
1493 if(!propertyServer) {
1494 errorStr = "ERR_ATHENAMP_PARSE \"" + range + "\": Unable to dyn-cast the event selector to IProperty";
1495 }
1496 else {
1497 std::string strInpuCol("InputCollections");
1498 std::vector<std::string> vectInpCol{eventRangeMap["PFN"],};
1499 StringArrayProperty inputFileList(std::move(strInpuCol), vectInpCol);
1500 if(propertyServer->setProperty(inputFileList).isFailure()) {
1501 errorStr = "ERR_ATHENAMP_PARSE \"" + range + "\": Unable to set input file name property to the Event Selector";
1502 }
1503 else {
1504 m_pfn = eventRangeMap["PFN"];
1505 }
1506 }
1507 }
1508 }
1509 }
1510
1511 if(errorStr.empty()) {
1512 // Event range parsing was successful
1513 debug() << "*** Decoded Event Range ***" << endmsg;
1514 for (const auto& fieldvalue : eventRangeMap) {
1515 debug() << fieldvalue.first << ":" << fieldvalue.second << endmsg;
1516 }
1517
1518 result->eventRangeID = eventRangeMap["eventRangeID"];
1519 result->pfn = eventRangeMap["PFN"];
1520 }
1521 else {
1522 // We got here because there was an error
1523 // Report the error to the pilot and reset the result, so that the next range can be tried
1524 warning() << errorStr << endmsg;
1525 info() << "Ignoring this event range" << endmsg;
1526 if( not m_inTestMode ) {
1527 void* errorMessage = CxxUtils::xmalloc(errorStr.size());
1528 memcpy(errorMessage,errorStr.data(),errorStr.size());
1529 socket->send(errorMessage,errorStr.size());
1530 }
1531 result.reset();
1532 }
1533
1534 return result;
1535}
1536
1538{
1539 size_t i(0);
1540 // get rid of leading spaces
1541 while(i<str.size() && str[i]==' ') i++;
1542 if(i) str = str.substr(i);
1543
1544 if(str.empty()) return; // Corner case: string consists only of spaces
1545
1546 // get rid of trailing spaces
1547 i=str.size()-1;
1548 while(str[i]==' ') i--;
1549 if(i) str.resize(i+1);
1550
1551 // the string might be enclosed by either
1552 // "u\'" and "\'"
1553 // or
1554 // "\"" and "\""
1555 // Get rid of them!
1556 if(str.starts_with( "u\'")) {
1557 str = str.substr(2);
1558 if(str.rfind('\'')==str.size()-1) {
1559 str.pop_back();
1560 }
1561 }
1562 else if(str.starts_with("\"")) {
1563 str = str.substr(1);
1564 if(str.rfind('\"')==str.size()-1) {
1565 str.pop_back();
1566 }
1567 }
1568}
#define endmsg
#define ATH_CHECK
Evaluate an expression and check for errors.
ClearStorePolicy::Type clearStorePolicy(const std::string &policyName, MsgStream &msg)
returns the enum-version of the policy (by name)
Helpers for checking error return status codes and reporting errors.
#define CHECK(...)
Evaluate an expression and check for errors.
Assign a CLID to EventContext.
This class provides a unique identification for each event, in terms of run/event number and/or a tim...
EventID eventIDFromxAOD(const xAOD::EventInfo *xaod)
Create EventID object from xAOD::EventInfo.
EventType eventTypeFromxAOD(const xAOD::EventInfo *xaod)
Create EventType object from xAOD::EventInfo.
This class provides general information about an event.
Extension to IEvtSelector to allow for seeking.
static Double_t sc
const bool debug
void setProperty(columnar::PythonToolHandle &self, const std::string &key, nb::object value)
This file contains the class definition for the OutputStreamSequencerSvc class.
An AttributeList represents a logical row of attributes in a metadata table.
static const Attributes_t empty
An AttributeList represents a logical row of attributes in a metadata table.
StatusCode getEventRoot(IOpaqueAddress *&refpAddr)
Create event address using event selector.
bool m_requireInputAttributeList
require input attribute list
virtual StatusCode finalize() override
implementation of IAppMgrUI::finalize
virtual EventContext createEventContext() override
implementation of IEventProcessor::createEventContext()
virtual void resetAppReturnCode() override
Reset the application return code.
virtual StatusCode executeEvent(EventContext &&ctx) override
implementation of IEventProcessor::executeEvent(void* par)
tool_store::const_iterator tool_iterator
virtual int curEvent() const override
Return the current event count.
ServiceHandle< OutputStreamSequencerSvc > m_outSeqSvc
SmartIF< IScheduler > m_schedulerSvc
A shortcut for the scheduler.
void setClearStorePolicy(Gaudi::Details::PropertyBase &clearStorePolicy)
property update handler:set the clear-store policy value and check its value.
virtual StatusCode executeAlgorithms()
Run the algorithms for the current event.
std::unique_ptr< yampl::ISocket > m_socket
ServiceHandle< Athena::IConditionsCleanerSvc > m_conditionsCleaner
std::unique_ptr< RangeStruct > getNextRange(yampl::ISocket *socket)
virtual bool terminateLoop() override
tool_stats m_toolReject
tool returns StatusCode::FAILURE counter
UnsignedIntegerProperty m_eventPrintoutInterval
tool_stats m_toolAccept
tool returns StatusCode::SUCCESS counter
virtual StatusCode executeRun(int maxevt) override
implementation of IEventProcessor::executeRun(int maxevt)
number_type m_currentRun
current run number
AthenaMtesEventLoopMgr()=delete
void modifyEventContext(EventContext &ctx, const EventID &eID, bool consume_modifier_stream)
tool_store m_tools
internal tool store
virtual const std::string & name() const override
unsigned int m_nev
events processed
virtual StatusCode initialize() override
implementation of IAppMgrUI::initalize
SmartIF< IAlgResourcePool > m_algResourcePool
Reference to the Algorithm resource pool.
IEvtIdModifierSvc_t m_evtIdModSvc
virtual StatusCode writeHistograms(bool force=false)
Dump out histograms as needed.
SmartIF< IHiveWhiteBoard > m_whiteboard
Reference to the Whiteboard interface.
virtual ~AthenaMtesEventLoopMgr()
Standard Destructor.
StatusCode initializeAlgorithms()
Initialize all algorithms and output streams.
SmartIF< IAlgExecStateSvc > m_aess
Reference to the Algorithm Execution State Svc.
tool_stats m_toolInvoke
tool called counter
virtual int drainScheduler(int &finishedEvents, bool report) override
Drain the scheduler from all actions that may be queued.
bool m_useSecondaryEventNumber
read event number from secondary input
virtual StatusCode nextEvent(int maxevt) override
implementation of IAppMgrUI::nextEvent. maxevt==0 returns immediately
EvtContext * m_evtContext
Gaudi event selector Context (may be used as a cursor by the evt selector)
StatusCode clearWBSlot(int evtSlot)
Clear a slot in the WB.
std::string m_whiteboardName
Name of the Whiteboard to be used.
int declareEventRootAddress(EventContext &)
Declare the root address of the event.
bool m_scheduledStop
Scheduled stop of event processing.
virtual StatusCode seek(int evt) override
Seek to a given event.
virtual StatusCode stop() override
implementation of IService::stop
std::string m_schedulerName
Name of the scheduler to be used.
StringArrayProperty m_testPilotMessages
IDataManagerSvc_t m_histoDataMgrSvc
Reference to the Histogram Data Service.
StoreGateSvc_t m_eventStore
Reference to StoreGateSvc;.
virtual int size() override
Return the size of the collection.
Gaudi::Property< std::string > m_eventRangeChannel
SmartIF< IProperty > m_appMgrProperty
Property interface of ApplicationMgr.
virtual void setCurrentEventNum(int num) override
ServiceHandle< IConversionSvc > IConversionSvc_t
virtual StatusCode stopRun() override
implementation of IEventProcessor::stopRun()
void setupPreSelectTools(Gaudi::Details::PropertyBase &)
property update handler:sets up the Pre-selection tools
IIncidentSvc_t m_incidentSvc
Reference to the incident service.
StoreGateSvc * eventStore() const
void trimRangeStrings(std::string &str)
UnsignedIntegerProperty m_writeInterval
virtual void handle(const Incident &inc) override
IIncidentListenet interfaces.
IEvtSelector * m_evtSelector
Reference to the Event Selector.
void resetTimeout(Timeout &instance)
Reset timeout.
Definition Timeout.h:83
static Timeout & instance()
Get reference to Timeout singleton.
Definition Timeout.h:64
void setConditionsRun(EventIDBase::number_type conditionsRun)
This class provides a unique identification for each event, in terms of run/event number and/or a tim...
Definition EventID.h:35
EventIDBase::number_type number_type
Definition EventID.h:37
EventID * event_ID()
the unique identification of the event.
Abstract interface for seeking for an event selector.
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
virtual int size(IEvtSelector::Context &c) const =0
Return the size of the collection, or -1 if we can't get the size.
std::unique_ptr< RangeReport_t > RangeReport_ptr
The Athena Transient Store API.
StatusCode recordAddress(const std::string &skey, CxxUtils::RefCountedPtr< IOpaqueAddress > pAddress, bool clearAddressFlag=true)
Create a proxy object using an IOpaqueAddress and a transient key.
StatusCode record(T *p2BRegistered, const TKEY &key)
Record an object with a key.
const T * tryConstRetrieve() const
StatusCode retrieve(const T *&ptr) const
Retrieve the default object into a const T*.
virtual StatusCode clearStore(bool forceRemove=false) override final
clear DataStore contents: called by the event loop mgrs
int ir
counter of the current depth
Definition fastadd.cxx:49
bool contains(const std::string &s, const std::string &regx)
does a string contain the substring
Definition hcg.cxx:114
bool verbose
Definition hcg.cxx:73
void setExtendedEventContext(EventContext &ctx, ExtendedEventContext &&ectx)
Move an extended context into a context object.
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
void * xmalloc(size_t size)
Trapping version of malloc.
Definition xmalloc.cxx:31
thread_local event_number_t eventIndex
EventInfo_v1 EventInfo
Definition of the latest event info version.
Trapping version of malloc.