15#include "GaudiKernel/ClassID.h"
16#include "GaudiKernel/FileIncident.h"
17#include "GaudiKernel/IIncidentSvc.h"
18#include "GaudiKernel/IIoComponentMgr.h"
26#include "eformat/StreamTag.h"
32 long eventNumber,
const void* source,
33 size_t nbytes,
unsigned int status) {
43 : base_class(name, svcloc) {
63 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
64 this->
reinit(lock).ignore();
84 m_autoRetrieveTools =
false;
85 m_checkToolDeps =
false;
88 ATH_MSG_DEBUG(
"Initializing secondary event selector " << name());
98 return StatusCode::FAILURE;
101 std::sort(m_skipEventSequence.begin(), m_skipEventSequence.end());
107 return StatusCode::FAILURE;
119 if (!m_eventStreamingTool.empty()) {
120 ATH_CHECK(m_eventStreamingTool.retrieve());
131 if (!iomgr->io_register(
this, IIoComponentMgr::IoMode::READ, input).isSuccess()) {
132 ATH_MSG_FATAL(
"could not register [" << input <<
"] for output !");
135 ATH_MSG_VERBOSE(
"io_register[" << this->name() <<
"](" << input <<
") [ok]");
139 return(StatusCode::FAILURE);
148 StatusCode risc = this->
reinit(lock);
162 m_firstEvt.resize(1);
168 bool retError =
false;
170 for (ToolHandle<IAthenaSelectorTool>& tool :
m_helperTools) {
171 if (!tool->postInitialize().isSuccess()) {
172 ATH_MSG_FATAL(
"Failed to postInitialize() " << tool->name());
179 return(StatusCode::FAILURE);
185 FileIncident firstInputFileIncident(name(),
"FirstInputFile",
"BSF:" + *m_inputCollectionsIterator);
196 return StatusCode::SUCCESS;
208 return StatusCode::SUCCESS;
219 FileIncident endInputFileIncident(name(),
"EndInputFile",
"stop");
223 return StatusCode::SUCCESS;
233 for (ToolHandle<IAthenaSelectorTool>& tool :
m_helperTools) {
234 if (!tool->preFinalize().isSuccess()) {
242 return AthService::finalize();
246 FileIncident endInputFileIncident(name(),
"EndInputFile",
"BSF:" + *m_inputCollectionsIterator);
248 ++m_inputCollectionsIterator;
255 ATH_MSG_ERROR(
"cannot open new run for non-filebased inputs");
256 return StatusCode::FAILURE;
261 return StatusCode::FAILURE;
263 std::string blockname = *m_inputCollectionsIterator;
266 long nev = nevguid.first;
268 ATH_MSG_FATAL(
"Unable to access file " << *m_inputCollectionsIterator <<
", stopping here");
273 FileIncident beginInputFileIncident(name(),
"BeginInputFile",
"BSF:" + *m_inputCollectionsIterator,nevguid.second);
286 ATH_MSG_WARNING(
"skipping more events " <<
m_skipEvents.value() - m_NumEvents <<
"(" << nev <<
") than in file " << *m_inputCollectionsIterator <<
", try next");
288 m_numEvt[m_fileCount] = nev;
295 m_firstEvt[m_fileCount] = m_NumEvents;
296 m_numEvt[m_fileCount] = nev;
298 return StatusCode::SUCCESS;
303 return StatusCode::SUCCESS;
313 static std::atomic<int> n_bad_events = 0;
315 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
318 ATH_CHECK(m_eventStreamingTool->getLockedEvent(&source, status));
320 return StatusCode::SUCCESS;
323 for (
const ToolHandle<IAthenaSelectorTool>& tool :
m_helperTools) {
324 if (!
tool->preNext().isSuccess()) {
337 if (
sc.isRecoverable()) {
339 }
else if (
sc.isFailure()) {
340 return StatusCode::FAILURE;
348 int nbad = ++n_bad_events;
352 ATH_MSG_ERROR(
"Cannot continue processing: bad event handling disabled or limit exceeded (" << nbad <<
" events)");
354 return StatusCode::FAILURE;
361 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
365 if (rec_sg != StatusCode::SUCCESS) {
366 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
373 for (
const ToolHandle<IAthenaSelectorTool>& tool :
m_helperTools) {
375 if (toolStatus.isRecoverable()) {
377 status = StatusCode::RECOVERABLE;
378 }
else if (toolStatus.isFailure()) {
380 status = StatusCode::FAILURE;
383 if (
status.isRecoverable()) {
385 }
else if (
status.isFailure()) {
397 catch (
const ByteStreamExceptions::badFragmentData&) {
398 int nbad = ++n_bad_events;
399 ATH_MSG_WARNING(
"Bad fragment data encountered, current count at " << nbad);
403 ATH_MSG_ERROR(
"Cannot continue processing: bad event handling disabled or limit exceeded (" << nbad <<
" events)");
405 return StatusCode::FAILURE;
412 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
413 m_skipEventSequence.erase(m_skipEventSequence.begin());
415 if ( m_NumEvents % 1'000 == 0 ) {
423 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) {
426 while ( (
sc = putEvent_ST(*m_eventStreamingTool,
427 m_NumEvents - 1, pre->start(),
428 pre->fragment_size_word() *
sizeof(uint32_t),
434 return StatusCode::SUCCESS;
451 unsigned int cntr = m_NumEvents;
453 while (m_NumEvents+1 <= cntr + jump) {
458 return StatusCode::SUCCESS;
463 return StatusCode::SUCCESS;
482 ATH_MSG_DEBUG(
"Event source found no more valid files left in input list");
484 return StatusCode::FAILURE;
492 return StatusCode::FAILURE;
503 if (pre ==
nullptr) {
505 return StatusCode::FAILURE;
511 return badEvent ? StatusCode::RECOVERABLE : StatusCode::SUCCESS;
516 if (rec_sg != StatusCode::SUCCESS) {
517 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
520 return StatusCode::SUCCESS;
535 if (
sc.isRecoverable()) {
538 if (
sc.isFailure()) {
539 return StatusCode::FAILURE;
549 (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
550 return StatusCode::SUCCESS;
552 if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
553 m_skipEventSequence.erase(m_skipEventSequence.begin());
556 ATH_MSG_INFO(
"skipping secondary event " << m_NumEvents);
563 return StatusCode::SUCCESS;
575 bool badEvent(
false);
581 catch (
const ByteStreamExceptions::readError&) {
583 return StatusCode::FAILURE;
585 catch (
const ByteStreamExceptions::badFragment&) {
589 catch (
const ByteStreamExceptions::badFragmentData&) {
596 return StatusCode::FAILURE;
600 ATH_MSG_FATAL(
"Attempt to read previous data on invalid reader");
601 return StatusCode::FAILURE;
610 return StatusCode::FAILURE;
617 if (rec_sg != StatusCode::SUCCESS) {
618 ATH_MSG_ERROR(
"Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
621 return StatusCode::SUCCESS;
635 for (
int i = 0;
i < jump;
i++) {
638 return StatusCode::SUCCESS;
640 return StatusCode::FAILURE;
644 if (it.identifier() ==
m_endIter->identifier()) {
646 return StatusCode::SUCCESS;
648 return StatusCode::FAILURE;
653 return StatusCode::FAILURE;
658 return StatusCode::SUCCESS;
666 ATH_MSG_ERROR(
"Input not seekable, choose different input svc");
667 return StatusCode::FAILURE;
671 if (fileNum == -1 && evtNum >= m_firstEvt[m_fileCount] && evtNum < m_NumEvents) {
672 fileNum = m_fileCount;
677 return StatusCode::RECOVERABLE;
680 if (fileNum != m_fileCount) {
685 m_fileCount = fileNum;
688 long nev = nevguid.first;
690 ATH_MSG_FATAL(
"Unable to open file with seeked event " << evtNum <<
" file " << fileName);
691 return StatusCode::FAILURE;
693 int delta = evtNum - m_firstEvt[m_fileCount];
701 int delta = (evtNum - m_firstEvt[m_fileCount] + 1) -
m_eventSource->positionInBlock();
702 ATH_MSG_DEBUG(
"Seeking event " << evtNum <<
" in current file with delta " << delta);
706 else if ( delta > 0 ) {
710 else if ( delta < 0 ) {
715 return StatusCode::SUCCESS;
725 std::string listName(
"EventInfoAtts");
734 auto attrList = std::make_unique<AthenaAttributeList>();
742 return StatusCode::SUCCESS;
753 attrList->extend(
"RunNumber" + suffix,
"unsigned int");
754 attrList->extend(
"EventNumber" + suffix,
"unsigned long long");
755 attrList->extend(
"LumiBlockN" + suffix,
"unsigned int");
756 attrList->extend(
"BunchId" + suffix,
"unsigned int");
757 attrList->extend(
"EventTime" + suffix,
"unsigned int");
758 attrList->extend(
"EventTimeNanoSec" + suffix,
"unsigned int");
763 (*attrList)[
"RunNumber" + suffix].data<
unsigned int>() = event->run_no();
764 if (event->version() < 0x03010000) {
765 (*attrList)[
"EventNumber" + suffix].data<
unsigned long long>() = event->lvl1_id();
767 (*attrList)[
"EventNumber" + suffix].data<
unsigned long long>() = event->global_id();
769 (*attrList)[
"LumiBlockN" + suffix].data<
unsigned int>() = event->lumi_block();
770 (*attrList)[
"BunchId" + suffix].data<
unsigned int>() = event->bc_id();
772 unsigned int bc_time_sec =
event->bc_time_seconds();
773 unsigned int bc_time_ns =
event->bc_time_nanoseconds();
775 if (bc_time_ns > 1000000000) {
777 ATH_MSG_WARNING(
" bc_time nanosecond number larger than 1e9, it is " << bc_time_ns <<
", reset it to 1 sec");
778 bc_time_ns = 1000000000;
780 (*attrList)[
"EventTime" + suffix].data<
unsigned int>() = bc_time_sec;
781 (*attrList)[
"EventTimeNanoSec" + suffix].data<
unsigned int>() = bc_time_ns;
785 event->status(buffer);
786 attrList->extend(
"TriggerStatus" + suffix,
"unsigned int");
787 (*attrList)[
"TriggerStatus" + suffix].data<
unsigned int>() = *buffer;
789 attrList->extend(
"ExtendedL1ID" + suffix,
"unsigned int");
790 attrList->extend(
"L1TriggerType" + suffix,
"unsigned int");
791 (*attrList)[
"ExtendedL1ID" + suffix].data<
unsigned int>() = event->lvl1_id();
792 (*attrList)[
"L1TriggerType" + suffix].data<
unsigned int>() = event->lvl1_trigger_type();
795 event->lvl1_trigger_info(buffer);
796 for (uint32_t iT1 = 0; iT1 <
event->nlvl1_trigger_info(); ++iT1) {
797 std::stringstream name;
798 name <<
"L1TriggerInfo_" << iT1;
799 attrList->extend(name.str() + suffix,
"unsigned int");
800 (*attrList)[name.str() + suffix].data<
unsigned int>() = *buffer;
805 event->lvl2_trigger_info(buffer);
806 for (uint32_t iT1 = 0; iT1 <
event->nlvl2_trigger_info(); ++iT1) {
808 std::stringstream name;
809 name <<
"L2TriggerInfo_" << iT1;
810 attrList->extend(name.str() + suffix,
"unsigned int");
811 (*attrList)[name.str() + suffix].data<
unsigned int>() = *buffer;
817 event->event_filter_info(buffer);
818 for (uint32_t iT1 = 0; iT1 <
event->nevent_filter_info(); ++iT1) {
820 std::stringstream name;
821 name <<
"EFTriggerInfo_" << iT1;
822 attrList->extend(name.str() + suffix,
"unsigned int");
823 (*attrList)[name.str() + suffix].data<
unsigned int>() = *buffer;
829 event->stream_tag(buffer);
830 std::vector<eformat::helper::StreamTag> onl_streamTags;
831 eformat::helper::decode(event->nstream_tag(), buffer, onl_streamTags);
832 for (std::vector<eformat::helper::StreamTag>::const_iterator itS = onl_streamTags.begin(),
833 itSE = onl_streamTags.end(); itS != itSE; ++itS) {
834 attrList->extend(itS->name + suffix,
"string");
835 (*attrList)[itS->name + suffix].data<std::string>() = itS->type;
838 return StatusCode::SUCCESS;
850 if (m_numEvt[i] == -1) {
853 long nev = nevguid.first;
860 m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
868 if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
882 return int(m_NumEvents);
893 if (m_eventStreamingTool.empty()) {
894 return StatusCode::FAILURE;
896 return m_eventStreamingTool->makeServer(1,
"");
902 if (m_eventStreamingTool.empty()) {
903 return StatusCode::FAILURE;
905 std::string dummyStr;
906 return m_eventStreamingTool->makeClient(0, dummyStr);
912 if (m_eventStreamingTool.empty()) {
913 return StatusCode::FAILURE;
915 if (m_eventStreamingTool->isClient()) {
916 StatusCode
sc = m_eventStreamingTool->lockEvent(evtNum);
917 while (
sc.isRecoverable()) {
919 sc = m_eventStreamingTool->lockEvent(evtNum);
923 return StatusCode::FAILURE;
929 if (m_eventStreamingTool.empty()) {
930 ATH_MSG_ERROR(
"No AthenaSharedMemoryTool configured for readEvent()");
931 return StatusCode::FAILURE;
934 for (
int i = 0; i < maxevt || maxevt == -1; ++i) {
939 if (m_NumEvents == -1) {
940 ATH_MSG_VERBOSE(
"Called read Event and read last event from input: " << i);
943 ATH_MSG_ERROR(
"Unable to retrieve next event for " << i <<
"/" << maxevt);
944 return StatusCode::FAILURE;
946 if (m_eventStreamingTool->isServer()) {
948 while ( (
sc = putEvent_ST(*m_eventStreamingTool,
951 pre->fragment_size_word() *
sizeof(uint32_t),
960 while ( (
sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
964 return StatusCode::SUCCESS;
969 IOpaqueAddress*& iop)
const {
972 iop = proxy->address();
973 return StatusCode::SUCCESS;
976 return StatusCode::FAILURE;
983 return StatusCode::SUCCESS;
992 if (!iomgr->io_hasitem(
this)) {
993 ATH_MSG_FATAL(
"IoComponentMgr does not know about myself !");
994 return StatusCode::FAILURE;
997 for (std::size_t i = 0,
imax = inputCollections.size(); i !=
imax; ++i) {
998 ATH_MSG_INFO(
"I/O reinitialization, file = " << inputCollections[i]);
999 std::string &fname = inputCollections[i];
1000 if (!iomgr->io_contains(
this, fname)) {
1001 ATH_MSG_ERROR(
"IoComponentMgr does not know about [" << fname <<
"] !");
1002 return StatusCode::FAILURE;
1004 ATH_CHECK(iomgr->io_retrieve(
this, fname));
1014 [] (Gaudi::Details::PropertyBase&) {}
1019 return this->
reinit(lock);
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
This file contains the class definition for the EventContextByteStream class.
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Define macros for attributes used to control the static checker.
#define ATLAS_THREAD_SAFE
An AttributeList represents a logical row of attributes in a metadata table.
This class provides the Context for EventSelectorByteStream.
virtual StatusCode initialize() override
Implementation of Service base class methods.
virtual StatusCode stop() override
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
virtual int size(Context &it) const override
Always returns -1.
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
Gaudi::Property< std::string > m_eventSourceName
ToolHandle< IAthenaSelectorTool > m_counterTool
virtual StatusCode share(int evtnum) override
Request to share a given event number.
Gaudi::Property< bool > m_filebased
virtual StatusCode resetCriteria(const std::string &criteria, Context &context) const override
Set a selection criteria.
std::lock_guard< mutex_t > lock_t
SmartIF< IByteStreamInputSvc > m_eventSource
Gaudi::CheckedProperty< uint32_t > m_firstLBNo
virtual StatusCode readEvent(int maxevt) override
Read the next maxevt events.
EventContextByteStream * m_endIter
int findEvent(int evtNum, lock_t &lock) const
Search for event with number evtNum.
virtual StatusCode releaseContext(Context *&it) const override
virtual StatusCode makeServer(int num) override
Make this a server.
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Gaudi::CheckedProperty< uint64_t > m_eventsPerRun
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
ToolHandleArray< IAthenaSelectorTool > m_helperTools
HelperTools, vector of names of AlgTools that are executed by the EventSelector.
virtual StatusCode finalize() override
StatusCode previousImpl(Context &it, lock_t &lock) const
virtual StatusCode last(Context &it) const override
Gaudi::CheckedProperty< uint32_t > m_initTimeStamp
Gaudi::CheckedProperty< uint32_t > m_eventsPerLB
Gaudi::Property< bool > m_procBadEvent
process bad events, which fail check_tree().
Gaudi::CheckedProperty< uint32_t > m_runNo
virtual StatusCode createContext(Context *&it) const override
create context
virtual StatusCode createAddress(const Context &it, IOpaqueAddress *&iop) const override
virtual StatusCode makeClient(int num) override
Make this a client.
virtual int curEvent(const Context &it) const override
Return the current event number.
ServiceHandle< IIncidentSvc > m_incidentSvc
StatusCode nextImpl(Context &it, lock_t &lock) const
StatusCode fillAttributeListImpl(coral::AttributeList *attrList, const std::string &suffix, bool copySource, lock_t &lock) const
void nextFile(lock_t &lock) const
StatusCode nextHandleFileTransitionImpl(IEvtSelector::Context &ctxt, lock_t &lock) const
virtual StatusCode rewind(Context &it) const override
StatusCode reinit(lock_t &lock)
Reinitialize the service when a fork() occurred/was-issued.
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
virtual StatusCode next(Context &it) const override
int m_fileCount ATLAS_THREAD_SAFE
number of files to process.
StatusCode openNewRun(lock_t &lock) const
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
virtual ~EventSelectorByteStream()
Standard Destructor.
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp
StoreGateSvc * eventStore() const
void inputCollectionsHandler(Gaudi::Details::PropertyBase &)
virtual StatusCode start() override
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
virtual StatusCode seek(Context &, int evtnum) const override
Seek to a given event number.
StatusCode nextWithSkipImpl(IEvtSelector::Context &ctxt, lock_t &lock) const
StatusCode recordAttributeListImpl(lock_t &lock) const
Gaudi::CheckedProperty< uint64_t > m_firstEventNo
EventContextByteStream * m_beginIter
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context &ctxt) const override
Handle file transition at the next iteration.
Gaudi::Property< long > m_skipEvents
virtual StatusCode previous(Context &it) const override
Gaudi::Property< int > m_maxBadEvts
number of bad events allowed before quitting.
EventSelectorByteStream(const std::string &name, ISvcLocator *svcloc)
Standard Constructor.
The Athena Transient Store API.
static StoreGateSvc * currentStoreGate()
get current StoreGate
virtual SG::DataProxy * proxy(const void *const pTransient) const override final
get proxy for a given data object address in memory
bool contains(const std::string &s, const std::string ®x)
does a string contain the substring
::StatusCode StatusCode
StatusCode definition for legacy code.
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.