15#include "GaudiKernel/ClassID.h"
16#include "GaudiKernel/FileIncident.h"
17#include "GaudiKernel/IIncidentSvc.h"
18#include "GaudiKernel/IIoComponentMgr.h"
27#include "eformat/StreamTag.h"
31 const std::string stringTypeStr{
"string"};
34 long eventNumber,
const void* source,
35 size_t nbytes,
unsigned int status) {
45 : base_class(name, svcloc) {
65 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
66 this->
reinit(lock).ignore();
86 m_autoRetrieveTools =
false;
87 m_checkToolDeps =
false;
90 ATH_MSG_DEBUG(
"Initializing secondary event selector " << name());
100 return StatusCode::FAILURE;
103 std::sort(m_skipEventSequence.begin(), m_skipEventSequence.end());
109 return StatusCode::FAILURE;
121 if (!m_eventStreamingTool.empty()) {
122 ATH_CHECK(m_eventStreamingTool.retrieve());
133 if (!iomgr->io_register(
this, IIoComponentMgr::IoMode::READ, input).isSuccess()) {
134 ATH_MSG_FATAL(
"could not register [" << input <<
"] for output !");
137 ATH_MSG_VERBOSE(
"io_register[" << this->name() <<
"](" << input <<
") [ok]");
141 return(StatusCode::FAILURE);
150 StatusCode risc = this->
reinit(lock);
164 m_firstEvt.resize(1);
170 bool retError =
false;
172 for (ToolHandle<IAthenaSelectorTool>& tool :
m_helperTools) {
173 if (!tool->postInitialize().isSuccess()) {
174 ATH_MSG_FATAL(
"Failed to postInitialize() " << tool->name());
181 return(StatusCode::FAILURE);
187 FileIncident firstInputFileIncident(name(),
"FirstInputFile",
"BSF:" + *m_inputCollectionsIterator);
196 return StatusCode::SUCCESS;
208 return StatusCode::SUCCESS;
216 m_inputFileGuard.reset();
221 return StatusCode::SUCCESS;
231 for (ToolHandle<IAthenaSelectorTool>& tool :
m_helperTools) {
232 if (!tool->preFinalize().isSuccess()) {
240 return AthService::finalize();
246 ++m_inputCollectionsIterator;
253 ATH_MSG_ERROR(
"cannot open new run for non-filebased inputs");
254 return StatusCode::FAILURE;
259 return StatusCode::FAILURE;
261 std::string blockname = *m_inputCollectionsIterator;
264 long nev = nevguid.first;
266 ATH_MSG_FATAL(
"Unable to access file " << *m_inputCollectionsIterator <<
", stopping here");
271 "BSF:" + *m_inputCollectionsIterator, nevguid.second,
282 ATH_MSG_WARNING(
"skipping more events " <<
m_skipEvents.value() - m_NumEvents <<
"(" << nev <<
") than in file " << *m_inputCollectionsIterator <<
", try next");
284 m_numEvt[m_fileCount] = nev;
291 m_firstEvt[m_fileCount] = m_NumEvents;
292 m_numEvt[m_fileCount] = nev;
294 return StatusCode::SUCCESS;
299 return StatusCode::SUCCESS;
309 static std::atomic<int> n_bad_events = 0;
311 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
314 ATH_CHECK(m_eventStreamingTool->getLockedEvent(&source, status));
316 return StatusCode::SUCCESS;
319 for (
const ToolHandle<IAthenaSelectorTool>& tool :
m_helperTools) {
320 if (!
tool->preNext().isSuccess()) {
333 if (
sc.isRecoverable()) {
335 }
else if (
sc.isFailure()) {
336 return StatusCode::FAILURE;
344 int nbad = ++n_bad_events;
348 ATH_MSG_ERROR(
"Cannot continue processing: bad event handling disabled or limit exceeded (" << nbad <<
" events)");
350 return StatusCode::FAILURE;
357 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
361 if (rec_sg != StatusCode::SUCCESS) {
362 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
369 for (
const ToolHandle<IAthenaSelectorTool>& tool :
m_helperTools) {
371 if (toolStatus.isRecoverable()) {
373 status = StatusCode::RECOVERABLE;
374 }
else if (toolStatus.isFailure()) {
376 status = StatusCode::FAILURE;
379 if (
status.isRecoverable()) {
381 }
else if (
status.isFailure()) {
393 catch (
const ByteStreamExceptions::badFragmentData&) {
394 int nbad = ++n_bad_events;
395 ATH_MSG_WARNING(
"Bad fragment data encountered, current count at " << nbad);
399 ATH_MSG_ERROR(
"Cannot continue processing: bad event handling disabled or limit exceeded (" << nbad <<
" events)");
401 return StatusCode::FAILURE;
408 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
409 m_skipEventSequence.erase(m_skipEventSequence.begin());
411 if ( m_NumEvents % 1'000 == 0 ) {
419 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) {
422 while ( (
sc = putEvent_ST(*m_eventStreamingTool,
423 m_NumEvents - 1, pre->start(),
424 pre->fragment_size_word() *
sizeof(uint32_t),
430 return StatusCode::SUCCESS;
447 unsigned int cntr = m_NumEvents;
449 while (m_NumEvents+1 <= cntr + jump) {
454 return StatusCode::SUCCESS;
459 return StatusCode::SUCCESS;
478 ATH_MSG_DEBUG(
"Event source found no more valid files left in input list");
480 return StatusCode::FAILURE;
488 return StatusCode::FAILURE;
499 if (pre ==
nullptr) {
501 return StatusCode::FAILURE;
507 return badEvent ? StatusCode::RECOVERABLE : StatusCode::SUCCESS;
512 if (rec_sg != StatusCode::SUCCESS) {
513 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
516 return StatusCode::SUCCESS;
531 if (
sc.isRecoverable()) {
534 if (
sc.isFailure()) {
535 return StatusCode::FAILURE;
545 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
546 return StatusCode::SUCCESS;
548 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
549 m_skipEventSequence.erase(m_skipEventSequence.begin());
552 ATH_MSG_INFO(
"skipping secondary event " << m_NumEvents);
559 return StatusCode::SUCCESS;
571 bool badEvent(
false);
577 catch (
const ByteStreamExceptions::readError&) {
579 return StatusCode::FAILURE;
581 catch (
const ByteStreamExceptions::badFragment&) {
585 catch (
const ByteStreamExceptions::badFragmentData&) {
592 return StatusCode::FAILURE;
596 ATH_MSG_FATAL(
"Attempt to read previous data on invalid reader");
597 return StatusCode::FAILURE;
606 return StatusCode::FAILURE;
613 if (rec_sg != StatusCode::SUCCESS) {
614 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
617 return StatusCode::SUCCESS;
631 for (
int i = 0;
i < jump;
i++) {
634 return StatusCode::SUCCESS;
636 return StatusCode::FAILURE;
640 if (it.identifier() ==
m_endIter->identifier()) {
642 return StatusCode::SUCCESS;
644 return StatusCode::FAILURE;
649 return StatusCode::FAILURE;
654 return StatusCode::SUCCESS;
662 ATH_MSG_ERROR(
"Input not seekable, choose different input svc");
663 return StatusCode::FAILURE;
667 if (fileNum == -1 && evtNum >= m_firstEvt[m_fileCount] && evtNum < m_NumEvents) {
668 fileNum = m_fileCount;
673 return StatusCode::RECOVERABLE;
676 if (fileNum != m_fileCount) {
681 m_fileCount = fileNum;
684 long nev = nevguid.first;
686 ATH_MSG_FATAL(
"Unable to open file with seeked event " << evtNum <<
" file " << fileName);
687 return StatusCode::FAILURE;
689 int delta = evtNum - m_firstEvt[m_fileCount];
697 int delta = (evtNum - m_firstEvt[m_fileCount] + 1) -
m_eventSource->positionInBlock();
698 ATH_MSG_DEBUG(
"Seeking event " << evtNum <<
" in current file with delta " << delta);
702 else if ( delta > 0 ) {
706 else if ( delta < 0 ) {
711 return StatusCode::SUCCESS;
721 std::string listName(
"EventInfoAtts");
730 auto attrList = std::make_unique<AthenaAttributeList>();
738 return StatusCode::SUCCESS;
749 attrList->extend(
"RunNumber" + suffix,
"unsigned int");
750 attrList->extend(
"EventNumber" + suffix,
"unsigned long long");
751 attrList->extend(
"LumiBlockN" + suffix,
"unsigned int");
752 attrList->extend(
"BunchId" + suffix,
"unsigned int");
753 attrList->extend(
"EventTime" + suffix,
"unsigned int");
754 attrList->extend(
"EventTimeNanoSec" + suffix,
"unsigned int");
759 (*attrList)[
"RunNumber" + suffix].data<
unsigned int>() = event->run_no();
760 if (event->version() < 0x03010000) {
761 (*attrList)[
"EventNumber" + suffix].data<
unsigned long long>() = event->lvl1_id();
763 (*attrList)[
"EventNumber" + suffix].data<
unsigned long long>() = event->global_id();
765 (*attrList)[
"LumiBlockN" + suffix].data<
unsigned int>() = event->lumi_block();
766 (*attrList)[
"BunchId" + suffix].data<
unsigned int>() = event->bc_id();
768 unsigned int bc_time_sec =
event->bc_time_seconds();
769 unsigned int bc_time_ns =
event->bc_time_nanoseconds();
771 if (bc_time_ns > 1000000000) {
773 ATH_MSG_WARNING(
" bc_time nanosecond number larger than 1e9, it is " << bc_time_ns <<
", reset it to 1 sec");
774 bc_time_ns = 1000000000;
776 (*attrList)[
"EventTime" + suffix].data<
unsigned int>() = bc_time_sec;
777 (*attrList)[
"EventTimeNanoSec" + suffix].data<
unsigned int>() = bc_time_ns;
781 event->status(buffer);
782 attrList->extend(
"TriggerStatus" + suffix,
"unsigned int");
783 (*attrList)[
"TriggerStatus" + suffix].data<
unsigned int>() = *buffer;
785 attrList->extend(
"ExtendedL1ID" + suffix,
"unsigned int");
786 attrList->extend(
"L1TriggerType" + suffix,
"unsigned int");
787 (*attrList)[
"ExtendedL1ID" + suffix].data<
unsigned int>() = event->lvl1_id();
788 (*attrList)[
"L1TriggerType" + suffix].data<
unsigned int>() = event->lvl1_trigger_type();
791 event->lvl1_trigger_info(buffer);
792 for (uint32_t iT1 = 0; iT1 <
event->nlvl1_trigger_info(); ++iT1) {
793 std::stringstream name;
794 name <<
"L1TriggerInfo_" << iT1;
795 attrList->extend(name.str() + suffix,
"unsigned int");
796 (*attrList)[name.str() + suffix].data<
unsigned int>() = *buffer;
801 event->lvl2_trigger_info(buffer);
802 for (uint32_t iT1 = 0; iT1 <
event->nlvl2_trigger_info(); ++iT1) {
804 std::stringstream name;
805 name <<
"L2TriggerInfo_" << iT1;
806 attrList->extend(name.str() + suffix,
"unsigned int");
807 (*attrList)[name.str() + suffix].data<
unsigned int>() = *buffer;
813 event->event_filter_info(buffer);
814 for (uint32_t iT1 = 0; iT1 <
event->nevent_filter_info(); ++iT1) {
816 std::stringstream name;
817 name <<
"EFTriggerInfo_" << iT1;
818 attrList->extend(name.str() + suffix,
"unsigned int");
819 (*attrList)[name.str() + suffix].data<
unsigned int>() = *buffer;
825 event->stream_tag(buffer);
826 std::vector<eformat::helper::StreamTag> onl_streamTags;
827 eformat::helper::decode(event->nstream_tag(), buffer, onl_streamTags);
828 for (std::vector<eformat::helper::StreamTag>::const_iterator itS = onl_streamTags.begin(),
829 itSE = onl_streamTags.end(); itS != itSE; ++itS) {
830 attrList->extend(itS->name + suffix, stringTypeStr);
831 (*attrList)[itS->name + suffix].data<std::string>() = itS->type;
834 return StatusCode::SUCCESS;
846 if (m_numEvt[i] == -1) {
849 long nev = nevguid.first;
856 m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
864 if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
878 return int(m_NumEvents);
889 if (m_eventStreamingTool.empty()) {
890 return StatusCode::FAILURE;
892 return m_eventStreamingTool->makeServer(1,
"");
898 if (m_eventStreamingTool.empty()) {
899 return StatusCode::FAILURE;
901 std::string dummyStr;
902 return m_eventStreamingTool->makeClient(0, dummyStr);
908 if (m_eventStreamingTool.empty()) {
909 return StatusCode::FAILURE;
911 if (m_eventStreamingTool->isClient()) {
912 StatusCode
sc = m_eventStreamingTool->lockEvent(evtNum);
913 while (
sc.isRecoverable()) {
915 sc = m_eventStreamingTool->lockEvent(evtNum);
919 return StatusCode::FAILURE;
925 if (m_eventStreamingTool.empty()) {
926 ATH_MSG_ERROR(
"No AthenaSharedMemoryTool configured for readEvent()");
927 return StatusCode::FAILURE;
930 for (
int i = 0; i < maxevt || maxevt == -1; ++i) {
935 if (m_NumEvents == -1) {
936 ATH_MSG_VERBOSE(
"Called read Event and read last event from input: " << i);
939 ATH_MSG_ERROR(
"Unable to retrieve next event for " << i <<
"/" << maxevt);
940 return StatusCode::FAILURE;
942 if (m_eventStreamingTool->isServer()) {
944 while ( (
sc = putEvent_ST(*m_eventStreamingTool,
947 pre->fragment_size_word() *
sizeof(uint32_t),
956 while ( (
sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
960 return StatusCode::SUCCESS;
965 IOpaqueAddress*& iop)
const {
968 iop = proxy->address();
969 return StatusCode::SUCCESS;
972 return StatusCode::FAILURE;
979 return StatusCode::SUCCESS;
988 if (!iomgr->io_hasitem(
this)) {
989 ATH_MSG_FATAL(
"IoComponentMgr does not know about myself !");
990 return StatusCode::FAILURE;
993 for (std::size_t i = 0,
imax = inputCollections.size(); i !=
imax; ++i) {
994 ATH_MSG_INFO(
"I/O reinitialization, file = " << inputCollections[i]);
995 std::string &fname = inputCollections[i];
996 if (!iomgr->io_contains(
this, fname)) {
997 ATH_MSG_ERROR(
"IoComponentMgr does not know about [" << fname <<
"] !");
998 return StatusCode::FAILURE;
1000 ATH_CHECK(iomgr->io_retrieve(
this, fname));
1003 m_inputFileGuard.reset();
1010 [] (Gaudi::Details::PropertyBase&) {}
1015 return this->
reinit(lock);
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
This file contains the class definition for the EventContextByteStream class.
virtual void lock()=0
Interface to allow an object to lock itself when made const in SG.
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
size_t size() const
Number of registered mappings.
Define macros for attributes used to control the static checker.
#define ATLAS_THREAD_SAFE
An AttributeList represents a logical row of attributes in a metadata table.
This class provides the Context for EventSelectorByteStream.
virtual StatusCode initialize() override
Implementation of Service base class methods.
virtual StatusCode stop() override
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
virtual int size(Context &it) const override
Always returns -1.
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
Gaudi::Property< std::string > m_eventSourceName
ToolHandle< IAthenaSelectorTool > m_counterTool
virtual StatusCode share(int evtnum) override
Request to share a given event number.
Gaudi::Property< bool > m_filebased
virtual StatusCode resetCriteria(const std::string &criteria, Context &context) const override
Set a selection criteria.
std::lock_guard< mutex_t > lock_t
SmartIF< IByteStreamInputSvc > m_eventSource
Gaudi::CheckedProperty< uint32_t > m_firstLBNo
virtual StatusCode readEvent(int maxevt) override
Read the next maxevt events.
EventContextByteStream * m_endIter
int findEvent(int evtNum, lock_t &lock) const
Search for event with number evtNum.
virtual StatusCode releaseContext(Context *&it) const override
virtual StatusCode makeServer(int num) override
Make this a server.
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Gaudi::CheckedProperty< uint64_t > m_eventsPerRun
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
ToolHandleArray< IAthenaSelectorTool > m_helperTools
HelperTools, vector of names of AlgTools that are executed by the EventSelector.
virtual StatusCode finalize() override
StatusCode previousImpl(Context &it, lock_t &lock) const
virtual StatusCode last(Context &it) const override
Gaudi::CheckedProperty< uint32_t > m_initTimeStamp
Gaudi::CheckedProperty< uint32_t > m_eventsPerLB
Gaudi::Property< bool > m_procBadEvent
process bad events, which fail check_tree().
Gaudi::CheckedProperty< uint32_t > m_runNo
virtual StatusCode createContext(Context *&it) const override
create context
virtual StatusCode createAddress(const Context &it, IOpaqueAddress *&iop) const override
virtual StatusCode makeClient(int num) override
Make this a client.
virtual int curEvent(const Context &it) const override
Return the current event number.
ServiceHandle< IIncidentSvc > m_incidentSvc
StatusCode nextImpl(Context &it, lock_t &lock) const
StatusCode fillAttributeListImpl(coral::AttributeList *attrList, const std::string &suffix, bool copySource, lock_t &lock) const
void nextFile(lock_t &lock) const
StatusCode nextHandleFileTransitionImpl(IEvtSelector::Context &ctxt, lock_t &lock) const
virtual StatusCode rewind(Context &it) const override
StatusCode reinit(lock_t &lock)
Reinitialize the service when a fork() occurred/was-issued.
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
virtual StatusCode next(Context &it) const override
int m_fileCount ATLAS_THREAD_SAFE
number of files to process.
StatusCode openNewRun(lock_t &lock) const
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
virtual ~EventSelectorByteStream()
Standard Destructor.
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp
StoreGateSvc * eventStore() const
void inputCollectionsHandler(Gaudi::Details::PropertyBase &)
virtual StatusCode start() override
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
virtual StatusCode seek(Context &, int evtnum) const override
Seek to a given event number.
StatusCode nextWithSkipImpl(IEvtSelector::Context &ctxt, lock_t &lock) const
StatusCode recordAttributeListImpl(lock_t &lock) const
Gaudi::CheckedProperty< uint64_t > m_firstEventNo
EventContextByteStream * m_beginIter
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context &ctxt) const override
Handle file transition at the next iteration.
Gaudi::Property< long > m_skipEvents
virtual StatusCode previous(Context &it) const override
Gaudi::Property< int > m_maxBadEvts
number of bad events allowed before quitting.
EventSelectorByteStream(const std::string &name, ISvcLocator *svcloc)
Standard Constructor.
The Athena Transient Store API.
static StoreGateSvc * currentStoreGate()
get current StoreGate
virtual SG::DataProxy * proxy(const void *const pTransient) const override final
get proxy for a given data object address in memory
bool contains(const std::string &s, const std::string ®x)
does a string contain the substring
::StatusCode StatusCode
StatusCode definition for legacy code.
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.
static constexpr CLID ID()