ATLAS Offline Software
Loading...
Searching...
No Matches
EventSelectorByteStream.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7#include <vector>
8#include <algorithm>
9
14
15#include "GaudiKernel/ClassID.h"
16#include "GaudiKernel/FileIncident.h"
17#include "GaudiKernel/IIncidentSvc.h"
18#include "GaudiKernel/IIoComponentMgr.h"
19
23
24// EventInfoAttributeList includes
27#include "eformat/StreamTag.h"
28
29
30namespace {
32 StatusCode putEvent_ST(const IAthenaIPCTool& tool,
33 long eventNumber, const void* source,
34 size_t nbytes, unsigned int status) {
35 StatusCode sc ATLAS_THREAD_SAFE = tool.putEvent(eventNumber, source, nbytes, status);
36 return sc;
37 }
38}
39
40
41// Constructor.
43 ISvcLocator *svcloc)
44 : base_class(name, svcloc) {
45 // RunNumber, OldRunNumber and OverrideRunNumberFromInput are used
46 // to override the run number coming in on the input stream
47 m_runNo.verifier().setLower(0);
48 // The following properties are only for compatibility with
49 // McEventSelector and are not really used anywhere
50 // TODO(berghaus): validate if those are even used
51 m_eventsPerRun.verifier().setLower(0);
52 m_firstEventNo.verifier().setLower(1);
53 m_firstLBNo.verifier().setLower(0);
54 m_eventsPerLB.verifier().setLower(0);
55 m_initTimeStamp.verifier().setLower(0);
56
57 m_inputCollectionsProp.declareUpdateHandler(
59}
60
61/******************************************************************************/
62void EventSelectorByteStream::inputCollectionsHandler(Gaudi::Details::PropertyBase&) {
64 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
65 this->reinit(lock).ignore();
66 }
67}
68
69
70/******************************************************************************/
73
74
75/******************************************************************************/
80
81
82//________________________________________________________________________________
84
85 m_autoRetrieveTools = false;
86 m_checkToolDeps = false;
87
88 if (m_isSecondary.value()) {
89 ATH_MSG_DEBUG("Initializing secondary event selector " << name());
90 } else {
91 ATH_MSG_DEBUG("Initializing " << name());
92 }
93
94 ATH_CHECK(::AthService::initialize());
95
96 // Check for input setting
97 if (m_filebased && m_inputCollectionsProp.value().empty()) {
98 ATH_MSG_FATAL("Unable to retrieve valid input list");
99 return StatusCode::FAILURE;
100 }
101 m_skipEventSequence = m_skipEventSequenceProp.value();
102 std::sort(m_skipEventSequence.begin(), m_skipEventSequence.end());
103
104 // Check ByteStreamCnvSvc
105 m_eventSource = serviceLocator()->service(m_eventSourceName.value());
106 if (!m_eventSource) {
107 ATH_MSG_FATAL("Cannot get ByteStreamInputSvc");
108 return StatusCode::FAILURE;
109 }
110
111 // Get CounterTool (if configured)
112 if (!m_counterTool.empty()) {
113 ATH_CHECK(m_counterTool.retrieve());
114 }
115 // Get HelperTools
116 if (!m_helperTools.empty()) {
117 ATH_CHECK(m_helperTools.retrieve());
118 }
119 // Get SharedMemoryTool (if configured)
120 if (!m_eventStreamingTool.empty()) {
121 ATH_CHECK(m_eventStreamingTool.retrieve());
122 }
123
124 // Register this service for 'I/O' events
125 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
126 ATH_CHECK(iomgr.retrieve());
127 ATH_CHECK(iomgr->io_register(this));
128
129 // Register the input files with the iomgr
130 bool allGood = true;
131 for (const std::string& input : m_inputCollectionsProp.value()) {
132 if (!iomgr->io_register(this, IIoComponentMgr::IoMode::READ, input).isSuccess()) {
133 ATH_MSG_FATAL("could not register [" << input << "] for output !");
134 allGood = false;
135 } else {
136 ATH_MSG_VERBOSE("io_register[" << this->name() << "](" << input << ") [ok]");
137 }
138 }
139 if (!allGood) {
140 return(StatusCode::FAILURE);
141 }
142
143 // Make sure MetaDataSvc is initialized before the first file is opened
144 ServiceHandle<IAthMetaDataSvc> metaDataSvc("MetaDataSvc", name());
145 ATH_CHECK(metaDataSvc.retrieve());
146
147 // Must happen before trying to open a file
149 StatusCode risc = this->reinit(lock);
150
151 return risc;
152}
153//__________________________________________________________________________
155 ATH_MSG_INFO("reinitialization...");
156 // reset markers
157 if (m_inputCollectionsProp.value().size()>0) {
158 m_numEvt.resize(m_inputCollectionsProp.value().size(), -1);
159 m_firstEvt.resize(m_inputCollectionsProp.value().size(), -1);
160 }
161 else {
162 m_numEvt.resize(1);
163 m_firstEvt.resize(1);
164 }
165
166 // Initialize InputCollectionsIterator
167 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
168 m_NumEvents = 0;
169 bool retError = false;
170 if (!m_helperTools.empty()) {
171 for (ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
172 if (!tool->postInitialize().isSuccess()) {
173 ATH_MSG_FATAL("Failed to postInitialize() " << tool->name());
174 retError = true;
175 }
176 }
177 }
178 if (retError) {
179 ATH_MSG_FATAL("Failed to postInitialize() helperTools");
180 return(StatusCode::FAILURE);
181 }
182
183 // If file based input then fire appropriate incidents
184 if (m_filebased) {
185 if (!m_firstFileFired) {
186 FileIncident firstInputFileIncident(name(), "FirstInputFile", "BSF:" + *m_inputCollectionsIterator);
187 m_incidentSvc->fireIncident(firstInputFileIncident);
188 m_firstFileFired = true;
189 }
190
191 // try to open a file
192 ATH_CHECK(this->openNewRun(lock));
193 }
194
195 return StatusCode::SUCCESS;
196}
197
198//________________________________________________________________________________
200 ATH_MSG_DEBUG("Calling EventSelectorByteStream::start()");
202 // Create the begin and end iterator's for this selector.
204 // Increment to get the new event in.
206
207 return StatusCode::SUCCESS;
208}
209
210//________________________________________________________________________________
212 ATH_MSG_DEBUG("Calling EventSelectorByteStream::stop()");
213 if (m_filebased) {
214 // Fire EndInputFile for any file still open
215 m_inputFileGuard.reset();
216 if (m_eventSource->ready()) {
217 m_eventSource->closeBlockIterator(false);
218 }
219 }
220 return StatusCode::SUCCESS;
221}
222
223//__________________________________________________________________________
225 if (!m_counterTool.empty()) {
226 if (!m_counterTool->preFinalize().isSuccess()) {
227 ATH_MSG_WARNING("Failed to preFinalize() CounterTool");
228 }
229 }
230 for (ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
231 if (!tool->preFinalize().isSuccess()) {
232 ATH_MSG_WARNING("Failed to preFinalize() " << tool->name());
233 }
234 }
235 delete m_beginIter; m_beginIter = nullptr;
236 delete m_endIter; m_endIter = nullptr;
237 if (m_eventSource) m_eventSource->release();
238 // Finalize the Service base class.
239 return AthService::finalize();
240}
241
243 // EndInputFile is handled by the InputFileIncidentGuard via transition()
244 // when the next file is opened, or via reset() on stop().
245 ++m_inputCollectionsIterator;
246 ++m_fileCount;
247}
248
250 // Should be protected upstream, but this is further protection
251 if (!m_filebased) {
252 ATH_MSG_ERROR("cannot open new run for non-filebased inputs");
253 return StatusCode::FAILURE;
254 }
255 // Check for end of file list
256 if (m_inputCollectionsIterator == m_inputCollectionsProp.value().end()) {
257 ATH_MSG_INFO("End of input file list reached");
258 return StatusCode::FAILURE;
259 }
260 std::string blockname = *m_inputCollectionsIterator;
261 // try to open a file
262 auto nevguid = m_eventSource->getBlockIterator(blockname);
263 long nev = nevguid.first;
264 if (nev == -1) {
265 ATH_MSG_FATAL("Unable to access file " << *m_inputCollectionsIterator << ", stopping here");
267 }
268 // Fire EndInputFile for previous file (if any), then BeginInputFile for new file
269 InputFileIncidentGuard::transition(m_inputFileGuard, *m_incidentSvc, name(),
270 "BSF:" + *m_inputCollectionsIterator, nevguid.second,
271 /*endFileName=*/{});
272
273 // check if file is empty
274 if (nev == 0) {
275 ATH_MSG_WARNING("no events in file " << blockname << " try next");
276 if (m_eventSource->ready()) m_eventSource->closeBlockIterator(true);
277 this->nextFile(lock);
278 return openNewRun(lock);
279 // check if skipping all events in that file (minus events already skipped)
280 } else if (m_skipEvents.value() - m_NumEvents > nev) {
281 ATH_MSG_WARNING("skipping more events " << m_skipEvents.value() - m_NumEvents << "(" << nev <<") than in file " << *m_inputCollectionsIterator << ", try next");
282 m_NumEvents += nev;
283 m_numEvt[m_fileCount] = nev;
284 if (m_eventSource->ready()) m_eventSource->closeBlockIterator(true);
285 this->nextFile(lock);
286 return openNewRun(lock);
287 }
288
289 ATH_MSG_DEBUG("Opened block/file " << blockname);
290 m_firstEvt[m_fileCount] = m_NumEvents;
291 m_numEvt[m_fileCount] = nev;
292
293 return StatusCode::SUCCESS;
294}
295
296StatusCode EventSelectorByteStream::createContext(IEvtSelector::Context*& it) const {
297 it = new EventContextByteStream(this);
298 return StatusCode::SUCCESS;
299}
300
301StatusCode EventSelectorByteStream::next(IEvtSelector::Context& it) const {
303 return nextImpl (it, lock);
304}
305StatusCode EventSelectorByteStream::nextImpl(IEvtSelector::Context& it,
306 lock_t& lock) const
307{
308 static std::atomic<int> n_bad_events = 0; // cross loop counter of bad events
309 // Check if this is an athenaMP client process
310 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
311 void* source = nullptr;
312 unsigned int status = 0;
313 ATH_CHECK(m_eventStreamingTool->getLockedEvent(&source, status));
314 m_eventSource->setEvent(static_cast<char*>(source), status);
315 return StatusCode::SUCCESS;
316 }
317 // Call all selector tool preNext before starting loop
318 for (const ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
319 if (!tool->preNext().isSuccess()) {
320 ATH_MSG_WARNING("Failed to preNext() " << tool->name());
321 }
322 }
323 if (!m_counterTool.empty()) {
324 if (!m_counterTool->preNext().isSuccess()) {
325 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
326 }
327 }
328 // Find an event to return
329 for (;;) {
330 bool badEvent{};
332 if (sc.isRecoverable()) {
333 badEvent = true;
334 } else if (sc.isFailure()) {
335 return StatusCode::FAILURE;
336 }
337
338 // increment that an event was found
339 ++m_NumEvents;
340
341 // check bad event flag and handle as configured
342 if (badEvent) {
343 int nbad = ++n_bad_events;
344 ATH_MSG_WARNING("Bad event encountered, current count at " << nbad);
345 bool toomany = (m_maxBadEvts >= 0 && nbad > m_maxBadEvts);
346 if (!m_procBadEvent || toomany) {
347 ATH_MSG_ERROR("Cannot continue processing: bad event handling disabled or limit exceeded (" << nbad << " events)");
348 it = *m_endIter;
349 return StatusCode::FAILURE;
350 }
351 ATH_MSG_WARNING("Continue with bad event");
352 }
353
354 // Check whether properties or tools reject this event
355 if ( m_NumEvents > m_skipEvents.value() &&
356 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
357
358 // Build a DH for use by other components
359 StatusCode rec_sg = m_eventSource->generateDataHeader();
360 if (rec_sg != StatusCode::SUCCESS) {
361 ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
362 }
363
364 // Build event info attribute list
365 if (recordAttributeListImpl(lock).isFailure()) ATH_MSG_WARNING("Unable to build event info att list");
366
367 StatusCode status(StatusCode::SUCCESS);
368 for (const ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
369 StatusCode toolStatus = tool->postNext();
370 if (toolStatus.isRecoverable()) {
371 ATH_MSG_INFO("Request skipping event from: " << tool->name());
372 status = StatusCode::RECOVERABLE;
373 } else if (toolStatus.isFailure()) {
374 ATH_MSG_WARNING("Failed to postNext() " << tool->name());
375 status = StatusCode::FAILURE;
376 }
377 }
378 if (status.isRecoverable()) {
379 ATH_MSG_INFO("skipping event " << m_NumEvents);
380 } else if (status.isFailure()) {
381 ATH_MSG_WARNING("Failed to postNext() HelperTool.");
382 } else {
383 if (!m_counterTool.empty()) {
384 if (!m_counterTool->postNext().isSuccess()) {
385 ATH_MSG_WARNING("Failed to postNext() CounterTool.");
386 }
387 }
388 // Validate the event
389 try {
390 m_eventSource->validateEvent();
391 }
392 catch (const ByteStreamExceptions::badFragmentData&) {
393 int nbad = ++n_bad_events;
394 ATH_MSG_WARNING("Bad fragment data encountered, current count at " << nbad);
395
396 bool toomany = (m_maxBadEvts >= 0 && nbad > m_maxBadEvts);
397 if (!m_procBadEvent || toomany) {
398 ATH_MSG_ERROR("Cannot continue processing: bad event handling disabled or limit exceeded (" << nbad << " events)");
399 it = *m_endIter;
400 return StatusCode::FAILURE;
401 }
402 ATH_MSG_WARNING("Continue with bad event");
403 }
404 break;
405 }
406 } else {
407 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
408 m_skipEventSequence.erase(m_skipEventSequence.begin());
409 }
410 if ( m_NumEvents % 1'000 == 0 ) {
411 ATH_MSG_INFO("Skipping event " << m_NumEvents - 1);
412 } else {
413 ATH_MSG_DEBUG("Skipping event " << m_NumEvents - 1);
414 }
415 }
416 } // for loop
417
418 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) { // For SharedReader Server, put event into SHM
419 const RawEvent* pre = m_eventSource->currentEvent();
421 while ( (sc = putEvent_ST(*m_eventStreamingTool,
422 m_NumEvents - 1, pre->start(),
423 pre->fragment_size_word() * sizeof(uint32_t),
424 m_eventSource->currentEventStatus())).isRecoverable() ) {
425 usleep(1000);
426 }
427 ATH_CHECK(sc);
428 }
429 return StatusCode::SUCCESS;
430}
431
432//________________________________________________________________________________
433StatusCode EventSelectorByteStream::next(IEvtSelector::Context& ctxt, int jump) const {
435 return nextImpl (ctxt, jump, lock);
436}
437//________________________________________________________________________________
439EventSelectorByteStream::nextImpl(IEvtSelector::Context& ctxt,
440 int jump,
441 lock_t& lock) const
442{
443 if (jump > 0) {
444 if ( m_NumEvents+jump != m_skipEvents.value()) {
445 // Save initial event count
446 unsigned int cntr = m_NumEvents;
447 // In case NumEvents increments multiple times in a single next call
448 while (m_NumEvents+1 <= cntr + jump) {
449 ATH_CHECK(nextImpl(ctxt, lock));
450 }
451 }
452 else ATH_MSG_DEBUG("Jump covered by skip event " << m_skipEvents.value());
453 return StatusCode::SUCCESS;
454 }
455 else {
456 ATH_MSG_WARNING("Called jump next with non-multiple jump");
457 }
458 return StatusCode::SUCCESS;
459}
460
461//________________________________________________________________________________
462StatusCode EventSelectorByteStream::nextHandleFileTransition(IEvtSelector::Context& ctxt) const
463{
465 return nextHandleFileTransitionImpl (ctxt, lock);
466}
467StatusCode EventSelectorByteStream::nextHandleFileTransitionImpl(IEvtSelector::Context& ctxt,
468 lock_t& lock) const
469{
470 const RawEvent* pre{};
471 bool badEvent{};
472 // if event source not ready from init, try next file
473 if (m_filebased && !m_eventSource->ready()) {
474 // next file
475 this->nextFile(lock);
476 if (this->openNewRun(lock).isFailure()) {
477 ATH_MSG_DEBUG("Event source found no more valid files left in input list");
478 m_NumEvents = -1;
479 return StatusCode::FAILURE;
480 }
481 }
482 try {
483 pre = m_eventSource->nextEvent();
484 }
485 catch (const ByteStreamExceptions::readError&) {
486 ATH_MSG_FATAL("Caught ByteStreamExceptions::readError");
487 return StatusCode::FAILURE;
488 }
489 catch (const ByteStreamExceptions::badFragment&) {
490 ATH_MSG_ERROR("badFragment encountered");
491 badEvent = true;
492 }
494 ATH_MSG_ERROR("badFragment data encountered");
495 badEvent = true;
496 }
497 // Check whether a RawEvent has actually been provided
498 if (pre == nullptr) {
499 ctxt = *m_endIter;
500 return StatusCode::FAILURE;
501 }
502
503 // If not secondary just return the status code based on if the event is bas
504 if (!m_isSecondary.value()) {
505 // check bad event flag and handle as configured
506 return badEvent ? StatusCode::RECOVERABLE : StatusCode::SUCCESS;
507 }
508
509 // Build a DH for use by other components
510 StatusCode rec_sg = m_eventSource->generateDataHeader();
511 if (rec_sg != StatusCode::SUCCESS) {
512 ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
513 }
514
515 return StatusCode::SUCCESS;
516}
517//________________________________________________________________________________
518StatusCode EventSelectorByteStream::nextWithSkip(IEvtSelector::Context& ctxt) const
519{
521 return nextWithSkipImpl (ctxt, lock);
522}
523StatusCode EventSelectorByteStream::nextWithSkipImpl(IEvtSelector::Context& ctxt,
524 lock_t& lock) const {
525 ATH_MSG_DEBUG("EventSelectorByteStream::nextWithSkip");
526
527 for (;;) {
528 // Check if we're at the end of file
529 StatusCode sc = nextHandleFileTransitionImpl(ctxt, lock);
530 if (sc.isRecoverable()) {
531 continue; // handles empty files
532 }
533 if (sc.isFailure()) {
534 return StatusCode::FAILURE;
535 }
536
537 // Increase event count
538 ++m_NumEvents;
539
540 if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
541 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
542 }
543 if ( m_NumEvents > m_skipEvents.value() &&
544 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
545 return StatusCode::SUCCESS;
546 } else {
547 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
548 m_skipEventSequence.erase(m_skipEventSequence.begin());
549 }
550 if (m_isSecondary.value()) {
551 ATH_MSG_INFO("skipping secondary event " << m_NumEvents);
552 } else {
553 ATH_MSG_INFO("skipping event " << m_NumEvents);
554 }
555 }
556 }
557
558 return StatusCode::SUCCESS;
559}
560//________________________________________________________________________________
561StatusCode EventSelectorByteStream::previous(IEvtSelector::Context& ctxt) const
562{
564 return previousImpl (ctxt, lock);
565}
566StatusCode EventSelectorByteStream::previousImpl(IEvtSelector::Context& /*ctxt*/,
567 lock_t& /*lock*/) const {
568 ATH_MSG_DEBUG(" ... previous");
569 const RawEvent* pre = nullptr;
570 bool badEvent(false);
571 // if event source not ready from init, try next file
572 if (m_eventSource->ready()) {
573 try {
574 pre = m_eventSource->previousEvent();
575 }
576 catch (const ByteStreamExceptions::readError&) {
577 ATH_MSG_FATAL("Caught ByteStreamExceptions::readError");
578 return StatusCode::FAILURE;
579 }
580 catch (const ByteStreamExceptions::badFragment&) {
581 ATH_MSG_ERROR("badFragment encountered");
582 badEvent = true;
583 }
584 catch (const ByteStreamExceptions::badFragmentData&) {
585 ATH_MSG_ERROR("badFragment data encountered");
586 badEvent = true;
587 }
588 // Check whether a RawEvent has actually been provided
589 if (pre == 0) {
590 ATH_MSG_ERROR("No event built");
591 return StatusCode::FAILURE;
592 }
593 }
594 else {
595 ATH_MSG_FATAL("Attempt to read previous data on invalid reader");
596 return StatusCode::FAILURE;
597 }
598 // increment that an event was found
599
600 // check bad event flag and handle as configured
601 if (badEvent) {
602 ATH_MSG_ERROR("Called previous for bad event");
603 if (!m_procBadEvent) {
604 // End of file
605 return StatusCode::FAILURE;
606 }
607 ATH_MSG_WARNING("Continue with bad event");
608 }
609
610 // Build a DH for use by other components
611 StatusCode rec_sg = m_eventSource->generateDataHeader();
612 if (rec_sg != StatusCode::SUCCESS) {
613 ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
614 }
615
616 return StatusCode::SUCCESS;
617}
618//________________________________________________________________________________
619StatusCode EventSelectorByteStream::previous(IEvtSelector::Context& ctxt, int jump) const {
621 return previousImpl (ctxt, jump, lock);
622}
623//________________________________________________________________________________
625EventSelectorByteStream::previousImpl(IEvtSelector::Context& ctxt,
626 int jump,
627 lock_t& lock) const
628{
629 if (jump > 0) {
630 for (int i = 0; i < jump; i++) {
632 }
633 return StatusCode::SUCCESS;
634 }
635 return StatusCode::FAILURE;
636}
637//________________________________________________________________________________
638StatusCode EventSelectorByteStream::last(IEvtSelector::Context& it)const {
639 if (it.identifier() == m_endIter->identifier()) {
640 ATH_MSG_DEBUG("last(): Last event in InputStream.");
641 return StatusCode::SUCCESS;
642 }
643 return StatusCode::FAILURE;
644}
645//________________________________________________________________________________
646StatusCode EventSelectorByteStream::rewind(IEvtSelector::Context& /*it*/) const {
647 ATH_MSG_ERROR("rewind() not implemented");
648 return StatusCode::FAILURE;
649}
650
651//________________________________________________________________________________
652StatusCode EventSelectorByteStream::resetCriteria(const std::string& /*criteria*/, IEvtSelector::Context& /*ctxt*/) const {
653 return StatusCode::SUCCESS;
654}
655
656//__________________________________________________________________________
657StatusCode EventSelectorByteStream::seek(Context& /* it */, int evtNum) const {
659 // Check that input is seekable
660 if (!m_filebased) {
661 ATH_MSG_ERROR("Input not seekable, choose different input svc");
662 return StatusCode::FAILURE;
663 }
664 // find the file index with that event
665 long fileNum = findEvent(evtNum, lock);
666 if (fileNum == -1 && evtNum >= m_firstEvt[m_fileCount] && evtNum < m_NumEvents) {
667 fileNum = m_fileCount;
668 }
669 // if unable to locate file, exit
670 if (fileNum == -1) {
671 ATH_MSG_INFO("seek: Reached end of Input.");
672 return StatusCode::RECOVERABLE;
673 }
674 // check if it is the current file
675 if (fileNum != m_fileCount) { // event in different file
676 // Close input file if open
677 if (m_eventSource->ready()) m_eventSource->closeBlockIterator(true);
678 ATH_MSG_DEBUG("Seek to item: \"" << m_inputCollectionsProp.value()[fileNum] << "\" from the explicit file list.");
679 std::string fileName = m_inputCollectionsProp.value()[fileNum];
680 m_fileCount = fileNum;
681 // Open the correct file
682 auto nevguid = m_eventSource->getBlockIterator(fileName);
683 long nev = nevguid.first;
684 if (nev == -1) {
685 ATH_MSG_FATAL("Unable to open file with seeked event " << evtNum << " file " << fileName);
686 return StatusCode::FAILURE;
687 }
688 int delta = evtNum - m_firstEvt[m_fileCount];
689 if (delta > 0) {
691 ATH_CHECK(nextImpl(*beginIter,delta, lock));
692 }
693 }
694 // event in current file
695 {
696 int delta = (evtNum - m_firstEvt[m_fileCount] + 1) - m_eventSource->positionInBlock();
697 ATH_MSG_DEBUG("Seeking event " << evtNum << " in current file with delta " << delta);
698 if ( delta == 0 ) { // current event
699 // nothing to do
700 }
701 else if ( delta > 0 ) { // forward
703 ATH_CHECK(this->nextImpl(*beginIter, delta, lock));
704 }
705 else if ( delta < 0 ) { // backward
707 ATH_CHECK(this->previousImpl(*beginIter, -1*delta, lock));
708 }
709 }
710 return StatusCode::SUCCESS;
711}
712
719{
720 std::string listName("EventInfoAtts");
721
722 if (eventStore()->contains<AthenaAttributeList>(listName)) {
723 const AthenaAttributeList* oldAttrList = nullptr;
724 ATH_CHECK(eventStore()->retrieve(oldAttrList, listName));
725 ATH_CHECK(eventStore()->removeDataAndProxy(oldAttrList));
726 }
727
728 // build the new attr list
729 auto attrList = std::make_unique<AthenaAttributeList>();
730
731 // fill the attr list
732 ATH_CHECK(fillAttributeListImpl(attrList.get(), "", false, lock));
733
734 // put result in event store
735 ATH_CHECK(eventStore()->record(std::move(attrList), listName));
736
737 return StatusCode::SUCCESS;
738}
739
740StatusCode EventSelectorByteStream::fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const
741{
743 return fillAttributeListImpl (attrList, suffix, copySource, lock);
744}
745StatusCode EventSelectorByteStream::fillAttributeListImpl(coral::AttributeList *attrList, const std::string &suffix, bool /* copySource */,
746 lock_t& /*lock*/) const
747{
748 attrList->extend("RunNumber" + suffix, "unsigned int");
749 attrList->extend("EventNumber" + suffix, "unsigned long long");
750 attrList->extend("LumiBlockN" + suffix, "unsigned int");
751 attrList->extend("BunchId" + suffix, "unsigned int");
752 attrList->extend("EventTime" + suffix, "unsigned int");
753 attrList->extend("EventTimeNanoSec" + suffix, "unsigned int");
754
755 // fill attribute list
756 const RawEvent* event = m_eventSource->currentEvent();
757
758 (*attrList)["RunNumber" + suffix].data<unsigned int>() = event->run_no();
759 if (event->version() < 0x03010000) {
760 (*attrList)["EventNumber" + suffix].data<unsigned long long>() = event->lvl1_id();
761 } else {
762 (*attrList)["EventNumber" + suffix].data<unsigned long long>() = event->global_id();
763 }
764 (*attrList)["LumiBlockN" + suffix].data<unsigned int>() = event->lumi_block();
765 (*attrList)["BunchId" + suffix].data<unsigned int>() = event->bc_id();
766
767 unsigned int bc_time_sec = event->bc_time_seconds();
768 unsigned int bc_time_ns = event->bc_time_nanoseconds();
769 // bc_time_ns should be lt 1e9.
770 if (bc_time_ns > 1000000000) {
771 // round it off to 1e9
772 ATH_MSG_WARNING(" bc_time nanosecond number larger than 1e9, it is " << bc_time_ns << ", reset it to 1 sec");
773 bc_time_ns = 1000000000;
774 }
775 (*attrList)["EventTime" + suffix].data<unsigned int>() = bc_time_sec;
776 (*attrList)["EventTimeNanoSec" + suffix].data<unsigned int>() = bc_time_ns;
777
779
780 event->status(buffer);
781 attrList->extend("TriggerStatus" + suffix, "unsigned int");
782 (*attrList)["TriggerStatus" + suffix].data<unsigned int>() = *buffer;
783
784 attrList->extend("ExtendedL1ID" + suffix, "unsigned int");
785 attrList->extend("L1TriggerType" + suffix, "unsigned int");
786 (*attrList)["ExtendedL1ID" + suffix].data<unsigned int>() = event->lvl1_id();
787 (*attrList)["L1TriggerType" + suffix].data<unsigned int>() = event->lvl1_trigger_type();
788
789 // Grab L1 words
790 event->lvl1_trigger_info(buffer);
791 for (uint32_t iT1 = 0; iT1 < event->nlvl1_trigger_info(); ++iT1) {
792 std::stringstream name;
793 name << "L1TriggerInfo_" << iT1;
794 attrList->extend(name.str() + suffix, "unsigned int");
795 (*attrList)[name.str() + suffix].data<unsigned int>() = *buffer;
796 ++buffer;
797 }
798
799 // Grab L2 words
800 event->lvl2_trigger_info(buffer);
801 for (uint32_t iT1 = 0; iT1 < event->nlvl2_trigger_info(); ++iT1) {
802 if (*buffer != 0) {
803 std::stringstream name;
804 name << "L2TriggerInfo_" << iT1;
805 attrList->extend(name.str() + suffix, "unsigned int");
806 (*attrList)[name.str() + suffix].data<unsigned int>() = *buffer;
807 }
808 ++buffer;
809 }
810
811 // Grab EF words
812 event->event_filter_info(buffer);
813 for (uint32_t iT1 = 0; iT1 < event->nevent_filter_info(); ++iT1) {
814 if (*buffer != 0) {
815 std::stringstream name;
816 name << "EFTriggerInfo_" << iT1;
817 attrList->extend(name.str() + suffix, "unsigned int");
818 (*attrList)[name.str() + suffix].data<unsigned int>() = *buffer;
819 }
820 ++buffer;
821 }
822
823 // Grab stream tags
824 event->stream_tag(buffer);
825 std::vector<eformat::helper::StreamTag> onl_streamTags;
826 eformat::helper::decode(event->nstream_tag(), buffer, onl_streamTags);
827 for (std::vector<eformat::helper::StreamTag>::const_iterator itS = onl_streamTags.begin(),
828 itSE = onl_streamTags.end(); itS != itSE; ++itS) {
829 attrList->extend(itS->name + suffix, "string");
830 (*attrList)[itS->name + suffix].data<std::string>() = itS->type;
831 }
832
833 return StatusCode::SUCCESS;
834}
835
836//__________________________________________________________________________
837int EventSelectorByteStream::findEvent(int evtNum, lock_t& /*lock*/) const {
838 // Loop over file event counts
839 for (size_t i = 0; i < m_inputCollectionsProp.value().size(); i++) {
840 if (m_inputCollectionsProp.value().size() != m_numEvt.size()) {
841 ATH_MSG_ERROR("vector size incompatibility");
842 break;
843 }
844 // if file not opened yet, check it
845 if (m_numEvt[i] == -1) {
846 std::string fileName = m_inputCollectionsProp.value()[i];
847 auto nevguid = m_eventSource->getBlockIterator(fileName);
848 long nev = nevguid.first;
849 // if failure on file open, exit
850 if (nev==-1) {
851 break;
852 }
853 // set initial event counter for that file
854 if (i > 0) {
855 m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
856 } else {
857 m_firstEvt[i] = 0;
858 }
859 // log number of events in that file
860 m_numEvt[i] = nev;
861 }
862 // if sought event is in this file, then return the index of that file
863 if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
864 ATH_MSG_INFO("found " << evtNum << " in file " << i);
865 return i;
866 }
867 }
868 ATH_MSG_INFO("did not find ev " << evtNum);
869 // return file not found marker
870 return -1;
871}
872
873//__________________________________________________________________________
874int EventSelectorByteStream::curEvent (const Context& /*it*/) const {
875 // event counter in IEvtSelectorSeek interface
877 return int(m_NumEvents);
878}
879
880//__________________________________________________________________________
881int EventSelectorByteStream::size (Context& /*it*/) const {
882 return -1;
883}
884
885//________________________________________________________________________________
886StatusCode EventSelectorByteStream::makeServer(int /*num*/) {
888 if (m_eventStreamingTool.empty()) {
889 return StatusCode::FAILURE;
890 }
891 return m_eventStreamingTool->makeServer(1, "");
892}
893
894//________________________________________________________________________________
895StatusCode EventSelectorByteStream::makeClient(int /*num*/) {
897 if (m_eventStreamingTool.empty()) {
898 return StatusCode::FAILURE;
899 }
900 std::string dummyStr;
901 return m_eventStreamingTool->makeClient(0, dummyStr);
902}
903
904//________________________________________________________________________________
905StatusCode EventSelectorByteStream::share(int evtNum) {
907 if (m_eventStreamingTool.empty()) {
908 return StatusCode::FAILURE;
909 }
910 if (m_eventStreamingTool->isClient()) {
911 StatusCode sc = m_eventStreamingTool->lockEvent(evtNum);
912 while (sc.isRecoverable()) {
913 usleep(1000);
914 sc = m_eventStreamingTool->lockEvent(evtNum);
915 }
916 return sc;
917 }
918 return StatusCode::FAILURE;
919}
920
921//________________________________________________________________________________
924 if (m_eventStreamingTool.empty()) {
925 ATH_MSG_ERROR("No AthenaSharedMemoryTool configured for readEvent()");
926 return StatusCode::FAILURE;
927 }
928 ATH_MSG_VERBOSE("Called read Event " << maxevt);
929 for (int i = 0; i < maxevt || maxevt == -1; ++i) {
930 const RawEvent* pre = nullptr;
931 if (this->nextImpl(*m_beginIter, lock).isSuccess()) {
932 pre = m_eventSource->currentEvent();
933 } else {
934 if (m_NumEvents == -1) {
935 ATH_MSG_VERBOSE("Called read Event and read last event from input: " << i);
936 break;
937 }
938 ATH_MSG_ERROR("Unable to retrieve next event for " << i << "/" << maxevt);
939 return StatusCode::FAILURE;
940 }
941 if (m_eventStreamingTool->isServer()) {
942 StatusCode sc;
943 while ( (sc = putEvent_ST(*m_eventStreamingTool,
944 m_NumEvents - 1,
945 pre->start(),
946 pre->fragment_size_word() * sizeof(uint32_t),
947 m_eventSource->currentEventStatus())).isRecoverable() ) {
948 usleep(1000);
949 }
950 ATH_CHECK(sc);
951 }
952 }
953 // End of file, wait for last event to be taken
954 StatusCode sc;
955 while ( (sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
956 usleep(1000);
957 }
958 ATH_CHECK(sc);
959 return StatusCode::SUCCESS;
960}
961
962//________________________________________________________________________________
963StatusCode EventSelectorByteStream::createAddress(const IEvtSelector::Context& /*it*/,
964 IOpaqueAddress*& iop) const {
965 SG::DataProxy* proxy = eventStore()->proxy(ClassID_traits<DataHeader>::ID(),"ByteStreamDataHeader");
966 if (proxy !=0) {
967 iop = proxy->address();
968 return StatusCode::SUCCESS;
969 } else {
970 iop = 0;
971 return StatusCode::FAILURE;
972 }
973}
974
975//________________________________________________________________________________
976StatusCode
977EventSelectorByteStream::releaseContext(IEvtSelector::Context*& /*it*/) const {
978 return StatusCode::SUCCESS;
979}
980
981//________________________________________________________________________________
984 ATH_MSG_INFO("I/O reinitialization...");
985 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
986 ATH_CHECK(iomgr.retrieve());
987 if (!iomgr->io_hasitem(this)) {
988 ATH_MSG_FATAL("IoComponentMgr does not know about myself !");
989 return StatusCode::FAILURE;
990 }
991 std::vector<std::string> inputCollections = m_inputCollectionsProp.value();
992 for (std::size_t i = 0, imax = inputCollections.size(); i != imax; ++i) {
993 ATH_MSG_INFO("I/O reinitialization, file = " << inputCollections[i]);
994 std::string &fname = inputCollections[i];
995 if (!iomgr->io_contains(this, fname)) {
996 ATH_MSG_ERROR("IoComponentMgr does not know about [" << fname << "] !");
997 return StatusCode::FAILURE;
998 }
999 ATH_CHECK(iomgr->io_retrieve(this, fname));
1000 }
1001 // all good... copy over.
1002 m_inputFileGuard.reset();
1003
1004 // Set m_inputCollectionsProp. But we _dont_ want to run the update
1005 // handler --- that calls reinit(), which will deadlock since
1006 // we're holding the lock. Instead, we'll call reinit() ourselves.
1007 auto old_cb = m_inputCollectionsProp.updateCallBack();
1008 m_inputCollectionsProp.declareUpdateHandler(
1009 [] (Gaudi::Details::PropertyBase&) {}
1010 );
1011 m_inputCollectionsProp = inputCollections;
1012 m_inputCollectionsProp.declareUpdateHandler (std::move(old_cb));
1013
1014 return this->reinit(lock);
1015}
1016
1017//__________________________________________________________________________
1019{
1020 return true;
1021}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the DataHeader and DataHeaderElement classes.
This file contains the class definition for the EventContextByteStream class.
This file contains the class definition for the IAthMetaDataSvc class.
virtual void lock()=0
Interface to allow an object to lock itself when made const in SG.
RAII guard that guarantees a matching end-incident for every begin-incident.
static Double_t sc
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition RawEvent.h:37
size_t size() const
Number of registered mappings.
int imax(int i, int j)
Define macros for attributes used to control the static checker.
#define ATLAS_THREAD_SAFE
An AttributeList represents a logical row of attributes in a metadata table.
This class provides the Context for EventSelectorByteStream.
virtual StatusCode initialize() override
Implementation of Service base class methods.
virtual StatusCode stop() override
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
virtual int size(Context &it) const override
Always returns -1.
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
Gaudi::Property< std::string > m_eventSourceName
ToolHandle< IAthenaSelectorTool > m_counterTool
virtual StatusCode share(int evtnum) override
Request to share a given event number.
Gaudi::Property< bool > m_filebased
virtual StatusCode resetCriteria(const std::string &criteria, Context &context) const override
Set a selection criteria.
std::lock_guard< mutex_t > lock_t
SmartIF< IByteStreamInputSvc > m_eventSource
Gaudi::CheckedProperty< uint32_t > m_firstLBNo
virtual StatusCode readEvent(int maxevt) override
Read the next maxevt events.
EventContextByteStream * m_endIter
int findEvent(int evtNum, lock_t &lock) const
Search for event with number evtNum.
virtual StatusCode releaseContext(Context *&it) const override
virtual StatusCode makeServer(int num) override
Make this a server.
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Gaudi::CheckedProperty< uint64_t > m_eventsPerRun
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
ToolHandleArray< IAthenaSelectorTool > m_helperTools
HelperTools, vector of names of AlgTools that are executed by the EventSelector.
virtual StatusCode finalize() override
StatusCode previousImpl(Context &it, lock_t &lock) const
virtual StatusCode last(Context &it) const override
Gaudi::CheckedProperty< uint32_t > m_initTimeStamp
Gaudi::CheckedProperty< uint32_t > m_eventsPerLB
Gaudi::Property< bool > m_procBadEvent
process bad events, which fail check_tree().
Gaudi::CheckedProperty< uint32_t > m_runNo
virtual StatusCode createContext(Context *&it) const override
create context
virtual StatusCode createAddress(const Context &it, IOpaqueAddress *&iop) const override
virtual StatusCode makeClient(int num) override
Make this a client.
virtual int curEvent(const Context &it) const override
Return the current event number.
ServiceHandle< IIncidentSvc > m_incidentSvc
StatusCode nextImpl(Context &it, lock_t &lock) const
StatusCode fillAttributeListImpl(coral::AttributeList *attrList, const std::string &suffix, bool copySource, lock_t &lock) const
void nextFile(lock_t &lock) const
StatusCode nextHandleFileTransitionImpl(IEvtSelector::Context &ctxt, lock_t &lock) const
virtual StatusCode rewind(Context &it) const override
StatusCode reinit(lock_t &lock)
Reinitialize the service when a fork() occurred/was-issued.
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
virtual StatusCode next(Context &it) const override
int m_fileCount ATLAS_THREAD_SAFE
number of files to process.
StatusCode openNewRun(lock_t &lock) const
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
virtual ~EventSelectorByteStream()
Standard Destructor.
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp
StoreGateSvc * eventStore() const
void inputCollectionsHandler(Gaudi::Details::PropertyBase &)
virtual StatusCode start() override
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
virtual StatusCode seek(Context &, int evtnum) const override
Seek to a given event number.
StatusCode nextWithSkipImpl(IEvtSelector::Context &ctxt, lock_t &lock) const
StatusCode recordAttributeListImpl(lock_t &lock) const
Gaudi::CheckedProperty< uint64_t > m_firstEventNo
EventContextByteStream * m_beginIter
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context &ctxt) const override
Handle file transition at the next iteration.
Gaudi::Property< long > m_skipEvents
virtual StatusCode previous(Context &it) const override
Gaudi::Property< int > m_maxBadEvts
number of bad events allowed before quitting.
EventSelectorByteStream(const std::string &name, ISvcLocator *svcloc)
Standard Constructor.
static void transition(std::optional< InputFileIncidentGuard > &guard, IIncidentSvc &incSvc, std::string_view source, std::string_view beginFileName, std::string_view guid, std::string_view endFileName={}, std::string_view beginType=IncidentType::BeginInputFile, std::string_view endType=IncidentType::EndInputFile)
Replace the guard in an optional, with strict End-before-Begin ordering.
The Athena Transient Store API.
static StoreGateSvc * currentStoreGate()
get current StoreGate
virtual SG::DataProxy * proxy(const void *const pTransient) const override final
get proxy for a given data object address in memory
bool contains(const std::string &s, const std::string &regx)
does a string contain the substring
Definition hcg.cxx:116
::StatusCode StatusCode
StatusCode definition for legacy code.
status
Definition merge.py:16
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.