ATLAS Offline Software
Loading...
Searching...
No Matches
EventSelectorByteStream.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7#include <vector>
8#include <algorithm>
9
14
15#include "GaudiKernel/ClassID.h"
16#include "GaudiKernel/FileIncident.h"
17#include "GaudiKernel/IIncidentSvc.h"
18#include "GaudiKernel/IIoComponentMgr.h"
19
23
24// EventInfoAttributeList includes
27#include "eformat/StreamTag.h"
28
29
30namespace {
31 const std::string stringTypeStr{"string"};
33 StatusCode putEvent_ST(const IAthenaIPCTool& tool,
34 long eventNumber, const void* source,
35 size_t nbytes, unsigned int status) {
36 StatusCode sc ATLAS_THREAD_SAFE = tool.putEvent(eventNumber, source, nbytes, status);
37 return sc;
38 }
39}
40
41
42// Constructor.
44 ISvcLocator *svcloc)
45 : base_class(name, svcloc) {
46 // RunNumber, OldRunNumber and OverrideRunNumberFromInput are used
47 // to override the run number coming in on the input stream
48 m_runNo.verifier().setLower(0);
49 // The following properties are only for compatibility with
50 // McEventSelector and are not really used anywhere
51 // TODO(berghaus): validate if those are even used
52 m_eventsPerRun.verifier().setLower(0);
53 m_firstEventNo.verifier().setLower(1);
54 m_firstLBNo.verifier().setLower(0);
55 m_eventsPerLB.verifier().setLower(0);
56 m_initTimeStamp.verifier().setLower(0);
57
58 m_inputCollectionsProp.declareUpdateHandler(
60}
61
62/******************************************************************************/
63void EventSelectorByteStream::inputCollectionsHandler(Gaudi::Details::PropertyBase&) {
65 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
66 this->reinit(lock).ignore();
67 }
68}
69
70
71/******************************************************************************/
74
75
76/******************************************************************************/
81
82
83//________________________________________________________________________________
85
86 m_autoRetrieveTools = false;
87 m_checkToolDeps = false;
88
89 if (m_isSecondary.value()) {
90 ATH_MSG_DEBUG("Initializing secondary event selector " << name());
91 } else {
92 ATH_MSG_DEBUG("Initializing " << name());
93 }
94
95 ATH_CHECK(::AthService::initialize());
96
97 // Check for input setting
98 if (m_filebased && m_inputCollectionsProp.value().empty()) {
99 ATH_MSG_FATAL("Unable to retrieve valid input list");
100 return StatusCode::FAILURE;
101 }
102 m_skipEventSequence = m_skipEventSequenceProp.value();
103 std::sort(m_skipEventSequence.begin(), m_skipEventSequence.end());
104
105 // Check ByteStreamCnvSvc
106 m_eventSource = serviceLocator()->service(m_eventSourceName.value());
107 if (!m_eventSource) {
108 ATH_MSG_FATAL("Cannot get ByteStreamInputSvc");
109 return StatusCode::FAILURE;
110 }
111
112 // Get CounterTool (if configured)
113 if (!m_counterTool.empty()) {
114 ATH_CHECK(m_counterTool.retrieve());
115 }
116 // Get HelperTools
117 if (!m_helperTools.empty()) {
118 ATH_CHECK(m_helperTools.retrieve());
119 }
120 // Get SharedMemoryTool (if configured)
121 if (!m_eventStreamingTool.empty()) {
122 ATH_CHECK(m_eventStreamingTool.retrieve());
123 }
124
125 // Register this service for 'I/O' events
126 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
127 ATH_CHECK(iomgr.retrieve());
128 ATH_CHECK(iomgr->io_register(this));
129
130 // Register the input files with the iomgr
131 bool allGood = true;
132 for (const std::string& input : m_inputCollectionsProp.value()) {
133 if (!iomgr->io_register(this, IIoComponentMgr::IoMode::READ, input).isSuccess()) {
134 ATH_MSG_FATAL("could not register [" << input << "] for output !");
135 allGood = false;
136 } else {
137 ATH_MSG_VERBOSE("io_register[" << this->name() << "](" << input << ") [ok]");
138 }
139 }
140 if (!allGood) {
141 return(StatusCode::FAILURE);
142 }
143
144 // Make sure MetaDataSvc is initialized before the first file is opened
145 ServiceHandle<IAthMetaDataSvc> metaDataSvc("MetaDataSvc", name());
146 ATH_CHECK(metaDataSvc.retrieve());
147
148 // Must happen before trying to open a file
150 StatusCode risc = this->reinit(lock);
151
152 return risc;
153}
154//__________________________________________________________________________
156 ATH_MSG_INFO("reinitialization...");
157 // reset markers
158 if (m_inputCollectionsProp.value().size()>0) {
159 m_numEvt.resize(m_inputCollectionsProp.value().size(), -1);
160 m_firstEvt.resize(m_inputCollectionsProp.value().size(), -1);
161 }
162 else {
163 m_numEvt.resize(1);
164 m_firstEvt.resize(1);
165 }
166
167 // Initialize InputCollectionsIterator
168 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
169 m_NumEvents = 0;
170 bool retError = false;
171 if (!m_helperTools.empty()) {
172 for (ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
173 if (!tool->postInitialize().isSuccess()) {
174 ATH_MSG_FATAL("Failed to postInitialize() " << tool->name());
175 retError = true;
176 }
177 }
178 }
179 if (retError) {
180 ATH_MSG_FATAL("Failed to postInitialize() helperTools");
181 return(StatusCode::FAILURE);
182 }
183
184 // If file based input then fire appropriate incidents
185 if (m_filebased) {
186 if (!m_firstFileFired) {
187 FileIncident firstInputFileIncident(name(), "FirstInputFile", "BSF:" + *m_inputCollectionsIterator);
188 m_incidentSvc->fireIncident(firstInputFileIncident);
189 m_firstFileFired = true;
190 }
191
192 // try to open a file
193 ATH_CHECK(this->openNewRun(lock));
194 }
195
196 return StatusCode::SUCCESS;
197}
198
199//________________________________________________________________________________
201 ATH_MSG_DEBUG("Calling EventSelectorByteStream::start()");
203 // Create the begin and end iterator's for this selector.
205 // Increment to get the new event in.
207
208 return StatusCode::SUCCESS;
209}
210
211//________________________________________________________________________________
213 ATH_MSG_DEBUG("Calling EventSelectorByteStream::stop()");
214 if (m_filebased) {
215 // Fire EndInputFile for any file still open
216 m_inputFileGuard.reset();
217 if (m_eventSource->ready()) {
218 m_eventSource->closeBlockIterator(false);
219 }
220 }
221 return StatusCode::SUCCESS;
222}
223
224//__________________________________________________________________________
226 if (!m_counterTool.empty()) {
227 if (!m_counterTool->preFinalize().isSuccess()) {
228 ATH_MSG_WARNING("Failed to preFinalize() CounterTool");
229 }
230 }
231 for (ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
232 if (!tool->preFinalize().isSuccess()) {
233 ATH_MSG_WARNING("Failed to preFinalize() " << tool->name());
234 }
235 }
236 delete m_beginIter; m_beginIter = nullptr;
237 delete m_endIter; m_endIter = nullptr;
238 if (m_eventSource) m_eventSource->release();
239 // Finalize the Service base class.
240 return AthService::finalize();
241}
242
244 // EndInputFile is handled by the InputFileIncidentGuard via transition()
245 // when the next file is opened, or via reset() on stop().
246 ++m_inputCollectionsIterator;
247 ++m_fileCount;
248}
249
251 // Should be protected upstream, but this is further protection
252 if (!m_filebased) {
253 ATH_MSG_ERROR("cannot open new run for non-filebased inputs");
254 return StatusCode::FAILURE;
255 }
256 // Check for end of file list
257 if (m_inputCollectionsIterator == m_inputCollectionsProp.value().end()) {
258 ATH_MSG_INFO("End of input file list reached");
259 return StatusCode::FAILURE;
260 }
261 std::string blockname = *m_inputCollectionsIterator;
262 // try to open a file
263 auto nevguid = m_eventSource->getBlockIterator(blockname);
264 long nev = nevguid.first;
265 if (nev == -1) {
266 ATH_MSG_FATAL("Unable to access file " << *m_inputCollectionsIterator << ", stopping here");
268 }
269 // Fire EndInputFile for previous file (if any), then BeginInputFile for new file
270 InputFileIncidentGuard::transition(m_inputFileGuard, *m_incidentSvc, name(),
271 "BSF:" + *m_inputCollectionsIterator, nevguid.second,
272 /*endFileName=*/{});
273
274 // check if file is empty
275 if (nev == 0) {
276 ATH_MSG_WARNING("no events in file " << blockname << " try next");
277 if (m_eventSource->ready()) m_eventSource->closeBlockIterator(true);
278 this->nextFile(lock);
279 return openNewRun(lock);
280 // check if skipping all events in that file (minus events already skipped)
281 } else if (m_skipEvents.value() - m_NumEvents > nev) {
282 ATH_MSG_WARNING("skipping more events " << m_skipEvents.value() - m_NumEvents << "(" << nev <<") than in file " << *m_inputCollectionsIterator << ", try next");
283 m_NumEvents += nev;
284 m_numEvt[m_fileCount] = nev;
285 if (m_eventSource->ready()) m_eventSource->closeBlockIterator(true);
286 this->nextFile(lock);
287 return openNewRun(lock);
288 }
289
290 ATH_MSG_DEBUG("Opened block/file " << blockname);
291 m_firstEvt[m_fileCount] = m_NumEvents;
292 m_numEvt[m_fileCount] = nev;
293
294 return StatusCode::SUCCESS;
295}
296
297StatusCode EventSelectorByteStream::createContext(IEvtSelector::Context*& it) const {
298 it = new EventContextByteStream(this);
299 return StatusCode::SUCCESS;
300}
301
302StatusCode EventSelectorByteStream::next(IEvtSelector::Context& it) const {
304 return nextImpl (it, lock);
305}
306StatusCode EventSelectorByteStream::nextImpl(IEvtSelector::Context& it,
307 lock_t& lock) const
308{
309 static std::atomic<int> n_bad_events = 0; // cross loop counter of bad events
310 // Check if this is an athenaMP client process
311 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
312 void* source = nullptr;
313 unsigned int status = 0;
314 ATH_CHECK(m_eventStreamingTool->getLockedEvent(&source, status));
315 m_eventSource->setEvent(static_cast<char*>(source), status);
316 return StatusCode::SUCCESS;
317 }
318 // Call all selector tool preNext before starting loop
319 for (const ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
320 if (!tool->preNext().isSuccess()) {
321 ATH_MSG_WARNING("Failed to preNext() " << tool->name());
322 }
323 }
324 if (!m_counterTool.empty()) {
325 if (!m_counterTool->preNext().isSuccess()) {
326 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
327 }
328 }
329 // Find an event to return
330 for (;;) {
331 bool badEvent{};
333 if (sc.isRecoverable()) {
334 badEvent = true;
335 } else if (sc.isFailure()) {
336 return StatusCode::FAILURE;
337 }
338
339 // increment that an event was found
340 ++m_NumEvents;
341
342 // check bad event flag and handle as configured
343 if (badEvent) {
344 int nbad = ++n_bad_events;
345 ATH_MSG_WARNING("Bad event encountered, current count at " << nbad);
346 bool toomany = (m_maxBadEvts >= 0 && nbad > m_maxBadEvts);
347 if (!m_procBadEvent || toomany) {
348 ATH_MSG_ERROR("Cannot continue processing: bad event handling disabled or limit exceeded (" << nbad << " events)");
349 it = *m_endIter;
350 return StatusCode::FAILURE;
351 }
352 ATH_MSG_WARNING("Continue with bad event");
353 }
354
355 // Check whether properties or tools reject this event
356 if ( m_NumEvents > m_skipEvents.value() &&
357 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
358
359 // Build a DH for use by other components
360 StatusCode rec_sg = m_eventSource->generateDataHeader();
361 if (rec_sg != StatusCode::SUCCESS) {
362 ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
363 }
364
365 // Build event info attribute list
366 if (recordAttributeListImpl(lock).isFailure()) ATH_MSG_WARNING("Unable to build event info att list");
367
368 StatusCode status(StatusCode::SUCCESS);
369 for (const ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
370 StatusCode toolStatus = tool->postNext();
371 if (toolStatus.isRecoverable()) {
372 ATH_MSG_INFO("Request skipping event from: " << tool->name());
373 status = StatusCode::RECOVERABLE;
374 } else if (toolStatus.isFailure()) {
375 ATH_MSG_WARNING("Failed to postNext() " << tool->name());
376 status = StatusCode::FAILURE;
377 }
378 }
379 if (status.isRecoverable()) {
380 ATH_MSG_INFO("skipping event " << m_NumEvents);
381 } else if (status.isFailure()) {
382 ATH_MSG_WARNING("Failed to postNext() HelperTool.");
383 } else {
384 if (!m_counterTool.empty()) {
385 if (!m_counterTool->postNext().isSuccess()) {
386 ATH_MSG_WARNING("Failed to postNext() CounterTool.");
387 }
388 }
389 // Validate the event
390 try {
391 m_eventSource->validateEvent();
392 }
393 catch (const ByteStreamExceptions::badFragmentData&) {
394 int nbad = ++n_bad_events;
395 ATH_MSG_WARNING("Bad fragment data encountered, current count at " << nbad);
396
397 bool toomany = (m_maxBadEvts >= 0 && nbad > m_maxBadEvts);
398 if (!m_procBadEvent || toomany) {
399 ATH_MSG_ERROR("Cannot continue processing: bad event handling disabled or limit exceeded (" << nbad << " events)");
400 it = *m_endIter;
401 return StatusCode::FAILURE;
402 }
403 ATH_MSG_WARNING("Continue with bad event");
404 }
405 break;
406 }
407 } else {
408 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
409 m_skipEventSequence.erase(m_skipEventSequence.begin());
410 }
411 if ( m_NumEvents % 1'000 == 0 ) {
412 ATH_MSG_INFO("Skipping event " << m_NumEvents - 1);
413 } else {
414 ATH_MSG_DEBUG("Skipping event " << m_NumEvents - 1);
415 }
416 }
417 } // for loop
418
419 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) { // For SharedReader Server, put event into SHM
420 const RawEvent* pre = m_eventSource->currentEvent();
422 while ( (sc = putEvent_ST(*m_eventStreamingTool,
423 m_NumEvents - 1, pre->start(),
424 pre->fragment_size_word() * sizeof(uint32_t),
425 m_eventSource->currentEventStatus())).isRecoverable() ) {
426 usleep(1000);
427 }
428 ATH_CHECK(sc);
429 }
430 return StatusCode::SUCCESS;
431}
432
433//________________________________________________________________________________
434StatusCode EventSelectorByteStream::next(IEvtSelector::Context& ctxt, int jump) const {
436 return nextImpl (ctxt, jump, lock);
437}
438//________________________________________________________________________________
440EventSelectorByteStream::nextImpl(IEvtSelector::Context& ctxt,
441 int jump,
442 lock_t& lock) const
443{
444 if (jump > 0) {
445 if ( m_NumEvents+jump != m_skipEvents.value()) {
446 // Save initial event count
447 unsigned int cntr = m_NumEvents;
448 // In case NumEvents increments multiple times in a single next call
449 while (m_NumEvents+1 <= cntr + jump) {
450 ATH_CHECK(nextImpl(ctxt, lock));
451 }
452 }
453 else ATH_MSG_DEBUG("Jump covered by skip event " << m_skipEvents.value());
454 return StatusCode::SUCCESS;
455 }
456 else {
457 ATH_MSG_WARNING("Called jump next with non-multiple jump");
458 }
459 return StatusCode::SUCCESS;
460}
461
462//________________________________________________________________________________
463StatusCode EventSelectorByteStream::nextHandleFileTransition(IEvtSelector::Context& ctxt) const
464{
466 return nextHandleFileTransitionImpl (ctxt, lock);
467}
468StatusCode EventSelectorByteStream::nextHandleFileTransitionImpl(IEvtSelector::Context& ctxt,
469 lock_t& lock) const
470{
471 const RawEvent* pre{};
472 bool badEvent{};
473 // if event source not ready from init, try next file
474 if (m_filebased && !m_eventSource->ready()) {
475 // next file
476 this->nextFile(lock);
477 if (this->openNewRun(lock).isFailure()) {
478 ATH_MSG_DEBUG("Event source found no more valid files left in input list");
479 m_NumEvents = -1;
480 return StatusCode::FAILURE;
481 }
482 }
483 try {
484 pre = m_eventSource->nextEvent();
485 }
486 catch (const ByteStreamExceptions::readError&) {
487 ATH_MSG_FATAL("Caught ByteStreamExceptions::readError");
488 return StatusCode::FAILURE;
489 }
490 catch (const ByteStreamExceptions::badFragment&) {
491 ATH_MSG_ERROR("badFragment encountered");
492 badEvent = true;
493 }
495 ATH_MSG_ERROR("badFragment data encountered");
496 badEvent = true;
497 }
498 // Check whether a RawEvent has actually been provided
499 if (pre == nullptr) {
500 ctxt = *m_endIter;
501 return StatusCode::FAILURE;
502 }
503
504 // If not secondary just return the status code based on if the event is bas
505 if (!m_isSecondary.value()) {
506 // check bad event flag and handle as configured
507 return badEvent ? StatusCode::RECOVERABLE : StatusCode::SUCCESS;
508 }
509
510 // Build a DH for use by other components
511 StatusCode rec_sg = m_eventSource->generateDataHeader();
512 if (rec_sg != StatusCode::SUCCESS) {
513 ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
514 }
515
516 return StatusCode::SUCCESS;
517}
518//________________________________________________________________________________
519StatusCode EventSelectorByteStream::nextWithSkip(IEvtSelector::Context& ctxt) const
520{
522 return nextWithSkipImpl (ctxt, lock);
523}
524StatusCode EventSelectorByteStream::nextWithSkipImpl(IEvtSelector::Context& ctxt,
525 lock_t& lock) const {
526 ATH_MSG_DEBUG("EventSelectorByteStream::nextWithSkip");
527
528 for (;;) {
529 // Check if we're at the end of file
530 StatusCode sc = nextHandleFileTransitionImpl(ctxt, lock);
531 if (sc.isRecoverable()) {
532 continue; // handles empty files
533 }
534 if (sc.isFailure()) {
535 return StatusCode::FAILURE;
536 }
537
538 // Increase event count
539 ++m_NumEvents;
540
541 if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
542 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
543 }
544 if ( m_NumEvents > m_skipEvents.value() &&
545 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
546 return StatusCode::SUCCESS;
547 } else {
548 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
549 m_skipEventSequence.erase(m_skipEventSequence.begin());
550 }
551 if (m_isSecondary.value()) {
552 ATH_MSG_INFO("skipping secondary event " << m_NumEvents);
553 } else {
554 ATH_MSG_INFO("skipping event " << m_NumEvents);
555 }
556 }
557 }
558
559 return StatusCode::SUCCESS;
560}
561//________________________________________________________________________________
562StatusCode EventSelectorByteStream::previous(IEvtSelector::Context& ctxt) const
563{
565 return previousImpl (ctxt, lock);
566}
567StatusCode EventSelectorByteStream::previousImpl(IEvtSelector::Context& /*ctxt*/,
568 lock_t& /*lock*/) const {
569 ATH_MSG_DEBUG(" ... previous");
570 const RawEvent* pre = nullptr;
571 bool badEvent(false);
572 // if event source not ready from init, try next file
573 if (m_eventSource->ready()) {
574 try {
575 pre = m_eventSource->previousEvent();
576 }
577 catch (const ByteStreamExceptions::readError&) {
578 ATH_MSG_FATAL("Caught ByteStreamExceptions::readError");
579 return StatusCode::FAILURE;
580 }
581 catch (const ByteStreamExceptions::badFragment&) {
582 ATH_MSG_ERROR("badFragment encountered");
583 badEvent = true;
584 }
585 catch (const ByteStreamExceptions::badFragmentData&) {
586 ATH_MSG_ERROR("badFragment data encountered");
587 badEvent = true;
588 }
589 // Check whether a RawEvent has actually been provided
590 if (pre == 0) {
591 ATH_MSG_ERROR("No event built");
592 return StatusCode::FAILURE;
593 }
594 }
595 else {
596 ATH_MSG_FATAL("Attempt to read previous data on invalid reader");
597 return StatusCode::FAILURE;
598 }
599 // increment that an event was found
600
601 // check bad event flag and handle as configured
602 if (badEvent) {
603 ATH_MSG_ERROR("Called previous for bad event");
604 if (!m_procBadEvent) {
605 // End of file
606 return StatusCode::FAILURE;
607 }
608 ATH_MSG_WARNING("Continue with bad event");
609 }
610
611 // Build a DH for use by other components
612 StatusCode rec_sg = m_eventSource->generateDataHeader();
613 if (rec_sg != StatusCode::SUCCESS) {
614 ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
615 }
616
617 return StatusCode::SUCCESS;
618}
619//________________________________________________________________________________
620StatusCode EventSelectorByteStream::previous(IEvtSelector::Context& ctxt, int jump) const {
622 return previousImpl (ctxt, jump, lock);
623}
624//________________________________________________________________________________
626EventSelectorByteStream::previousImpl(IEvtSelector::Context& ctxt,
627 int jump,
628 lock_t& lock) const
629{
630 if (jump > 0) {
631 for (int i = 0; i < jump; i++) {
633 }
634 return StatusCode::SUCCESS;
635 }
636 return StatusCode::FAILURE;
637}
638//________________________________________________________________________________
639StatusCode EventSelectorByteStream::last(IEvtSelector::Context& it)const {
640 if (it.identifier() == m_endIter->identifier()) {
641 ATH_MSG_DEBUG("last(): Last event in InputStream.");
642 return StatusCode::SUCCESS;
643 }
644 return StatusCode::FAILURE;
645}
646//________________________________________________________________________________
647StatusCode EventSelectorByteStream::rewind(IEvtSelector::Context& /*it*/) const {
648 ATH_MSG_ERROR("rewind() not implemented");
649 return StatusCode::FAILURE;
650}
651
652//________________________________________________________________________________
653StatusCode EventSelectorByteStream::resetCriteria(const std::string& /*criteria*/, IEvtSelector::Context& /*ctxt*/) const {
654 return StatusCode::SUCCESS;
655}
656
657//__________________________________________________________________________
658StatusCode EventSelectorByteStream::seek(Context& /* it */, int evtNum) const {
660 // Check that input is seekable
661 if (!m_filebased) {
662 ATH_MSG_ERROR("Input not seekable, choose different input svc");
663 return StatusCode::FAILURE;
664 }
665 // find the file index with that event
666 long fileNum = findEvent(evtNum, lock);
667 if (fileNum == -1 && evtNum >= m_firstEvt[m_fileCount] && evtNum < m_NumEvents) {
668 fileNum = m_fileCount;
669 }
670 // if unable to locate file, exit
671 if (fileNum == -1) {
672 ATH_MSG_INFO("seek: Reached end of Input.");
673 return StatusCode::RECOVERABLE;
674 }
675 // check if it is the current file
676 if (fileNum != m_fileCount) { // event in different file
677 // Close input file if open
678 if (m_eventSource->ready()) m_eventSource->closeBlockIterator(true);
679 ATH_MSG_DEBUG("Seek to item: \"" << m_inputCollectionsProp.value()[fileNum] << "\" from the explicit file list.");
680 std::string fileName = m_inputCollectionsProp.value()[fileNum];
681 m_fileCount = fileNum;
682 // Open the correct file
683 auto nevguid = m_eventSource->getBlockIterator(fileName);
684 long nev = nevguid.first;
685 if (nev == -1) {
686 ATH_MSG_FATAL("Unable to open file with seeked event " << evtNum << " file " << fileName);
687 return StatusCode::FAILURE;
688 }
689 int delta = evtNum - m_firstEvt[m_fileCount];
690 if (delta > 0) {
692 ATH_CHECK(nextImpl(*beginIter,delta, lock));
693 }
694 }
695 // event in current file
696 {
697 int delta = (evtNum - m_firstEvt[m_fileCount] + 1) - m_eventSource->positionInBlock();
698 ATH_MSG_DEBUG("Seeking event " << evtNum << " in current file with delta " << delta);
699 if ( delta == 0 ) { // current event
700 // nothing to do
701 }
702 else if ( delta > 0 ) { // forward
704 ATH_CHECK(this->nextImpl(*beginIter, delta, lock));
705 }
706 else if ( delta < 0 ) { // backward
708 ATH_CHECK(this->previousImpl(*beginIter, -1*delta, lock));
709 }
710 }
711 return StatusCode::SUCCESS;
712}
713
720{
721 std::string listName("EventInfoAtts");
722
723 if (eventStore()->contains<AthenaAttributeList>(listName)) {
724 const AthenaAttributeList* oldAttrList = nullptr;
725 ATH_CHECK(eventStore()->retrieve(oldAttrList, listName));
726 ATH_CHECK(eventStore()->removeDataAndProxy(oldAttrList));
727 }
728
729 // build the new attr list
730 auto attrList = std::make_unique<AthenaAttributeList>();
731
732 // fill the attr list
733 ATH_CHECK(fillAttributeListImpl(attrList.get(), "", false, lock));
734
735 // put result in event store
736 ATH_CHECK(eventStore()->record(std::move(attrList), listName));
737
738 return StatusCode::SUCCESS;
739}
740
741StatusCode EventSelectorByteStream::fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const
742{
744 return fillAttributeListImpl (attrList, suffix, copySource, lock);
745}
746StatusCode EventSelectorByteStream::fillAttributeListImpl(coral::AttributeList *attrList, const std::string &suffix, bool /* copySource */,
747 lock_t& /*lock*/) const
748{
749 attrList->extend("RunNumber" + suffix, "unsigned int");
750 attrList->extend("EventNumber" + suffix, "unsigned long long");
751 attrList->extend("LumiBlockN" + suffix, "unsigned int");
752 attrList->extend("BunchId" + suffix, "unsigned int");
753 attrList->extend("EventTime" + suffix, "unsigned int");
754 attrList->extend("EventTimeNanoSec" + suffix, "unsigned int");
755
756 // fill attribute list
757 const RawEvent* event = m_eventSource->currentEvent();
758
759 (*attrList)["RunNumber" + suffix].data<unsigned int>() = event->run_no();
760 if (event->version() < 0x03010000) {
761 (*attrList)["EventNumber" + suffix].data<unsigned long long>() = event->lvl1_id();
762 } else {
763 (*attrList)["EventNumber" + suffix].data<unsigned long long>() = event->global_id();
764 }
765 (*attrList)["LumiBlockN" + suffix].data<unsigned int>() = event->lumi_block();
766 (*attrList)["BunchId" + suffix].data<unsigned int>() = event->bc_id();
767
768 unsigned int bc_time_sec = event->bc_time_seconds();
769 unsigned int bc_time_ns = event->bc_time_nanoseconds();
770 // bc_time_ns should be lt 1e9.
771 if (bc_time_ns > 1000000000) {
772 // round it off to 1e9
773 ATH_MSG_WARNING(" bc_time nanosecond number larger than 1e9, it is " << bc_time_ns << ", reset it to 1 sec");
774 bc_time_ns = 1000000000;
775 }
776 (*attrList)["EventTime" + suffix].data<unsigned int>() = bc_time_sec;
777 (*attrList)["EventTimeNanoSec" + suffix].data<unsigned int>() = bc_time_ns;
778
780
781 event->status(buffer);
782 attrList->extend("TriggerStatus" + suffix, "unsigned int");
783 (*attrList)["TriggerStatus" + suffix].data<unsigned int>() = *buffer;
784
785 attrList->extend("ExtendedL1ID" + suffix, "unsigned int");
786 attrList->extend("L1TriggerType" + suffix, "unsigned int");
787 (*attrList)["ExtendedL1ID" + suffix].data<unsigned int>() = event->lvl1_id();
788 (*attrList)["L1TriggerType" + suffix].data<unsigned int>() = event->lvl1_trigger_type();
789
790 // Grab L1 words
791 event->lvl1_trigger_info(buffer);
792 for (uint32_t iT1 = 0; iT1 < event->nlvl1_trigger_info(); ++iT1) {
793 std::stringstream name;
794 name << "L1TriggerInfo_" << iT1;
795 attrList->extend(name.str() + suffix, "unsigned int");
796 (*attrList)[name.str() + suffix].data<unsigned int>() = *buffer;
797 ++buffer;
798 }
799
800 // Grab L2 words
801 event->lvl2_trigger_info(buffer);
802 for (uint32_t iT1 = 0; iT1 < event->nlvl2_trigger_info(); ++iT1) {
803 if (*buffer != 0) {
804 std::stringstream name;
805 name << "L2TriggerInfo_" << iT1;
806 attrList->extend(name.str() + suffix, "unsigned int");
807 (*attrList)[name.str() + suffix].data<unsigned int>() = *buffer;
808 }
809 ++buffer;
810 }
811
812 // Grab EF words
813 event->event_filter_info(buffer);
814 for (uint32_t iT1 = 0; iT1 < event->nevent_filter_info(); ++iT1) {
815 if (*buffer != 0) {
816 std::stringstream name;
817 name << "EFTriggerInfo_" << iT1;
818 attrList->extend(name.str() + suffix, "unsigned int");
819 (*attrList)[name.str() + suffix].data<unsigned int>() = *buffer;
820 }
821 ++buffer;
822 }
823
824 // Grab stream tags
825 event->stream_tag(buffer);
826 std::vector<eformat::helper::StreamTag> onl_streamTags;
827 eformat::helper::decode(event->nstream_tag(), buffer, onl_streamTags);
828 for (std::vector<eformat::helper::StreamTag>::const_iterator itS = onl_streamTags.begin(),
829 itSE = onl_streamTags.end(); itS != itSE; ++itS) {
830 attrList->extend(itS->name + suffix, stringTypeStr);
831 (*attrList)[itS->name + suffix].data<std::string>() = itS->type;
832 }
833
834 return StatusCode::SUCCESS;
835}
836
837//__________________________________________________________________________
838int EventSelectorByteStream::findEvent(int evtNum, lock_t& /*lock*/) const {
839 // Loop over file event counts
840 for (size_t i = 0; i < m_inputCollectionsProp.value().size(); i++) {
841 if (m_inputCollectionsProp.value().size() != m_numEvt.size()) {
842 ATH_MSG_ERROR("vector size incompatibility");
843 break;
844 }
845 // if file not opened yet, check it
846 if (m_numEvt[i] == -1) {
847 std::string fileName = m_inputCollectionsProp.value()[i];
848 auto nevguid = m_eventSource->getBlockIterator(fileName);
849 long nev = nevguid.first;
850 // if failure on file open, exit
851 if (nev==-1) {
852 break;
853 }
854 // set initial event counter for that file
855 if (i > 0) {
856 m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
857 } else {
858 m_firstEvt[i] = 0;
859 }
860 // log number of events in that file
861 m_numEvt[i] = nev;
862 }
863 // if sought event is in this file, then return the index of that file
864 if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
865 ATH_MSG_INFO("found " << evtNum << " in file " << i);
866 return i;
867 }
868 }
869 ATH_MSG_INFO("did not find ev " << evtNum);
870 // return file not found marker
871 return -1;
872}
873
874//__________________________________________________________________________
875int EventSelectorByteStream::curEvent (const Context& /*it*/) const {
876 // event counter in IEvtSelectorSeek interface
878 return int(m_NumEvents);
879}
880
881//__________________________________________________________________________
882int EventSelectorByteStream::size (Context& /*it*/) const {
883 return -1;
884}
885
886//________________________________________________________________________________
887StatusCode EventSelectorByteStream::makeServer(int /*num*/) {
889 if (m_eventStreamingTool.empty()) {
890 return StatusCode::FAILURE;
891 }
892 return m_eventStreamingTool->makeServer(1, "");
893}
894
895//________________________________________________________________________________
896StatusCode EventSelectorByteStream::makeClient(int /*num*/) {
898 if (m_eventStreamingTool.empty()) {
899 return StatusCode::FAILURE;
900 }
901 std::string dummyStr;
902 return m_eventStreamingTool->makeClient(0, dummyStr);
903}
904
905//________________________________________________________________________________
906StatusCode EventSelectorByteStream::share(int evtNum) {
908 if (m_eventStreamingTool.empty()) {
909 return StatusCode::FAILURE;
910 }
911 if (m_eventStreamingTool->isClient()) {
912 StatusCode sc = m_eventStreamingTool->lockEvent(evtNum);
913 while (sc.isRecoverable()) {
914 usleep(1000);
915 sc = m_eventStreamingTool->lockEvent(evtNum);
916 }
917 return sc;
918 }
919 return StatusCode::FAILURE;
920}
921
922//________________________________________________________________________________
925 if (m_eventStreamingTool.empty()) {
926 ATH_MSG_ERROR("No AthenaSharedMemoryTool configured for readEvent()");
927 return StatusCode::FAILURE;
928 }
929 ATH_MSG_VERBOSE("Called read Event " << maxevt);
930 for (int i = 0; i < maxevt || maxevt == -1; ++i) {
931 const RawEvent* pre = nullptr;
932 if (this->nextImpl(*m_beginIter, lock).isSuccess()) {
933 pre = m_eventSource->currentEvent();
934 } else {
935 if (m_NumEvents == -1) {
936 ATH_MSG_VERBOSE("Called read Event and read last event from input: " << i);
937 break;
938 }
939 ATH_MSG_ERROR("Unable to retrieve next event for " << i << "/" << maxevt);
940 return StatusCode::FAILURE;
941 }
942 if (m_eventStreamingTool->isServer()) {
943 StatusCode sc;
944 while ( (sc = putEvent_ST(*m_eventStreamingTool,
945 m_NumEvents - 1,
946 pre->start(),
947 pre->fragment_size_word() * sizeof(uint32_t),
948 m_eventSource->currentEventStatus())).isRecoverable() ) {
949 usleep(1000);
950 }
951 ATH_CHECK(sc);
952 }
953 }
954 // End of file, wait for last event to be taken
955 StatusCode sc;
956 while ( (sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
957 usleep(1000);
958 }
959 ATH_CHECK(sc);
960 return StatusCode::SUCCESS;
961}
962
963//________________________________________________________________________________
964StatusCode EventSelectorByteStream::createAddress(const IEvtSelector::Context& /*it*/,
965 IOpaqueAddress*& iop) const {
966 SG::DataProxy* proxy = eventStore()->proxy(ClassID_traits<DataHeader>::ID(),"ByteStreamDataHeader");
967 if (proxy !=0) {
968 iop = proxy->address();
969 return StatusCode::SUCCESS;
970 } else {
971 iop = 0;
972 return StatusCode::FAILURE;
973 }
974}
975
976//________________________________________________________________________________
977StatusCode
978EventSelectorByteStream::releaseContext(IEvtSelector::Context*& /*it*/) const {
979 return StatusCode::SUCCESS;
980}
981
982//________________________________________________________________________________
985 ATH_MSG_INFO("I/O reinitialization...");
986 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
987 ATH_CHECK(iomgr.retrieve());
988 if (!iomgr->io_hasitem(this)) {
989 ATH_MSG_FATAL("IoComponentMgr does not know about myself !");
990 return StatusCode::FAILURE;
991 }
992 std::vector<std::string> inputCollections = m_inputCollectionsProp.value();
993 for (std::size_t i = 0, imax = inputCollections.size(); i != imax; ++i) {
994 ATH_MSG_INFO("I/O reinitialization, file = " << inputCollections[i]);
995 std::string &fname = inputCollections[i];
996 if (!iomgr->io_contains(this, fname)) {
997 ATH_MSG_ERROR("IoComponentMgr does not know about [" << fname << "] !");
998 return StatusCode::FAILURE;
999 }
1000 ATH_CHECK(iomgr->io_retrieve(this, fname));
1001 }
1002 // all good... copy over.
1003 m_inputFileGuard.reset();
1004
1005 // Set m_inputCollectionsProp. But we _dont_ want to run the update
1006 // handler --- that calls reinit(), which will deadlock since
1007 // we're holding the lock. Instead, we'll call reinit() ourselves.
1008 auto old_cb = m_inputCollectionsProp.updateCallBack();
1009 m_inputCollectionsProp.declareUpdateHandler(
1010 [] (Gaudi::Details::PropertyBase&) {}
1011 );
1012 m_inputCollectionsProp = inputCollections;
1013 m_inputCollectionsProp.declareUpdateHandler (std::move(old_cb));
1014
1015 return this->reinit(lock);
1016}
1017
1018//__________________________________________________________________________
1020{
1021 return true;
1022}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the DataHeader and DataHeaderElement classes.
This file contains the class definition for the EventContextByteStream class.
This file contains the class definition for the IAthMetaDataSvc class.
virtual void lock()=0
Interface to allow an object to lock itself when made const in SG.
RAII guard that guarantees a matching end-incident for every begin-incident.
static Double_t sc
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition RawEvent.h:37
size_t size() const
Number of registered mappings.
int imax(int i, int j)
Define macros for attributes used to control the static checker.
#define ATLAS_THREAD_SAFE
An AttributeList represents a logical row of attributes in a metadata table.
This class provides the Context for EventSelectorByteStream.
virtual StatusCode initialize() override
Implementation of Service base class methods.
virtual StatusCode stop() override
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
virtual int size(Context &it) const override
Always returns -1.
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
Gaudi::Property< std::string > m_eventSourceName
ToolHandle< IAthenaSelectorTool > m_counterTool
virtual StatusCode share(int evtnum) override
Request to share a given event number.
Gaudi::Property< bool > m_filebased
virtual StatusCode resetCriteria(const std::string &criteria, Context &context) const override
Set a selection criteria.
std::lock_guard< mutex_t > lock_t
SmartIF< IByteStreamInputSvc > m_eventSource
Gaudi::CheckedProperty< uint32_t > m_firstLBNo
virtual StatusCode readEvent(int maxevt) override
Read the next maxevt events.
EventContextByteStream * m_endIter
int findEvent(int evtNum, lock_t &lock) const
Search for event with number evtNum.
virtual StatusCode releaseContext(Context *&it) const override
virtual StatusCode makeServer(int num) override
Make this a server.
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Gaudi::CheckedProperty< uint64_t > m_eventsPerRun
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
ToolHandleArray< IAthenaSelectorTool > m_helperTools
HelperTools, vector of names of AlgTools that are executed by the EventSelector.
virtual StatusCode finalize() override
StatusCode previousImpl(Context &it, lock_t &lock) const
virtual StatusCode last(Context &it) const override
Gaudi::CheckedProperty< uint32_t > m_initTimeStamp
Gaudi::CheckedProperty< uint32_t > m_eventsPerLB
Gaudi::Property< bool > m_procBadEvent
process bad events, which fail check_tree().
Gaudi::CheckedProperty< uint32_t > m_runNo
virtual StatusCode createContext(Context *&it) const override
create context
virtual StatusCode createAddress(const Context &it, IOpaqueAddress *&iop) const override
virtual StatusCode makeClient(int num) override
Make this a client.
virtual int curEvent(const Context &it) const override
Return the current event number.
ServiceHandle< IIncidentSvc > m_incidentSvc
StatusCode nextImpl(Context &it, lock_t &lock) const
StatusCode fillAttributeListImpl(coral::AttributeList *attrList, const std::string &suffix, bool copySource, lock_t &lock) const
void nextFile(lock_t &lock) const
StatusCode nextHandleFileTransitionImpl(IEvtSelector::Context &ctxt, lock_t &lock) const
virtual StatusCode rewind(Context &it) const override
StatusCode reinit(lock_t &lock)
Reinitialize the service when a fork() occurred/was-issued.
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
virtual StatusCode next(Context &it) const override
int m_fileCount ATLAS_THREAD_SAFE
number of files to process.
StatusCode openNewRun(lock_t &lock) const
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
virtual ~EventSelectorByteStream()
Standard Destructor.
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp
StoreGateSvc * eventStore() const
void inputCollectionsHandler(Gaudi::Details::PropertyBase &)
virtual StatusCode start() override
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
virtual StatusCode seek(Context &, int evtnum) const override
Seek to a given event number.
StatusCode nextWithSkipImpl(IEvtSelector::Context &ctxt, lock_t &lock) const
StatusCode recordAttributeListImpl(lock_t &lock) const
Gaudi::CheckedProperty< uint64_t > m_firstEventNo
EventContextByteStream * m_beginIter
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context &ctxt) const override
Handle file transition at the next iteration.
Gaudi::Property< long > m_skipEvents
virtual StatusCode previous(Context &it) const override
Gaudi::Property< int > m_maxBadEvts
number of bad events allowed before quitting.
EventSelectorByteStream(const std::string &name, ISvcLocator *svcloc)
Standard Constructor.
static void transition(std::optional< InputFileIncidentGuard > &guard, IIncidentSvc &incSvc, std::string_view source, std::string_view beginFileName, std::string_view guid, std::string_view endFileName={}, std::string_view beginType=IncidentType::BeginInputFile, std::string_view endType=IncidentType::EndInputFile)
Replace the guard in an optional, with strict End-before-Begin ordering.
The Athena Transient Store API.
static StoreGateSvc * currentStoreGate()
get current StoreGate
virtual SG::DataProxy * proxy(const void *const pTransient) const override final
get proxy for a given data object address in memory
bool contains(const std::string &s, const std::string &regx)
does a string contain the substring
Definition hcg.cxx:116
::StatusCode StatusCode
StatusCode definition for legacy code.
status
Definition merge.py:16
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.