15 #include "GaudiKernel/ClassID.h"
16 #include "GaudiKernel/FileIncident.h"
17 #include "GaudiKernel/IIncidentSvc.h"
18 #include "GaudiKernel/IIoComponentMgr.h"
26 #include "eformat/StreamTag.h"
33 size_t nbytes,
unsigned int status) {
43 : base_class(
name, svcloc) {
65 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
66 this->
reinit(lock).ignore();
86 m_autoRetrieveTools =
false;
87 m_checkToolDeps =
false;
97 return(StatusCode::FAILURE);
103 return(StatusCode::FAILURE);
106 std::sort(m_skipEventSequence.begin(), m_skipEventSequence.end());
112 return(StatusCode::FAILURE);
119 return(StatusCode::FAILURE);
126 return(StatusCode::FAILURE);
130 if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.retrieve().isSuccess()) {
132 return(StatusCode::FAILURE);
137 if (!iomgr.retrieve().isSuccess()) {
139 return(StatusCode::FAILURE);
141 if (!iomgr->io_register(
this).isSuccess()) {
142 ATH_MSG_FATAL(
"Cannot register myself with the IoComponentMgr.");
143 return(StatusCode::FAILURE);
158 return(StatusCode::FAILURE);
181 m_firstEvt.resize(1);
187 bool retError =
false;
190 if (!
tool->postInitialize().isSuccess()) {
198 return(StatusCode::FAILURE);
204 FileIncident firstInputFileIncident(
name(),
"FirstInputFile",
"BSF:" + *m_inputCollectionsIterator);
212 return(StatusCode::FAILURE);
218 return(StatusCode::SUCCESS);
230 return(StatusCode::SUCCESS);
241 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"stop");
245 return(StatusCode::SUCCESS);
256 if (!
tool->preFinalize().isSuccess()) {
263 if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.release().isSuccess()) {
282 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"BSF:" + *m_inputCollectionsIterator);
284 ++m_inputCollectionsIterator;
291 ATH_MSG_ERROR(
"cannot open new run for non-filebased inputs");
292 return(StatusCode::FAILURE);
297 return(StatusCode::FAILURE);
299 std::string
blockname = *m_inputCollectionsIterator;
303 long nev = nevguid.first;
305 ATH_MSG_FATAL(
"Unable to access file " << *m_inputCollectionsIterator <<
", stopping here");
310 FileIncident beginInputFileIncident(
name(),
"BeginInputFile",
"BSF:" + *m_inputCollectionsIterator,nevguid.second);
323 ATH_MSG_WARNING(
"skipping more events " <<
m_skipEvents.value() - m_NumEvents <<
"(" <<
nev <<
") than in file " << *m_inputCollectionsIterator <<
", try next");
325 m_numEvt[m_fileCount] =
nev;
332 m_firstEvt[m_fileCount] = m_NumEvents;
333 m_numEvt[m_fileCount] =
nev;
335 return(StatusCode::SUCCESS);
340 return(StatusCode::SUCCESS);
350 static std::atomic<int> n_bad_events = 0;
352 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
355 if (!m_eventStreamingTool->getLockedEvent(&
source,
status).isSuccess()) {
356 ATH_MSG_FATAL(
"Cannot get NextEvent from AthenaSharedMemoryTool");
357 return(StatusCode::FAILURE);
360 return(StatusCode::SUCCESS);
364 if (!
tool->preNext().isSuccess()) {
377 if (
sc.isRecoverable()) {
379 }
else if (
sc.isFailure()) {
380 return StatusCode::FAILURE;
388 int nbad = ++n_bad_events;
389 ATH_MSG_INFO(
"Bad event encountered, current count at " << nbad);
395 return(StatusCode::FAILURE);
402 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
406 if (rec_sg != StatusCode::SUCCESS) {
407 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
416 if (toolStatus.isRecoverable()) {
418 status = StatusCode::RECOVERABLE;
419 }
else if (toolStatus.isFailure()) {
421 status = StatusCode::FAILURE;
424 if (
status.isRecoverable()) {
426 }
else if (
status.isFailure()) {
444 int nbad = ++n_bad_events;
445 ATH_MSG_INFO(
"Bad event encountered, current count at " << nbad);
452 return(StatusCode::FAILURE);
457 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
458 m_skipEventSequence.erase(m_skipEventSequence.begin());
460 if ( m_NumEvents % 1
'000 == 0 ) {
461 ATH_MSG_INFO("Skipping event " << m_NumEvents - 1);
463 ATH_MSG_DEBUG("Skipping event " << m_NumEvents - 1);
468 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) { // For SharedReader Server, put event into SHM
469 const RawEvent* pre = m_eventSource->currentEvent();
471 while ( (sc = putEvent_ST(*m_eventStreamingTool,
472 m_NumEvents - 1, pre->start(),
473 pre->fragment_size_word() * sizeof(uint32_t),
474 m_eventSource->currentEventStatus())).isRecoverable() ) {
477 if (!sc.isSuccess()) {
478 ATH_MSG_ERROR("Cannot put Event " << m_NumEvents - 1 << " to AthenaSharedMemoryTool");
479 return(StatusCode::FAILURE);
482 return(StatusCode::SUCCESS);
485 //________________________________________________________________________________
486 StatusCode EventSelectorByteStream::next(IEvtSelector::Context& ctxt, int jump) const {
487 lock_t lock (m_mutex);
488 return nextImpl (ctxt, jump, lock);
490 //________________________________________________________________________________
492 EventSelectorByteStream::nextImpl(IEvtSelector::Context& ctxt,
497 if ( m_NumEvents+jump != m_skipEvents.value()) {
498 // Save initial event count
499 unsigned int cntr = m_NumEvents;
500 // In case NumEvents increments multiple times in a single next call
501 while (m_NumEvents+1 <= cntr + jump) {
502 if (!nextImpl(ctxt, lock).isSuccess()) {
503 return(StatusCode::FAILURE);
507 else ATH_MSG_DEBUG("Jump covered by skip event " << m_skipEvents.value());
508 return(StatusCode::SUCCESS);
511 ATH_MSG_WARNING("Called jump next with non-multiple jump");
513 return(StatusCode::SUCCESS);
516 //________________________________________________________________________________
517 StatusCode EventSelectorByteStream::nextHandleFileTransition(IEvtSelector::Context& ctxt) const
519 lock_t lock (m_mutex);
520 return nextHandleFileTransitionImpl (ctxt, lock);
522 StatusCode EventSelectorByteStream::nextHandleFileTransitionImpl(IEvtSelector::Context& ctxt,
525 const RawEvent* pre{};
527 // if event source not ready from init, try next file
528 if (m_filebased && !m_eventSource->ready()) {
530 this->nextFile(lock);
531 if (this->openNewRun(lock).isFailure()) {
532 ATH_MSG_DEBUG("Event source found no more valid files left in input list");
534 return StatusCode::FAILURE;
538 pre = m_eventSource->nextEvent();
540 catch (const ByteStreamExceptions::readError&) {
541 ATH_MSG_FATAL("Caught ByteStreamExceptions::readError");
542 return StatusCode::FAILURE;
544 catch (const ByteStreamExceptions::badFragment&) {
545 ATH_MSG_ERROR("badFragment encountered");
548 catch (const ByteStreamExceptions::badFragmentData&) {
549 ATH_MSG_ERROR("badFragment data encountered");
552 // Check whether a RawEvent has actually been provided
553 if (pre == nullptr) {
555 return StatusCode::FAILURE;
558 // If not secondary just return the status code based on if the event is bas
559 if (!m_isSecondary.value()) {
560 // check bad event flag and handle as configured
561 return badEvent ? StatusCode::RECOVERABLE : StatusCode::SUCCESS;
564 // Build a DH for use by other components
565 StatusCode rec_sg = m_eventSource->generateDataHeader();
566 if (rec_sg != StatusCode::SUCCESS) {
567 ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
570 return StatusCode::SUCCESS;
572 //________________________________________________________________________________
573 StatusCode EventSelectorByteStream::nextWithSkip(IEvtSelector::Context& ctxt) const
575 lock_t lock (m_mutex);
576 return nextWithSkipImpl (ctxt, lock);
578 StatusCode EventSelectorByteStream::nextWithSkipImpl(IEvtSelector::Context& ctxt,
579 lock_t& lock) const {
580 ATH_MSG_DEBUG("EventSelectorByteStream::nextWithSkip");
585 if (
sc.isRecoverable()) {
588 if (
sc.isFailure()) {
589 return StatusCode::FAILURE;
596 ATH_MSG_WARNING(
"Failed to preNext() CounterTool.");
599 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
600 return StatusCode::SUCCESS;
602 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
603 m_skipEventSequence.erase(m_skipEventSequence.begin());
606 ATH_MSG_INFO(
"skipping secondary event " << m_NumEvents);
608 ATH_MSG_INFO(
"skipping event " << m_NumEvents);
613 return StatusCode::SUCCESS;
625 bool badEvent(
false);
633 return StatusCode::FAILURE;
647 return(StatusCode::FAILURE);
651 ATH_MSG_FATAL(
"Attempt to read previous data on invalid reader");
652 return(StatusCode::FAILURE);
663 return(StatusCode::FAILURE);
670 if (rec_sg != StatusCode::SUCCESS) {
671 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
674 return StatusCode::SUCCESS;
688 for (
int i = 0;
i < jump;
i++) {
690 return(StatusCode::FAILURE);
693 return(StatusCode::SUCCESS);
695 return(StatusCode::FAILURE);
701 return(StatusCode::SUCCESS);
703 return(StatusCode::FAILURE);
708 return(StatusCode::FAILURE);
713 return(StatusCode::SUCCESS);
721 ATH_MSG_ERROR(
"Input not seekable, choose different input svc");
722 return StatusCode::FAILURE;
726 if (fileNum == -1 && evtNum >= m_firstEvt[m_fileCount] && evtNum < m_NumEvents) {
727 fileNum = m_fileCount;
732 return(StatusCode::RECOVERABLE);
735 if (fileNum != m_fileCount) {
740 m_fileCount = fileNum;
743 long nev = nevguid.first;
746 return StatusCode::FAILURE;
748 int delta = evtNum - m_firstEvt[m_fileCount];
751 if (
nextImpl(*beginIter,delta, lock).isFailure())
return StatusCode::FAILURE;
756 int delta = (evtNum - m_firstEvt[m_fileCount] + 1) -
m_eventSource->positionInBlock();
757 ATH_MSG_DEBUG(
"Seeking event " << evtNum <<
" in current file with delta " << delta);
761 else if ( delta > 0 ) {
763 if ( this->
nextImpl(*beginIter, delta, lock).isFailure() )
return StatusCode::FAILURE;
765 else if ( delta < 0 ) {
767 if ( this->
previousImpl(*beginIter, -1*delta, lock).isFailure() )
return(StatusCode::FAILURE);
770 return StatusCode::SUCCESS;
780 std::string listName(
"EventInfoAtts");
782 if (
eventStore()->contains<AthenaAttributeList>(listName)) {
785 ATH_MSG_ERROR(
"Cannot retrieve old AttributeList from StoreGate.");
786 return(StatusCode::FAILURE);
788 if (!
eventStore()->removeDataAndProxy(oldAttrList).isSuccess()) {
789 ATH_MSG_ERROR(
"Cannot remove old AttributeList from StoreGate.");
790 return(StatusCode::FAILURE);
795 auto attrList = std::make_unique<AthenaAttributeList>();
801 if (
eventStore()->record(std::move(attrList), listName).isFailure()) {
802 return StatusCode::FAILURE;
805 return StatusCode::SUCCESS;
816 attrList->extend(
"RunNumber" +
suffix,
"unsigned int");
817 attrList->extend(
"EventNumber" +
suffix,
"unsigned long long");
818 attrList->extend(
"LumiBlockN" +
suffix,
"unsigned int");
819 attrList->extend(
"BunchId" +
suffix,
"unsigned int");
820 attrList->extend(
"EventTime" +
suffix,
"unsigned int");
821 attrList->extend(
"EventTimeNanoSec" +
suffix,
"unsigned int");
826 (*attrList)[
"RunNumber" +
suffix].data<
unsigned int>() =
event->run_no();
827 if (
event->version() < 0x03010000) {
828 (*attrList)[
"EventNumber" +
suffix].data<
unsigned long long>() =
event->lvl1_id();
830 (*attrList)[
"EventNumber" +
suffix].data<
unsigned long long>() =
event->global_id();
832 (*attrList)[
"LumiBlockN" +
suffix].data<
unsigned int>() =
event->lumi_block();
833 (*attrList)[
"BunchId" +
suffix].data<
unsigned int>() =
event->bc_id();
835 unsigned int bc_time_sec =
event->bc_time_seconds();
836 unsigned int bc_time_ns =
event->bc_time_nanoseconds();
838 if (bc_time_ns > 1000000000) {
840 ATH_MSG_WARNING(
" bc_time nanosecond number larger than 1e9, it is " << bc_time_ns <<
", reset it to 1 sec");
841 bc_time_ns = 1000000000;
843 (*attrList)[
"EventTime" +
suffix].data<
unsigned int>() = bc_time_sec;
844 (*attrList)[
"EventTimeNanoSec" +
suffix].data<
unsigned int>() = bc_time_ns;
849 attrList->extend(
"TriggerStatus" +
suffix,
"unsigned int");
850 (*attrList)[
"TriggerStatus" +
suffix].data<
unsigned int>() = *
buffer;
852 attrList->extend(
"ExtendedL1ID" +
suffix,
"unsigned int");
853 attrList->extend(
"L1TriggerType" +
suffix,
"unsigned int");
854 (*attrList)[
"ExtendedL1ID" +
suffix].data<
unsigned int>() =
event->lvl1_id();
855 (*attrList)[
"L1TriggerType" +
suffix].data<
unsigned int>() =
event->lvl1_trigger_type();
858 event->lvl1_trigger_info(
buffer);
859 for (
uint32_t iT1 = 0; iT1 <
event->nlvl1_trigger_info(); ++iT1) {
860 std::stringstream
name;
861 name <<
"L1TriggerInfo_" << iT1;
862 attrList->extend(
name.str() +
suffix,
"unsigned int");
868 event->lvl2_trigger_info(
buffer);
869 for (
uint32_t iT1 = 0; iT1 <
event->nlvl2_trigger_info(); ++iT1) {
871 std::stringstream
name;
872 name <<
"L2TriggerInfo_" << iT1;
873 attrList->extend(
name.str() +
suffix,
"unsigned int");
880 event->event_filter_info(
buffer);
881 for (
uint32_t iT1 = 0; iT1 <
event->nevent_filter_info(); ++iT1) {
883 std::stringstream
name;
884 name <<
"EFTriggerInfo_" << iT1;
885 attrList->extend(
name.str() +
suffix,
"unsigned int");
892 event->stream_tag(
buffer);
893 std::vector<eformat::helper::StreamTag> onl_streamTags;
895 for (std::vector<eformat::helper::StreamTag>::const_iterator itS = onl_streamTags.begin(),
896 itSE = onl_streamTags.end(); itS != itSE; ++itS) {
897 attrList->extend(itS->name +
suffix,
"string");
898 (*attrList)[itS->name +
suffix].data<std::string>() = itS->type;
901 return StatusCode::SUCCESS;
914 if (m_numEvt[
i] == -1) {
917 long nev = nevguid.first;
924 m_firstEvt[
i] = m_firstEvt[
i - 1] + m_numEvt[
i - 1];
932 if (evtNum >= m_firstEvt[
i] && evtNum < m_firstEvt[
i] + m_numEvt[
i]) {
946 return int(m_NumEvents);
957 if (m_eventStreamingTool.empty()) {
958 return(StatusCode::FAILURE);
960 return(m_eventStreamingTool->makeServer(1,
""));
966 if (m_eventStreamingTool.empty()) {
967 return(StatusCode::FAILURE);
969 std::string dummyStr;
970 return(m_eventStreamingTool->makeClient(0, dummyStr));
976 if (m_eventStreamingTool.empty()) {
977 return(StatusCode::FAILURE);
979 if (m_eventStreamingTool->isClient()) {
980 StatusCode sc = m_eventStreamingTool->lockEvent(evtNum);
981 while (
sc.isRecoverable()) {
983 sc = m_eventStreamingTool->lockEvent(evtNum);
987 return(StatusCode::FAILURE);
993 if (m_eventStreamingTool.empty()) {
994 ATH_MSG_ERROR(
"No AthenaSharedMemoryTool configured for readEvent()");
995 return(StatusCode::FAILURE);
998 for (
int i = 0;
i < maxevt || maxevt == -1; ++
i) {
1003 if (m_NumEvents == -1) {
1004 ATH_MSG_VERBOSE(
"Called read Event and read last event from input: " <<
i);
1007 ATH_MSG_ERROR(
"Unable to retrieve next event for " <<
i <<
"/" << maxevt);
1008 return(StatusCode::FAILURE);
1010 if (m_eventStreamingTool->isServer()) {
1012 while ( (
sc = putEvent_ST(*m_eventStreamingTool,
1015 pre->fragment_size_word() *
sizeof(
uint32_t),
1019 if (!
sc.isSuccess()) {
1020 ATH_MSG_ERROR(
"Cannot put Event " << m_NumEvents - 1 <<
" to AthenaSharedMemoryTool");
1021 return(StatusCode::FAILURE);
1027 while ( (
sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
1030 if (!
sc.isSuccess()) {
1031 ATH_MSG_ERROR(
"Cannot put last Event marker to AthenaSharedMemoryTool");
1032 return(StatusCode::FAILURE);
1034 return(StatusCode::SUCCESS);
1039 IOpaqueAddress*& iop)
const {
1042 iop =
proxy->address();
1043 return(StatusCode::SUCCESS);
1046 return(StatusCode::FAILURE);
1053 return(StatusCode::SUCCESS);
1061 if (!iomgr.retrieve().isSuccess()) {
1063 return(StatusCode::FAILURE);
1065 if (!iomgr->io_hasitem(
this)) {
1066 ATH_MSG_FATAL(
"IoComponentMgr does not know about myself !");
1067 return(StatusCode::FAILURE);
1073 if (!iomgr->io_contains(
this,
fname)) {
1075 return(StatusCode::FAILURE);
1077 if (!iomgr->io_retrieve(
this,
fname).isSuccess()) {
1079 return(StatusCode::FAILURE);
1090 [] (Gaudi::Details::PropertyBase&) {}
1095 return(this->
reinit(lock));