15#include "GaudiKernel/ClassID.h"
16#include "GaudiKernel/FileIncident.h"
17#include "GaudiKernel/IIncidentSvc.h"
18#include "GaudiKernel/IIoComponentMgr.h"
27#include "eformat/StreamTag.h"
33 long eventNumber,
const void* source,
34 size_t nbytes,
unsigned int status) {
44 : base_class(name, svcloc) {
64 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
65 this->
reinit(lock).ignore();
85 m_autoRetrieveTools =
false;
86 m_checkToolDeps =
false;
89 ATH_MSG_DEBUG(
"Initializing secondary event selector " << name());
99 return StatusCode::FAILURE;
102 std::sort(m_skipEventSequence.begin(), m_skipEventSequence.end());
108 return StatusCode::FAILURE;
120 if (!m_eventStreamingTool.empty()) {
121 ATH_CHECK(m_eventStreamingTool.retrieve());
132 if (!iomgr->io_register(
this, IIoComponentMgr::IoMode::READ, input).isSuccess()) {
133 ATH_MSG_FATAL(
"could not register [" << input <<
"] for output !");
136 ATH_MSG_VERBOSE(
"io_register[" << this->name() <<
"](" << input <<
") [ok]");
140 return(StatusCode::FAILURE);
149 StatusCode risc = this->
reinit(lock);
163 m_firstEvt.resize(1);
169 bool retError =
false;
171 for (ToolHandle<IAthenaSelectorTool>& tool :
m_helperTools) {
172 if (!tool->postInitialize().isSuccess()) {
173 ATH_MSG_FATAL(
"Failed to postInitialize() " << tool->name());
180 return(StatusCode::FAILURE);
186 FileIncident firstInputFileIncident(name(),
"FirstInputFile",
"BSF:" + *m_inputCollectionsIterator);
195 return StatusCode::SUCCESS;
207 return StatusCode::SUCCESS;
215 m_inputFileGuard.reset();
220 return StatusCode::SUCCESS;
230 for (ToolHandle<IAthenaSelectorTool>& tool :
m_helperTools) {
231 if (!tool->preFinalize().isSuccess()) {
239 return AthService::finalize();
245 ++m_inputCollectionsIterator;
252 ATH_MSG_ERROR(
"cannot open new run for non-filebased inputs");
253 return StatusCode::FAILURE;
258 return StatusCode::FAILURE;
260 std::string blockname = *m_inputCollectionsIterator;
263 long nev = nevguid.first;
265 ATH_MSG_FATAL(
"Unable to access file " << *m_inputCollectionsIterator <<
", stopping here");
270 "BSF:" + *m_inputCollectionsIterator, nevguid.second,
281 ATH_MSG_WARNING(
"skipping more events " <<
m_skipEvents.value() - m_NumEvents <<
"(" << nev <<
") than in file " << *m_inputCollectionsIterator <<
", try next");
283 m_numEvt[m_fileCount] = nev;
290 m_firstEvt[m_fileCount] = m_NumEvents;
291 m_numEvt[m_fileCount] = nev;
293 return StatusCode::SUCCESS;
298 return StatusCode::SUCCESS;
308 static std::atomic<int> n_bad_events = 0;
310 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
313 ATH_CHECK(m_eventStreamingTool->getLockedEvent(&source, status));
315 return StatusCode::SUCCESS;
318 for (
const ToolHandle<IAthenaSelectorTool>& tool :
m_helperTools) {
319 if (!
tool->preNext().isSuccess()) {
332 if (
sc.isRecoverable()) {
334 }
else if (
sc.isFailure()) {
335 return StatusCode::FAILURE;
343 int nbad = ++n_bad_events;
347 ATH_MSG_ERROR(
"Cannot continue processing: bad event handling disabled or limit exceeded (" << nbad <<
" events)");
349 return StatusCode::FAILURE;
356 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
360 if (rec_sg != StatusCode::SUCCESS) {
361 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
368 for (
const ToolHandle<IAthenaSelectorTool>& tool :
m_helperTools) {
370 if (toolStatus.isRecoverable()) {
372 status = StatusCode::RECOVERABLE;
373 }
else if (toolStatus.isFailure()) {
375 status = StatusCode::FAILURE;
378 if (
status.isRecoverable()) {
380 }
else if (
status.isFailure()) {
392 catch (
const ByteStreamExceptions::badFragmentData&) {
393 int nbad = ++n_bad_events;
394 ATH_MSG_WARNING(
"Bad fragment data encountered, current count at " << nbad);
398 ATH_MSG_ERROR(
"Cannot continue processing: bad event handling disabled or limit exceeded (" << nbad <<
" events)");
400 return StatusCode::FAILURE;
407 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
408 m_skipEventSequence.erase(m_skipEventSequence.begin());
410 if ( m_NumEvents % 1'000 == 0 ) {
418 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) {
421 while ( (
sc = putEvent_ST(*m_eventStreamingTool,
422 m_NumEvents - 1, pre->start(),
423 pre->fragment_size_word() *
sizeof(uint32_t),
429 return StatusCode::SUCCESS;
446 unsigned int cntr = m_NumEvents;
448 while (m_NumEvents+1 <= cntr + jump) {
453 return StatusCode::SUCCESS;
458 return StatusCode::SUCCESS;
477 ATH_MSG_DEBUG(
"Event source found no more valid files left in input list");
479 return StatusCode::FAILURE;
487 return StatusCode::FAILURE;
498 if (pre ==
nullptr) {
500 return StatusCode::FAILURE;
506 return badEvent ? StatusCode::RECOVERABLE : StatusCode::SUCCESS;
511 if (rec_sg != StatusCode::SUCCESS) {
512 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
515 return StatusCode::SUCCESS;
530 if (
sc.isRecoverable()) {
533 if (
sc.isFailure()) {
534 return StatusCode::FAILURE;
544 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
545 return StatusCode::SUCCESS;
547 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
548 m_skipEventSequence.erase(m_skipEventSequence.begin());
551 ATH_MSG_INFO(
"skipping secondary event " << m_NumEvents);
558 return StatusCode::SUCCESS;
570 bool badEvent(
false);
576 catch (
const ByteStreamExceptions::readError&) {
578 return StatusCode::FAILURE;
580 catch (
const ByteStreamExceptions::badFragment&) {
584 catch (
const ByteStreamExceptions::badFragmentData&) {
591 return StatusCode::FAILURE;
595 ATH_MSG_FATAL(
"Attempt to read previous data on invalid reader");
596 return StatusCode::FAILURE;
605 return StatusCode::FAILURE;
612 if (rec_sg != StatusCode::SUCCESS) {
613 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
616 return StatusCode::SUCCESS;
630 for (
int i = 0;
i < jump;
i++) {
633 return StatusCode::SUCCESS;
635 return StatusCode::FAILURE;
639 if (it.identifier() ==
m_endIter->identifier()) {
641 return StatusCode::SUCCESS;
643 return StatusCode::FAILURE;
648 return StatusCode::FAILURE;
653 return StatusCode::SUCCESS;
661 ATH_MSG_ERROR(
"Input not seekable, choose different input svc");
662 return StatusCode::FAILURE;
666 if (fileNum == -1 && evtNum >= m_firstEvt[m_fileCount] && evtNum < m_NumEvents) {
667 fileNum = m_fileCount;
672 return StatusCode::RECOVERABLE;
675 if (fileNum != m_fileCount) {
680 m_fileCount = fileNum;
683 long nev = nevguid.first;
685 ATH_MSG_FATAL(
"Unable to open file with seeked event " << evtNum <<
" file " << fileName);
686 return StatusCode::FAILURE;
688 int delta = evtNum - m_firstEvt[m_fileCount];
696 int delta = (evtNum - m_firstEvt[m_fileCount] + 1) -
m_eventSource->positionInBlock();
697 ATH_MSG_DEBUG(
"Seeking event " << evtNum <<
" in current file with delta " << delta);
701 else if ( delta > 0 ) {
705 else if ( delta < 0 ) {
710 return StatusCode::SUCCESS;
720 std::string listName(
"EventInfoAtts");
729 auto attrList = std::make_unique<AthenaAttributeList>();
737 return StatusCode::SUCCESS;
748 attrList->extend(
"RunNumber" + suffix,
"unsigned int");
749 attrList->extend(
"EventNumber" + suffix,
"unsigned long long");
750 attrList->extend(
"LumiBlockN" + suffix,
"unsigned int");
751 attrList->extend(
"BunchId" + suffix,
"unsigned int");
752 attrList->extend(
"EventTime" + suffix,
"unsigned int");
753 attrList->extend(
"EventTimeNanoSec" + suffix,
"unsigned int");
758 (*attrList)[
"RunNumber" + suffix].data<
unsigned int>() = event->run_no();
759 if (event->version() < 0x03010000) {
760 (*attrList)[
"EventNumber" + suffix].data<
unsigned long long>() = event->lvl1_id();
762 (*attrList)[
"EventNumber" + suffix].data<
unsigned long long>() = event->global_id();
764 (*attrList)[
"LumiBlockN" + suffix].data<
unsigned int>() = event->lumi_block();
765 (*attrList)[
"BunchId" + suffix].data<
unsigned int>() = event->bc_id();
767 unsigned int bc_time_sec =
event->bc_time_seconds();
768 unsigned int bc_time_ns =
event->bc_time_nanoseconds();
770 if (bc_time_ns > 1000000000) {
772 ATH_MSG_WARNING(
" bc_time nanosecond number larger than 1e9, it is " << bc_time_ns <<
", reset it to 1 sec");
773 bc_time_ns = 1000000000;
775 (*attrList)[
"EventTime" + suffix].data<
unsigned int>() = bc_time_sec;
776 (*attrList)[
"EventTimeNanoSec" + suffix].data<
unsigned int>() = bc_time_ns;
780 event->status(buffer);
781 attrList->extend(
"TriggerStatus" + suffix,
"unsigned int");
782 (*attrList)[
"TriggerStatus" + suffix].data<
unsigned int>() = *buffer;
784 attrList->extend(
"ExtendedL1ID" + suffix,
"unsigned int");
785 attrList->extend(
"L1TriggerType" + suffix,
"unsigned int");
786 (*attrList)[
"ExtendedL1ID" + suffix].data<
unsigned int>() = event->lvl1_id();
787 (*attrList)[
"L1TriggerType" + suffix].data<
unsigned int>() = event->lvl1_trigger_type();
790 event->lvl1_trigger_info(buffer);
791 for (uint32_t iT1 = 0; iT1 <
event->nlvl1_trigger_info(); ++iT1) {
792 std::stringstream name;
793 name <<
"L1TriggerInfo_" << iT1;
794 attrList->extend(name.str() + suffix,
"unsigned int");
795 (*attrList)[name.str() + suffix].data<
unsigned int>() = *buffer;
800 event->lvl2_trigger_info(buffer);
801 for (uint32_t iT1 = 0; iT1 <
event->nlvl2_trigger_info(); ++iT1) {
803 std::stringstream name;
804 name <<
"L2TriggerInfo_" << iT1;
805 attrList->extend(name.str() + suffix,
"unsigned int");
806 (*attrList)[name.str() + suffix].data<
unsigned int>() = *buffer;
812 event->event_filter_info(buffer);
813 for (uint32_t iT1 = 0; iT1 <
event->nevent_filter_info(); ++iT1) {
815 std::stringstream name;
816 name <<
"EFTriggerInfo_" << iT1;
817 attrList->extend(name.str() + suffix,
"unsigned int");
818 (*attrList)[name.str() + suffix].data<
unsigned int>() = *buffer;
824 event->stream_tag(buffer);
825 std::vector<eformat::helper::StreamTag> onl_streamTags;
826 eformat::helper::decode(event->nstream_tag(), buffer, onl_streamTags);
827 for (std::vector<eformat::helper::StreamTag>::const_iterator itS = onl_streamTags.begin(),
828 itSE = onl_streamTags.end(); itS != itSE; ++itS) {
829 attrList->extend(itS->name + suffix,
"string");
830 (*attrList)[itS->name + suffix].data<std::string>() = itS->type;
833 return StatusCode::SUCCESS;
845 if (m_numEvt[i] == -1) {
848 long nev = nevguid.first;
855 m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
863 if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
877 return int(m_NumEvents);
888 if (m_eventStreamingTool.empty()) {
889 return StatusCode::FAILURE;
891 return m_eventStreamingTool->makeServer(1,
"");
897 if (m_eventStreamingTool.empty()) {
898 return StatusCode::FAILURE;
900 std::string dummyStr;
901 return m_eventStreamingTool->makeClient(0, dummyStr);
907 if (m_eventStreamingTool.empty()) {
908 return StatusCode::FAILURE;
910 if (m_eventStreamingTool->isClient()) {
911 StatusCode
sc = m_eventStreamingTool->lockEvent(evtNum);
912 while (
sc.isRecoverable()) {
914 sc = m_eventStreamingTool->lockEvent(evtNum);
918 return StatusCode::FAILURE;
924 if (m_eventStreamingTool.empty()) {
925 ATH_MSG_ERROR(
"No AthenaSharedMemoryTool configured for readEvent()");
926 return StatusCode::FAILURE;
929 for (
int i = 0; i < maxevt || maxevt == -1; ++i) {
934 if (m_NumEvents == -1) {
935 ATH_MSG_VERBOSE(
"Called read Event and read last event from input: " << i);
938 ATH_MSG_ERROR(
"Unable to retrieve next event for " << i <<
"/" << maxevt);
939 return StatusCode::FAILURE;
941 if (m_eventStreamingTool->isServer()) {
943 while ( (
sc = putEvent_ST(*m_eventStreamingTool,
946 pre->fragment_size_word() *
sizeof(uint32_t),
955 while ( (
sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
959 return StatusCode::SUCCESS;
964 IOpaqueAddress*& iop)
const {
967 iop = proxy->address();
968 return StatusCode::SUCCESS;
971 return StatusCode::FAILURE;
978 return StatusCode::SUCCESS;
987 if (!iomgr->io_hasitem(
this)) {
988 ATH_MSG_FATAL(
"IoComponentMgr does not know about myself !");
989 return StatusCode::FAILURE;
992 for (std::size_t i = 0,
imax = inputCollections.size(); i !=
imax; ++i) {
993 ATH_MSG_INFO(
"I/O reinitialization, file = " << inputCollections[i]);
994 std::string &fname = inputCollections[i];
995 if (!iomgr->io_contains(
this, fname)) {
996 ATH_MSG_ERROR(
"IoComponentMgr does not know about [" << fname <<
"] !");
997 return StatusCode::FAILURE;
999 ATH_CHECK(iomgr->io_retrieve(
this, fname));
1002 m_inputFileGuard.reset();
1009 [] (Gaudi::Details::PropertyBase&) {}
1014 return this->
reinit(lock);
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
This file contains the class definition for the EventContextByteStream class.
virtual void lock()=0
Interface to allow an object to lock itself when made const in SG.
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
size_t size() const
Number of registered mappings.
Define macros for attributes used to control the static checker.
#define ATLAS_THREAD_SAFE
An AttributeList represents a logical row of attributes in a metadata table.
This class provides the Context for EventSelectorByteStream.
virtual StatusCode initialize() override
Implementation of Service base class methods.
virtual StatusCode stop() override
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
virtual int size(Context &it) const override
Always returns -1.
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
Gaudi::Property< std::string > m_eventSourceName
ToolHandle< IAthenaSelectorTool > m_counterTool
virtual StatusCode share(int evtnum) override
Request to share a given event number.
Gaudi::Property< bool > m_filebased
virtual StatusCode resetCriteria(const std::string &criteria, Context &context) const override
Set a selection criteria.
std::lock_guard< mutex_t > lock_t
SmartIF< IByteStreamInputSvc > m_eventSource
Gaudi::CheckedProperty< uint32_t > m_firstLBNo
virtual StatusCode readEvent(int maxevt) override
Read the next maxevt events.
EventContextByteStream * m_endIter
int findEvent(int evtNum, lock_t &lock) const
Search for event with number evtNum.
virtual StatusCode releaseContext(Context *&it) const override
virtual StatusCode makeServer(int num) override
Make this a server.
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Gaudi::CheckedProperty< uint64_t > m_eventsPerRun
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
ToolHandleArray< IAthenaSelectorTool > m_helperTools
HelperTools, vector of names of AlgTools that are executed by the EventSelector.
virtual StatusCode finalize() override
StatusCode previousImpl(Context &it, lock_t &lock) const
virtual StatusCode last(Context &it) const override
Gaudi::CheckedProperty< uint32_t > m_initTimeStamp
Gaudi::CheckedProperty< uint32_t > m_eventsPerLB
Gaudi::Property< bool > m_procBadEvent
process bad events, which fail check_tree().
Gaudi::CheckedProperty< uint32_t > m_runNo
virtual StatusCode createContext(Context *&it) const override
create context
virtual StatusCode createAddress(const Context &it, IOpaqueAddress *&iop) const override
virtual StatusCode makeClient(int num) override
Make this a client.
virtual int curEvent(const Context &it) const override
Return the current event number.
ServiceHandle< IIncidentSvc > m_incidentSvc
StatusCode nextImpl(Context &it, lock_t &lock) const
StatusCode fillAttributeListImpl(coral::AttributeList *attrList, const std::string &suffix, bool copySource, lock_t &lock) const
void nextFile(lock_t &lock) const
StatusCode nextHandleFileTransitionImpl(IEvtSelector::Context &ctxt, lock_t &lock) const
virtual StatusCode rewind(Context &it) const override
StatusCode reinit(lock_t &lock)
Reinitialize the service when a fork() occurred/was-issued.
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
virtual StatusCode next(Context &it) const override
int m_fileCount ATLAS_THREAD_SAFE
number of files to process.
StatusCode openNewRun(lock_t &lock) const
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
virtual ~EventSelectorByteStream()
Standard Destructor.
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp
StoreGateSvc * eventStore() const
void inputCollectionsHandler(Gaudi::Details::PropertyBase &)
virtual StatusCode start() override
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
virtual StatusCode seek(Context &, int evtnum) const override
Seek to a given event number.
StatusCode nextWithSkipImpl(IEvtSelector::Context &ctxt, lock_t &lock) const
StatusCode recordAttributeListImpl(lock_t &lock) const
Gaudi::CheckedProperty< uint64_t > m_firstEventNo
EventContextByteStream * m_beginIter
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context &ctxt) const override
Handle file transition at the next iteration.
Gaudi::Property< long > m_skipEvents
virtual StatusCode previous(Context &it) const override
Gaudi::Property< int > m_maxBadEvts
number of bad events allowed before quitting.
EventSelectorByteStream(const std::string &name, ISvcLocator *svcloc)
Standard Constructor.
The Athena Transient Store API.
static StoreGateSvc * currentStoreGate()
get current StoreGate
virtual SG::DataProxy * proxy(const void *const pTransient) const override final
get proxy for a given data object address in memory
bool contains(const std::string &s, const std::string ®x)
does a string contain the substring
::StatusCode StatusCode
StatusCode definition for legacy code.
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.
static constexpr CLID ID()