ATLAS Offline Software
Loading...
Searching...
No Matches
EventSelectorByteStream.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7#include <vector>
8#include <algorithm>
9
14
15#include "GaudiKernel/ClassID.h"
16#include "GaudiKernel/FileIncident.h"
17#include "GaudiKernel/IIncidentSvc.h"
18#include "GaudiKernel/IIoComponentMgr.h"
19
22
23// EventInfoAttributeList includes
26#include "eformat/StreamTag.h"
27
28
29namespace {
31 StatusCode putEvent_ST(const IAthenaIPCTool& tool,
32 long eventNumber, const void* source,
33 size_t nbytes, unsigned int status) {
34 StatusCode sc ATLAS_THREAD_SAFE = tool.putEvent(eventNumber, source, nbytes, status);
35 return sc;
36 }
37}
38
39
40// Constructor.
42 ISvcLocator *svcloc)
43 : base_class(name, svcloc) {
44 // RunNumber, OldRunNumber and OverrideRunNumberFromInput are used
45 // to override the run number coming in on the input stream
46 m_runNo.verifier().setLower(0);
47 // The following properties are only for compatibility with
48 // McEventSelector and are not really used anywhere
49 // TODO(berghaus): validate if those are even used
50 m_eventsPerRun.verifier().setLower(0);
51 m_firstEventNo.verifier().setLower(1);
52 m_firstLBNo.verifier().setLower(0);
53 m_eventsPerLB.verifier().setLower(0);
54 m_initTimeStamp.verifier().setLower(0);
55
56 m_inputCollectionsProp.declareUpdateHandler(
58}
59
60/******************************************************************************/
61void EventSelectorByteStream::inputCollectionsHandler(Gaudi::Details::PropertyBase&) {
62 lock_t lock (m_mutex);
63 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
64 this->reinit(lock).ignore();
65 }
66}
67
68
69/******************************************************************************/
72
73
74/******************************************************************************/
79
80
81//________________________________________________________________________________
83
84 m_autoRetrieveTools = false;
85 m_checkToolDeps = false;
86
87 if (m_isSecondary.value()) {
88 ATH_MSG_DEBUG("Initializing secondary event selector " << name());
89 } else {
90 ATH_MSG_DEBUG("Initializing " << name());
91 }
92
93 ATH_CHECK(::AthService::initialize());
94
95 // Check for input setting
96 if (m_filebased && m_inputCollectionsProp.value().empty()) {
97 ATH_MSG_FATAL("Unable to retrieve valid input list");
98 return StatusCode::FAILURE;
99 }
100 m_skipEventSequence = m_skipEventSequenceProp.value();
101 std::sort(m_skipEventSequence.begin(), m_skipEventSequence.end());
102
103 // Check ByteStreamCnvSvc
104 m_eventSource = serviceLocator()->service(m_eventSourceName.value());
105 if (!m_eventSource) {
106 ATH_MSG_FATAL("Cannot get ByteStreamInputSvc");
107 return StatusCode::FAILURE;
108 }
109
110 // Get CounterTool (if configured)
111 if (!m_counterTool.empty()) {
112 ATH_CHECK(m_counterTool.retrieve());
113 }
114 // Get HelperTools
115 if (!m_helperTools.empty()) {
116 ATH_CHECK(m_helperTools.retrieve());
117 }
118 // Get SharedMemoryTool (if configured)
119 if (!m_eventStreamingTool.empty()) {
120 ATH_CHECK(m_eventStreamingTool.retrieve());
121 }
122
123 // Register this service for 'I/O' events
124 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
125 ATH_CHECK(iomgr.retrieve());
126 ATH_CHECK(iomgr->io_register(this));
127
128 // Register the input files with the iomgr
129 bool allGood = true;
130 for (const std::string& input : m_inputCollectionsProp.value()) {
131 if (!iomgr->io_register(this, IIoComponentMgr::IoMode::READ, input).isSuccess()) {
132 ATH_MSG_FATAL("could not register [" << input << "] for output !");
133 allGood = false;
134 } else {
135 ATH_MSG_VERBOSE("io_register[" << this->name() << "](" << input << ") [ok]");
136 }
137 }
138 if (!allGood) {
139 return(StatusCode::FAILURE);
140 }
141
142 // Make sure MetaDataSvc is initialized before the first file is opened
143 ServiceHandle<IAthMetaDataSvc> metaDataSvc("MetaDataSvc", name());
144 ATH_CHECK(metaDataSvc.retrieve());
145
146 // Must happen before trying to open a file
147 lock_t lock (m_mutex);
148 StatusCode risc = this->reinit(lock);
149
150 return risc;
151}
152//__________________________________________________________________________
154 ATH_MSG_INFO("reinitialization...");
155 // reset markers
156 if (m_inputCollectionsProp.value().size()>0) {
157 m_numEvt.resize(m_inputCollectionsProp.value().size(), -1);
158 m_firstEvt.resize(m_inputCollectionsProp.value().size(), -1);
159 }
160 else {
161 m_numEvt.resize(1);
162 m_firstEvt.resize(1);
163 }
164
165 // Initialize InputCollectionsIterator
166 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
167 m_NumEvents = 0;
168 bool retError = false;
169 if (!m_helperTools.empty()) {
170 for (ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
171 if (!tool->postInitialize().isSuccess()) {
172 ATH_MSG_FATAL("Failed to postInitialize() " << tool->name());
173 retError = true;
174 }
175 }
176 }
177 if (retError) {
178 ATH_MSG_FATAL("Failed to postInitialize() helperTools");
179 return(StatusCode::FAILURE);
180 }
181
182 // If file based input then fire appropriate incidents
183 if (m_filebased) {
184 if (!m_firstFileFired) {
185 FileIncident firstInputFileIncident(name(), "FirstInputFile", "BSF:" + *m_inputCollectionsIterator);
186 m_incidentSvc->fireIncident(firstInputFileIncident);
187 m_firstFileFired = true;
188 }
189
190 // try to open a file
191 ATH_CHECK(this->openNewRun(lock));
192 // should be in openNewRun, but see comment there
193 m_beginFileFired = true;
194 }
195
196 return StatusCode::SUCCESS;
197}
198
199//________________________________________________________________________________
201 ATH_MSG_DEBUG("Calling EventSelectorByteStream::start()");
202 lock_t lock (m_mutex);
203 // Create the begin and end iterator's for this selector.
205 // Increment to get the new event in.
207
208 return StatusCode::SUCCESS;
209}
210
211//________________________________________________________________________________
213 ATH_MSG_DEBUG("Calling EventSelectorByteStream::stop()");
214 // Handle open files
215 if (m_filebased) {
216 // Close the file
217 if (m_eventSource->ready()) {
218 m_eventSource->closeBlockIterator(false);
219 FileIncident endInputFileIncident(name(), "EndInputFile", "stop");
220 m_incidentSvc->fireIncident(endInputFileIncident);
221 }
222 }
223 return StatusCode::SUCCESS;
224}
225
226//__________________________________________________________________________
228 if (!m_counterTool.empty()) {
229 if (!m_counterTool->preFinalize().isSuccess()) {
230 ATH_MSG_WARNING("Failed to preFinalize() CounterTool");
231 }
232 }
233 for (ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
234 if (!tool->preFinalize().isSuccess()) {
235 ATH_MSG_WARNING("Failed to preFinalize() " << tool->name());
236 }
237 }
238 delete m_beginIter; m_beginIter = nullptr;
239 delete m_endIter; m_endIter = nullptr;
240 if (m_eventSource) m_eventSource->release();
241 // Finalize the Service base class.
242 return AthService::finalize();
243}
244
246 FileIncident endInputFileIncident(name(), "EndInputFile", "BSF:" + *m_inputCollectionsIterator);
247 m_incidentSvc->fireIncident(endInputFileIncident);
248 ++m_inputCollectionsIterator;
249 ++m_fileCount;
250}
251
253 // Should be protected upstream, but this is further protection
254 if (!m_filebased) {
255 ATH_MSG_ERROR("cannot open new run for non-filebased inputs");
256 return StatusCode::FAILURE;
257 }
258 // Check for end of file list
259 if (m_inputCollectionsIterator == m_inputCollectionsProp.value().end()) {
260 ATH_MSG_INFO("End of input file list reached");
261 return StatusCode::FAILURE;
262 }
263 std::string blockname = *m_inputCollectionsIterator;
264 // try to open a file
265 auto nevguid = m_eventSource->getBlockIterator(blockname);
266 long nev = nevguid.first;
267 if (nev == -1) {
268 ATH_MSG_FATAL("Unable to access file " << *m_inputCollectionsIterator << ", stopping here");
270 }
271 // Fire the incident
272 if (!m_beginFileFired) {
273 FileIncident beginInputFileIncident(name(), "BeginInputFile", "BSF:" + *m_inputCollectionsIterator,nevguid.second);
274 m_incidentSvc->fireIncident(beginInputFileIncident);
275 //m_beginFileFired = true; // Should go here, but can't because IEvtSelector next is const
276 }
277
278 // check if file is empty
279 if (nev == 0) {
280 ATH_MSG_WARNING("no events in file " << blockname << " try next");
281 if (m_eventSource->ready()) m_eventSource->closeBlockIterator(true);
282 this->nextFile(lock);
283 return openNewRun(lock);
284 // check if skipping all events in that file (minus events already skipped)
285 } else if (m_skipEvents.value() - m_NumEvents > nev) {
286 ATH_MSG_WARNING("skipping more events " << m_skipEvents.value() - m_NumEvents << "(" << nev <<") than in file " << *m_inputCollectionsIterator << ", try next");
287 m_NumEvents += nev;
288 m_numEvt[m_fileCount] = nev;
289 if (m_eventSource->ready()) m_eventSource->closeBlockIterator(true);
290 this->nextFile(lock);
291 return openNewRun(lock);
292 }
293
294 ATH_MSG_DEBUG("Opened block/file " << blockname);
295 m_firstEvt[m_fileCount] = m_NumEvents;
296 m_numEvt[m_fileCount] = nev;
297
298 return StatusCode::SUCCESS;
299}
300
301StatusCode EventSelectorByteStream::createContext(IEvtSelector::Context*& it) const {
302 it = new EventContextByteStream(this);
303 return StatusCode::SUCCESS;
304}
305
306StatusCode EventSelectorByteStream::next(IEvtSelector::Context& it) const {
307 lock_t lock (m_mutex);
308 return nextImpl (it, lock);
309}
310StatusCode EventSelectorByteStream::nextImpl(IEvtSelector::Context& it,
311 lock_t& lock) const
312{
313 static std::atomic<int> n_bad_events = 0; // cross loop counter of bad events
314 // Check if this is an athenaMP client process
315 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
316 void* source = nullptr;
317 unsigned int status = 0;
318 ATH_CHECK(m_eventStreamingTool->getLockedEvent(&source, status));
319 m_eventSource->setEvent(static_cast<char*>(source), status);
320 return StatusCode::SUCCESS;
321 }
322 // Call all selector tool preNext before starting loop
323 for (const ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
324 if (!tool->preNext().isSuccess()) {
325 ATH_MSG_WARNING("Failed to preNext() " << tool->name());
326 }
327 }
328 if (!m_counterTool.empty()) {
329 if (!m_counterTool->preNext().isSuccess()) {
330 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
331 }
332 }
333 // Find an event to return
334 for (;;) {
335 bool badEvent{};
337 if (sc.isRecoverable()) {
338 badEvent = true;
339 } else if (sc.isFailure()) {
340 return StatusCode::FAILURE;
341 }
342
343 // increment that an event was found
344 ++m_NumEvents;
345
346 // check bad event flag and handle as configured
347 if (badEvent) {
348 int nbad = ++n_bad_events;
349 ATH_MSG_WARNING("Bad event encountered, current count at " << nbad);
350 bool toomany = (m_maxBadEvts >= 0 && nbad > m_maxBadEvts);
351 if (!m_procBadEvent || toomany) {
352 ATH_MSG_ERROR("Cannot continue processing: bad event handling disabled or limit exceeded (" << nbad << " events)");
353 it = *m_endIter;
354 return StatusCode::FAILURE;
355 }
356 ATH_MSG_WARNING("Continue with bad event");
357 }
358
359 // Check whether properties or tools reject this event
360 if ( m_NumEvents > m_skipEvents.value() &&
361 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
362
363 // Build a DH for use by other components
364 StatusCode rec_sg = m_eventSource->generateDataHeader();
365 if (rec_sg != StatusCode::SUCCESS) {
366 ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
367 }
368
369 // Build event info attribute list
370 if (recordAttributeListImpl(lock).isFailure()) ATH_MSG_WARNING("Unable to build event info att list");
371
372 StatusCode status(StatusCode::SUCCESS);
373 for (const ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
374 StatusCode toolStatus = tool->postNext();
375 if (toolStatus.isRecoverable()) {
376 ATH_MSG_INFO("Request skipping event from: " << tool->name());
377 status = StatusCode::RECOVERABLE;
378 } else if (toolStatus.isFailure()) {
379 ATH_MSG_WARNING("Failed to postNext() " << tool->name());
380 status = StatusCode::FAILURE;
381 }
382 }
383 if (status.isRecoverable()) {
384 ATH_MSG_INFO("skipping event " << m_NumEvents);
385 } else if (status.isFailure()) {
386 ATH_MSG_WARNING("Failed to postNext() HelperTool.");
387 } else {
388 if (!m_counterTool.empty()) {
389 if (!m_counterTool->postNext().isSuccess()) {
390 ATH_MSG_WARNING("Failed to postNext() CounterTool.");
391 }
392 }
393 // Validate the event
394 try {
395 m_eventSource->validateEvent();
396 }
397 catch (const ByteStreamExceptions::badFragmentData&) {
398 int nbad = ++n_bad_events;
399 ATH_MSG_WARNING("Bad fragment data encountered, current count at " << nbad);
400
401 bool toomany = (m_maxBadEvts >= 0 && nbad > m_maxBadEvts);
402 if (!m_procBadEvent || toomany) {
403 ATH_MSG_ERROR("Cannot continue processing: bad event handling disabled or limit exceeded (" << nbad << " events)");
404 it = *m_endIter;
405 return StatusCode::FAILURE;
406 }
407 ATH_MSG_WARNING("Continue with bad event");
408 }
409 break;
410 }
411 } else {
412 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
413 m_skipEventSequence.erase(m_skipEventSequence.begin());
414 }
415 if ( m_NumEvents % 1'000 == 0 ) {
416 ATH_MSG_INFO("Skipping event " << m_NumEvents - 1);
417 } else {
418 ATH_MSG_DEBUG("Skipping event " << m_NumEvents - 1);
419 }
420 }
421 } // for loop
422
423 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) { // For SharedReader Server, put event into SHM
424 const RawEvent* pre = m_eventSource->currentEvent();
426 while ( (sc = putEvent_ST(*m_eventStreamingTool,
427 m_NumEvents - 1, pre->start(),
428 pre->fragment_size_word() * sizeof(uint32_t),
429 m_eventSource->currentEventStatus())).isRecoverable() ) {
430 usleep(1000);
431 }
432 ATH_CHECK(sc);
433 }
434 return StatusCode::SUCCESS;
435}
436
437//________________________________________________________________________________
438StatusCode EventSelectorByteStream::next(IEvtSelector::Context& ctxt, int jump) const {
439 lock_t lock (m_mutex);
440 return nextImpl (ctxt, jump, lock);
441}
442//________________________________________________________________________________
443StatusCode
444EventSelectorByteStream::nextImpl(IEvtSelector::Context& ctxt,
445 int jump,
446 lock_t& lock) const
447{
448 if (jump > 0) {
449 if ( m_NumEvents+jump != m_skipEvents.value()) {
450 // Save initial event count
451 unsigned int cntr = m_NumEvents;
452 // In case NumEvents increments multiple times in a single next call
453 while (m_NumEvents+1 <= cntr + jump) {
454 ATH_CHECK(nextImpl(ctxt, lock));
455 }
456 }
457 else ATH_MSG_DEBUG("Jump covered by skip event " << m_skipEvents.value());
458 return StatusCode::SUCCESS;
459 }
460 else {
461 ATH_MSG_WARNING("Called jump next with non-multiple jump");
462 }
463 return StatusCode::SUCCESS;
464}
465
466//________________________________________________________________________________
467StatusCode EventSelectorByteStream::nextHandleFileTransition(IEvtSelector::Context& ctxt) const
468{
469 lock_t lock (m_mutex);
470 return nextHandleFileTransitionImpl (ctxt, lock);
471}
472StatusCode EventSelectorByteStream::nextHandleFileTransitionImpl(IEvtSelector::Context& ctxt,
473 lock_t& lock) const
474{
475 const RawEvent* pre{};
476 bool badEvent{};
477 // if event source not ready from init, try next file
478 if (m_filebased && !m_eventSource->ready()) {
479 // next file
480 this->nextFile(lock);
481 if (this->openNewRun(lock).isFailure()) {
482 ATH_MSG_DEBUG("Event source found no more valid files left in input list");
483 m_NumEvents = -1;
484 return StatusCode::FAILURE;
485 }
486 }
487 try {
488 pre = m_eventSource->nextEvent();
489 }
490 catch (const ByteStreamExceptions::readError&) {
491 ATH_MSG_FATAL("Caught ByteStreamExceptions::readError");
492 return StatusCode::FAILURE;
493 }
494 catch (const ByteStreamExceptions::badFragment&) {
495 ATH_MSG_ERROR("badFragment encountered");
496 badEvent = true;
497 }
499 ATH_MSG_ERROR("badFragment data encountered");
500 badEvent = true;
501 }
502 // Check whether a RawEvent has actually been provided
503 if (pre == nullptr) {
504 ctxt = *m_endIter;
505 return StatusCode::FAILURE;
506 }
507
508 // If not secondary just return the status code based on if the event is bas
509 if (!m_isSecondary.value()) {
510 // check bad event flag and handle as configured
511 return badEvent ? StatusCode::RECOVERABLE : StatusCode::SUCCESS;
512 }
513
514 // Build a DH for use by other components
515 StatusCode rec_sg = m_eventSource->generateDataHeader();
516 if (rec_sg != StatusCode::SUCCESS) {
517 ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
518 }
519
520 return StatusCode::SUCCESS;
521}
522//________________________________________________________________________________
523StatusCode EventSelectorByteStream::nextWithSkip(IEvtSelector::Context& ctxt) const
524{
525 lock_t lock (m_mutex);
526 return nextWithSkipImpl (ctxt, lock);
527}
528StatusCode EventSelectorByteStream::nextWithSkipImpl(IEvtSelector::Context& ctxt,
529 lock_t& lock) const {
530 ATH_MSG_DEBUG("EventSelectorByteStream::nextWithSkip");
531
532 for (;;) {
533 // Check if we're at the end of file
534 StatusCode sc = nextHandleFileTransitionImpl(ctxt, lock);
535 if (sc.isRecoverable()) {
536 continue; // handles empty files
537 }
538 if (sc.isFailure()) {
539 return StatusCode::FAILURE;
540 }
541
542 // Increase event count
543 ++m_NumEvents;
544
545 if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
546 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
547 }
548 if ( m_NumEvents > m_skipEvents.value() &&
549 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
550 return StatusCode::SUCCESS;
551 } else {
552 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
553 m_skipEventSequence.erase(m_skipEventSequence.begin());
554 }
555 if (m_isSecondary.value()) {
556 ATH_MSG_INFO("skipping secondary event " << m_NumEvents);
557 } else {
558 ATH_MSG_INFO("skipping event " << m_NumEvents);
559 }
560 }
561 }
562
563 return StatusCode::SUCCESS;
564}
565//________________________________________________________________________________
566StatusCode EventSelectorByteStream::previous(IEvtSelector::Context& ctxt) const
567{
568 lock_t lock (m_mutex);
569 return previousImpl (ctxt, lock);
570}
571StatusCode EventSelectorByteStream::previousImpl(IEvtSelector::Context& /*ctxt*/,
572 lock_t& /*lock*/) const {
573 ATH_MSG_DEBUG(" ... previous");
574 const RawEvent* pre = nullptr;
575 bool badEvent(false);
576 // if event source not ready from init, try next file
577 if (m_eventSource->ready()) {
578 try {
579 pre = m_eventSource->previousEvent();
580 }
581 catch (const ByteStreamExceptions::readError&) {
582 ATH_MSG_FATAL("Caught ByteStreamExceptions::readError");
583 return StatusCode::FAILURE;
584 }
585 catch (const ByteStreamExceptions::badFragment&) {
586 ATH_MSG_ERROR("badFragment encountered");
587 badEvent = true;
588 }
589 catch (const ByteStreamExceptions::badFragmentData&) {
590 ATH_MSG_ERROR("badFragment data encountered");
591 badEvent = true;
592 }
593 // Check whether a RawEvent has actually been provided
594 if (pre == 0) {
595 ATH_MSG_ERROR("No event built");
596 return StatusCode::FAILURE;
597 }
598 }
599 else {
600 ATH_MSG_FATAL("Attempt to read previous data on invalid reader");
601 return StatusCode::FAILURE;
602 }
603 // increment that an event was found
604
605 // check bad event flag and handle as configured
606 if (badEvent) {
607 ATH_MSG_ERROR("Called previous for bad event");
608 if (!m_procBadEvent) {
609 // End of file
610 return StatusCode::FAILURE;
611 }
612 ATH_MSG_WARNING("Continue with bad event");
613 }
614
615 // Build a DH for use by other components
616 StatusCode rec_sg = m_eventSource->generateDataHeader();
617 if (rec_sg != StatusCode::SUCCESS) {
618 ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
619 }
620
621 return StatusCode::SUCCESS;
622}
623//________________________________________________________________________________
624StatusCode EventSelectorByteStream::previous(IEvtSelector::Context& ctxt, int jump) const {
625 lock_t lock (m_mutex);
626 return previousImpl (ctxt, jump, lock);
627}
628//________________________________________________________________________________
629StatusCode
630EventSelectorByteStream::previousImpl(IEvtSelector::Context& ctxt,
631 int jump,
632 lock_t& lock) const
633{
634 if (jump > 0) {
635 for (int i = 0; i < jump; i++) {
636 ATH_CHECK(previousImpl(ctxt, lock));
637 }
638 return StatusCode::SUCCESS;
639 }
640 return StatusCode::FAILURE;
641}
642//________________________________________________________________________________
643StatusCode EventSelectorByteStream::last(IEvtSelector::Context& it)const {
644 if (it.identifier() == m_endIter->identifier()) {
645 ATH_MSG_DEBUG("last(): Last event in InputStream.");
646 return StatusCode::SUCCESS;
647 }
648 return StatusCode::FAILURE;
649}
650//________________________________________________________________________________
651StatusCode EventSelectorByteStream::rewind(IEvtSelector::Context& /*it*/) const {
652 ATH_MSG_ERROR("rewind() not implemented");
653 return StatusCode::FAILURE;
654}
655
656//________________________________________________________________________________
657StatusCode EventSelectorByteStream::resetCriteria(const std::string& /*criteria*/, IEvtSelector::Context& /*ctxt*/) const {
658 return StatusCode::SUCCESS;
659}
660
661//__________________________________________________________________________
662StatusCode EventSelectorByteStream::seek(Context& /* it */, int evtNum) const {
663 lock_t lock (m_mutex);
664 // Check that input is seekable
665 if (!m_filebased) {
666 ATH_MSG_ERROR("Input not seekable, choose different input svc");
667 return StatusCode::FAILURE;
668 }
669 // find the file index with that event
670 long fileNum = findEvent(evtNum, lock);
671 if (fileNum == -1 && evtNum >= m_firstEvt[m_fileCount] && evtNum < m_NumEvents) {
672 fileNum = m_fileCount;
673 }
674 // if unable to locate file, exit
675 if (fileNum == -1) {
676 ATH_MSG_INFO("seek: Reached end of Input.");
677 return StatusCode::RECOVERABLE;
678 }
679 // check if it is the current file
680 if (fileNum != m_fileCount) { // event in different file
681 // Close input file if open
682 if (m_eventSource->ready()) m_eventSource->closeBlockIterator(true);
683 ATH_MSG_DEBUG("Seek to item: \"" << m_inputCollectionsProp.value()[fileNum] << "\" from the explicit file list.");
684 std::string fileName = m_inputCollectionsProp.value()[fileNum];
685 m_fileCount = fileNum;
686 // Open the correct file
687 auto nevguid = m_eventSource->getBlockIterator(fileName);
688 long nev = nevguid.first;
689 if (nev == -1) {
690 ATH_MSG_FATAL("Unable to open file with seeked event " << evtNum << " file " << fileName);
691 return StatusCode::FAILURE;
692 }
693 int delta = evtNum - m_firstEvt[m_fileCount];
694 if (delta > 0) {
696 ATH_CHECK(nextImpl(*beginIter,delta, lock));
697 }
698 }
699 // event in current file
700 {
701 int delta = (evtNum - m_firstEvt[m_fileCount] + 1) - m_eventSource->positionInBlock();
702 ATH_MSG_DEBUG("Seeking event " << evtNum << " in current file with delta " << delta);
703 if ( delta == 0 ) { // current event
704 // nothing to do
705 }
706 else if ( delta > 0 ) { // forward
708 ATH_CHECK(this->nextImpl(*beginIter, delta, lock));
709 }
710 else if ( delta < 0 ) { // backward
712 ATH_CHECK(this->previousImpl(*beginIter, -1*delta, lock));
713 }
714 }
715 return StatusCode::SUCCESS;
716}
717
719{
720 lock_t lock (m_mutex);
721 return recordAttributeListImpl (lock);
722}
724{
725 std::string listName("EventInfoAtts");
726
727 if (eventStore()->contains<AthenaAttributeList>(listName)) {
728 const AthenaAttributeList* oldAttrList = nullptr;
729 ATH_CHECK(eventStore()->retrieve(oldAttrList, listName));
730 ATH_CHECK(eventStore()->removeDataAndProxy(oldAttrList));
731 }
732
733 // build the new attr list
734 auto attrList = std::make_unique<AthenaAttributeList>();
735
736 // fill the attr list
737 ATH_CHECK(fillAttributeListImpl(attrList.get(), "", false, lock));
738
739 // put result in event store
740 ATH_CHECK(eventStore()->record(std::move(attrList), listName));
741
742 return StatusCode::SUCCESS;
743}
744
745StatusCode EventSelectorByteStream::fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const
746{
747 lock_t lock (m_mutex);
748 return fillAttributeListImpl (attrList, suffix, copySource, lock);
749}
750StatusCode EventSelectorByteStream::fillAttributeListImpl(coral::AttributeList *attrList, const std::string &suffix, bool /* copySource */,
751 lock_t& /*lock*/) const
752{
753 attrList->extend("RunNumber" + suffix, "unsigned int");
754 attrList->extend("EventNumber" + suffix, "unsigned long long");
755 attrList->extend("LumiBlockN" + suffix, "unsigned int");
756 attrList->extend("BunchId" + suffix, "unsigned int");
757 attrList->extend("EventTime" + suffix, "unsigned int");
758 attrList->extend("EventTimeNanoSec" + suffix, "unsigned int");
759
760 // fill attribute list
761 const RawEvent* event = m_eventSource->currentEvent();
762
763 (*attrList)["RunNumber" + suffix].data<unsigned int>() = event->run_no();
764 if (event->version() < 0x03010000) {
765 (*attrList)["EventNumber" + suffix].data<unsigned long long>() = event->lvl1_id();
766 } else {
767 (*attrList)["EventNumber" + suffix].data<unsigned long long>() = event->global_id();
768 }
769 (*attrList)["LumiBlockN" + suffix].data<unsigned int>() = event->lumi_block();
770 (*attrList)["BunchId" + suffix].data<unsigned int>() = event->bc_id();
771
772 unsigned int bc_time_sec = event->bc_time_seconds();
773 unsigned int bc_time_ns = event->bc_time_nanoseconds();
774 // bc_time_ns should be lt 1e9.
775 if (bc_time_ns > 1000000000) {
776 // round it off to 1e9
777 ATH_MSG_WARNING(" bc_time nanosecond number larger than 1e9, it is " << bc_time_ns << ", reset it to 1 sec");
778 bc_time_ns = 1000000000;
779 }
780 (*attrList)["EventTime" + suffix].data<unsigned int>() = bc_time_sec;
781 (*attrList)["EventTimeNanoSec" + suffix].data<unsigned int>() = bc_time_ns;
782
784
785 event->status(buffer);
786 attrList->extend("TriggerStatus" + suffix, "unsigned int");
787 (*attrList)["TriggerStatus" + suffix].data<unsigned int>() = *buffer;
788
789 attrList->extend("ExtendedL1ID" + suffix, "unsigned int");
790 attrList->extend("L1TriggerType" + suffix, "unsigned int");
791 (*attrList)["ExtendedL1ID" + suffix].data<unsigned int>() = event->lvl1_id();
792 (*attrList)["L1TriggerType" + suffix].data<unsigned int>() = event->lvl1_trigger_type();
793
794 // Grab L1 words
795 event->lvl1_trigger_info(buffer);
796 for (uint32_t iT1 = 0; iT1 < event->nlvl1_trigger_info(); ++iT1) {
797 std::stringstream name;
798 name << "L1TriggerInfo_" << iT1;
799 attrList->extend(name.str() + suffix, "unsigned int");
800 (*attrList)[name.str() + suffix].data<unsigned int>() = *buffer;
801 ++buffer;
802 }
803
804 // Grab L2 words
805 event->lvl2_trigger_info(buffer);
806 for (uint32_t iT1 = 0; iT1 < event->nlvl2_trigger_info(); ++iT1) {
807 if (*buffer != 0) {
808 std::stringstream name;
809 name << "L2TriggerInfo_" << iT1;
810 attrList->extend(name.str() + suffix, "unsigned int");
811 (*attrList)[name.str() + suffix].data<unsigned int>() = *buffer;
812 }
813 ++buffer;
814 }
815
816 // Grab EF words
817 event->event_filter_info(buffer);
818 for (uint32_t iT1 = 0; iT1 < event->nevent_filter_info(); ++iT1) {
819 if (*buffer != 0) {
820 std::stringstream name;
821 name << "EFTriggerInfo_" << iT1;
822 attrList->extend(name.str() + suffix, "unsigned int");
823 (*attrList)[name.str() + suffix].data<unsigned int>() = *buffer;
824 }
825 ++buffer;
826 }
827
828 // Grab stream tags
829 event->stream_tag(buffer);
830 std::vector<eformat::helper::StreamTag> onl_streamTags;
831 eformat::helper::decode(event->nstream_tag(), buffer, onl_streamTags);
832 for (std::vector<eformat::helper::StreamTag>::const_iterator itS = onl_streamTags.begin(),
833 itSE = onl_streamTags.end(); itS != itSE; ++itS) {
834 attrList->extend(itS->name + suffix, "string");
835 (*attrList)[itS->name + suffix].data<std::string>() = itS->type;
836 }
837
838 return StatusCode::SUCCESS;
839}
840
841//__________________________________________________________________________
842int EventSelectorByteStream::findEvent(int evtNum, lock_t& /*lock*/) const {
843 // Loop over file event counts
844 for (size_t i = 0; i < m_inputCollectionsProp.value().size(); i++) {
845 if (m_inputCollectionsProp.value().size() != m_numEvt.size()) {
846 ATH_MSG_ERROR("vector size incompatibility");
847 break;
848 }
849 // if file not opened yet, check it
850 if (m_numEvt[i] == -1) {
851 std::string fileName = m_inputCollectionsProp.value()[i];
852 auto nevguid = m_eventSource->getBlockIterator(fileName);
853 long nev = nevguid.first;
854 // if failure on file open, exit
855 if (nev==-1) {
856 break;
857 }
858 // set initial event counter for that file
859 if (i > 0) {
860 m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
861 } else {
862 m_firstEvt[i] = 0;
863 }
864 // log number of events in that file
865 m_numEvt[i] = nev;
866 }
867 // if sought event is in this file, then return the index of that file
868 if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
869 ATH_MSG_INFO("found " << evtNum << " in file " << i);
870 return i;
871 }
872 }
873 ATH_MSG_INFO("did not find ev " << evtNum);
874 // return file not found marker
875 return -1;
876}
877
878//__________________________________________________________________________
879int EventSelectorByteStream::curEvent (const Context& /*it*/) const {
880 // event counter in IEvtSelectorSeek interface
881 lock_t lock (m_mutex);
882 return int(m_NumEvents);
883}
884
885//__________________________________________________________________________
886int EventSelectorByteStream::size (Context& /*it*/) const {
887 return -1;
888}
889
890//________________________________________________________________________________
891StatusCode EventSelectorByteStream::makeServer(int /*num*/) {
892 lock_t lock (m_mutex);
893 if (m_eventStreamingTool.empty()) {
894 return StatusCode::FAILURE;
895 }
896 return m_eventStreamingTool->makeServer(1, "");
897}
898
899//________________________________________________________________________________
900StatusCode EventSelectorByteStream::makeClient(int /*num*/) {
901 lock_t lock (m_mutex);
902 if (m_eventStreamingTool.empty()) {
903 return StatusCode::FAILURE;
904 }
905 std::string dummyStr;
906 return m_eventStreamingTool->makeClient(0, dummyStr);
907}
908
909//________________________________________________________________________________
910StatusCode EventSelectorByteStream::share(int evtNum) {
911 lock_t lock (m_mutex);
912 if (m_eventStreamingTool.empty()) {
913 return StatusCode::FAILURE;
914 }
915 if (m_eventStreamingTool->isClient()) {
916 StatusCode sc = m_eventStreamingTool->lockEvent(evtNum);
917 while (sc.isRecoverable()) {
918 usleep(1000);
919 sc = m_eventStreamingTool->lockEvent(evtNum);
920 }
921 return sc;
922 }
923 return StatusCode::FAILURE;
924}
925
926//________________________________________________________________________________
928 lock_t lock (m_mutex);
929 if (m_eventStreamingTool.empty()) {
930 ATH_MSG_ERROR("No AthenaSharedMemoryTool configured for readEvent()");
931 return StatusCode::FAILURE;
932 }
933 ATH_MSG_VERBOSE("Called read Event " << maxevt);
934 for (int i = 0; i < maxevt || maxevt == -1; ++i) {
935 const RawEvent* pre = nullptr;
936 if (this->nextImpl(*m_beginIter, lock).isSuccess()) {
937 pre = m_eventSource->currentEvent();
938 } else {
939 if (m_NumEvents == -1) {
940 ATH_MSG_VERBOSE("Called read Event and read last event from input: " << i);
941 break;
942 }
943 ATH_MSG_ERROR("Unable to retrieve next event for " << i << "/" << maxevt);
944 return StatusCode::FAILURE;
945 }
946 if (m_eventStreamingTool->isServer()) {
947 StatusCode sc;
948 while ( (sc = putEvent_ST(*m_eventStreamingTool,
949 m_NumEvents - 1,
950 pre->start(),
951 pre->fragment_size_word() * sizeof(uint32_t),
952 m_eventSource->currentEventStatus())).isRecoverable() ) {
953 usleep(1000);
954 }
955 ATH_CHECK(sc);
956 }
957 }
958 // End of file, wait for last event to be taken
959 StatusCode sc;
960 while ( (sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
961 usleep(1000);
962 }
963 ATH_CHECK(sc);
964 return StatusCode::SUCCESS;
965}
966
967//________________________________________________________________________________
968StatusCode EventSelectorByteStream::createAddress(const IEvtSelector::Context& /*it*/,
969 IOpaqueAddress*& iop) const {
970 SG::DataProxy* proxy = eventStore()->proxy(ClassID_traits<DataHeader>::ID(),"ByteStreamDataHeader");
971 if (proxy !=0) {
972 iop = proxy->address();
973 return StatusCode::SUCCESS;
974 } else {
975 iop = 0;
976 return StatusCode::FAILURE;
977 }
978}
979
980//________________________________________________________________________________
981StatusCode
982EventSelectorByteStream::releaseContext(IEvtSelector::Context*& /*it*/) const {
983 return StatusCode::SUCCESS;
984}
985
986//________________________________________________________________________________
988 lock_t lock (m_mutex);
989 ATH_MSG_INFO("I/O reinitialization...");
990 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
991 ATH_CHECK(iomgr.retrieve());
992 if (!iomgr->io_hasitem(this)) {
993 ATH_MSG_FATAL("IoComponentMgr does not know about myself !");
994 return StatusCode::FAILURE;
995 }
996 std::vector<std::string> inputCollections = m_inputCollectionsProp.value();
997 for (std::size_t i = 0, imax = inputCollections.size(); i != imax; ++i) {
998 ATH_MSG_INFO("I/O reinitialization, file = " << inputCollections[i]);
999 std::string &fname = inputCollections[i];
1000 if (!iomgr->io_contains(this, fname)) {
1001 ATH_MSG_ERROR("IoComponentMgr does not know about [" << fname << "] !");
1002 return StatusCode::FAILURE;
1003 }
1004 ATH_CHECK(iomgr->io_retrieve(this, fname));
1005 }
1006 // all good... copy over.
1007 m_beginFileFired = false;
1008
1009 // Set m_inputCollectionsProp. But we _dont_ want to run the update
1010 // handler --- that calls reinit(), which will deadlock since
1011 // we're holding the lock. Instead, we'll call reinit() ourselves.
1012 auto old_cb = m_inputCollectionsProp.updateCallBack();
1013 m_inputCollectionsProp.declareUpdateHandler(
1014 [] (Gaudi::Details::PropertyBase&) {}
1015 );
1016 m_inputCollectionsProp = inputCollections;
1017 m_inputCollectionsProp.declareUpdateHandler (std::move(old_cb));
1018
1019 return this->reinit(lock);
1020}
1021
1022//__________________________________________________________________________
1024{
1025 return true;
1026}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the DataHeader and DataHeaderElement classes.
This file contains the class definition for the EventContextByteStream class.
This file contains the class definition for the IAthMetaDataSvc class.
static Double_t sc
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition RawEvent.h:37
int imax(int i, int j)
Define macros for attributes used to control the static checker.
#define ATLAS_THREAD_SAFE
An AttributeList represents a logical row of attributes in a metadata table.
This class provides the Context for EventSelectorByteStream.
virtual StatusCode initialize() override
Implementation of Service base class methods.
virtual StatusCode stop() override
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
virtual int size(Context &it) const override
Always returns -1.
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
Gaudi::Property< std::string > m_eventSourceName
ToolHandle< IAthenaSelectorTool > m_counterTool
virtual StatusCode share(int evtnum) override
Request to share a given event number.
Gaudi::Property< bool > m_filebased
virtual StatusCode resetCriteria(const std::string &criteria, Context &context) const override
Set a selection criteria.
std::lock_guard< mutex_t > lock_t
SmartIF< IByteStreamInputSvc > m_eventSource
Gaudi::CheckedProperty< uint32_t > m_firstLBNo
virtual StatusCode readEvent(int maxevt) override
Read the next maxevt events.
EventContextByteStream * m_endIter
int findEvent(int evtNum, lock_t &lock) const
Search for event with number evtNum.
virtual StatusCode releaseContext(Context *&it) const override
virtual StatusCode makeServer(int num) override
Make this a server.
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Gaudi::CheckedProperty< uint64_t > m_eventsPerRun
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
ToolHandleArray< IAthenaSelectorTool > m_helperTools
HelperTools, vector of names of AlgTools that are executed by the EventSelector.
virtual StatusCode finalize() override
StatusCode previousImpl(Context &it, lock_t &lock) const
virtual StatusCode last(Context &it) const override
Gaudi::CheckedProperty< uint32_t > m_initTimeStamp
Gaudi::CheckedProperty< uint32_t > m_eventsPerLB
Gaudi::Property< bool > m_procBadEvent
process bad events, which fail check_tree().
Gaudi::CheckedProperty< uint32_t > m_runNo
virtual StatusCode createContext(Context *&it) const override
create context
virtual StatusCode createAddress(const Context &it, IOpaqueAddress *&iop) const override
virtual StatusCode makeClient(int num) override
Make this a client.
virtual int curEvent(const Context &it) const override
Return the current event number.
ServiceHandle< IIncidentSvc > m_incidentSvc
StatusCode nextImpl(Context &it, lock_t &lock) const
StatusCode fillAttributeListImpl(coral::AttributeList *attrList, const std::string &suffix, bool copySource, lock_t &lock) const
void nextFile(lock_t &lock) const
StatusCode nextHandleFileTransitionImpl(IEvtSelector::Context &ctxt, lock_t &lock) const
virtual StatusCode rewind(Context &it) const override
StatusCode reinit(lock_t &lock)
Reinitialize the service when a fork() occurred/was-issued.
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
virtual StatusCode next(Context &it) const override
int m_fileCount ATLAS_THREAD_SAFE
number of files to process.
StatusCode openNewRun(lock_t &lock) const
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
virtual ~EventSelectorByteStream()
Standard Destructor.
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp
StoreGateSvc * eventStore() const
void inputCollectionsHandler(Gaudi::Details::PropertyBase &)
virtual StatusCode start() override
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
virtual StatusCode seek(Context &, int evtnum) const override
Seek to a given event number.
StatusCode nextWithSkipImpl(IEvtSelector::Context &ctxt, lock_t &lock) const
StatusCode recordAttributeListImpl(lock_t &lock) const
Gaudi::CheckedProperty< uint64_t > m_firstEventNo
EventContextByteStream * m_beginIter
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context &ctxt) const override
Handle file transition at the next iteration.
Gaudi::Property< long > m_skipEvents
virtual StatusCode previous(Context &it) const override
Gaudi::Property< int > m_maxBadEvts
number of bad events allowed before quitting.
EventSelectorByteStream(const std::string &name, ISvcLocator *svcloc)
Standard Constructor.
The Athena Transient Store API.
static StoreGateSvc * currentStoreGate()
get current StoreGate
virtual SG::DataProxy * proxy(const void *const pTransient) const override final
get proxy for a given data object address in memory
bool contains(const std::string &s, const std::string &regx)
does a string contain the substring
Definition hcg.cxx:114
::StatusCode StatusCode
StatusCode definition for legacy code.
status
Definition merge.py:16
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.