16 #include "GaudiKernel/ClassID.h"
17 #include "GaudiKernel/FileIncident.h"
18 #include "GaudiKernel/IIncidentSvc.h"
19 #include "GaudiKernel/IIoComponentMgr.h"
27 #include "eformat/StreamTag.h"
34 size_t nbytes,
unsigned int status) {
44 : base_class(
name, svcloc) {
66 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
67 this->
reinit(lock).ignore();
87 m_autoRetrieveTools =
false;
88 m_checkToolDeps =
false;
98 return(StatusCode::FAILURE);
104 return(StatusCode::FAILURE);
107 std::sort(m_skipEventSequence.begin(), m_skipEventSequence.end());
113 return(StatusCode::FAILURE);
118 return(StatusCode::FAILURE);
125 return(StatusCode::FAILURE);
132 return(StatusCode::FAILURE);
136 if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.retrieve().isSuccess()) {
138 return(StatusCode::FAILURE);
143 if (!iomgr.retrieve().isSuccess()) {
145 return(StatusCode::FAILURE);
147 if (!iomgr->io_register(
this).isSuccess()) {
148 ATH_MSG_FATAL(
"Cannot register myself with the IoComponentMgr.");
149 return(StatusCode::FAILURE);
164 return(StatusCode::FAILURE);
187 m_firstEvt.resize(1);
193 bool retError =
false;
196 if (!
tool->postInitialize().isSuccess()) {
204 return(StatusCode::FAILURE);
210 FileIncident firstInputFileIncident(
name(),
"FirstInputFile",
"BSF:" + *m_inputCollectionsIterator);
218 return(StatusCode::FAILURE);
224 return(StatusCode::SUCCESS);
236 return(StatusCode::SUCCESS);
247 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"stop");
251 return(StatusCode::SUCCESS);
262 if (!
tool->preFinalize().isSuccess()) {
269 if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.release().isSuccess()) {
288 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"BSF:" + *m_inputCollectionsIterator);
290 ++m_inputCollectionsIterator;
297 ATH_MSG_ERROR(
"cannot open new run for non-filebased inputs");
298 return(StatusCode::FAILURE);
303 return(StatusCode::FAILURE);
305 std::string
blockname = *m_inputCollectionsIterator;
309 long nev = nevguid.first;
311 ATH_MSG_FATAL(
"Unable to access file " << *m_inputCollectionsIterator <<
", stopping here");
316 FileIncident beginInputFileIncident(
name(),
"BeginInputFile",
"BSF:" + *m_inputCollectionsIterator,nevguid.second);
329 ATH_MSG_WARNING(
"skipping more events " <<
m_skipEvents.value() - m_NumEvents <<
"(" << nev <<
") than in file " << *m_inputCollectionsIterator <<
", try next");
331 m_numEvt[m_fileCount] = nev;
338 m_firstEvt[m_fileCount] = m_NumEvents;
339 m_numEvt[m_fileCount] = nev;
341 return(StatusCode::SUCCESS);
346 return(StatusCode::SUCCESS);
356 static std::atomic<int> n_bad_events = 0;
358 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
361 if (!m_eventStreamingTool->getLockedEvent(&source,
status).isSuccess()) {
362 ATH_MSG_FATAL(
"Cannot get NextEvent from AthenaSharedMemoryTool");
363 return(StatusCode::FAILURE);
366 return(StatusCode::SUCCESS);
370 if (!
tool->preNext().isSuccess()) {
383 if (
sc.isRecoverable()) {
385 }
else if (
sc.isFailure()) {
386 return StatusCode::FAILURE;
394 int nbad = ++n_bad_events;
395 ATH_MSG_INFO(
"Bad event encountered, current count at " << nbad);
401 return(StatusCode::FAILURE);
408 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
412 if (rec_sg != StatusCode::SUCCESS) {
413 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
422 if (toolStatus.isRecoverable()) {
424 status = StatusCode::RECOVERABLE;
425 }
else if (toolStatus.isFailure()) {
427 status = StatusCode::FAILURE;
430 if (
status.isRecoverable()) {
432 }
else if (
status.isFailure()) {
450 int nbad = ++n_bad_events;
451 ATH_MSG_INFO(
"Bad event encountered, current count at " << nbad);
458 return(StatusCode::FAILURE);
463 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
464 m_skipEventSequence.erase(m_skipEventSequence.begin());
466 if ( m_NumEvents % 1
'000 == 0 ) {
467 ATH_MSG_INFO("Skipping event " << m_NumEvents - 1);
469 ATH_MSG_DEBUG("Skipping event " << m_NumEvents - 1);
474 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) { // For SharedReader Server, put event into SHM
475 const RawEvent* pre = m_eventSource->currentEvent();
477 while ( (sc = putEvent_ST(*m_eventStreamingTool,
478 m_NumEvents - 1, pre->start(),
479 pre->fragment_size_word() * sizeof(uint32_t),
480 m_eventSource->currentEventStatus())).isRecoverable() ) {
483 if (!sc.isSuccess()) {
484 ATH_MSG_ERROR("Cannot put Event " << m_NumEvents - 1 << " to AthenaSharedMemoryTool");
485 return(StatusCode::FAILURE);
488 return(StatusCode::SUCCESS);
491 //________________________________________________________________________________
492 StatusCode EventSelectorByteStream::next(IEvtSelector::Context& ctxt, int jump) const {
493 lock_t lock (m_mutex);
494 return nextImpl (ctxt, jump, lock);
496 //________________________________________________________________________________
498 EventSelectorByteStream::nextImpl(IEvtSelector::Context& ctxt,
503 if ( m_NumEvents+jump != m_skipEvents.value()) {
504 // Save initial event count
505 unsigned int cntr = m_NumEvents;
506 // In case NumEvents increments multiple times in a single next call
507 while (m_NumEvents+1 <= cntr + jump) {
508 if (!nextImpl(ctxt, lock).isSuccess()) {
509 return(StatusCode::FAILURE);
513 else ATH_MSG_DEBUG("Jump covered by skip event " << m_skipEvents.value());
514 return(StatusCode::SUCCESS);
517 ATH_MSG_WARNING("Called jump next with non-multiple jump");
519 return(StatusCode::SUCCESS);
522 //________________________________________________________________________________
523 StatusCode EventSelectorByteStream::nextHandleFileTransition(IEvtSelector::Context& ctxt) const
525 lock_t lock (m_mutex);
526 return nextHandleFileTransitionImpl (ctxt, lock);
528 StatusCode EventSelectorByteStream::nextHandleFileTransitionImpl(IEvtSelector::Context& ctxt,
531 const RawEvent* pre{};
533 // if event source not ready from init, try next file
534 if (m_filebased && !m_eventSource->ready()) {
536 this->nextFile(lock);
537 if (this->openNewRun(lock).isFailure()) {
538 ATH_MSG_DEBUG("Event source found no more valid files left in input list");
540 return StatusCode::FAILURE;
544 pre = m_eventSource->nextEvent();
546 catch (const ByteStreamExceptions::readError&) {
547 ATH_MSG_FATAL("Caught ByteStreamExceptions::readError");
548 return StatusCode::FAILURE;
550 catch (const ByteStreamExceptions::badFragment&) {
551 ATH_MSG_ERROR("badFragment encountered");
554 catch (const ByteStreamExceptions::badFragmentData&) {
555 ATH_MSG_ERROR("badFragment data encountered");
558 // Check whether a RawEvent has actually been provided
559 if (pre == nullptr) {
561 return StatusCode::FAILURE;
564 // If not secondary just return the status code based on if the event is bas
565 if (!m_isSecondary.value()) {
566 // check bad event flag and handle as configured
567 return badEvent ? StatusCode::RECOVERABLE : StatusCode::SUCCESS;
570 // Build a DH for use by other components
571 StatusCode rec_sg = m_eventSource->generateDataHeader();
572 if (rec_sg != StatusCode::SUCCESS) {
573 ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
576 return StatusCode::SUCCESS;
578 //________________________________________________________________________________
579 StatusCode EventSelectorByteStream::nextWithSkip(IEvtSelector::Context& ctxt) const
581 lock_t lock (m_mutex);
582 return nextWithSkipImpl (ctxt, lock);
584 StatusCode EventSelectorByteStream::nextWithSkipImpl(IEvtSelector::Context& ctxt,
585 lock_t& lock) const {
586 ATH_MSG_DEBUG("EventSelectorByteStream::nextWithSkip");
591 if (
sc.isRecoverable()) {
594 if (
sc.isFailure()) {
595 return StatusCode::FAILURE;
602 ATH_MSG_WARNING(
"Failed to preNext() CounterTool.");
605 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
606 return StatusCode::SUCCESS;
608 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
609 m_skipEventSequence.erase(m_skipEventSequence.begin());
612 ATH_MSG_INFO(
"skipping secondary event " << m_NumEvents);
614 ATH_MSG_INFO(
"skipping event " << m_NumEvents);
619 return StatusCode::SUCCESS;
631 bool badEvent(
false);
639 return StatusCode::FAILURE;
653 return(StatusCode::FAILURE);
657 ATH_MSG_FATAL(
"Attempt to read previous data on invalid reader");
658 return(StatusCode::FAILURE);
669 return(StatusCode::FAILURE);
676 if (rec_sg != StatusCode::SUCCESS) {
677 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
680 return StatusCode::SUCCESS;
694 for (
int i = 0;
i < jump;
i++) {
696 return(StatusCode::FAILURE);
699 return(StatusCode::SUCCESS);
701 return(StatusCode::FAILURE);
707 return(StatusCode::SUCCESS);
709 return(StatusCode::FAILURE);
714 return(StatusCode::FAILURE);
719 return(StatusCode::SUCCESS);
727 ATH_MSG_ERROR(
"Input not seekable, choose different input svc");
728 return StatusCode::FAILURE;
732 if (fileNum == -1 && evtNum >= m_firstEvt[m_fileCount] && evtNum < m_NumEvents) {
733 fileNum = m_fileCount;
738 return(StatusCode::RECOVERABLE);
741 if (fileNum != m_fileCount) {
746 m_fileCount = fileNum;
749 long nev = nevguid.first;
752 return StatusCode::FAILURE;
754 int delta = evtNum - m_firstEvt[m_fileCount];
757 if (
nextImpl(*beginIter,delta, lock).isFailure())
return StatusCode::FAILURE;
763 ATH_MSG_DEBUG(
"Seeking event " << evtNum <<
" in current file with delta " << delta);
767 else if ( delta > 0 ) {
769 if ( this->
nextImpl(*beginIter, delta, lock).isFailure() )
return StatusCode::FAILURE;
771 else if ( delta < 0 ) {
773 if ( this->
previousImpl(*beginIter, -1*delta, lock).isFailure() )
return(StatusCode::FAILURE);
776 return StatusCode::SUCCESS;
786 std::string listName(
"EventInfoAtts");
788 if (
eventStore()->contains<AthenaAttributeList>(listName)) {
791 ATH_MSG_ERROR(
"Cannot retrieve old AttributeList from StoreGate.");
792 return(StatusCode::FAILURE);
794 if (!
eventStore()->removeDataAndProxy(oldAttrList).isSuccess()) {
795 ATH_MSG_ERROR(
"Cannot remove old AttributeList from StoreGate.");
796 return(StatusCode::FAILURE);
801 auto attrList = std::make_unique<AthenaAttributeList>();
807 if (
eventStore()->record(std::move(attrList), listName).isFailure()) {
808 return StatusCode::FAILURE;
811 return StatusCode::SUCCESS;
822 attrList->extend(
"RunNumber" +
suffix,
"unsigned int");
823 attrList->extend(
"EventNumber" +
suffix,
"unsigned long long");
824 attrList->extend(
"LumiBlockN" +
suffix,
"unsigned int");
825 attrList->extend(
"BunchId" +
suffix,
"unsigned int");
826 attrList->extend(
"EventTime" +
suffix,
"unsigned int");
827 attrList->extend(
"EventTimeNanoSec" +
suffix,
"unsigned int");
832 (*attrList)[
"RunNumber" +
suffix].data<
unsigned int>() =
event->run_no();
833 if (
event->version() < 0x03010000) {
834 (*attrList)[
"EventNumber" +
suffix].data<
unsigned long long>() =
event->lvl1_id();
836 (*attrList)[
"EventNumber" +
suffix].data<
unsigned long long>() =
event->global_id();
838 (*attrList)[
"LumiBlockN" +
suffix].data<
unsigned int>() =
event->lumi_block();
839 (*attrList)[
"BunchId" +
suffix].data<
unsigned int>() =
event->bc_id();
841 unsigned int bc_time_sec =
event->bc_time_seconds();
842 unsigned int bc_time_ns =
event->bc_time_nanoseconds();
844 if (bc_time_ns > 1000000000) {
846 ATH_MSG_WARNING(
" bc_time nanosecond number larger than 1e9, it is " << bc_time_ns <<
", reset it to 1 sec");
847 bc_time_ns = 1000000000;
849 (*attrList)[
"EventTime" +
suffix].data<
unsigned int>() = bc_time_sec;
850 (*attrList)[
"EventTimeNanoSec" +
suffix].data<
unsigned int>() = bc_time_ns;
855 attrList->extend(
"TriggerStatus" +
suffix,
"unsigned int");
856 (*attrList)[
"TriggerStatus" +
suffix].data<
unsigned int>() = *
buffer;
858 attrList->extend(
"ExtendedL1ID" +
suffix,
"unsigned int");
859 attrList->extend(
"L1TriggerType" +
suffix,
"unsigned int");
860 (*attrList)[
"ExtendedL1ID" +
suffix].data<
unsigned int>() =
event->lvl1_id();
861 (*attrList)[
"L1TriggerType" +
suffix].data<
unsigned int>() =
event->lvl1_trigger_type();
864 event->lvl1_trigger_info(
buffer);
865 for (
uint32_t iT1 = 0; iT1 <
event->nlvl1_trigger_info(); ++iT1) {
866 std::stringstream
name;
867 name <<
"L1TriggerInfo_" << iT1;
868 attrList->extend(
name.str() +
suffix,
"unsigned int");
874 event->lvl2_trigger_info(
buffer);
875 for (
uint32_t iT1 = 0; iT1 <
event->nlvl2_trigger_info(); ++iT1) {
877 std::stringstream
name;
878 name <<
"L2TriggerInfo_" << iT1;
879 attrList->extend(
name.str() +
suffix,
"unsigned int");
886 event->event_filter_info(
buffer);
887 for (
uint32_t iT1 = 0; iT1 <
event->nevent_filter_info(); ++iT1) {
889 std::stringstream
name;
890 name <<
"EFTriggerInfo_" << iT1;
891 attrList->extend(
name.str() +
suffix,
"unsigned int");
898 event->stream_tag(
buffer);
899 std::vector<eformat::helper::StreamTag> onl_streamTags;
901 for (std::vector<eformat::helper::StreamTag>::const_iterator itS = onl_streamTags.begin(),
902 itSE = onl_streamTags.end(); itS != itSE; ++itS) {
903 attrList->extend(itS->name +
suffix,
"string");
904 (*attrList)[itS->name +
suffix].data<std::string>() = itS->type;
907 return StatusCode::SUCCESS;
920 if (m_numEvt[
i] == -1) {
923 long nev = nevguid.first;
930 m_firstEvt[
i] = m_firstEvt[
i - 1] + m_numEvt[
i - 1];
938 if (evtNum >= m_firstEvt[
i] && evtNum < m_firstEvt[
i] + m_numEvt[
i]) {
952 return int(m_NumEvents);
963 if (m_eventStreamingTool.empty()) {
964 return(StatusCode::FAILURE);
966 return(m_eventStreamingTool->makeServer(1,
""));
972 if (m_eventStreamingTool.empty()) {
973 return(StatusCode::FAILURE);
975 std::string dummyStr;
976 return(m_eventStreamingTool->makeClient(0, dummyStr));
982 if (m_eventStreamingTool.empty()) {
983 return(StatusCode::FAILURE);
985 if (m_eventStreamingTool->isClient()) {
986 StatusCode sc = m_eventStreamingTool->lockEvent(evtNum);
987 while (
sc.isRecoverable()) {
989 sc = m_eventStreamingTool->lockEvent(evtNum);
993 return(StatusCode::FAILURE);
999 if (m_eventStreamingTool.empty()) {
1000 ATH_MSG_ERROR(
"No AthenaSharedMemoryTool configured for readEvent()");
1001 return(StatusCode::FAILURE);
1004 for (
int i = 0;
i < maxevt || maxevt == -1; ++
i) {
1009 if (m_NumEvents == -1) {
1010 ATH_MSG_VERBOSE(
"Called read Event and read last event from input: " <<
i);
1013 ATH_MSG_ERROR(
"Unable to retrieve next event for " <<
i <<
"/" << maxevt);
1014 return(StatusCode::FAILURE);
1016 if (m_eventStreamingTool->isServer()) {
1018 while ( (
sc = putEvent_ST(*m_eventStreamingTool,
1021 pre->fragment_size_word() *
sizeof(
uint32_t),
1025 if (!
sc.isSuccess()) {
1026 ATH_MSG_ERROR(
"Cannot put Event " << m_NumEvents - 1 <<
" to AthenaSharedMemoryTool");
1027 return(StatusCode::FAILURE);
1033 while ( (
sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
1036 if (!
sc.isSuccess()) {
1037 ATH_MSG_ERROR(
"Cannot put last Event marker to AthenaSharedMemoryTool");
1038 return(StatusCode::FAILURE);
1040 return(StatusCode::SUCCESS);
1045 IOpaqueAddress*& iop)
const {
1048 iop =
proxy->address();
1049 return(StatusCode::SUCCESS);
1052 return(StatusCode::FAILURE);
1059 return(StatusCode::SUCCESS);
1064 if (riid == IEvtSelector::interfaceID()) {
1065 *ppvInterface =
dynamic_cast<IEvtSelector*
>(
this);
1066 }
else if (riid == IIoComponent::interfaceID()) {
1067 *ppvInterface =
dynamic_cast<IIoComponent*
>(
this);
1068 }
else if (riid == IProperty::interfaceID()) {
1069 *ppvInterface =
dynamic_cast<IProperty*
>(
this);
1070 }
else if (riid == IEvtSelectorSeek::interfaceID()) {
1074 }
else if (riid == ISecondaryEventSelector::interfaceID()) {
1077 return(Service::queryInterface(riid, ppvInterface));
1080 return(StatusCode::SUCCESS);
1087 if (!iomgr.retrieve().isSuccess()) {
1089 return(StatusCode::FAILURE);
1091 if (!iomgr->io_hasitem(
this)) {
1092 ATH_MSG_FATAL(
"IoComponentMgr does not know about myself !");
1093 return(StatusCode::FAILURE);
1099 if (!iomgr->io_contains(
this,
fname)) {
1101 return(StatusCode::FAILURE);
1103 if (!iomgr->io_retrieve(
this,
fname).isSuccess()) {
1105 return(StatusCode::FAILURE);
1116 [] (Gaudi::Details::PropertyBase&) {}
1121 return(this->
reinit(lock));