15 #include "GaudiKernel/ClassID.h"
16 #include "GaudiKernel/FileIncident.h"
17 #include "GaudiKernel/IIncidentSvc.h"
18 #include "GaudiKernel/IIoComponentMgr.h"
26 #include "eformat/StreamTag.h"
33 size_t nbytes,
unsigned int status) {
43 : base_class(
name, svcloc) {
65 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
66 this->
reinit(lock).ignore();
86 m_autoRetrieveTools =
false;
87 m_checkToolDeps =
false;
97 return(StatusCode::FAILURE);
103 return(StatusCode::FAILURE);
106 std::sort(m_skipEventSequence.begin(), m_skipEventSequence.end());
112 return(StatusCode::FAILURE);
119 return(StatusCode::FAILURE);
126 return(StatusCode::FAILURE);
130 if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.retrieve().isSuccess()) {
132 return(StatusCode::FAILURE);
137 if (!iomgr.retrieve().isSuccess()) {
139 return(StatusCode::FAILURE);
141 if (!iomgr->io_register(
this).isSuccess()) {
142 ATH_MSG_FATAL(
"Cannot register myself with the IoComponentMgr.");
143 return(StatusCode::FAILURE);
158 return(StatusCode::FAILURE);
181 m_firstEvt.resize(1);
187 bool retError =
false;
190 if (!
tool->postInitialize().isSuccess()) {
198 return(StatusCode::FAILURE);
204 FileIncident firstInputFileIncident(
name(),
"FirstInputFile",
"BSF:" + *m_inputCollectionsIterator);
212 return(StatusCode::FAILURE);
218 return(StatusCode::SUCCESS);
230 return(StatusCode::SUCCESS);
241 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"stop");
245 return(StatusCode::SUCCESS);
256 if (!
tool->preFinalize().isSuccess()) {
263 if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.release().isSuccess()) {
282 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"BSF:" + *m_inputCollectionsIterator);
284 ++m_inputCollectionsIterator;
291 ATH_MSG_ERROR(
"cannot open new run for non-filebased inputs");
292 return(StatusCode::FAILURE);
297 return(StatusCode::FAILURE);
299 std::string
blockname = *m_inputCollectionsIterator;
303 long nev = nevguid.first;
305 ATH_MSG_FATAL(
"Unable to access file " << *m_inputCollectionsIterator <<
", stopping here");
310 FileIncident beginInputFileIncident(
name(),
"BeginInputFile",
"BSF:" + *m_inputCollectionsIterator,nevguid.second);
323 ATH_MSG_WARNING(
"skipping more events " <<
m_skipEvents.value() - m_NumEvents <<
"(" <<
nev <<
") than in file " << *m_inputCollectionsIterator <<
", try next");
325 m_numEvt[m_fileCount] =
nev;
332 m_firstEvt[m_fileCount] = m_NumEvents;
333 m_numEvt[m_fileCount] =
nev;
335 return(StatusCode::SUCCESS);
340 return(StatusCode::SUCCESS);
350 static std::atomic<int> n_bad_events = 0;
352 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
355 if (!m_eventStreamingTool->getLockedEvent(&
source,
status).isSuccess()) {
356 ATH_MSG_FATAL(
"Cannot get NextEvent from AthenaSharedMemoryTool");
357 return(StatusCode::FAILURE);
360 return(StatusCode::SUCCESS);
364 if (!
tool->preNext().isSuccess()) {
377 if (
sc.isRecoverable()) {
379 }
else if (
sc.isFailure()) {
380 return StatusCode::FAILURE;
388 int nbad = ++n_bad_events;
392 ATH_MSG_ERROR(
"Cannot continue processing: bad event handling disabled or limit exceeded (" << nbad <<
" events)");
394 return StatusCode::FAILURE;
401 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
405 if (rec_sg != StatusCode::SUCCESS) {
406 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
415 if (toolStatus.isRecoverable()) {
417 status = StatusCode::RECOVERABLE;
418 }
else if (toolStatus.isFailure()) {
420 status = StatusCode::FAILURE;
423 if (
status.isRecoverable()) {
425 }
else if (
status.isFailure()) {
438 int nbad = ++n_bad_events;
439 ATH_MSG_WARNING(
"Bad fragment data encountered, current count at " << nbad);
443 ATH_MSG_ERROR(
"Cannot continue processing: bad event handling disabled or limit exceeded (" << nbad <<
" events)");
445 return StatusCode::FAILURE;
452 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
453 m_skipEventSequence.erase(m_skipEventSequence.begin());
455 if ( m_NumEvents % 1
'000 == 0 ) {
456 ATH_MSG_INFO("Skipping event " << m_NumEvents - 1);
458 ATH_MSG_DEBUG("Skipping event " << m_NumEvents - 1);
463 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) { // For SharedReader Server, put event into SHM
464 const RawEvent* pre = m_eventSource->currentEvent();
466 while ( (sc = putEvent_ST(*m_eventStreamingTool,
467 m_NumEvents - 1, pre->start(),
468 pre->fragment_size_word() * sizeof(uint32_t),
469 m_eventSource->currentEventStatus())).isRecoverable() ) {
472 if (!sc.isSuccess()) {
473 ATH_MSG_ERROR("Cannot put Event " << m_NumEvents - 1 << " to AthenaSharedMemoryTool");
474 return(StatusCode::FAILURE);
477 return(StatusCode::SUCCESS);
480 //________________________________________________________________________________
481 StatusCode EventSelectorByteStream::next(IEvtSelector::Context& ctxt, int jump) const {
482 lock_t lock (m_mutex);
483 return nextImpl (ctxt, jump, lock);
485 //________________________________________________________________________________
487 EventSelectorByteStream::nextImpl(IEvtSelector::Context& ctxt,
492 if ( m_NumEvents+jump != m_skipEvents.value()) {
493 // Save initial event count
494 unsigned int cntr = m_NumEvents;
495 // In case NumEvents increments multiple times in a single next call
496 while (m_NumEvents+1 <= cntr + jump) {
497 if (!nextImpl(ctxt, lock).isSuccess()) {
498 return(StatusCode::FAILURE);
502 else ATH_MSG_DEBUG("Jump covered by skip event " << m_skipEvents.value());
503 return(StatusCode::SUCCESS);
506 ATH_MSG_WARNING("Called jump next with non-multiple jump");
508 return(StatusCode::SUCCESS);
511 //________________________________________________________________________________
512 StatusCode EventSelectorByteStream::nextHandleFileTransition(IEvtSelector::Context& ctxt) const
514 lock_t lock (m_mutex);
515 return nextHandleFileTransitionImpl (ctxt, lock);
517 StatusCode EventSelectorByteStream::nextHandleFileTransitionImpl(IEvtSelector::Context& ctxt,
520 const RawEvent* pre{};
522 // if event source not ready from init, try next file
523 if (m_filebased && !m_eventSource->ready()) {
525 this->nextFile(lock);
526 if (this->openNewRun(lock).isFailure()) {
527 ATH_MSG_DEBUG("Event source found no more valid files left in input list");
529 return StatusCode::FAILURE;
533 pre = m_eventSource->nextEvent();
535 catch (const ByteStreamExceptions::readError&) {
536 ATH_MSG_FATAL("Caught ByteStreamExceptions::readError");
537 return StatusCode::FAILURE;
539 catch (const ByteStreamExceptions::badFragment&) {
540 ATH_MSG_ERROR("badFragment encountered");
543 catch (const ByteStreamExceptions::badFragmentData&) {
544 ATH_MSG_ERROR("badFragment data encountered");
547 // Check whether a RawEvent has actually been provided
548 if (pre == nullptr) {
550 return StatusCode::FAILURE;
553 // If not secondary just return the status code based on if the event is bas
554 if (!m_isSecondary.value()) {
555 // check bad event flag and handle as configured
556 return badEvent ? StatusCode::RECOVERABLE : StatusCode::SUCCESS;
559 // Build a DH for use by other components
560 StatusCode rec_sg = m_eventSource->generateDataHeader();
561 if (rec_sg != StatusCode::SUCCESS) {
562 ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
565 return StatusCode::SUCCESS;
567 //________________________________________________________________________________
568 StatusCode EventSelectorByteStream::nextWithSkip(IEvtSelector::Context& ctxt) const
570 lock_t lock (m_mutex);
571 return nextWithSkipImpl (ctxt, lock);
573 StatusCode EventSelectorByteStream::nextWithSkipImpl(IEvtSelector::Context& ctxt,
574 lock_t& lock) const {
575 ATH_MSG_DEBUG("EventSelectorByteStream::nextWithSkip");
580 if (
sc.isRecoverable()) {
583 if (
sc.isFailure()) {
584 return StatusCode::FAILURE;
591 ATH_MSG_WARNING(
"Failed to preNext() CounterTool.");
594 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
595 return StatusCode::SUCCESS;
597 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
598 m_skipEventSequence.erase(m_skipEventSequence.begin());
601 ATH_MSG_INFO(
"skipping secondary event " << m_NumEvents);
603 ATH_MSG_INFO(
"skipping event " << m_NumEvents);
608 return StatusCode::SUCCESS;
620 bool badEvent(
false);
628 return StatusCode::FAILURE;
642 return(StatusCode::FAILURE);
646 ATH_MSG_FATAL(
"Attempt to read previous data on invalid reader");
647 return(StatusCode::FAILURE);
658 return(StatusCode::FAILURE);
665 if (rec_sg != StatusCode::SUCCESS) {
666 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
669 return StatusCode::SUCCESS;
683 for (
int i = 0;
i < jump;
i++) {
685 return(StatusCode::FAILURE);
688 return(StatusCode::SUCCESS);
690 return(StatusCode::FAILURE);
696 return(StatusCode::SUCCESS);
698 return(StatusCode::FAILURE);
703 return(StatusCode::FAILURE);
708 return(StatusCode::SUCCESS);
716 ATH_MSG_ERROR(
"Input not seekable, choose different input svc");
717 return StatusCode::FAILURE;
721 if (fileNum == -1 && evtNum >= m_firstEvt[m_fileCount] && evtNum < m_NumEvents) {
722 fileNum = m_fileCount;
727 return(StatusCode::RECOVERABLE);
730 if (fileNum != m_fileCount) {
735 m_fileCount = fileNum;
738 long nev = nevguid.first;
741 return StatusCode::FAILURE;
743 int delta = evtNum - m_firstEvt[m_fileCount];
746 if (
nextImpl(*beginIter,delta,
lock).isFailure())
return StatusCode::FAILURE;
751 int delta = (evtNum - m_firstEvt[m_fileCount] + 1) -
m_eventSource->positionInBlock();
752 ATH_MSG_DEBUG(
"Seeking event " << evtNum <<
" in current file with delta " << delta);
756 else if ( delta > 0 ) {
758 if ( this->
nextImpl(*beginIter, delta,
lock).isFailure() )
return StatusCode::FAILURE;
760 else if ( delta < 0 ) {
762 if ( this->
previousImpl(*beginIter, -1*delta,
lock).isFailure() )
return(StatusCode::FAILURE);
765 return StatusCode::SUCCESS;
775 std::string listName(
"EventInfoAtts");
777 if (
eventStore()->contains<AthenaAttributeList>(listName)) {
780 ATH_MSG_ERROR(
"Cannot retrieve old AttributeList from StoreGate.");
781 return(StatusCode::FAILURE);
783 if (!
eventStore()->removeDataAndProxy(oldAttrList).isSuccess()) {
784 ATH_MSG_ERROR(
"Cannot remove old AttributeList from StoreGate.");
785 return(StatusCode::FAILURE);
790 auto attrList = std::make_unique<AthenaAttributeList>();
796 if (
eventStore()->record(std::move(attrList), listName).isFailure()) {
797 return StatusCode::FAILURE;
800 return StatusCode::SUCCESS;
811 attrList->extend(
"RunNumber" +
suffix,
"unsigned int");
812 attrList->extend(
"EventNumber" +
suffix,
"unsigned long long");
813 attrList->extend(
"LumiBlockN" +
suffix,
"unsigned int");
814 attrList->extend(
"BunchId" +
suffix,
"unsigned int");
815 attrList->extend(
"EventTime" +
suffix,
"unsigned int");
816 attrList->extend(
"EventTimeNanoSec" +
suffix,
"unsigned int");
821 (*attrList)[
"RunNumber" +
suffix].data<
unsigned int>() =
event->run_no();
822 if (
event->version() < 0x03010000) {
823 (*attrList)[
"EventNumber" +
suffix].data<
unsigned long long>() =
event->lvl1_id();
825 (*attrList)[
"EventNumber" +
suffix].data<
unsigned long long>() =
event->global_id();
827 (*attrList)[
"LumiBlockN" +
suffix].data<
unsigned int>() =
event->lumi_block();
828 (*attrList)[
"BunchId" +
suffix].data<
unsigned int>() =
event->bc_id();
830 unsigned int bc_time_sec =
event->bc_time_seconds();
831 unsigned int bc_time_ns =
event->bc_time_nanoseconds();
833 if (bc_time_ns > 1000000000) {
835 ATH_MSG_WARNING(
" bc_time nanosecond number larger than 1e9, it is " << bc_time_ns <<
", reset it to 1 sec");
836 bc_time_ns = 1000000000;
838 (*attrList)[
"EventTime" +
suffix].data<
unsigned int>() = bc_time_sec;
839 (*attrList)[
"EventTimeNanoSec" +
suffix].data<
unsigned int>() = bc_time_ns;
844 attrList->extend(
"TriggerStatus" +
suffix,
"unsigned int");
845 (*attrList)[
"TriggerStatus" +
suffix].data<
unsigned int>() = *
buffer;
847 attrList->extend(
"ExtendedL1ID" +
suffix,
"unsigned int");
848 attrList->extend(
"L1TriggerType" +
suffix,
"unsigned int");
849 (*attrList)[
"ExtendedL1ID" +
suffix].data<
unsigned int>() =
event->lvl1_id();
850 (*attrList)[
"L1TriggerType" +
suffix].data<
unsigned int>() =
event->lvl1_trigger_type();
853 event->lvl1_trigger_info(
buffer);
854 for (
uint32_t iT1 = 0; iT1 <
event->nlvl1_trigger_info(); ++iT1) {
855 std::stringstream
name;
856 name <<
"L1TriggerInfo_" << iT1;
857 attrList->extend(
name.str() +
suffix,
"unsigned int");
863 event->lvl2_trigger_info(
buffer);
864 for (
uint32_t iT1 = 0; iT1 <
event->nlvl2_trigger_info(); ++iT1) {
866 std::stringstream
name;
867 name <<
"L2TriggerInfo_" << iT1;
868 attrList->extend(
name.str() +
suffix,
"unsigned int");
875 event->event_filter_info(
buffer);
876 for (
uint32_t iT1 = 0; iT1 <
event->nevent_filter_info(); ++iT1) {
878 std::stringstream
name;
879 name <<
"EFTriggerInfo_" << iT1;
880 attrList->extend(
name.str() +
suffix,
"unsigned int");
887 event->stream_tag(
buffer);
888 std::vector<eformat::helper::StreamTag> onl_streamTags;
890 for (std::vector<eformat::helper::StreamTag>::const_iterator itS = onl_streamTags.begin(),
891 itSE = onl_streamTags.end(); itS != itSE; ++itS) {
892 attrList->extend(itS->name +
suffix,
"string");
893 (*attrList)[itS->name +
suffix].data<std::string>() = itS->type;
896 return StatusCode::SUCCESS;
909 if (m_numEvt[
i] == -1) {
912 long nev = nevguid.first;
919 m_firstEvt[
i] = m_firstEvt[
i - 1] + m_numEvt[
i - 1];
927 if (evtNum >= m_firstEvt[
i] && evtNum < m_firstEvt[
i] + m_numEvt[
i]) {
941 return int(m_NumEvents);
952 if (m_eventStreamingTool.empty()) {
953 return(StatusCode::FAILURE);
955 return(m_eventStreamingTool->makeServer(1,
""));
961 if (m_eventStreamingTool.empty()) {
962 return(StatusCode::FAILURE);
964 std::string dummyStr;
965 return(m_eventStreamingTool->makeClient(0, dummyStr));
971 if (m_eventStreamingTool.empty()) {
972 return(StatusCode::FAILURE);
974 if (m_eventStreamingTool->isClient()) {
975 StatusCode sc = m_eventStreamingTool->lockEvent(evtNum);
976 while (
sc.isRecoverable()) {
978 sc = m_eventStreamingTool->lockEvent(evtNum);
982 return(StatusCode::FAILURE);
988 if (m_eventStreamingTool.empty()) {
989 ATH_MSG_ERROR(
"No AthenaSharedMemoryTool configured for readEvent()");
990 return(StatusCode::FAILURE);
993 for (
int i = 0;
i < maxevt || maxevt == -1; ++
i) {
998 if (m_NumEvents == -1) {
1002 ATH_MSG_ERROR(
"Unable to retrieve next event for " <<
i <<
"/" << maxevt);
1003 return(StatusCode::FAILURE);
1005 if (m_eventStreamingTool->isServer()) {
1007 while ( (
sc = putEvent_ST(*m_eventStreamingTool,
1010 pre->fragment_size_word() *
sizeof(
uint32_t),
1014 if (!
sc.isSuccess()) {
1015 ATH_MSG_ERROR(
"Cannot put Event " << m_NumEvents - 1 <<
" to AthenaSharedMemoryTool");
1016 return(StatusCode::FAILURE);
1022 while ( (
sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
1025 if (!
sc.isSuccess()) {
1026 ATH_MSG_ERROR(
"Cannot put last Event marker to AthenaSharedMemoryTool");
1027 return(StatusCode::FAILURE);
1029 return(StatusCode::SUCCESS);
1034 IOpaqueAddress*& iop)
const {
1037 iop =
proxy->address();
1038 return(StatusCode::SUCCESS);
1041 return(StatusCode::FAILURE);
1048 return(StatusCode::SUCCESS);
1056 if (!iomgr.retrieve().isSuccess()) {
1058 return(StatusCode::FAILURE);
1060 if (!iomgr->io_hasitem(
this)) {
1061 ATH_MSG_FATAL(
"IoComponentMgr does not know about myself !");
1062 return(StatusCode::FAILURE);
1068 if (!iomgr->io_contains(
this,
fname)) {
1070 return(StatusCode::FAILURE);
1072 if (!iomgr->io_retrieve(
this,
fname).isSuccess()) {
1074 return(StatusCode::FAILURE);
1085 [] (Gaudi::Details::PropertyBase&) {}
1090 return(this->
reinit(lock));