ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaMtesEventLoopMgr.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
6#include "ClearStorePolicy.h"
8
15#include "CxxUtils/xmalloc.h"
16
17#include "GaudiKernel/IAlgorithm.h"
18#include "GaudiKernel/SmartIF.h"
19#include "GaudiKernel/Incident.h"
20#include "GaudiKernel/DataObject.h"
21#include "GaudiKernel/IIncidentSvc.h"
22#include "GaudiKernel/IDataManagerSvc.h"
23#include "GaudiKernel/IDataProviderSvc.h"
24#include "GaudiKernel/IConversionSvc.h"
25#include "GaudiKernel/GaudiException.h"
26#include "GaudiKernel/AppReturnCode.h"
27#include "GaudiKernel/MsgStream.h"
28#include "GaudiKernel/EventIDBase.h"
29#include "GaudiKernel/ThreadLocalContext.h"
30#include "GaudiKernel/FileIncident.h"
31
33
34#include "EventInfo/EventInfo.h"
35#include "EventInfo/EventID.h"
36#include "EventInfo/EventType.h"
39
40#include "tbb/tick_count.h"
41#include <yampl/ISocket.h>
42#include "yampl/SocketFactory.h"
43
44#include <cassert>
45#include <ios>
46#include <iostream>
47#include <fstream>
48#include <iomanip>
49#include <cstdlib>
50#include <unistd.h>
51
52namespace {
53 bool
54 leftString(std::string & s, char sc){
55 bool truncated{false};
56 auto n = s.find(sc);
57 if (n!=std::string::npos){
58 s.resize(n);
59 truncated=true;
60 }
61 return truncated;
62}
63
64 bool
65 leftString(std::string & s, int n){
66 bool truncated{false};
67 if (static_cast<size_t>(n) < s.size()){
68 s.resize(n);
69 truncated=true;
70 }
71 return truncated;
72 }
73}
74
76 , ISvcLocator* svcLoc)
77 : base_class(nam, svcLoc)
78 , m_incidentSvc ( "IncidentSvc", nam )
79 , m_eventStore( "StoreGateSvc", nam )
80 , m_evtSelector{nullptr}
81 , m_evtContext{nullptr}
82 , m_histoDataMgrSvc( "HistogramDataSvc", nam )
83 , m_histoPersSvc ( "HistogramPersistencySvc", nam )
84 , m_evtIdModSvc("", nam)
85 , m_currentRun(0)
86 , m_firstRun(true)
87 , m_tools(this)
88 , m_nevt(0)
89 , m_writeHists(false)
90 , m_nev(0)
91 , m_proc(0)
92 , m_useTools(false)
93 , m_doEvtHeartbeat(false)
94 , m_conditionsCleaner( "Athena::ConditionsCleanerSvc", nam )
95 , m_outSeqSvc("OutputStreamSequencerSvc", nam)
96{
97 declareProperty("EvtSel", m_evtsel,
98 "Name of Event Selector to use. If empty string (default) "
99 "take value from ApplicationMgr");
100 declareProperty("HistogramPersistency", m_histPersName="",
101 "Histogram persistency technology to use: ROOT, HBOOK, NONE. "
102 "By default (empty string) get property value from "
103 "ApplicationMgr");
104 declareProperty("HistWriteInterval", m_writeInterval=0 ,
105 "histogram write/update interval");
106 declareProperty("FailureMode", m_failureMode=1 ,
107 "Controls behaviour of event loop depending on return code of"
108 " Algorithms. 0: all non-SUCCESSes terminate job. "
109 "1: RECOVERABLE skips to next event, FAILURE terminates job "
110 "(DEFAULT). 2: RECOVERABLE and FAILURE skip to next events");
111 declareProperty("EventPrintoutInterval", m_eventPrintoutInterval=1,
112 "Print event heartbeat printouts every m_eventPrintoutInterval events");
113 declareProperty("ClearStorePolicy",
114 m_clearStorePolicy = "EndEvent",
115 "Configure the policy wrt handling of when the "
116 "'clear-the-event-store' event shall happen: at EndEvent "
117 "(default as it is makes things easier for memory management"
118 ") or at BeginEvent (easier e.g. for interactive use)");
119 declareProperty("PreSelectTools",m_tools,"AlgTools for event pre-selection")->
120 declareUpdateHandler( &AthenaMtesEventLoopMgr::setupPreSelectTools, this );
121
122 declareProperty("SchedulerSvc", m_schedulerName="ForwardSchedulerSvc",
123 "Name of the scheduler to be used");
124
125 declareProperty("WhiteboardSvc", m_whiteboardName="EventDataSvc",
126 "Name of the Whiteboard to be used");
127
128 declareProperty("EventStore", m_eventStore);
129
130 declareProperty("EvtIdModifierSvc", m_evtIdModSvc,
131 "ServiceHandle for EvtIdModifierSvc");
132
133 declareProperty("FakeLumiBlockInterval", m_flmbi = 0,
134 "Event interval at which to increment lumiBlock# when "
135 "creating events without an EventSelector. Zero means "
136 "don't increment it");
137 declareProperty("FakeTimestampInterval", m_timeStampInt = 1,
138 "timestamp interval between events when creating Events "
139 "without an EventSelector");
140 declareProperty("RequireInputAttributeList", m_requireInputAttributeList = false,
141 "Require valid input attribute list to be present");
142 declareProperty("UseSecondaryEventNumber", m_useSecondaryEventNumber = false,
143 "In case of DoubleEventSelector use event number from secondary input");
144
145 declareProperty("ESTestPilotMessages", m_testPilotMessages, "List of messages from fake pilot for test mode");
146
147 m_scheduledStop = false;
148
149}
150
154
156{
157 info() << "Initializing " << name() << endmsg;
158
159 StatusCode sc = MinimalEventLoopMgr::initialize();
160 if(!sc.isSuccess()) {
161 error() << "Failed to initialize base class MinimalEventLoopMgr" << endmsg;
162 return sc;
163 }
164
165//-------------------------------------------------------------------------
166// Setup stuff for hive
167//-------------------------------------------------------------------------
168
169 m_whiteboard = serviceLocator()->service(m_whiteboardName);
170 if(!m_whiteboard.isValid()) {
171 fatal() << "Error retrieving " << m_whiteboardName << " interface IHiveWhiteBoard." << endmsg;
172 return StatusCode::FAILURE;
173 }
174
175 m_schedulerSvc = serviceLocator()->service(m_schedulerName);
176 if(!m_schedulerSvc.isValid()) {
177 fatal() << "Error retrieving SchedulerSvc interface ISchedulerSvc." << endmsg;
178 return StatusCode::FAILURE;
179 }
180
181 m_algResourcePool = serviceLocator()->service("AlgResourcePool");
182 if(!m_algResourcePool.isValid()) {
183 fatal() << "Error retrieving AlgResourcePool" << endmsg;
184 return StatusCode::FAILURE;
185 }
186
187 m_aess = serviceLocator()->service("AlgExecStateSvc");
188 if(!m_aess.isValid()) {
189 fatal() << "Error retrieving AlgExecStateSvc" << endmsg;
190 return StatusCode::FAILURE;
191 }
192
193 ATH_CHECK(m_eventStore.retrieve());
194 ATH_CHECK(m_incidentSvc.retrieve());
195
196//--------------------------------------------------------------------------
197// Access Property Manager interface:
198//--------------------------------------------------------------------------
199 SmartIF<IProperty> prpMgr(serviceLocator());
200 if(!prpMgr.isValid()) {
201 fatal() << "IProperty interface not found in ApplicationMgr." << endmsg;
202 return StatusCode::FAILURE;
203 }
204
205 ATH_CHECK(m_histoDataMgrSvc.retrieve());
206
207 const std::string& histPersName(m_histPersName.value());
208 if(histPersName.empty()) {
209 ATH_CHECK(setProperty(prpMgr->getProperty("HistogramPersistency")));
210 }
211
212 if(histPersName != "NONE") {
213
214 m_histoPersSvc = IConversionSvc_t( "HistogramPersistencySvc",
215 this->name() );
216
217 SmartIF<IProperty> histSvc{serviceLocator()->service("RootHistSvc")};
218
219 if (!histSvc) {
220 error() << "could not locate actual Histogram persistency service" << endmsg;
221 }
222 else {
223 const Gaudi::Details::PropertyBase &prop = histSvc->getProperty("OutputFile");
224 std::string val;
225 try {
226 const StringProperty &sprop = dynamic_cast<const StringProperty&>(prop);
227 val = sprop.value();
228 }
229 catch (...) {
230 verbose() << "could not dcast OutputFile property to a StringProperty."
231 << " Need to fix Gaudi."
232 << endmsg;
233
234 val = prop.toString();
235 }
236
237 if (val != ""
238 && val != "UndefinedROOTOutputFileName") {
239 m_writeHists = true;
240 }
241 }
242 }
243 else {
244 if (msgLevel(MSG::DEBUG)) {
245 debug() << "Histograms saving not required." << endmsg;
246 }
247 }
248
249 //--------------------------------------------------------------------------
250 // Set up the EventID modifier Service
251 //--------------------------------------------------------------------------
252 if (m_evtIdModSvc.empty()) {
253 debug() << "EventID modifier Service not set. No run number, ... overrides "
254 "will "
255 "be applied."
256 << endmsg;
257 } else if (!m_evtIdModSvc.retrieve().isSuccess()) {
258 debug() << "Could not find EventID modifier Service. No run number, ... "
259 "overrides "
260 "will be applied."
261 << endmsg;
262 }
263
264 //-------------------------------------------------------------------------
265 // Setup EventSelector service
266 //-------------------------------------------------------------------------
267 const std::string& selName(m_evtsel.value());
268 // the evt sel is usually specified as a property of ApplicationMgr
269 if (selName.empty()) {
270 sc = setProperty(prpMgr->getProperty("EvtSel"));
271 }
272 if (sc.isFailure()) {
273 warning() << "Unable to set EvtSel property" << endmsg;
274 }
275
276 // We do not expect a Event Selector necessarily being declared
277 if( !selName.empty() && selName != "NONE") {
278 SmartIF<IEvtSelector> theEvtSel{serviceLocator()->service(selName)};
279 if(theEvtSel && (theEvtSel != m_evtSelector)) {
280 // Event Selector changed (or setup for the first time)
281 m_evtSelector = theEvtSel;
282
283 // reset iterator
284 if (m_evtSelector->createContext(m_evtContext).isFailure()) {
285 fatal() << "Can not create the event selector Context." << endmsg;
286 return StatusCode::FAILURE;
287 }
288 if (msgLevel(MSG::INFO)) {
289 SmartIF<INamedInterface> named{theEvtSel};
290 if (named) {
291 info() << "Setup EventSelector service " << named->name( ) << endmsg;
292 }
293 }
294 }
295 else if (sc.isFailure()) {
296 fatal() << "No valid event selector called " << selName << endmsg;
297 return StatusCode::FAILURE;
298 }
299 }
300
301//-------------------------------------------------------------------------
302// Setup 'Clear-Store' policy
303//-------------------------------------------------------------------------
304 try {
306 } catch(...) {
307 return StatusCode::FAILURE;
308 }
309
310 // Listen to the BeforeFork incident
311 m_incidentSvc->addListener(this,"BeforeFork",0);
312
313 CHECK( m_conditionsCleaner.retrieve() );
314 CHECK( m_outSeqSvc.retrieve() );
315
316 // Print if we override the event number using the one from secondary event
318 info() << "Using secondary event number." << endmsg;
319 }
320
321 if( m_testPilotMessages.value().size() > 0 ) {
322 info() << "runnung in standalone TEST MODE" << endmsg;
323 info() << " test contains " << m_testPilotMessages.value().size() << " event ranges" << endmsg;
324 for( const std::string& range: m_testPilotMessages.value() ) {
325 debug() << " " << range << endmsg;
326 }
327 m_inTestMode = true;
328 }
329 return sc;
330}
331
332//=========================================================================
333// property handlers
334//=========================================================================
335void
336AthenaMtesEventLoopMgr::setClearStorePolicy(Gaudi::Details::PropertyBase&) {
337 const std::string& policyName = m_clearStorePolicy.value();
338
339 if ( policyName != "BeginEvent" &&
340 policyName != "EndEvent" ) {
341
342 fatal() << "Unknown policy [" << policyName
343 << "] for the 'ClearStore-policy !\n"
344 << " Valid values are: BeginEvent, EndEvent"
345 << endmsg;
346 throw GaudiException("Can not setup 'ClearStore'-policy",
347 name(),
348 StatusCode::FAILURE);
349 }
350
351 return;
352}
353
354void
355AthenaMtesEventLoopMgr::setupPreSelectTools(Gaudi::Details::PropertyBase&) {
356
357 m_toolInvoke.clear();
358 m_toolReject.clear();
359 m_toolAccept.clear();
360
361 m_tools.retrieve().ignore();
362 if(m_tools.size() > 0) {
363 m_useTools=true;
364 m_toolInvoke.resize(m_tools.size());
365 m_toolReject.resize(m_tools.size());
366 m_toolAccept.resize(m_tools.size());
367
368 tool_iterator firstTool = m_tools.begin();
369 tool_iterator lastTool = m_tools.end();
370 unsigned int toolCtr = 0;
371 for ( ; firstTool != lastTool; ++firstTool )
372 {
373 // reset statistics
374 m_toolInvoke[toolCtr] = 0;
375 m_toolReject[toolCtr] = 0;
376 m_toolAccept[toolCtr] = 0;
377 toolCtr++;
378 }
379 }
380
381 return;
382
383}
384
385//=========================================================================
386// implementation of IAppMgrUI::finalize
387//=========================================================================
389{
390
391 StatusCode sc = MinimalEventLoopMgr::finalize();
392 if (sc.isFailure())
393 {
394 error() << "Error in Service base class Finalize"
395 << endmsg;
396 }
397
398 StatusCode sc2 = writeHistograms(true);
399 if (sc2.isFailure())
400 {
401 error() << "Error in writing Histograms"
402 << endmsg;
403 }
404
405 // Release all interfaces (ignore StatusCodes)
406 m_histoDataMgrSvc.release().ignore();
407 m_histoPersSvc.release().ignore();
408
409 m_whiteboard = 0;
411 m_schedulerSvc = 0;
412 // m_evtDataSvc = 0;
413
414 m_incidentSvc.release().ignore();
415
416 // Release event selector context
417 if ( m_evtSelector && m_evtContext ) {
418 m_evtSelector->releaseContext(m_evtContext).ignore();
419 // m_evtSelector = releaseInterface(m_evtSelector);
420 delete m_evtContext; m_evtContext = 0;
421 }
422
423
424 if(m_useTools) {
425 tool_iterator firstTool = m_tools.begin();
426 tool_iterator lastTool = m_tools.end();
427 unsigned int toolCtr = 0;
428 info() << "Summary of AthenaEvtLoopPreSelectTool invocation: (invoked/success/failure)" << endmsg;
429 info() << "-----------------------------------------------------" << endmsg;
430
431 for ( ; firstTool != lastTool; ++firstTool ) {
432 info() << std::setw(2) << std::setiosflags(std::ios_base::right)
433 << toolCtr+1 << ".) " << std::resetiosflags(std::ios_base::right)
434 << std::setw(48) << std::setfill('.')
435 << std::setiosflags(std::ios_base::left)
436 << (*firstTool)->name() << std::resetiosflags(std::ios_base::left)
437 << std::setfill(' ')
438 << " ("
439 << std::setw(6) << std::setiosflags(std::ios_base::right)
440 << m_toolInvoke[toolCtr]
441 << "/"
442 << m_toolAccept[toolCtr]
443 << "/"
444 << m_toolReject[toolCtr]
445 << ")"
446 << endmsg;
447 toolCtr++;
448 }
449 }
450
451 return ( sc.isFailure() || sc2.isFailure() ) ? StatusCode::FAILURE :
452 StatusCode::SUCCESS;
453
454}
455
456//=========================================================================
457// write out the histograms
458//=========================================================================
460
461 StatusCode sc (StatusCode::SUCCESS);
462
463 if ( 0 != m_histoPersSvc && m_writeHists ) {
464 std::vector<DataObject*> objects;
465 sc = m_histoDataMgrSvc->traverseTree( [&objects]( IRegistry* reg, int ) {
466 DataObject* obj = reg->object();
467 if ( !obj || obj->clID() == CLID_StatisticsFile ) return false;
468 objects.push_back( obj );
469 return true;
470 } );
471
472 if ( !sc.isSuccess() ) {
473 error() << "Error while traversing Histogram data store" << endmsg;
474 return sc;
475 }
476
477 if ( objects.size() > 0) {
478 int writeInterval(m_writeInterval.value());
479
480 if ( m_nevt == 1 || force ||
481 (writeInterval != 0 && m_nevt%writeInterval == 0) ) {
482
483 // skip /stat entry!
484 sc = std::accumulate( begin( objects ), end( objects ), sc, [&]( StatusCode isc, auto& i ) {
485 IOpaqueAddress* pAddr = nullptr;
486 StatusCode iret = m_histoPersSvc->createRep( i, pAddr );
487 if ( iret.isFailure() ) return iret;
488 i->registry()->setAddress( pAddr );
489 return isc;
490 } );
491 sc = std::accumulate( begin( objects ), end( objects ), sc, [&]( StatusCode isc, auto& i ) {
492 IRegistry* reg = i->registry();
493 StatusCode iret = m_histoPersSvc->fillRepRefs( reg->address(), i );
494 return iret.isFailure() ? iret : isc;
495 } );
496 if ( ! sc.isSuccess() ) {
497 error() << "Error while saving Histograms." << endmsg;
498 }
499 }
500
501 if (force || (writeInterval != 0 && m_nevt%writeInterval == 0) ) {
502 if (msgLevel(MSG::DEBUG)) { debug() << "committing Histograms" << endmsg; }
503 m_histoPersSvc->conversionSvc()->commitOutput("",true).ignore();
504 }
505 }
506
507 }
508
509 return sc;
510}
511
512//=========================================================================
513// Call sysInitialize() on all algorithms and output streams
514//=========================================================================
516
517 return StatusCode::SUCCESS;
518}
519
520//=========================================================================
521// Run the algorithms for the current event
522//=========================================================================
524
525 return StatusCode::SUCCESS;
526}
527
528
529//=========================================================================
530// executeEvent( EventContext &&ctx )
531//=========================================================================
532StatusCode AthenaMtesEventLoopMgr::executeEvent( EventContext &&ctx )
533{
534
535 // An incident may schedule a stop, in which case is better to exit before the actual execution.
536 if ( m_scheduledStop ) {
537 always() << "A stopRun was requested by an incidentListener. "
538 << "Do not process this event."
539 << endmsg;
540 m_terminateLoop = true;
541 return (StatusCode::SUCCESS);
542 }
543
544 m_aess->reset( ctx );
545
546 // Make sure context with slot is set before calling es->next().
547 Gaudi::Hive::setCurrentContext ( ctx );
548
549 int declEvtRootSc = declareEventRootAddress(ctx);
550 if (declEvtRootSc == 0 ) { // We ran out of events!
551 m_terminateLoop = true; // we have finished!
552 return StatusCode::SUCCESS;
553 } else if ( declEvtRootSc == -1) {
554 error() << "declareEventRootAddress for context " << ctx << " failed"
555 << endmsg;
556 return StatusCode::FAILURE;
557 }
558
559 EventID::event_number_t evtNumber = ctx.eventID().event_number();
560 unsigned int conditionsRun = ctx.eventID().run_number();
561 if (!m_evtIdModSvc.isSet()) {
562 const AthenaAttributeList* attr = nullptr;
563 if (m_eventStore->contains<AthenaAttributeList>("Input") &&
564 m_eventStore->retrieve(attr, "Input").isSuccess()) {
565 if (attr->exists("ConditionsRun")) {
566 conditionsRun = (*attr)["ConditionsRun"].data<unsigned int>();
567 }
568 }
569 }
571 Gaudi::Hive::setCurrentContext ( ctx );
572
573 // Record EventContext in current whiteboard
574 if (m_eventStore->record(std::make_unique<EventContext> (ctx),
575 "EventContext").isFailure())
576 {
577 error() << "Error recording event context object" << endmsg;
578 return (StatusCode::FAILURE);
579 }
580
582 if (m_firstRun || (m_currentRun != ctx.eventID().run_number()) ) {
583 // Fire EndRun incident unless this is the first run
584 if (!m_firstRun) {
585 // FIXME!!!
586 m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndRun));
587 }
588 m_firstRun=false;
589 m_currentRun = ctx.eventID().run_number();
590
591 info() << " ===>>> start of run " << m_currentRun << " <<<==="
592 << endmsg;
593
594 // FIXME!!! Fire BeginRun "Incident"
595 m_incidentSvc->fireIncident(Incident(name(),IncidentType::BeginRun,ctx));
596
597 }
598
599 bool toolsPassed=true;
600 // CGL: FIXME
601 // bool eventFailed = false;
602
603 // Call any attached tools to reject events early
604 unsigned int toolCtr=0;
605 if(m_useTools) {
606 tool_store::iterator theTool = m_tools.begin();
607 tool_store::iterator lastTool = m_tools.end();
608 while(toolsPassed && theTool!=lastTool )
609 {
610 toolsPassed = (*theTool)->passEvent(ctx.eventID());
611 m_toolInvoke[toolCtr]++;
612 {toolsPassed ? m_toolAccept[toolCtr]++ : m_toolReject[toolCtr]++;}
613 ++toolCtr;
614 ++theTool;
615 }
616 }
617
619 0 == (m_nev % m_eventPrintoutInterval.value()));
620 if (m_doEvtHeartbeat) {
621 if(!m_useTools)
622 info() << " ===>>> start processing event #" << evtNumber << ", run #" << m_currentRun
623 << " on slot " << ctx.slot() << ", " << m_proc
624 << " events processed so far <<<===" << endmsg;
625 else
626 info() << " ===>>> start processing event #" << evtNumber << ", run #" << m_currentRun
627 << " on slot " << ctx.slot() << ", "
628 << m_nev << " events read and " << m_proc
629 << " events processed so far <<<===" << endmsg;
630 }
631
632 // Reset the timeout singleton
634 if(toolsPassed) {
635
636 CHECK( m_conditionsCleaner->event (ctx, true) );
637
638 // Remember the last event context for after event processing finishes.
639 m_lastEventContext = ctx;
640
641 // Now add event to the scheduler
642 debug() << "Adding event " << ctx.evt()
643 << ", slot " << ctx.slot()
644 << " to the scheduler" << endmsg;
645
646 m_incidentSvc->fireIncident(Incident(name(), IncidentType::BeginProcessing,
647 ctx));
648 StatusCode addEventStatus = m_schedulerSvc->pushNewEvent( new EventContext{ std::move(ctx) } );
649
650 // If this fails, we need to wait for something to complete
651 if (!addEventStatus.isSuccess()){
652 fatal() << "An event processing slot should be now free in the scheduler, but it appears not to be the case." << endmsg;
653 }
654
655 } // end of toolsPassed test
656
657 ++m_nev;
658
659 ++m_nevt;
660
661 // invalidate thread local context once outside of event execute loop
662 Gaudi::Hive::setCurrentContext( EventContext() );
663
664 return StatusCode::SUCCESS;
665
666}
667
668//=========================================================================
669// implementation of IEventProcessor::executeRun
670//=========================================================================
672{
673 StatusCode sc = nextEvent(maxevt);
674 if (sc.isSuccess()) {
675 m_incidentSvc->fireIncident(Incident(name(),"EndEvtLoop"));
676 }
677 return sc;
678}
679
680//-----------------------------------------------------------------------------
681// Implementation of IEventProcessor::stopRun()
682//-----------------------------------------------------------------------------
684 // Set the application return code
685 SmartIF<IProperty> appmgr(serviceLocator());
686 if(Gaudi::setAppReturnCode(appmgr, Gaudi::ReturnCode::ScheduledStop, true).isFailure()) {
687 error() << "Could not set return code of the application ("
688 << Gaudi::ReturnCode::ScheduledStop << ")" << endmsg;
689 }
690 m_scheduledStop = true;
691 return StatusCode::SUCCESS;
692}
693
694
695//-----------------------------------------------------------------------------
696// Implementation of IService::stop()
697//-----------------------------------------------------------------------------
699{
700 // To enable conditions access during stop we set an invalid EventContext
701 // (no event/slot number) but with valid EventID (and extended EventContext).
702 m_lastEventContext.setValid(false);
703 Gaudi::Hive::setCurrentContext( m_lastEventContext );
704
705 StatusCode sc = MinimalEventLoopMgr::stop();
706
707 // If we exit the event loop early due to an error, some event stores
708 // may not have been cleared. This can lead to segfaults later,
709 // as DetectorStore will usually get finalized before HiveSvcMgr.
710 // So make sure that all stores have been cleared at this point.
711 size_t nslot = m_whiteboard->getNumberOfStores();
712 for (size_t islot = 0; islot < nslot; islot++) {
713 sc &= clearWBSlot (islot);
714 }
715
716 Gaudi::Hive::setCurrentContext( EventContext() );
717 return sc;
718}
719
720
722{
723 if(maxevt==0) return StatusCode::SUCCESS;
724
725 // Create a socket to communicate with the Pilot
726 m_socket =
727 std::unique_ptr<yampl::ISocket>{yampl::SocketFactory().createClientSocket(
728 yampl::Channel{m_eventRangeChannel.value(), yampl::LOCAL},
729 yampl::MOVE_DATA)};
730
731 // Reset the application return code.
733
734 int finishedEvts =0;
735 int createdEvts =0;
736 info() << "Starting loop on events" << endmsg;
737
738 StatusCode sc(StatusCode::SUCCESS);
739
740 // Calculate runtime
741 auto start_time = tbb::tick_count::now();
742 auto secsFromStart = [&start_time]()->double{
743 return (tbb::tick_count::now()-start_time).seconds();
744 };
745
746 std::unique_ptr<RangeStruct> range;
747 while(!range) {
748 range = getNextRange(m_socket.get());
749 usleep(1000);
750 }
751
752 bool loop_ended = range->eventRangeID.empty();
753 if(!loop_ended) {
754 m_currentEvntNum = range->startEvent;
755 // Fire NextRange incident
756 m_incidentSvc->fireIncident(FileIncident(name(), "NextEventRange",range->eventRangeID));
757 }
758
759 bool no_more_events = false;
760
761 while(!loop_ended) {
762
763 debug() << " -> createdEvts: " << createdEvts << endmsg;
764
765 if(!m_terminateLoop // No scheduled loop termination
766 && !no_more_events // We are not yet done getting events
767 && m_schedulerSvc->freeSlots()>0) { // There are still free slots in the scheduler
768
769 debug() << "createdEvts: " << createdEvts << ", freeslots: " << m_schedulerSvc->freeSlots() << endmsg;
770
771 auto ctx = createEventContext();
772
773 if ( !ctx.valid() ) {
774 sc = StatusCode::FAILURE;
775 }
776 else {
777 sc = executeEvent( std::move(ctx) );
778 }
779
780 if (sc.isFailure()) {
781 error() << "Terminating event processing loop due to errors" << endmsg;
782 loop_ended = true;
783 }
784 else {
785 ++createdEvts;
786 if(++m_currentEvntNum > range->lastEvent) {
787 // Fetch next event range
788 range.reset();
789 while(!range) {
790 range = getNextRange(m_socket.get());
791 usleep(1000);
792 }
793 if(range->eventRangeID.empty()) {
794 no_more_events = true;
795 }
796 else {
797 m_currentEvntNum = range->startEvent;
798 // Fire NextRange incident
799 m_incidentSvc->fireIncident(FileIncident(name(), "NextEventRange",range->eventRangeID));
800 }
801 }
802 }
803 }
804 else {
805 // all the events were created but not all finished or the slots were
806 // all busy: the scheduler should finish its job
807
808 debug() << "Draining the scheduler" << endmsg;
809
810 // Pull out of the scheduler the finished events
811 int ir = drainScheduler(finishedEvts,true);
812 if(ir < 0) {
813 // some sort of error draining scheduler;
814 loop_ended = true;
815 sc = StatusCode::FAILURE;
816 }
817 else if(ir == 0) {
818 // no more events in scheduler
819 if(no_more_events) {
820 // We are done
821 loop_ended = true;
822 sc = StatusCode::SUCCESS;
823 }
824 }
825 else {
826 // keep going!
827 }
828 }
829 } // end main loop on finished events
830
831 info() << "---> Loop Finished (seconds): " << secsFromStart() <<endmsg;
832
833 return sc;
834}
835
836
837//=========================================================================
838// Seek to a given event.
839// The event selector must support the IEventSeek interface for this to work.
840//=========================================================================
842{
843 IEvtSelectorSeek* is = dynamic_cast<IEvtSelectorSeek*> (m_evtSelector);
844 if (is == 0) {
845 error() << "Seek failed; unsupported by event selector"
846 << endmsg;
847 return StatusCode::FAILURE;
848 }
849 //cppcheck-suppress nullPointerRedundantCheck
850 if (!m_evtContext) {
851 if (m_evtSelector->createContext(m_evtContext).isFailure()) {
852 fatal() << "Can not create the event selector Context."
853 << endmsg;
854 return StatusCode::FAILURE;
855 }
856 }
857 //m_evtContext cannot be null if createContext succeeded
858 //cppcheck-suppress nullPointerRedundantCheck
859 StatusCode sc = is->seek (*m_evtContext, evt);
860 if (sc.isSuccess()) {
861 m_nevt = evt;
862 }
863 else {
864 error() << "Seek failed." << endmsg;
865 }
866 return sc;
867}
868
869
870//=========================================================================
871// Return the current event count.
872//=========================================================================
874{
875 return m_nevt;
876}
877
878//=========================================================================
879// Return the collection size
880//=========================================================================
882{
883 IEvtSelectorSeek* cs = dynamic_cast<IEvtSelectorSeek*> (m_evtSelector);
884 if (cs == 0) {
885 error() << "Collection size unsupported by event selector"
886 << endmsg;
887 return -1;
888 }
889 //cppcheck-suppress nullPointerRedundantCheck
890 if (!m_evtContext) {
891 if (m_evtSelector->createContext(m_evtContext).isFailure()) {
892 fatal() << "Can not create the event selector Context."
893 << endmsg;
894 return -1;
895 }
896 }
897 //cppcheck-suppress nullPointerRedundantCheck
898 return cs->size (*m_evtContext);
899}
900
901//=========================================================================
902// Handle Incidents
903//=========================================================================
904void AthenaMtesEventLoopMgr::handle(const Incident& inc)
905{
906
907 if(inc.type()!="BeforeFork")
908 return;
909
910 if(!m_evtContext || !m_firstRun) {
911 warning() << "Skipping BeforeFork handler. Either no event selector is provided or begin run has already passed" << endmsg;
912 }
913
914 // Initialize Algorithms and Output Streams
915 StatusCode sc = initializeAlgorithms();
916 if(sc.isFailure()) {
917 error() << "Failed to initialize Algorithms" << endmsg;
918 return;
919 }
920
921 // Construct EventInfo
922 const EventInfo* pEvent(0);
923 IOpaqueAddress* addr = 0;
924 sc = m_evtSelector->next(*m_evtContext);
925 if(!sc.isSuccess()) {
926 info() << "No more events in event selection " << endmsg;
927 return;
928 }
929 sc = m_evtSelector->createAddress(*m_evtContext, addr);
930 if (sc.isFailure()) {
931 error() << "Could not create an IOpaqueAddress" << endmsg;
932 return;
933 }
934 if (0 != addr) {
935 //create its proxy
936 sc = m_eventStore->recordAddress(addr);
937 if(!sc.isSuccess()) {
938 error() << "Error declaring Event object" << endmsg;
939 return;
940 }
941 }
942
943 if(m_eventStore->loadEventProxies().isFailure()) {
944 warning() << "Error loading Event proxies" << endmsg;
945 return;
946 }
947
948 // Retrieve the Event object
949 sc = m_eventStore->retrieve(pEvent);
950 if(!sc.isSuccess()) {
951 error() << "Unable to retrieve Event root object" << endmsg;
952 return;
953 }
954
955 m_firstRun=false;
956 m_currentRun = pEvent->event_ID()->run_number();
957
958 // Clear Store
959 const ClearStorePolicy::Type s_clearStore = clearStorePolicy( m_clearStorePolicy.value(), msgStream() );
960 if(s_clearStore==ClearStorePolicy::EndEvent) {
961 sc = m_eventStore->clearStore();
962 if(!sc.isSuccess()) {
963 error() << "Clear of Event data store failed" << endmsg;
964 }
965 }
966}
967
968//---------------------------------------------------------------------------
969
971StatusCode AthenaMtesEventLoopMgr::getEventRoot(IOpaqueAddress*& refpAddr) {
972 refpAddr = 0;
973 StatusCode sc = m_evtSelector->next(*m_evtContext);
974 if ( !sc.isSuccess() ) {
975 return sc;
976 }
977 // Create root address and assign address to data service
978 sc = m_evtSelector->createAddress(*m_evtContext,refpAddr);
979 if( !sc.isSuccess() ) {
980 sc = m_evtSelector->next(*m_evtContext);
981 if ( sc.isSuccess() ) {
982 sc = m_evtSelector->createAddress(*m_evtContext,refpAddr);
983 if ( !sc.isSuccess() ) {
984 warning() << "Error creating IOpaqueAddress." << endmsg;
985 }
986 }
987 }
988 return sc;
989}
990
991//---------------------------------------------------------------------------
992
994
995 // return codes:
996 // -1 : error
997 // 0 : no more events in selection
998 // 1 : ok
999
1000 StatusCode sc(StatusCode::SUCCESS);
1001
1002 //-----------------------------------------------------------------------
1003 // we need an EventInfo Object to fire the incidents.
1004 //-----------------------------------------------------------------------
1005 std::unique_ptr<const EventInfo> pEvent;
1006 if ( m_evtContext ) {
1007 // Deal with the case when an EventSelector is provided
1008
1009 //
1010 // FIXME: flow control if no more events in selector, etc.
1011 //
1012
1013 IOpaqueAddress* addr = 0;
1014
1015 IEvtSelectorSeek* is = dynamic_cast<IEvtSelectorSeek*> (m_evtSelector);
1016 if (is == 0) {
1017 error() << "Seek failed; unsupported by event selector" << endmsg;
1018 return 0;
1019 }
1020
1022 if(sc.isFailure()) {
1023 error() << "Seek failed to Evt=" << m_currentEvntNum-1 << endmsg;
1024 return 0;
1025 }
1026
1027 sc = m_evtSelector->next(*m_evtContext);
1028
1029 if ( !sc.isSuccess() ) {
1030 // This is the end of the loop. No more events in the selection
1031 info() << "No more events in event selection " << endmsg;
1032 return 0;
1033 }
1034
1035 if (m_evtSelector->createAddress(*m_evtContext, addr).isFailure()) {
1036 error() << "Could not create an IOpaqueAddress" << endmsg;
1037 return -1;
1038 }
1039
1040
1041 // Most iterators provide the IOA of an event header (EventInfo, DataHeader)
1042 if (0 != addr) {
1043 //create its proxy
1044 sc = m_eventStore->recordAddress(addr);
1045 if( !sc.isSuccess() ) {
1047 warning() << "Error declaring Event object" << endmsg;
1048 return 0;
1049 }
1050 } if ((sc=m_eventStore->loadEventProxies()).isFailure()) {
1051 error() << "Error loading Event proxies" << endmsg;
1052 return -1;
1053 }
1054 bool consume_modifier_stream = false;
1055 // Read the attribute list
1056 const AthenaAttributeList* pAttrList = m_eventStore->tryConstRetrieve<AthenaAttributeList>("Input");
1057 if ( pAttrList != nullptr && pAttrList->size() > 6 ) { // Try making EventID-only EventInfo object from in-file TAG
1058 try {
1059 unsigned int runNumber = (*pAttrList)["RunNumber"].data<unsigned int>();
1060 unsigned long long eventNumber = (*pAttrList)["EventNumber"].data<unsigned long long>();
1061 unsigned int eventTime = (*pAttrList)["EventTime"].data<unsigned int>();
1062 unsigned int eventTimeNS = (*pAttrList)["EventTimeNanoSec"].data<unsigned int>();
1063 unsigned int lumiBlock = (*pAttrList)["LumiBlockN"].data<unsigned int>();
1064 unsigned int bunchId = (*pAttrList)["BunchId"].data<unsigned int>();
1065
1066 consume_modifier_stream = true;
1067 // an option to override primary eventNumber with the secondary one in case of DoubleEventSelector
1069 unsigned long long eventNumberSecondary{};
1070 if ( !(pAttrList->exists("hasSecondaryInput") && (*pAttrList)["hasSecondaryInput"].data<bool>()) ) {
1071 fatal() << "Secondary EventNumber requested, but secondary input does not exist!" << endmsg;
1072 return -1;
1073 }
1074 if ( pAttrList->exists("EventNumber_secondary") ) {
1075 eventNumberSecondary = (*pAttrList)["EventNumber_secondary"].data<unsigned long long>();
1076 }
1077 else {
1078 // try legacy EventInfo if secondary input did not have attribute list
1079 // primary input should not have this EventInfo type
1080 const EventInfo* pEventSecondary = m_eventStore->tryConstRetrieve<EventInfo>();
1081 if (pEventSecondary) {
1082 eventNumberSecondary = pEventSecondary->event_ID()->event_number();
1083 }
1084 else {
1085 fatal() << "Secondary EventNumber requested, but it does not exist!" << endmsg;
1086 return -1;
1087 }
1088 }
1089 if (eventNumberSecondary != 0) {
1090 if (m_doEvtHeartbeat) {
1091 info() << " ===>>> using secondary event #" << eventNumberSecondary << " instead of #" << eventNumber << "<<<===" << endmsg;
1092 }
1093 eventNumber = eventNumberSecondary;
1094 }
1095 }
1096
1097 // never recorded in the eventStore
1098 pEvent = std::make_unique<EventInfo>(
1099 std::make_unique<EventID>(runNumber, eventNumber, eventTime,
1100 eventTimeNS, lumiBlock, bunchId),
1101 nullptr);
1102 } catch (...) {
1103 }
1104 } else if (m_requireInputAttributeList) {
1105 fatal() << "Valid input attribute list required but not present!";
1106 return -1;
1107 }
1108
1109 const EventInfo* pEventObserver{pEvent.get()};
1110 if (!pEventObserver) {
1111 // Retrieve the Event object
1113 pEventObserver = m_eventStore->tryConstRetrieve<EventInfo>();
1114 if( !pEventObserver ) {
1115
1116 // Try to get the xAOD::EventInfo
1117 const xAOD::EventInfo* pXEvent{nullptr};
1118 sc = m_eventStore->retrieve(pXEvent);
1119 if( !sc.isSuccess() ) {
1120 error() << "Unable to retrieve Event root object" << endmsg;
1121 return -1;
1122 }
1123 consume_modifier_stream = true;
1124 // Build the old-style Event Info object for those clients that still need it
1125 pEvent = std::make_unique<EventInfo>(
1126 std::make_unique<EventID>(eventIDFromxAOD(pXEvent)),
1127 std::make_unique<EventType>(eventTypeFromxAOD(pXEvent)));
1128 pEventObserver = pEvent.get();
1129 sc = m_eventStore->record(std::move(pEvent), "");
1130 if( !sc.isSuccess() ) {
1131 error() << "Error declaring event data object" << endmsg;
1132 return -1;
1133 }
1134 } else {
1135 consume_modifier_stream = false;
1136 }
1137 }
1138
1139 // the pEvent was moved to the eventStore, the object is still 'alive'.
1140 // so the raw pEventObserver pointer is also still valid
1141 // cppcheck-suppress invalidLifetime
1142 modifyEventContext(ctx, *(pEventObserver->event_ID()),
1143 consume_modifier_stream);
1144
1145 } else {
1146
1147 // with no iterator it's up to us to create an EventInfo
1148 // first event # == 1
1149 unsigned int runNmb{1}, evtNmb{m_nevt + 1};
1150
1151 // increment the run/lumiBlock number if desired
1152 if (m_flmbi != 0) {
1153 runNmb = m_nevt / m_flmbi + 1;
1154 evtNmb = m_nevt % m_flmbi + 1;
1155 }
1156 auto eid = std::make_unique<EventID> (runNmb,evtNmb, m_timeStamp);
1157 // Change lumiBlock# to match runNumber
1158 eid->set_lumi_block( runNmb );
1159
1161
1162 pEvent = std::make_unique<EventInfo>(std::move(eid),
1163 std::make_unique<EventType>());
1164
1165 modifyEventContext(ctx,*(pEvent->event_ID()), true);
1166
1167 debug() << "selecting store: " << ctx.slot() << endmsg;
1168
1169 m_whiteboard->selectStore( ctx.slot() ).ignore();
1170
1171 debug() << "recording EventInfo " << *pEvent->event_ID() << " in "
1172 << m_eventStore->name() << endmsg;
1173 sc = m_eventStore->record(std::move(pEvent), "McEventInfo");
1174 if( !sc.isSuccess() ) {
1175 error() << "Error declaring event data object" << endmsg;
1176 return -1;
1177 }
1178 }
1179 return 1;
1180}
1181
1182//---------------------------------------------------------------------------
1184 const EventID& eID,
1185 bool consume_modifier_stream) {
1186
1187 if (m_evtIdModSvc.isSet()) {
1188 EventID new_eID(eID);
1189 // In Mtes EventLoopMgr ctx.evt() gets set to m_nevt and *then* m_nevt is
1190 // incremented later so it's zero-indexed and we don't need to subtract one
1191 m_evtIdModSvc->modify_evtid(new_eID, ctx.evt(), consume_modifier_stream);
1192 if (msgLevel(MSG::DEBUG)) {
1193 unsigned int oldrunnr = eID.run_number();
1194 unsigned int oldLB = eID.lumi_block();
1195 unsigned int oldTS = eID.time_stamp();
1196 unsigned int oldTSno = eID.time_stamp_ns_offset();
1197 debug() << "modifyEventContext: use evtIdModSvc runnr=" << oldrunnr
1198 << " -> " << new_eID.run_number() << endmsg;
1199 debug() << "modifyEventContext: use evtIdModSvc LB=" << oldLB << " -> "
1200 << new_eID.lumi_block() << endmsg;
1201 debug() << "modifyEventContext: use evtIdModSvc TimeStamp=" << oldTS
1202 << " -> " << new_eID.time_stamp() << endmsg;
1203 debug() << "modifyEventContext: use evtIdModSvc TimeStamp ns Offset="
1204 << oldTSno << " -> " << new_eID.time_stamp_ns_offset() << endmsg;
1205 }
1206 ctx.setEventID(new_eID);
1208 ctx.eventID().run_number());
1209 return;
1210 }
1211
1212 ctx.setEventID(eID);
1213}
1214
1215//---------------------------------------------------------------------------
1217
1218 EventContext ctx{ m_nevt, m_whiteboard->allocateStore( m_nevt ) };
1219
1220 StatusCode sc = m_whiteboard->selectStore( ctx.slot() );
1221 if (sc.isFailure()) {
1222 fatal() << "Slot " << ctx.slot()
1223 << " could not be selected for the WhiteBoard" << endmsg;
1224 return EventContext{}; // invalid EventContext
1225 } else {
1227
1228 debug() << "created EventContext, num: " << ctx.evt() << " in slot: "
1229 << ctx.slot() << endmsg;
1230 }
1231
1232 return ctx;
1233}
1234
1236{
1237 Gaudi::setAppReturnCode(m_appMgrProperty, Gaudi::ReturnCode::Success, true).ignore();
1238}
1239
1243
1247//---------------------------------------------------------------------------
1248
1249int
1250AthenaMtesEventLoopMgr::drainScheduler(int& finishedEvts,bool report){
1251
1252 StatusCode sc(StatusCode::SUCCESS);
1253
1254 // maybe we can do better
1255 std::vector<std::unique_ptr<EventContext>> finishedEvtContexts;
1256
1257 EventContext* finishedEvtContext{nullptr};
1258
1259 // Here we wait not to loose cpu resources
1260 debug() << "drainScheduler: [" << finishedEvts << "] Waiting for a context" << endmsg;
1261 sc = m_schedulerSvc->popFinishedEvent(finishedEvtContext);
1262
1263 // We got past it: cache the pointer
1264 if (sc.isSuccess()){
1265 debug() << "drainScheduler: scheduler not empty: Context "
1266 << finishedEvtContext << endmsg;
1267 finishedEvtContexts.emplace_back(finishedEvtContext);
1268 } else{
1269 // no more events left in scheduler to be drained
1270 debug() << "drainScheduler: scheduler empty" << endmsg;
1271 return 0;
1272 }
1273
1274 // Let's see if we can pop other event contexts
1275 while (m_schedulerSvc->tryPopFinishedEvent(finishedEvtContext).isSuccess()){
1276 finishedEvtContexts.emplace_back(finishedEvtContext);
1277 }
1278
1279 // Now we flush them
1280 bool fail(false);
1281 for (auto& thisFinishedEvtContext : finishedEvtContexts){
1282 if (!thisFinishedEvtContext) {
1283 fatal() << "Detected nullptr ctxt while clearing WB!"<< endmsg;
1284 fail = true;
1285 continue;
1286 }
1287
1288 if (m_aess->eventStatus(*thisFinishedEvtContext) != EventStatus::Success) {
1289 fatal() << "Failed event detected on " << thisFinishedEvtContext
1290 << " w/ fail mode: "
1291 << m_aess->eventStatus(*thisFinishedEvtContext) << endmsg;
1292 fail = true;
1293 continue;
1294 }
1295
1296 EventID::number_type n_run(0);
1297 EventID::event_number_t n_evt(0);
1298
1299 if (m_whiteboard->selectStore(thisFinishedEvtContext->slot()).isSuccess()) {
1300 n_run = thisFinishedEvtContext->eventID().run_number();
1301 n_evt = thisFinishedEvtContext->eventID().event_number();
1302 } else {
1303 error() << "DrainSched: unable to select store "
1304 << thisFinishedEvtContext->slot() << endmsg;
1305 fail = true;
1306 continue;
1307 }
1308
1309 // m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndEvent,
1310 // *thisFinishedEvtContext ));
1311
1312 // Some code still needs global context in addition to that passed in the incident
1313 Gaudi::Hive::setCurrentContext( *thisFinishedEvtContext );
1314 info() << "Firing EndProcessing" << endmsg;
1315 m_incidentSvc->fireIncident(Incident(name(), IncidentType::EndProcessing, *thisFinishedEvtContext ));
1316
1317 if(report) {
1318 // If we completed an event range, then report it to the pilot
1319 OutputStreamSequencerSvc::RangeReport_ptr rangeReport = m_outSeqSvc->getRangeReport();
1320 if(rangeReport) {
1321 std::string outputFileReport = rangeReport->second + std::string(",ID:")
1322 + rangeReport->first + std::string(",CPU:N/A,WALL:N/A");
1323 if( not m_inTestMode ) {
1324 // In standalone test mode there is no pilot to talk to
1325 void* message2pilot = CxxUtils::xmalloc(outputFileReport.size());
1326 memcpy(message2pilot,outputFileReport.data(),outputFileReport.size());
1327 m_socket->send(message2pilot,outputFileReport.size());
1328 }
1329 info() << "Reported the output " << outputFileReport << endmsg;
1330 }
1331 }
1332
1333 debug() << "Clearing slot " << thisFinishedEvtContext->slot()
1334 << " (event " << thisFinishedEvtContext->evt()
1335 << ") of the whiteboard" << endmsg;
1336
1337 StatusCode sc = clearWBSlot(thisFinishedEvtContext->slot());
1338 if (!sc.isSuccess()) {
1339 error() << "Whiteboard slot " << thisFinishedEvtContext->slot()
1340 << " could not be properly cleared";
1341 fail = true;
1342 continue;
1343 }
1344
1345 finishedEvts++;
1346
1347 writeHistograms().ignore();
1348 ++m_proc;
1349
1350 if (m_doEvtHeartbeat) {
1351 if(!m_useTools)
1352 info() << " ===>>> done processing event #" << n_evt << ", run #" << n_run
1353 << " on slot " << thisFinishedEvtContext->slot() << ", "
1354 << m_proc << " events processed so far <<<===" << endmsg;
1355 else
1356 info() << " ===>>> done processing event #" << n_evt << ", run #" << n_run
1357 << " on slot " << thisFinishedEvtContext->slot() << ", "
1358 << m_nev << " events read and " << m_proc
1359 << " events processed so far <<<===" << endmsg;
1360 std::ofstream outfile( "eventLoopHeartBeat.txt");
1361 if ( !outfile ) {
1362 error() << " unable to open: eventLoopHeartBeat.txt" << endmsg;
1363 fail = true;
1364 continue;
1365 } else {
1366 outfile << " done processing event #" << n_evt << ", run #" << n_run
1367 << " " << m_nev << " events read so far <<<===" << std::endl;
1368 outfile.close();
1369 }
1370 }
1371
1372 debug() << "drainScheduler thisFinishedEvtContext: " << thisFinishedEvtContext
1373 << endmsg;
1374 }
1375
1376 return ( fail ? -1 : 1 );
1377
1378}
1379
1380//---------------------------------------------------------------------------
1381
1383 StatusCode sc = m_whiteboard->clearStore(evtSlot);
1384 if( !sc.isSuccess() ) {
1385 warning() << "Clear of Event data store failed" << endmsg;
1386 }
1387 return m_whiteboard->freeStore(evtSlot);
1388}
1389//---------------------------------------------------------------------------
1390
1391std::unique_ptr<AthenaMtesEventLoopMgr::RangeStruct> AthenaMtesEventLoopMgr::getNextRange(yampl::ISocket* socket)
1392{
1393 static const std::string strReady("Ready for events");
1394 static const std::string strStopProcessing("No more events");
1395
1396 std::string range;
1397 if( m_inTestMode ) {
1398 static std::atomic<size_t> line_n = 0;
1399 info() <<"in TEST MODE, Range #" << line_n+1 << endmsg;
1400 range = (line_n < m_testPilotMessages.value().size()) ? m_testPilotMessages.value()[line_n++] : strStopProcessing;
1401 } else {
1402 // Signal the Pilot that we are ready for event processing
1403 void* ready_message = malloc(strReady.size());
1404 if (!ready_message) std::abort();
1405 memcpy(ready_message,strReady.data(),strReady.size());
1406 socket->send(ready_message,strReady.size());
1407 void* eventRangeMessage;
1408 std::string strPeerId;
1409 ssize_t eventRangeSize = socket->recv(eventRangeMessage,strPeerId);
1410 range = std::string((const char*)eventRangeMessage,eventRangeSize);
1411 leftString(range, '\n');
1412 }
1413
1414 std::unique_ptr<RangeStruct> result = std::make_unique<RangeStruct>();
1415 if(range.compare(strStopProcessing)==0) {
1416 info() << "No more events from the server" << endmsg;
1417 return result;
1418 }
1419 info() << "Got Event Range from the pilot: " << range << endmsg;
1420
1421 // _____________________ Decode range string _____________________________
1422 // Expected the following format: [{KEY:VALUE[,KEY:VALUE]}]
1423 // First get rid of the leading '[{' and the trailing '}]'
1424 if(range.starts_with( "[{")) range=range.substr(2);
1425 if(range.ends_with( "}]")){
1426 const int truncate = range.size()-2;
1427 leftString(range, truncate);
1428 }
1429
1430 std::map<std::string,std::string> eventRangeMap;
1431 size_t startpos(0);
1432 size_t endpos = range.find(',');
1433 while(endpos!=std::string::npos) {
1434 // Get the Key-Value pair
1435 std::string keyValue(range.substr(startpos,endpos-startpos));
1436 size_t colonPos = keyValue.find(':');
1437 std::string strKey = keyValue.substr(0,colonPos);
1438 std::string strVal = keyValue.substr(colonPos+1);
1439 trimRangeStrings(strKey);
1440 trimRangeStrings(strVal);
1441 eventRangeMap[strKey]=std::move(strVal);
1442
1443 // Next iteration
1444 startpos = endpos+1;
1445 endpos = range.find(',',startpos);
1446 }
1447
1448 // Get the final Key-Value pair
1449 std::string keyValue(range.substr(startpos));
1450 size_t colonPos = keyValue.find(':');
1451 std::string strKey = keyValue.substr(0,colonPos);
1452 std::string strVal = keyValue.substr(colonPos+1);
1453 trimRangeStrings(strKey);
1454 trimRangeStrings(strVal);
1455 eventRangeMap[strKey]=std::move(strVal);
1456
1457 // _____________________ Consistency check for range string _____________________________
1458 // Three checks are performed:
1459 // 1. The existence of all required fields
1460 // 2. The consistency of field values
1461 // 3. Protection against having event ranges from different input files
1462 // NB. The last check is hopefully a temporary limitation of MTES
1463 std::string errorStr{""};
1464
1465 if(eventRangeMap.find("eventRangeID")==eventRangeMap.end()
1466 || eventRangeMap.find("startEvent")==eventRangeMap.end()
1467 || eventRangeMap.find("lastEvent")==eventRangeMap.end()
1468 || eventRangeMap.find("PFN")==eventRangeMap.end()) {
1469 // Wrong format
1470 errorStr = "ERR_ATHENAMP_PARSE \"" + range + "\": Wrong format";
1471 }
1472
1473 if(errorStr.empty()) {
1474 result->startEvent = std::atoi(eventRangeMap["startEvent"].c_str());
1475 result->lastEvent = std::atoi(eventRangeMap["lastEvent"].c_str());
1476
1477 if(eventRangeMap["eventRangeID"].empty()
1478 || eventRangeMap["PFN"].empty()
1479 || result->lastEvent < result->startEvent) {
1480 // Wrong values of range fields
1481 errorStr = "ERR_ATHENAMP_PARSE \"" + range + "\": Wrong values of range fields";
1482 }
1483 else {
1484 // Update m_pfn if necessary
1485 if(m_pfn != eventRangeMap["PFN"]) {
1486 IProperty* propertyServer = dynamic_cast<IProperty*>(m_evtSelector);
1487 if(!propertyServer) {
1488 errorStr = "ERR_ATHENAMP_PARSE \"" + range + "\": Unable to dyn-cast the event selector to IProperty";
1489 }
1490 else {
1491 std::string strInpuCol("InputCollections");
1492 std::vector<std::string> vectInpCol{eventRangeMap["PFN"],};
1493 StringArrayProperty inputFileList(std::move(strInpuCol), vectInpCol);
1494 if(propertyServer->setProperty(inputFileList).isFailure()) {
1495 errorStr = "ERR_ATHENAMP_PARSE \"" + range + "\": Unable to set input file name property to the Event Selector";
1496 }
1497 else {
1498 m_pfn = eventRangeMap["PFN"];
1499 }
1500 }
1501 }
1502 }
1503 }
1504
1505 if(errorStr.empty()) {
1506 // Event range parsing was successful
1507 debug() << "*** Decoded Event Range ***" << endmsg;
1508 for (const auto& fieldvalue : eventRangeMap) {
1509 debug() << fieldvalue.first << ":" << fieldvalue.second << endmsg;
1510 }
1511
1512 result->eventRangeID = eventRangeMap["eventRangeID"];
1513 result->pfn = eventRangeMap["PFN"];
1514 }
1515 else {
1516 // We got here because there was an error
1517 // Report the error to the pilot and reset the result, so that the next range can be tried
1518 warning() << errorStr << endmsg;
1519 info() << "Ignoring this event range" << endmsg;
1520 if( not m_inTestMode ) {
1521 void* errorMessage = CxxUtils::xmalloc(errorStr.size());
1522 memcpy(errorMessage,errorStr.data(),errorStr.size());
1523 socket->send(errorMessage,errorStr.size());
1524 }
1525 result.reset();
1526 }
1527
1528 return result;
1529}
1530
1532{
1533 size_t i(0);
1534 // get rid of leading spaces
1535 while(i<str.size() && str[i]==' ') i++;
1536 if(i) str = str.substr(i);
1537
1538 if(str.empty()) return; // Corner case: string consists only of spaces
1539
1540 // get rid of trailing spaces
1541 i=str.size()-1;
1542 while(str[i]==' ') i--;
1543 if(i) str.resize(i+1);
1544
1545 // the string might be enclosed by either
1546 // "u\'" and "\'"
1547 // or
1548 // "\"" and "\""
1549 // Get rid of them!
1550 if(str.starts_with( "u\'")) {
1551 str = str.substr(2);
1552 if(str.rfind('\'')==str.size()-1) {
1553 str.pop_back();
1554 }
1555 }
1556 else if(str.starts_with("\"")) {
1557 str = str.substr(1);
1558 if(str.rfind('\"')==str.size()-1) {
1559 str.pop_back();
1560 }
1561 }
1562}
#define endmsg
#define ATH_CHECK
Evaluate an expression and check for errors.
ClearStorePolicy::Type clearStorePolicy(const std::string &policyName, MsgStream &msg)
returns the enum-version of the policy (by name)
Helpers for checking error return status codes and reporting errors.
#define CHECK(...)
Evaluate an expression and check for errors.
Assign a CLID to EventContext.
This class provides a unique identification for each event, in terms of run/event number and/or a tim...
EventID eventIDFromxAOD(const xAOD::EventInfo *xaod)
Create EventID object from xAOD::EventInfo.
EventType eventTypeFromxAOD(const xAOD::EventInfo *xaod)
Create EventType object from xAOD::EventInfo.
This class provides general information about an event.
Extension to IEvtSelector to allow for seeking.
static Double_t sc
const bool debug
void setProperty(columnar::PythonToolHandle &self, const std::string &key, nb::object value)
This file contains the class definition for the OutputStreamSequencerSvc class.
An AttributeList represents a logical row of attributes in a metadata table.
static const Attributes_t empty
An AttributeList represents a logical row of attributes in a metadata table.
StatusCode getEventRoot(IOpaqueAddress *&refpAddr)
Create event address using event selector.
bool m_requireInputAttributeList
require input attribute list
virtual StatusCode finalize() override
implementation of IAppMgrUI::finalize
virtual EventContext createEventContext() override
implementation of IEventProcessor::createEventContext()
virtual void resetAppReturnCode() override
Reset the application return code.
virtual StatusCode executeEvent(EventContext &&ctx) override
implementation of IEventProcessor::executeEvent(void* par)
tool_store::const_iterator tool_iterator
virtual int curEvent() const override
Return the current event count.
ServiceHandle< OutputStreamSequencerSvc > m_outSeqSvc
SmartIF< IScheduler > m_schedulerSvc
A shortcut for the scheduler.
void setClearStorePolicy(Gaudi::Details::PropertyBase &clearStorePolicy)
property update handler:set the clear-store policy value and check its value.
virtual StatusCode executeAlgorithms()
Run the algorithms for the current event.
std::unique_ptr< yampl::ISocket > m_socket
ServiceHandle< Athena::IConditionsCleanerSvc > m_conditionsCleaner
std::unique_ptr< RangeStruct > getNextRange(yampl::ISocket *socket)
virtual bool terminateLoop() override
tool_stats m_toolReject
tool returns StatusCode::FAILURE counter
UnsignedIntegerProperty m_eventPrintoutInterval
tool_stats m_toolAccept
tool returns StatusCode::SUCCESS counter
virtual StatusCode executeRun(int maxevt) override
implementation of IEventProcessor::executeRun(int maxevt)
number_type m_currentRun
current run number
AthenaMtesEventLoopMgr()=delete
void modifyEventContext(EventContext &ctx, const EventID &eID, bool consume_modifier_stream)
tool_store m_tools
internal tool store
virtual const std::string & name() const override
unsigned int m_nev
events processed
virtual StatusCode initialize() override
implementation of IAppMgrUI::initalize
SmartIF< IAlgResourcePool > m_algResourcePool
Reference to the Algorithm resource pool.
IEvtIdModifierSvc_t m_evtIdModSvc
virtual StatusCode writeHistograms(bool force=false)
Dump out histograms as needed.
SmartIF< IHiveWhiteBoard > m_whiteboard
Reference to the Whiteboard interface.
virtual ~AthenaMtesEventLoopMgr()
Standard Destructor.
StatusCode initializeAlgorithms()
Initialize all algorithms and output streams.
SmartIF< IAlgExecStateSvc > m_aess
Reference to the Algorithm Execution State Svc.
tool_stats m_toolInvoke
tool called counter
virtual int drainScheduler(int &finishedEvents, bool report) override
Drain the scheduler from all actions that may be queued.
bool m_useSecondaryEventNumber
read event number from secondary input
virtual StatusCode nextEvent(int maxevt) override
implementation of IAppMgrUI::nextEvent. maxevt==0 returns immediately
EvtContext * m_evtContext
Gaudi event selector Context (may be used as a cursor by the evt selector)
StatusCode clearWBSlot(int evtSlot)
Clear a slot in the WB.
std::string m_whiteboardName
Name of the Whiteboard to be used.
int declareEventRootAddress(EventContext &)
Declare the root address of the event.
bool m_scheduledStop
Scheduled stop of event processing.
virtual StatusCode seek(int evt) override
Seek to a given event.
virtual StatusCode stop() override
implementation of IService::stop
std::string m_schedulerName
Name of the scheduler to be used.
StringArrayProperty m_testPilotMessages
IDataManagerSvc_t m_histoDataMgrSvc
Reference to the Histogram Data Service.
StoreGateSvc_t m_eventStore
Reference to StoreGateSvc;.
virtual int size() override
Return the size of the collection.
Gaudi::Property< std::string > m_eventRangeChannel
SmartIF< IProperty > m_appMgrProperty
Property interface of ApplicationMgr.
virtual void setCurrentEventNum(int num) override
ServiceHandle< IConversionSvc > IConversionSvc_t
virtual StatusCode stopRun() override
implementation of IEventProcessor::stopRun()
void setupPreSelectTools(Gaudi::Details::PropertyBase &)
property update handler:sets up the Pre-selection tools
IIncidentSvc_t m_incidentSvc
Reference to the incident service.
void trimRangeStrings(std::string &str)
UnsignedIntegerProperty m_writeInterval
virtual void handle(const Incident &inc) override
IIncidentListenet interfaces.
IEvtSelector * m_evtSelector
Reference to the Event Selector.
void resetTimeout(Timeout &instance)
Reset timeout.
Definition Timeout.h:83
static Timeout & instance()
Get reference to Timeout singleton.
Definition Timeout.h:64
void setConditionsRun(EventIDBase::number_type conditionsRun)
This class provides a unique identification for each event, in terms of run/event number and/or a tim...
Definition EventID.h:35
EventIDBase::number_type number_type
Definition EventID.h:37
EventID * event_ID()
the unique identification of the event.
Abstract interface for seeking for an event selector.
virtual StatusCode seek(IEvtSelector::Context &c, int evtnum) const =0
Seek to a given event number.
virtual int size(IEvtSelector::Context &c) const =0
Return the size of the collection, or -1 if we can't get the size.
std::unique_ptr< RangeReport_t > RangeReport_ptr
int ir
counter of the current depth
Definition fastadd.cxx:49
bool verbose
Definition hcg.cxx:73
void setExtendedEventContext(EventContext &ctx, ExtendedEventContext &&ectx)
Move an extended context into a context object.
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
void * xmalloc(size_t size)
Trapping version of malloc.
Definition xmalloc.cxx:31
thread_local event_number_t eventIndex
EventInfo_v1 EventInfo
Definition of the latest event info version.
Trapping version of malloc.