24#include "GaudiKernel/ClassID.h"
25#include "GaudiKernel/FileIncident.h"
26#include "GaudiKernel/IIncidentSvc.h"
27#include "GaudiKernel/IIoComponentMgr.h"
28#include "GaudiKernel/GaudiException.h"
29#include "GaudiKernel/GenericAddress.h"
30#include "GaudiKernel/StatusCode.h"
38#include <boost/tokenizer.hpp>
46 base_class(name, pSvcLocator)
58 m_inputCollectionsChanged =
false;
62 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
63 m_inputCollectionsChanged =
true;
76 m_autoRetrieveTools =
false;
77 m_checkToolDeps =
false;
80 ATH_MSG_DEBUG(
"Initializing secondary event selector " << name());
88 ATH_MSG_FATAL(
"Use the property: EventSelector.InputCollections = "
89 <<
"[ \"<collectionName>\" ] (list of collections)");
90 return StatusCode::FAILURE;
92 boost::char_separator<char> sep_coma(
","), sep_hyph(
"-");
94 for(
const std::string&
r: ranges ) {
95 boost::tokenizer fromto(
r, sep_hyph);
96 auto from_iter = fromto.begin();
97 long from = std::stol(*from_iter);
99 if( ++from_iter != fromto.end() ) {
100 to = std::stol(*from_iter);
102 m_skipEventRanges.emplace_back(from, to);
106 m_skipEventRanges.emplace_back(v, v);
108 std::sort(m_skipEventRanges.begin(), m_skipEventRanges.end());
109 if( msgLvl(MSG::DEBUG) ) {
110 std::string skip_ranges_str;
111 for(
const auto& [first, second] : m_skipEventRanges ) {
112 if( !skip_ranges_str.empty() ) skip_ranges_str +=
", ";
113 skip_ranges_str += std::to_string(first);
114 if( first != second) skip_ranges_str += std::format(
"-{}", second);
116 if( !skip_ranges_str.empty() )
131 std::vector<std::string> propVal;
133 bool foundCnvSvc =
false;
134 for (
const auto& property : propVal) {
139 if (!epSvc->setProperty(
"CnvServices", Gaudi::Utils::toString(propVal)).isSuccess()) {
140 ATH_MSG_FATAL(
"Cannot set EventPersistencySvc Property for CnvServices");
141 return StatusCode::FAILURE;
152 for (
const auto& inputCollection : incol) {
153 if (!iomgr->io_register(
this, IIoComponentMgr::IoMode::READ, inputCollection, inputCollection).isSuccess()) {
154 ATH_MSG_FATAL(
"could not register [" << inputCollection <<
"] for output !");
157 ATH_MSG_VERBOSE(
"io_register[" << this->name() <<
"](" << inputCollection <<
") [ok]");
161 return StatusCode::FAILURE;
167 return StatusCode::FAILURE;
171 if (!
reinit().isSuccess()) {
172 return StatusCode::FAILURE;
177 m_incidentSvc->addListener(
this, IncidentType::BeginProcessing, 0);
178 m_incidentSvc->addListener(
this, IncidentType::EndProcessing, 0);
179 return StatusCode::SUCCESS;
192 if (!m_firstEvt.empty()) {
195 m_inputCollectionsChanged =
false;
197 m_headerIterator = 0;
198 bool retError =
false;
199 for (
auto& tool : m_helperTools) {
200 if (!tool->postInitialize().isSuccess()) {
201 ATH_MSG_FATAL(
"Failed to postInitialize() " << tool->name());
207 return StatusCode::FAILURE;
212 if (!m_poolCollectionConverter) {
213 ATH_MSG_INFO(
"No Events found in any Input Collections");
219 FileIncident firstInputFileIncident(name(),
"FirstInputFile", *m_inputCollectionsIterator);
224 return StatusCode::SUCCESS;
228 m_headerIterator = &m_poolCollectionConverter->selectAll();
229 }
catch (std::exception &e) {
230 ATH_MSG_FATAL(
"Cannot open implicit collection - check data/software version.");
232 return StatusCode::FAILURE;
234 while (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
235 if (m_poolCollectionConverter) {
236 m_poolCollectionConverter->disconnectDb().ignore();
237 m_poolCollectionConverter.reset();
239 ++m_inputCollectionsIterator;
241 if (m_poolCollectionConverter) {
242 m_headerIterator = &m_poolCollectionConverter->selectAll();
247 if (!m_poolCollectionConverter || m_headerIterator ==
nullptr) {
251 if (!m_poolCollectionConverter) {
252 return StatusCode::SUCCESS;
254 m_headerIterator = &m_poolCollectionConverter->selectAll();
255 while (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
256 if (m_poolCollectionConverter) {
257 m_poolCollectionConverter->disconnectDb().ignore();
258 m_poolCollectionConverter.reset();
260 ++m_inputCollectionsIterator;
262 if (m_poolCollectionConverter) {
263 m_headerIterator = &m_poolCollectionConverter->selectAll();
269 if (!m_poolCollectionConverter || m_headerIterator ==
nullptr) {
270 return StatusCode::SUCCESS;
272 const Token& headRef = m_headerIterator->eventRef();
279 FileIncident firstInputFileIncident(name(),
"FirstInputFile",
"FID:" + fid, fid);
283 return StatusCode::SUCCESS;
287 if (m_poolCollectionConverter) {
289 m_poolCollectionConverter->disconnectDb().ignore();
290 m_poolCollectionConverter.reset();
295 if (!m_poolCollectionConverter) {
296 ATH_MSG_INFO(
"No Events found in any Input Collections");
299 --m_inputCollectionsIterator;
302 m_headerIterator = &m_poolCollectionConverter->selectAll();
308 return StatusCode::SUCCESS;
314 m_inputFileGuard.reset();
316 IEvtSelector::Context* ctxt(
nullptr);
320 return StatusCode::SUCCESS;
328 for (
auto& tool : m_helperTools) {
329 if (!tool->preFinalize().isSuccess()) {
334 m_headerIterator =
nullptr;
335 if (m_poolCollectionConverter) {
336 m_poolCollectionConverter.reset();
339 return ::AthService::finalize();
345 return StatusCode::SUCCESS;
349 std::lock_guard<CallMutex> lockGuard(
m_callLock);
350 for (
const auto& tool : m_helperTools) {
351 if (!tool->preNext().isSuccess()) {
358 if (
sc.isRecoverable()) {
361 if (
sc.isFailure()) {
362 return StatusCode::FAILURE;
370 && (m_skipEventRanges.empty() ||
m_evtCount < m_skipEventRanges.front().first))
375 return StatusCode::FAILURE;
378 StatusCode status = StatusCode::SUCCESS;
379 for (
const auto& tool : m_helperTools) {
380 StatusCode toolStatus = tool->postNext();
381 if (toolStatus.isRecoverable()) {
382 ATH_MSG_INFO(
"Request skipping event from: " << tool->name());
383 if (status.isSuccess()) {
384 status = StatusCode::RECOVERABLE;
386 }
else if (toolStatus.isFailure()) {
388 status = StatusCode::FAILURE;
391 if (status.isRecoverable()) {
393 }
else if (status.isFailure()) {
402 while( !m_skipEventRanges.empty() &&
m_evtCount >= m_skipEventRanges.front().second ) {
403 m_skipEventRanges.erase(m_skipEventRanges.begin());
408 return StatusCode::SUCCESS;
413 for (
int i = 0; i < jump; i++) {
416 return StatusCode::SUCCESS;
418 return StatusCode::FAILURE;
423 if( m_inputCollectionsChanged ) {
425 if(
rc != StatusCode::SUCCESS )
return rc;
429 if (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
430 m_headerIterator =
nullptr;
432 m_poolCollectionConverter.reset();
435 m_inputFileGuard.reset();
441 if( m_inputCollectionsChanged ) {
443 if(
rc != StatusCode::SUCCESS )
return rc;
446 ++m_inputCollectionsIterator;
449 if (!m_poolCollectionConverter) {
453 return StatusCode::FAILURE;
456 m_headerIterator = &m_poolCollectionConverter->selectAll();
459 return StatusCode::RECOVERABLE;
463 const Token& headRef = m_headerIterator->eventRef();
468 if (guid != m_guid) {
478 m_activeEventsPerSource[guid.toString()] = 0;
483 if (!
m_athenaPoolCnvSvc->setInputAttributes(*m_inputCollectionsIterator).isSuccess()) {
485 return StatusCode::FAILURE;
489 *m_inputCollectionsIterator, m_guid.toString(),
496 "FID:" + m_guid.toString(), m_guid.toString(),
501 return StatusCode::SUCCESS;
510 if (
sc.isRecoverable()) {
513 if (
sc.isFailure()) {
514 return StatusCode::FAILURE;
524 && (m_skipEventRanges.empty() ||
m_evtCount < m_skipEventRanges.front().first))
526 return StatusCode::SUCCESS;
528 while( !m_skipEventRanges.empty() &&
m_evtCount >= m_skipEventRanges.front().second ) {
529 m_skipEventRanges.erase(m_skipEventRanges.begin());
539 return StatusCode::SUCCESS;
544 return StatusCode::FAILURE;
549 for (
int i = 0; i < jump; i++) {
552 return StatusCode::SUCCESS;
554 return StatusCode::FAILURE;
558 if (ctxt.identifier() ==
m_endIter->identifier()) {
560 return StatusCode::SUCCESS;
562 return StatusCode::FAILURE;
568 return StatusCode::SUCCESS;
572 IOpaqueAddress*& iop)
const {
573 std::string tokenStr;
577 tokenStr = (*attrList)[
"eventRef"].data<std::string>();
578 ATH_MSG_DEBUG(
"found AthenaAttribute, name = eventRef = " << tokenStr);
579 }
catch (std::exception &e) {
581 return StatusCode::FAILURE;
585 tokenStr = m_headerIterator->eventRef().toString();
587 auto token = std::make_unique<Token>();
588 token->fromString(tokenStr);
590 return StatusCode::SUCCESS;
594 return StatusCode::SUCCESS;
598 IEvtSelector::Context& )
const {
599 return StatusCode::SUCCESS;
604 if( m_inputCollectionsChanged ) {
606 if(
rc != StatusCode::SUCCESS )
return rc;
614 m_headerIterator =
nullptr;
616 m_inputFileGuard.reset();
617 return StatusCode::RECOVERABLE;
621 m_poolCollectionConverter->disconnectDb().ignore();
623 m_poolCollectionConverter.reset();
628 <<
"\" from the collection list.");
632 m_poolCollectionConverter = std::make_unique<PoolCollectionConverter>(
m_collectionType.value(),
636 if (!m_poolCollectionConverter || !m_poolCollectionConverter->initialize().isSuccess()) {
637 m_headerIterator =
nullptr;
638 ATH_MSG_ERROR(
"seek: Unable to initialize PoolCollectionConverter.");
639 return StatusCode::FAILURE;
642 m_headerIterator = &m_poolCollectionConverter->selectAll();
645 next(*beginIter).ignore();
646 ATH_MSG_DEBUG(
"Token " << m_headerIterator->eventRef().toString());
647 }
catch (std::exception &e) {
648 m_headerIterator =
nullptr;
650 return StatusCode::FAILURE;
654 if (m_headerIterator->seek(evtNum - m_firstEvt[
m_curCollection]) == 0) {
655 m_headerIterator =
nullptr;
657 return StatusCode::FAILURE;
661 return StatusCode::SUCCESS;
673 for (std::size_t i = 0,
imax = m_numEvt.size(); i <
imax; i++) {
674 if (m_numEvt[i] == -1) {
682 int collection_size = 0;
685 collection_size = hi->
size();
688 m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
692 m_numEvt[i] = collection_size;
694 if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
705 return std::accumulate(m_numEvt.begin(), m_numEvt.end(), 0);
708std::unique_ptr<PoolCollectionConverter>
716 ATH_MSG_DEBUG(
"Try item: \"" << *m_inputCollectionsIterator <<
"\" from the collection list.");
717 auto pCollCnv = std::make_unique<PoolCollectionConverter>(
m_collectionType.value(),
718 *m_inputCollectionsIterator,
721 StatusCode status = pCollCnv->initialize();
722 if (!status.isSuccess()) {
725 if (!status.isRecoverable()) {
726 ATH_MSG_ERROR(
"Unable to initialize PoolCollectionConverter.");
727 throw GaudiException(
"Unable to read: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
729 ATH_MSG_ERROR(
"Unable to open: " << *m_inputCollectionsIterator);
730 throw GaudiException(
"Unable to open: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
733 if (!pCollCnv->isValid().isSuccess()) {
735 ATH_MSG_DEBUG(
"No events found in: " << *m_inputCollectionsIterator <<
" skipped!!!");
739 *m_inputCollectionsIterator, {},
740 "eventless " + *m_inputCollectionsIterator);
743 ++m_inputCollectionsIterator;
753 if (!
eventStore()->clearStore().isSuccess()) {
759 const coral::AttributeList& attrList = m_headerIterator->currentRow().attributeList();
766 ATH_CHECK(wh.record(std::move(athAttrList)));
767 return StatusCode::SUCCESS;
772 const auto& row = m_headerIterator->currentRow();
773 attrList->extend( row.tokenName() + suffix,
"string" );
774 (*attrList)[ row.tokenName() + suffix ].data<std::string>() = row.token().toString();
775 ATH_MSG_DEBUG(
"record AthenaAttribute, name = " << row.tokenName() + suffix <<
" = " << row.token().toString() <<
".");
777 std::string eventRef =
"eventRef";
779 eventRef.append(suffix);
781 attrList->extend(eventRef,
"string");
782 (*attrList)[eventRef].data<std::string>() = m_headerIterator->eventRef().toString();
783 ATH_MSG_DEBUG(
"record AthenaAttribute, name = " + eventRef +
" = " << m_headerIterator->eventRef().toString() <<
".");
786 const coral::AttributeList& sourceAttrList = m_headerIterator->currentRow().attributeList();
787 for (
const auto &attr : sourceAttrList) {
788 attrList->extend(attr.specification().name() + suffix, attr.specification().type());
789 (*attrList)[attr.specification().name() + suffix] = attr;
793 return StatusCode::SUCCESS;
798 if (m_poolCollectionConverter) {
799 m_poolCollectionConverter->disconnectDb().ignore();
800 m_poolCollectionConverter.reset();
802 m_headerIterator =
nullptr;
804 if (!iomgr.retrieve().isSuccess()) {
806 return StatusCode::FAILURE;
808 if (!iomgr->io_hasitem(
this)) {
809 ATH_MSG_FATAL(
"IoComponentMgr does not know about myself !");
810 return StatusCode::FAILURE;
813 std::set<std::size_t> updatedIndexes;
815 if (updatedIndexes.find(i) != updatedIndexes.end())
continue;
816 std::string savedName = inputCollections[i];
817 std::string &fname = inputCollections[i];
818 if (!iomgr->io_contains(
this, fname)) {
819 ATH_MSG_ERROR(
"IoComponentMgr does not know about [" << fname <<
"] !");
820 return StatusCode::FAILURE;
822 if (!iomgr->io_retrieve(
this, fname).isSuccess()) {
823 ATH_MSG_FATAL(
"Could not retrieve new value for [" << fname <<
"] !");
824 return StatusCode::FAILURE;
826 if (savedName != fname) {
827 ATH_MSG_DEBUG(
"Mapping value for [" << savedName <<
"] to [" << fname <<
"]");
830 updatedIndexes.insert(i);
831 for (std::size_t j = i + 1; j <
imax; j++) {
832 if (inputCollections[j] == savedName) {
833 inputCollections[j] = fname;
834 updatedIndexes.insert(j);
847 m_inputFileGuard.reset();
848 if (m_poolCollectionConverter) {
849 m_poolCollectionConverter->disconnectDb().ignore();
850 m_poolCollectionConverter.reset();
852 return StatusCode::SUCCESS;
864 if (inc.type() == IncidentType::BeginProcessing) {
875 ATH_MSG_WARNING(
"could not read event source ID from incident event context");
878 if( m_activeEventsPerSource.find( fid ) == m_activeEventsPerSource.end()) {
879 ATH_MSG_DEBUG(
"Incident handler ignoring unknown input FID: " << fid );
882 ATH_MSG_DEBUG(
"** MN Incident handler " << inc.type() <<
" Event source ID=" << fid );
883 if( inc.type() == IncidentType::BeginProcessing ) {
885 m_activeEventsPerSource[fid]++;
886 }
else if( inc.type() == IncidentType::EndProcessing ) {
887 m_activeEventsPerSource[fid]--;
891 if( msgLvl(MSG::DEBUG) ) {
892 for(
auto& source: m_activeEventsPerSource )
893 msg(MSG::DEBUG) <<
"SourceID: " << source.first <<
" active events: " << source.second <<
endmsg;
904 if( m_activeEventsPerSource.find(fid) != m_activeEventsPerSource.end()
905 && m_activeEventsPerSource[fid] <= 0 && m_guid != fid ) {
911 m_activeEventsPerSource.erase( fid );
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
This file contains the class definition for the EventContextAthenaPool class.
This file contains the class definition for the EventSelectorAthenaPool class.
This file contains the class definition for the IPoolSvc interface class.
This file contains the class definition for the PoolCollectionConverter class.
Handle class for reading from StoreGate.
Handle class for recording to StoreGate.
This file contains the class definition for the TokenAddress class.
This file contains the class definition for the Token class (migrated from POOL).
An AttributeList represents a logical row of attributes in a metadata table.
This class provides the context to access an event from POOL persistent store.
EventSelectorAthenaPool(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Gaudi::CheckedProperty< uint64_t > m_firstEventNo
virtual StatusCode start() override
std::unique_ptr< PoolCollectionConverter > getCollectionCnv(bool throwIncidents=false) const
Return pointer to new PoolCollectionConverter.
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
virtual StatusCode initialize() override
Required of all Gaudi Services.
Gaudi::Property< int > m_skipEvents
SkipEvents, numbers of events to skip: default = 0.
Gaudi::Property< bool > m_processMetadata
ProcessMetadata, switch on firing of FileIncidents which will trigger processing of metadata: default...
virtual int curEvent(const Context &ctxt) const override
Return the current event number.
virtual StatusCode createContext(IEvtSelector::Context *&ctxt) const override
create context
virtual StatusCode io_finalize() override
Callback method to finalize the internal state of the component for I/O purposes (e....
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp
Gaudi::CheckedProperty< uint32_t > m_initTimeStamp
void inputCollectionsHandler(Gaudi::Details::PropertyBase &)
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
virtual StatusCode resetCriteria(const std::string &criteria, IEvtSelector::Context &ctxt) const override
Set a selection criteria.
StatusCode reinit() const
Reinitialize the service when a fork() occurred/was-issued.
StoreGateSvc * eventStore() const
Return pointer to active event SG.
virtual StatusCode releaseContext(IEvtSelector::Context *&ctxt) const override
virtual StatusCode stop() override
ServiceHandle< IIncidentSvc > m_incidentSvc
virtual StatusCode next(IEvtSelector::Context &ctxt) const override
virtual ~EventSelectorAthenaPool()
Destructor.
ToolHandle< IAthenaSelectorTool > m_counterTool
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
std::string m_attrListKey
AttributeList SG key.
virtual StatusCode seek(Context &ctxt, int evtnum) const override
Seek to a given event number.
SG::SlotSpecificObj< SG::SourceID > m_sourceID
virtual StatusCode last(IEvtSelector::Context &ctxt) const override
Gaudi::CheckedProperty< uint64_t > m_eventsPerRun
Gaudi::CheckedProperty< uint32_t > m_eventsPerLB
virtual StatusCode createAddress(const IEvtSelector::Context &ctxt, IOpaqueAddress *&iop) const override
virtual void handle(const Incident &incident) override
Incident service handle listening for BeginProcessing and EndProcessing.
virtual StatusCode previous(IEvtSelector::Context &ctxt) const override
ServiceHandle< IAthenaPoolCnvSvc > m_athenaPoolCnvSvc
virtual StatusCode rewind(IEvtSelector::Context &ctxt) const override
Gaudi::Property< bool > m_keepInputFilesOpen
KeepInputFilesOpen, boolean flag to keep files open after PoolCollection reaches end: default = false...
Gaudi::CheckedProperty< uint32_t > m_oldRunNo
Gaudi::Property< std::string > m_collectionType
CollectionType, type of the collection: default = "ImplicitCollection".
virtual StatusCode finalize() override
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Disconnect DB if all events from the source FID were processed and the Selector moved to another file...
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
InputCollections, vector with names of the input collections.
EventContextAthenaPool * m_endIter
std::atomic_int m_evtCount
std::atomic_bool m_firedIncident
std::atomic_long m_curCollection
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context &ctxt) const override
Handle file transition at the next iteration.
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
Gaudi::CheckedProperty< uint32_t > m_runNo
The following are included for compatibility with McEventSelector and are not really used.
virtual int size(Context &ctxt) const override
Return the size of the collection.
int findEvent(int evtNum) const
Search for event with number evtNum.
Gaudi::Property< std::string > m_skipEventRangesProp
Skip Events - comma separated list of event to skip, ranges with '-': <start> - <end>.
Gaudi::CheckedProperty< uint32_t > m_firstLBNo
This class provides a encapsulation of a GUID/UUID/CLSID/IID data structure (128 bit number).
static const Guid & null() noexcept
NULL-Guid: static class method.
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
This class provides an interface to POOL collections.
StatusCode isValid() const
Check whether has valid pool::ICollection*.
pool::ICollectionCursor & selectAll()
StatusCode initialize()
Required by all Gaudi Services.
virtual bool isValid() override final
Can the handle be successfully dereferenced?
The Athena Transient Store API.
static StoreGateSvc * currentStoreGate()
get current StoreGate
This class provides a Generic Transient Address for POOL tokens.
This class provides a token that identifies in a unique way objects on the persistent storage.
virtual const std::string toString() const
Retrieve the string representation of the token.
int technology() const
Access technology type.
const Guid & dbID() const
Access database identifier.
An interface used to navigate the result of a query on a collection.
virtual std::size_t size()=0
Returns the size of the collection.
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
bool hasExtendedEventContext(const EventContext &ctx)
Test whether a context object has an extended context installed.
StatusCode parse(std::tuple< Tup... > &tup, const Gaudi::Parsers::InputData &input)
static const DbType POOL_StorageType
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.
static constexpr CLID ID()