ATLAS Offline Software
Loading...
Searching...
No Matches
DoubleEventSelectorAthenaPool Class Reference

This class is the EventSelector for event data. More...

#include <DoubleEventSelectorAthenaPool.h>

Inheritance diagram for DoubleEventSelectorAthenaPool:
Collaboration diagram for DoubleEventSelectorAthenaPool:

Public Member Functions

 DoubleEventSelectorAthenaPool (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor.
virtual ~DoubleEventSelectorAthenaPool ()
 Destructor.
virtual StatusCode initialize () override
 Initialize function.
virtual StatusCode next (IEvtSelector::Context &ctxt) const override
virtual StatusCode next (IEvtSelector::Context &ctxt, int jump) const override
virtual StatusCode seek (Context &ctxt, int evtNum) const override
 Seek to a given event number.
virtual int size (Context &ctxt) const override
 Return the size of the collection.
virtual void handle (const Incident &incident) override
 Incident service handle listening for BeginProcessing and EndProcessing.
virtual StatusCode start () override
virtual StatusCode stop () override
virtual StatusCode finalize () override
virtual StatusCode createContext (IEvtSelector::Context *&ctxt) const override
 create context
virtual StatusCode previous (IEvtSelector::Context &ctxt) const override
virtual StatusCode previous (IEvtSelector::Context &ctxt, int jump) const override
virtual StatusCode last (IEvtSelector::Context &ctxt) const override
virtual StatusCode rewind (IEvtSelector::Context &ctxt) const override
virtual StatusCode createAddress (const IEvtSelector::Context &ctxt, IOpaqueAddress *&iop) const override
virtual StatusCode releaseContext (IEvtSelector::Context *&ctxt) const override
virtual StatusCode resetCriteria (const std::string &criteria, IEvtSelector::Context &ctxt) const override
 Set a selection criteria.
virtual int curEvent (const Context &ctxt) const override
 Return the current event number.
virtual StatusCode makeServer (int num) override
 Make this a server.
virtual StatusCode makeClient (int num) override
 Make this a client.
virtual StatusCode share (int evtnum) override
 Request to share a given event number.
virtual StatusCode readEvent (int maxevt) override
 Read the next maxevt events.
virtual StatusCode io_reinit () override
 Callback method to reinitialize the internal state of the component for I/O purposes (e.g. upon fork(2))
virtual StatusCode io_finalize () override
 Callback method to finalize the internal state of the component for I/O purposes (e.g. before fork(2))

Protected Member Functions

virtual StatusCode nextHandleFileTransition (IEvtSelector::Context &ctxt) const override
 Handle file transition at the next iteration.
virtual StatusCode nextWithSkip (IEvtSelector::Context &ctxt) const override
 Go to next event and skip if necessary.
virtual StatusCode fillAttributeList (coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
 Fill AttributeList with specific items from the selector and a suffix.
virtual bool disconnectIfFinished (const SG::SourceID &fid) const override

Private Types

typedef std::mutex CallMutex

Private Member Functions

virtual StatusCode recordAttributeList () const override
 Record AttributeList in StoreGate.
StoreGateSvceventStore () const
 Return pointer to active event SG.
StatusCode reinit () const
 Reinitialize the service when a fork() occurred/was-issued.
std::unique_ptr< PoolCollectionConvertergetCollectionCnv (bool throwIncidents=false) const
 Return pointer to new PoolCollectionConverter.
int findEvent (int evtNum) const
 Search for event with number evtNum.
void fireEndFileIncidents (bool isLastFile) const
 Fires the EndInputFile incident (if there is an open file) at end of selector.
void inputCollectionsHandler (Gaudi::Details::PropertyBase &)

Private Attributes

ServiceHandle< ISecondaryEventSelectorm_secondarySelector {this, "SecondaryEventSelector", "SecondaryEventSelector", ""}
Gaudi::Property< std::string > m_secondaryAttrListSuffix {this, "SecondaryAttrListSuffix", "secondary", ""}
bool m_secondaryByteStream {}
SG::SlotSpecificObj< SG::SourceIDm_sourceID1
SG::SlotSpecificObj< SG::SourceIDm_sourceID2
EventContextAthenaPoolm_endIter {}
std::unique_ptr< PoolCollectionConverter > m_poolCollectionConverter ATLAS_THREAD_SAFE {}
pool::ICollectionCursor *m_headerIterator ATLAS_THREAD_SAFE {}
Guid m_guid ATLAS_THREAD_SAFE {}
std::map< SG::SourceID, int > m_activeEventsPerSource ATLAS_THREAD_SAFE
std::vector< std::string >::const_iterator m_inputCollectionsIterator ATLAS_THREAD_SAFE
bool m_inputCollectionsChanged ATLAS_THREAD_SAFE
 flag to notify the EvSel that the inputs were changed and reinit() needs to be called ASAP
ToolHandleArray< IAthenaSelectorTool > m_helperTools ATLAS_THREAD_SAFE {this, "HelperTools", {}, ""}
 HelperTools, vector of names of AlgTools that are executed by the EventSelector.
std::vector< int > m_numEvt ATLAS_THREAD_SAFE
std::vector< int > m_firstEvt ATLAS_THREAD_SAFE
std::vector< std::pair< long, long > > m_skipEventRanges ATLAS_THREAD_SAFE
ServiceHandle< IAthenaPoolCnvSvcm_athenaPoolCnvSvc {this, "ConversionService", "AthenaPoolCnvSvc", ""}
ServiceHandle< IIncidentSvc > m_incidentSvc {this, "IncidentSvc", "IncidentSvc", ""}
Gaudi::Property< bool > m_isSecondary {this, "IsSecondary", false, ""}
 IsSecondary, know if this is an instance of secondary event selector.
Gaudi::Property< bool > m_processMetadata {this, "ProcessMetadata", true, ""}
 ProcessMetadata, switch on firing of FileIncidents which will trigger processing of metadata: default = true.
Gaudi::Property< std::string > m_collectionType {this, "CollectionType", "ImplicitCollection", ""}
 CollectionType, type of the collection: default = "ImplicitCollection".
std::string m_attrListKey {"Input"}
 AttributeList SG key.
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp {this, "InputCollections", {}, ""}
 InputCollections, vector with names of the input collections.
Gaudi::Property< bool > m_keepInputFilesOpen {this, "KeepInputFilesOpen", false, ""}
 KeepInputFilesOpen, boolean flag to keep files open after PoolCollection reaches end: default = false.
ToolHandle< IAthenaSelectorToolm_counterTool {this, "CounterTool", "", ""}
ToolHandle< IAthenaIPCToolm_eventStreamingTool {this, "SharedMemoryTool", "", ""}
Gaudi::Property< int > m_makeStreamingToolClient {this, "MakeStreamingToolClient",0}
 Make this instance a Streaming Client during first iteration automatically.
Gaudi::CheckedProperty< uint32_t > m_runNo {this, "RunNumber", 0, ""}
 The following are included for compatibility with McEventSelector and are not really used.
Gaudi::CheckedProperty< uint32_t > m_oldRunNo {this, "OldRunNumber", 0, ""}
Gaudi::Property< bool > m_overrideRunNumber {this, "OverrideRunNumber", false, ""}
Gaudi::Property< bool > m_overrideRunNumberFromInput {this, "OverrideRunNumberFromInput", false, ""}
Gaudi::CheckedProperty< uint64_t > m_firstEventNo {this, "FirstEvent", 1, ""}
Gaudi::CheckedProperty< uint64_t > m_eventsPerRun {this, "EventsPerRun", 1000000, ""}
Gaudi::CheckedProperty< uint32_t > m_firstLBNo {this, "FirstLB", 0, ""}
Gaudi::CheckedProperty< uint32_t > m_eventsPerLB {this, "EventsPerLB", 1000, ""}
Gaudi::CheckedProperty< uint32_t > m_initTimeStamp {this, "InitialTimeStamp", 0, ""}
Gaudi::Property< uint32_t > m_timeStampInterval {this, "TimeStampInterval", 0, ""}
std::atomic_long m_curCollection {}
Gaudi::Property< int > m_skipEvents {this, "SkipEvents", 0, ""}
 SkipEvents, numbers of events to skip: default = 0.
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp {this, "SkipEventSequence", {}, ""}
Gaudi::Property< std::string > m_skipEventRangesProp {this, "SkipEventRanges", {}, ""}
 Skip Events - comma separated list of event to skip, ranges with '-': <start> - <end>
std::atomic_int m_evtCount {}
std::atomic_bool m_firedIncident {}
CallMutex m_callLock
SG::SlotSpecificObj< SG::SourceIDm_sourceID

Detailed Description

This class is the EventSelector for event data.

Definition at line 26 of file DoubleEventSelectorAthenaPool.h.

Member Typedef Documentation

◆ CallMutex

typedef std::mutex EventSelectorAthenaPool::CallMutex
privateinherited

Definition at line 239 of file EventSelectorAthenaPool.h.

Constructor & Destructor Documentation

◆ DoubleEventSelectorAthenaPool()

DoubleEventSelectorAthenaPool::DoubleEventSelectorAthenaPool ( const std::string & name,
ISvcLocator * pSvcLocator )

Standard Service Constructor.

Definition at line 22 of file DoubleEventSelectorAthenaPool.cxx.

23 : EventSelectorAthenaPool(name, pSvcLocator)
24{
25}
EventSelectorAthenaPool(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.

◆ ~DoubleEventSelectorAthenaPool()

DoubleEventSelectorAthenaPool::~DoubleEventSelectorAthenaPool ( )
virtual

Destructor.

Definition at line 28 of file DoubleEventSelectorAthenaPool.cxx.

28{}

Member Function Documentation

◆ createAddress()

StatusCode EventSelectorAthenaPool::createAddress ( const IEvtSelector::Context & ctxt,
IOpaqueAddress *& iop ) const
overridevirtualinherited
Parameters
ctxt[IN] current event context.
iop[OUT] pointer to IOpaqueAddress reference of selection context.

Definition at line 700 of file EventSelectorAthenaPool.cxx.

701 {
702 std::string tokenStr;
703 SG::ReadHandle<AthenaAttributeList> attrList(m_attrListKey, eventStore()->name());
704 if (attrList.isValid()) {
705 try {
706 tokenStr = (*attrList)["eventRef"].data<std::string>();
707 ATH_MSG_DEBUG("found AthenaAttribute, name = eventRef = " << tokenStr);
708 } catch (std::exception &e) {
709 ATH_MSG_ERROR(e.what());
710 return StatusCode::FAILURE;
711 }
712 } else {
713 ATH_MSG_WARNING("Cannot find AthenaAttribute, key = " << m_attrListKey);
714 tokenStr = m_headerIterator->eventRef().toString();
715 }
716 auto token = std::make_unique<Token>();
717 token->fromString(tokenStr);
718 iop = new TokenAddress(pool::POOL_StorageType.type(), ClassID_traits<DataHeader>::ID(), "", "EventSelector", IPoolSvc::kInputStream, std::move(token));
719 return StatusCode::SUCCESS;
720}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
StoreGateSvc * eventStore() const
Return pointer to active event SG.
std::string m_attrListKey
AttributeList SG key.
@ kInputStream
Definition IPoolSvc.h:40
static const DbType POOL_StorageType
Definition DbType.h:98

◆ createContext()

StatusCode EventSelectorAthenaPool::createContext ( IEvtSelector::Context *& ctxt) const
overridevirtualinherited

create context

Definition at line 398 of file EventSelectorAthenaPool.cxx.

398 {
399 ctxt = new EventContextAthenaPool(this);
400 return StatusCode::SUCCESS;
401}

◆ curEvent()

int EventSelectorAthenaPool::curEvent ( const Context & ctxt) const
overridevirtualinherited

Return the current event number.

Parameters
ctxt[IN/OUT] current event context.

Definition at line 793 of file EventSelectorAthenaPool.cxx.

793 {
794 return(m_evtCount);
795}

◆ disconnectIfFinished()

bool EventSelectorAthenaPool::disconnectIfFinished ( const SG::SourceID & fid) const
overrideprotectedvirtualinherited

Definition at line 1157 of file EventSelectorAthenaPool.cxx.

1158{
1159 if( m_eventStreamingTool.empty() && m_activeEventsPerSource.find(fid) != m_activeEventsPerSource.end()
1160 && m_activeEventsPerSource[fid] <= 0 && m_guid != fid ) {
1161 // Explicitly disconnect file corresponding to old FID to release memory
1162 if( !m_keepInputFilesOpen.value() ) {
1163 // Assume that the end of collection file indicates the end of payload file.
1164 if (m_processMetadata.value()) {
1165 FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + fid, fid);
1166 m_incidentSvc->fireIncident(endInputFileIncident);
1167 }
1168 ATH_MSG_INFO("Disconnecting input sourceID: " << fid );
1169 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb("FID:" + fid, IPoolSvc::kInputStream).ignore();
1170 m_activeEventsPerSource.erase( fid );
1171 return true;
1172 }
1173 }
1174 return false;
1175}
#define ATH_MSG_INFO(x)
Gaudi::Property< bool > m_processMetadata
ProcessMetadata, switch on firing of FileIncidents which will trigger processing of metadata: default...
ServiceHandle< IIncidentSvc > m_incidentSvc
ToolHandle< IAthenaIPCTool > m_eventStreamingTool
ServiceHandle< IAthenaPoolCnvSvc > m_athenaPoolCnvSvc
Gaudi::Property< bool > m_keepInputFilesOpen
KeepInputFilesOpen, boolean flag to keep files open after PoolCollection reaches end: default = false...

◆ eventStore()

StoreGateSvc * EventSelectorAthenaPool::eventStore ( ) const
privateinherited

Return pointer to active event SG.

Definition at line 83 of file EventSelectorAthenaPool.cxx.

83 {
85}
static StoreGateSvc * currentStoreGate()
get current StoreGate

◆ fillAttributeList()

StatusCode EventSelectorAthenaPool::fillAttributeList ( coral::AttributeList * attrList,
const std::string & suffix,
bool copySource ) const
overrideprotectedvirtualinherited

Fill AttributeList with specific items from the selector and a suffix.

Definition at line 1021 of file EventSelectorAthenaPool.cxx.

1022{
1023 const pool::TokenList& tokenList = m_headerIterator->currentRow().tokenList();
1024 for (pool::TokenList::const_iterator iter = tokenList.begin(), last = tokenList.end(); iter != last; ++iter) {
1025 attrList->extend(iter.tokenName() + suffix, "string");
1026 (*attrList)[iter.tokenName() + suffix].data<std::string>() = iter->toString();
1027 ATH_MSG_DEBUG("record AthenaAttribute, name = " << iter.tokenName() + suffix << " = " << iter->toString() << ".");
1028 }
1029
1030 std::string eventRef = "eventRef";
1031 if (m_isSecondary.value()) {
1032 eventRef.append(suffix);
1033 }
1034 attrList->extend(eventRef, "string");
1035 (*attrList)[eventRef].data<std::string>() = m_headerIterator->eventRef().toString();
1036 ATH_MSG_DEBUG("record AthenaAttribute, name = " + eventRef + " = " << m_headerIterator->eventRef().toString() << ".");
1037
1038 if (copySource) {
1039 const coral::AttributeList& sourceAttrList = m_headerIterator->currentRow().attributeList();
1040 for (const auto &attr : sourceAttrList) {
1041 attrList->extend(attr.specification().name() + suffix, attr.specification().type());
1042 (*attrList)[attr.specification().name() + suffix] = attr;
1043 }
1044 }
1045
1046 return StatusCode::SUCCESS;
1047}
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
virtual StatusCode last(IEvtSelector::Context &ctxt) const override
iterator begin()
Returns a forward iterator pointing to first element in Token list.
Definition TokenList.h:218
iterator end()
Returns a forward iterator pointing to last element in Token list.
Definition TokenList.h:224

◆ finalize()

StatusCode EventSelectorAthenaPool::finalize ( )
overridevirtualinherited

Definition at line 377 of file EventSelectorAthenaPool.cxx.

377 {
378 if (m_eventStreamingTool.empty() || !m_eventStreamingTool->isClient()) {
379 if (!m_counterTool.empty() && !m_counterTool->preFinalize().isSuccess()) {
380 ATH_MSG_WARNING("Failed to preFinalize() CounterTool");
381 }
382 for (auto& tool : m_helperTools) {
383 if (!tool->preFinalize().isSuccess()) {
384 ATH_MSG_WARNING("Failed to preFinalize() " << tool->name());
385 }
386 }
387 }
388 delete m_endIter; m_endIter = nullptr;
389 m_headerIterator = nullptr;
390 if (m_poolCollectionConverter) {
391 m_poolCollectionConverter.reset();
392 }
393 // Finalize the Service base class.
394 return ::AthService::finalize();
395}
ToolHandle< IAthenaSelectorTool > m_counterTool
EventContextAthenaPool * m_endIter

◆ findEvent()

int EventSelectorAthenaPool::findEvent ( int evtNum) const
privateinherited

Search for event with number evtNum.

Definition at line 801 of file EventSelectorAthenaPool.cxx.

801 {
802 for (std::size_t i = 0, imax = m_numEvt.size(); i < imax; i++) {
803 if (m_numEvt[i] == -1) {
804 PoolCollectionConverter pcc(m_collectionType.value(),
805 m_inputCollectionsProp.value()[i],
807 m_athenaPoolCnvSvc->getPoolSvc());
808 if (!pcc.initialize().isSuccess()) {
809 break;
810 }
811 int collection_size = 0;
812 if (pcc.isValid()) {
813 pool::ICollectionCursor* hi = &pcc.selectAll();
814 collection_size = hi->size();
815 }
816 if (i > 0) {
817 m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
818 } else {
819 m_firstEvt[i] = 0;
820 }
821 m_numEvt[i] = collection_size;
822 }
823 if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
824 return(i);
825 }
826 }
827 return(-1);
828}
int imax(int i, int j)
Gaudi::Property< std::string > m_collectionType
CollectionType, type of the collection: default = "ImplicitCollection".
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
InputCollections, vector with names of the input collections.
virtual std::size_t size()=0
Returns the size of the collection.

◆ fireEndFileIncidents()

void EventSelectorAthenaPool::fireEndFileIncidents ( bool isLastFile) const
privateinherited

Fires the EndInputFile incident (if there is an open file) at end of selector.

Definition at line 360 of file EventSelectorAthenaPool.cxx.

360 {
361 if (m_processMetadata.value()) {
362 if (m_evtCount >= 0) {
363 // Assume that the end of collection file indicates the end of payload file.
364 if (m_guid != Guid::null()) {
365 // Fire EndInputFile incident
366 FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + m_guid.toString(), m_guid.toString());
367 m_incidentSvc->fireIncident(endInputFileIncident);
368 }
369 }
370 if (isLastFile && m_firedIncident) {
371 m_firedIncident = false;
372 }
373 }
374}
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14

◆ getCollectionCnv()

std::unique_ptr< PoolCollectionConverter > EventSelectorAthenaPool::getCollectionCnv ( bool throwIncidents = false) const
privateinherited

Return pointer to new PoolCollectionConverter.

Definition at line 963 of file EventSelectorAthenaPool.cxx.

963 {
964 while (m_inputCollectionsIterator != m_inputCollectionsProp.value().end()) {
965 if (m_curCollection != 0) {
966 m_numEvt[m_curCollection] = m_evtCount - m_firstEvt[m_curCollection];
968 m_firstEvt[m_curCollection] = m_evtCount;
969 }
970 ATH_MSG_DEBUG("Try item: \"" << *m_inputCollectionsIterator << "\" from the collection list.");
971 auto pCollCnv = std::make_unique<PoolCollectionConverter>(m_collectionType.value(),
972 *m_inputCollectionsIterator,
974 m_athenaPoolCnvSvc->getPoolSvc());
975 StatusCode status = pCollCnv->initialize();
976 if (!status.isSuccess()) {
977 // Close previous collection.
978 pCollCnv.reset();
979 if (!status.isRecoverable()) {
980 ATH_MSG_ERROR("Unable to initialize PoolCollectionConverter.");
981 throw GaudiException("Unable to read: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
982 } else {
983 ATH_MSG_ERROR("Unable to open: " << *m_inputCollectionsIterator);
984 throw GaudiException("Unable to open: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
985 }
986 } else {
987 if (!pCollCnv->isValid().isSuccess()) {
988 pCollCnv.reset();
989 ATH_MSG_DEBUG("No events found in: " << *m_inputCollectionsIterator << " skipped!!!");
990 if (throwIncidents && m_processMetadata.value()) {
991 FileIncident beginInputFileIncident(name(), "BeginInputFile", *m_inputCollectionsIterator);
992 m_incidentSvc->fireIncident(beginInputFileIncident);
993 FileIncident endInputFileIncident(name(), "EndInputFile", "eventless " + *m_inputCollectionsIterator);
994 m_incidentSvc->fireIncident(endInputFileIncident);
995 }
996 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb(*m_inputCollectionsIterator).ignore();
997 ++m_inputCollectionsIterator;
998 } else {
999 return(pCollCnv);
1000 }
1001 }
1002 }
1003 return(nullptr);
1004}
::StatusCode StatusCode
StatusCode definition for legacy code.
status
Definition merge.py:16

◆ handle()

void DoubleEventSelectorAthenaPool::handle ( const Incident & incident)
overridevirtual

Incident service handle listening for BeginProcessing and EndProcessing.

Reimplemented from EventSelectorAthenaPool.

Definition at line 190 of file DoubleEventSelectorAthenaPool.cxx.

191{
192 ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::handle");
193
194 if (not Atlas::hasExtendedEventContext(inc.context()) ) {
195 ATH_MSG_WARNING("No extended event context available.");
196 return;
197 }
198
199 SG::SourceID fid1;
200 if (inc.type() == IncidentType::BeginProcessing) {
201 if ( Atlas::hasExtendedEventContext(inc.context()) ) {
202 fid1 = Atlas::getExtendedEventContext(inc.context()).proxy()->sourceID();
203 }
204 *m_sourceID1.get(inc.context()) = fid1;
205 }
206 else {
207 fid1 = *m_sourceID1.get(inc.context());
208 }
209
210 if( fid1.empty() ) {
211 ATH_MSG_WARNING("could not read event source ID from incident event context with key EventSelector");
212 return;
213 }
214
215 ATH_MSG_DEBUG("** MN Incident handler " << inc.type() << " Event source ID=" << fid1 );
216 if( inc.type() == IncidentType::BeginProcessing ) {
217 // increment the events-per-file counter for FID
218 m_activeEventsPerSource[fid1]++;
219 } else if( inc.type() == IncidentType::EndProcessing ) {
220 m_activeEventsPerSource[fid1]--;
221 disconnectIfFinished( fid1 );
222 *m_sourceID1.get(inc.context()) = "";
223 }
224 if( msgLvl(MSG::DEBUG) ) {
225 for( auto& source: m_activeEventsPerSource )
226 msg(MSG::DEBUG) << "SourceID: " << source.first << " active events: " << source.second << endmsg;
227 }
228
229 // Nothing to do if secondary event selector is ByteStream
231 return;
232 }
233
234 // Secondary guid
235 SG::SourceID fid2;
236 if (inc.type() == IncidentType::BeginProcessing) {
237 if ( Atlas::hasExtendedEventContext(inc.context()) ) {
238 fid2 = Atlas::getExtendedEventContext(inc.context()).proxy()->sourceID("SecondaryEventSelector");
239 }
240 *m_sourceID2.get(inc.context()) = fid2;
241 }
242 else {
243 fid2 = *m_sourceID2.get(inc.context());
244 }
245
246 if( fid2.empty() ) {
247 ATH_MSG_WARNING("could not read event source ID from incident event context with key SecondaryEventSelector");
248 return;
249 }
250
251 ATH_MSG_DEBUG("** MN Incident handler " << inc.type() << " Event source ID=" << fid2 );
252 if( inc.type() == IncidentType::BeginProcessing ) {
253 // increment the events-per-file counter for FID
254 m_activeEventsPerSource[fid2]++;
255 } else if( inc.type() == IncidentType::EndProcessing ) {
256 m_activeEventsPerSource[fid2]--;
257 m_secondarySelector->disconnectIfFinished( fid2 );
258 *m_sourceID2.get(inc.context()) = "";
259 }
260 if( msgLvl(MSG::DEBUG) ) {
261 for( auto& source: m_activeEventsPerSource )
262 msg(MSG::DEBUG) << "SourceID: " << source.first << " active events: " << source.second << endmsg;
263 }
264}
#define endmsg
SG::SlotSpecificObj< SG::SourceID > m_sourceID2
SG::SlotSpecificObj< SG::SourceID > m_sourceID1
ServiceHandle< ISecondaryEventSelector > m_secondarySelector
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
bool hasExtendedEventContext(const EventContext &ctx)
Test whether a context object has an extended context installed.
MsgStream & msg
Definition testRead.cxx:32

◆ initialize()

StatusCode DoubleEventSelectorAthenaPool::initialize ( )
overridevirtual

Initialize function.

Reimplemented from EventSelectorAthenaPool.

Definition at line 31 of file DoubleEventSelectorAthenaPool.cxx.

32{
34
36 if (dynamic_cast<EventSelectorAthenaPool *>(&*(m_secondarySelector)) == nullptr) {
38 }
39
40 return StatusCode::SUCCESS;
41}
#define ATH_CHECK
Evaluate an expression and check for errors.
virtual StatusCode initialize() override
Required of all Gaudi Services.

◆ inputCollectionsHandler()

void EventSelectorAthenaPool::inputCollectionsHandler ( Gaudi::Details::PropertyBase & )
privateinherited

Definition at line 74 of file EventSelectorAthenaPool.cxx.

74 {
75 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
76 m_inputCollectionsChanged = true;
77 }
78}

◆ io_finalize()

StatusCode EventSelectorAthenaPool::io_finalize ( )
overridevirtualinherited

Callback method to finalize the internal state of the component for I/O purposes (e.g. before fork(2))

Definition at line 1101 of file EventSelectorAthenaPool.cxx.

1101 {
1102 ATH_MSG_INFO("I/O finalization...");
1103 if (m_poolCollectionConverter) {
1104 m_poolCollectionConverter->disconnectDb().ignore();
1105 m_poolCollectionConverter.reset();
1106 }
1107 return StatusCode::SUCCESS;
1108}

◆ io_reinit()

StatusCode EventSelectorAthenaPool::io_reinit ( )
overridevirtualinherited

Callback method to reinitialize the internal state of the component for I/O purposes (e.g. upon fork(2))

Definition at line 1049 of file EventSelectorAthenaPool.cxx.

1049 {
1050 ATH_MSG_INFO("I/O reinitialization...");
1051 if (m_poolCollectionConverter) {
1052 m_poolCollectionConverter->disconnectDb().ignore();
1053 m_poolCollectionConverter.reset();
1054 }
1055 m_headerIterator = nullptr;
1056 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
1057 if (!iomgr.retrieve().isSuccess()) {
1058 ATH_MSG_FATAL("Could not retrieve IoComponentMgr !");
1059 return StatusCode::FAILURE;
1060 }
1061 if (!iomgr->io_hasitem(this)) {
1062 ATH_MSG_FATAL("IoComponentMgr does not know about myself !");
1063 return StatusCode::FAILURE;
1064 }
1065 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
1066 m_guid = Guid::null();
1067 return(this->reinit());
1068 }
1069 std::vector<std::string> inputCollections = m_inputCollectionsProp.value();
1070 std::set<std::size_t> updatedIndexes;
1071 for (std::size_t i = 0, imax = m_inputCollectionsProp.value().size(); i < imax; i++) {
1072 if (updatedIndexes.find(i) != updatedIndexes.end()) continue;
1073 std::string savedName = inputCollections[i];
1074 std::string &fname = inputCollections[i];
1075 if (!iomgr->io_contains(this, fname)) {
1076 ATH_MSG_ERROR("IoComponentMgr does not know about [" << fname << "] !");
1077 return StatusCode::FAILURE;
1078 }
1079 if (!iomgr->io_retrieve(this, fname).isSuccess()) {
1080 ATH_MSG_FATAL("Could not retrieve new value for [" << fname << "] !");
1081 return StatusCode::FAILURE;
1082 }
1083 if (savedName != fname) {
1084 ATH_MSG_DEBUG("Mapping value for [" << savedName << "] to [" << fname << "]");
1085 m_athenaPoolCnvSvc->getPoolSvc()->renamePfn(savedName, fname);
1086 }
1087 updatedIndexes.insert(i);
1088 for (std::size_t j = i + 1; j < imax; j++) {
1089 if (inputCollections[j] == savedName) {
1091 updatedIndexes.insert(j);
1092 }
1093 }
1094 }
1095 // all good... copy over.
1097 m_guid = Guid::null();
1098 return reinit();
1099}
#define ATH_MSG_FATAL(x)
StatusCode reinit() const
Reinitialize the service when a fork() occurred/was-issued.
float j(const xAOD::IParticle &, const xAOD::TrackMeasurementValidation &hit, const Eigen::Matrix3d &jab_inv)

◆ last()

StatusCode EventSelectorAthenaPool::last ( IEvtSelector::Context & ctxt) const
overridevirtualinherited
Parameters
ctxt[IN/OUT] current event context is interated to last event.

Definition at line 686 of file EventSelectorAthenaPool.cxx.

686 {
687 if (ctxt.identifier() == m_endIter->identifier()) {
688 ATH_MSG_DEBUG("last(): Last event in InputStream.");
689 return StatusCode::SUCCESS;
690 }
691 return StatusCode::FAILURE;
692}

◆ makeClient()

StatusCode EventSelectorAthenaPool::makeClient ( int num)
overridevirtualinherited

Make this a client.

Definition at line 856 of file EventSelectorAthenaPool.cxx.

856 {
857 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
858 if (ds == nullptr) {
859 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
860 return StatusCode::FAILURE;
861 }
862 if (ds->makeClient(num + 1).isFailure()) {
863 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to DataStreaming client");
864 return StatusCode::FAILURE;
865 }
866 if (m_eventStreamingTool.empty()) {
867 return StatusCode::SUCCESS;
868 }
869 ATH_MSG_DEBUG("makeClient: " << m_eventStreamingTool << " = " << num);
870 std::string dummyStr;
871 return(m_eventStreamingTool->makeClient(0, dummyStr));
872}

◆ makeServer()

StatusCode EventSelectorAthenaPool::makeServer ( int num)
overridevirtualinherited

Make this a server.

Definition at line 831 of file EventSelectorAthenaPool.cxx.

831 {
832 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
833 if (ds == nullptr) {
834 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
835 return StatusCode::FAILURE;
836 }
837 if (num < 0) {
838 if (ds->makeServer(num - 1).isFailure()) {
839 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to output DataStreaming server");
840 }
841 return StatusCode::SUCCESS;
842 }
843 if (ds->makeServer(num + 1).isFailure()) {
844 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to input DataStreaming server");
845 return StatusCode::FAILURE;
846 }
847 if (m_eventStreamingTool.empty()) {
848 return StatusCode::SUCCESS;
849 }
850 m_processMetadata = false;
851 ATH_MSG_DEBUG("makeServer: " << m_eventStreamingTool << " = " << num);
852 return(m_eventStreamingTool->makeServer(1, ""));
853}

◆ next() [1/2]

StatusCode DoubleEventSelectorAthenaPool::next ( IEvtSelector::Context & ctxt) const
overridevirtual
Parameters
ctxt[IN/OUT] current event context is interated to next event.

Reimplemented from EventSelectorAthenaPool.

Definition at line 44 of file DoubleEventSelectorAthenaPool.cxx.

45{
46 ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::next");
47
48 std::lock_guard<CallMutex> lockGuard(m_callLock);
49 if (!eventStore()->clearStore().isSuccess()) {
50 ATH_MSG_WARNING("Cannot clear Store");
51 }
52 for (const auto& tool : m_helperTools) {
53 if (!tool->preNext().isSuccess()) {
54 ATH_MSG_WARNING("Failed to preNext() " << tool->name());
55 }
56 }
57
58 for (;;) {
59 // Move in the primary file (with skipping)
60 if (nextWithSkip(ctxt).isFailure()) {
61 return StatusCode::FAILURE;
62 }
63
64 // Check if we're at the end of secondary file
65 if (m_secondarySelector->nextWithSkip(ctxt).isFailure()) {
66 return StatusCode::FAILURE;
67 }
68
69 // Record the attribute list
70 if (!recordAttributeList().isSuccess()) {
71 ATH_MSG_ERROR("Failed to record AttributeList.");
72 return StatusCode::FAILURE;
73 }
74
75 StatusCode status = StatusCode::SUCCESS;
76 for (const auto& tool : m_helperTools) {
77 StatusCode toolStatus = tool->postNext();
78 if (toolStatus.isRecoverable()) {
79 ATH_MSG_INFO("Request skipping event from: " << tool->name());
80 if (status.isSuccess()) {
81 status = StatusCode::RECOVERABLE;
82 }
83 } else if (toolStatus.isFailure()) {
84 ATH_MSG_WARNING("Failed to postNext() " << tool->name());
85 status = StatusCode::FAILURE;
86 }
87 }
88 if (status.isRecoverable()) {
89 ATH_MSG_INFO("skipping event " << m_evtCount);
90 } else if (status.isFailure()) {
91 ATH_MSG_WARNING("Failed to postNext() HelperTool.");
92 } else {
93 if (!m_counterTool.empty() && !m_counterTool->postNext().isSuccess()) {
94 ATH_MSG_WARNING("Failed to postNext() CounterTool.");
95 }
96 break;
97 }
98 }
99
100 return StatusCode::SUCCESS;
101}
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.

◆ next() [2/2]

StatusCode DoubleEventSelectorAthenaPool::next ( IEvtSelector::Context & ctxt,
int jump ) const
overridevirtual
Parameters
ctxt[IN/OUT] current event context is interated to next event.
jump[IN] number of events to jump (currently not supported).

Reimplemented from EventSelectorAthenaPool.

Definition at line 104 of file DoubleEventSelectorAthenaPool.cxx.

105{
106 ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::next(jump)");
107
108 if (jump > 0) {
109 for (int i = 0; i < jump; i++) {
110 if (!next(ctxt).isSuccess()) {
111 return StatusCode::FAILURE;
112 }
113 }
114 return StatusCode::SUCCESS;
115 }
116
117 return StatusCode::FAILURE;
118}
virtual StatusCode next(IEvtSelector::Context &ctxt) const override

◆ nextHandleFileTransition()

StatusCode EventSelectorAthenaPool::nextHandleFileTransition ( IEvtSelector::Context & ctxt) const
overrideprotectedvirtualinherited

Handle file transition at the next iteration.

Definition at line 554 of file EventSelectorAthenaPool.cxx.

555{
556 if( m_inputCollectionsChanged ) {
557 StatusCode rc = reinit();
558 if( rc != StatusCode::SUCCESS ) return rc;
559 }
560 else { // advance to the next (not needed after reinit)
561 // Check if we're at the end of file
562 if (m_headerIterator == nullptr || m_headerIterator->next() == 0) {
563 m_headerIterator = nullptr;
564 // Close previous collection.
565 m_poolCollectionConverter.reset();
566
567 // zero the current DB ID (m_guid) before disconnect() to indicate it is no longer in use
568 const SG::SourceID old_guid = m_guid.toString();
569 m_guid = Guid::null();
570 disconnectIfFinished( old_guid );
571
572 // check if somebody updated Inputs in the EOF incident (like VP1 does)
573 if( m_inputCollectionsChanged ) {
574 StatusCode rc = reinit();
575 if( rc != StatusCode::SUCCESS ) return rc;
576 } else {
577 // Open next file from inputCollections list.
578 ++m_inputCollectionsIterator;
579 // Create PoolCollectionConverter for input file
580 m_poolCollectionConverter = getCollectionCnv(true);
581 if (!m_poolCollectionConverter) {
582 // Return end iterator
583 ctxt = *m_endIter;
584 // This is not a real failure but a Gaudi way of handling "end of job"
585 return StatusCode::FAILURE;
586 }
587 // Get DataHeader iterator
588 m_headerIterator = &m_poolCollectionConverter->selectAll();
589
590 // Return RECOVERABLE to mark we should still continue
591 return StatusCode::RECOVERABLE;
592 }
593 }
594 }
595 const Token& headRef = m_headerIterator->eventRef();
596 const Guid guid = headRef.dbID();
597 const int tech = headRef.technology();
598 ATH_MSG_VERBOSE("next(): DataHeder Token=" << headRef.toString() );
599
600 if (guid != m_guid) {
601 // we are starting reading from a new DB. Check if the old one needs to be retired
602 if (m_guid != Guid::null()) {
603 // zero the current DB ID (m_guid) before trying disconnect() to indicate it is no longer in use
604 const SG::SourceID old_guid = m_guid.toString();
605 m_guid = Guid::null();
606 disconnectIfFinished( old_guid );
607 }
608 m_guid = guid;
609 m_activeEventsPerSource[guid.toString()] = 0;
610 // Fire BeginInputFile incident if current InputCollection is a payload file;
611 // otherwise, ascertain whether the pointed-to file is reachable before firing any incidents and/or proceeding
612 if (m_collectionType.value() == "ImplicitCollection") {
613 // For now, we can only deal with input metadata from POOL files, but we know we have a POOL file here
614 if (!m_athenaPoolCnvSvc->setInputAttributes(*m_inputCollectionsIterator).isSuccess()) {
615 ATH_MSG_ERROR("Failed to set input attributes.");
616 return StatusCode::FAILURE;
617 }
618 if (m_processMetadata.value()) {
619 FileIncident beginInputFileIncident(name(), "BeginInputFile", *m_inputCollectionsIterator, m_guid.toString());
620 m_incidentSvc->fireIncident(beginInputFileIncident);
621 }
622 } else {
623 // Check if File is BS
624 if (tech != 0x00001000 && m_processMetadata.value()) {
625 FileIncident beginInputFileIncident(name(), "BeginInputFile", "FID:" + m_guid.toString(), m_guid.toString());
626 m_incidentSvc->fireIncident(beginInputFileIncident);
627 }
628 }
629 } // end if (guid != m_guid)
630 return StatusCode::SUCCESS;
631}
#define ATH_MSG_VERBOSE(x)
static Double_t rc
std::unique_ptr< PoolCollectionConverter > getCollectionCnv(bool throwIncidents=false) const
Return pointer to new PoolCollectionConverter.
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:134
int technology() const
Access technology type.
Definition Token.h:77
const Guid & dbID() const
Access database identifier.
Definition Token.h:64

◆ nextWithSkip()

StatusCode EventSelectorAthenaPool::nextWithSkip ( IEvtSelector::Context & ctxt) const
overrideprotectedvirtualinherited

Go to next event and skip if necessary.

Definition at line 633 of file EventSelectorAthenaPool.cxx.

633 {
634 ATH_MSG_DEBUG("EventSelectorAthenaPool::nextWithSkip");
635
636 for (;;) {
637 // Check if we're at the end of file
639 if (sc.isRecoverable()) {
640 continue; // handles empty files
641 }
642 if (sc.isFailure()) {
643 return StatusCode::FAILURE;
644 }
645
646 // Increase event count
647 ++m_evtCount;
648
649 if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
650 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
651 }
653 && (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
654 {
655 return StatusCode::SUCCESS;
656 } else {
657 while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
658 m_skipEventRanges.erase(m_skipEventRanges.begin());
659 }
660 if (m_isSecondary.value()) {
661 ATH_MSG_INFO("skipping secondary event " << m_evtCount);
662 } else {
663 ATH_MSG_INFO("skipping event " << m_evtCount);
664 }
665 }
666 }
667
668 return StatusCode::SUCCESS;
669}
static Double_t sc
Gaudi::Property< int > m_skipEvents
SkipEvents, numbers of events to skip: default = 0.
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context &ctxt) const override
Handle file transition at the next iteration.

◆ previous() [1/2]

StatusCode EventSelectorAthenaPool::previous ( IEvtSelector::Context & ctxt) const
overridevirtualinherited
Parameters
ctxt[IN/OUT] current event context is interated to previous event.

Definition at line 671 of file EventSelectorAthenaPool.cxx.

671 {
672 ATH_MSG_ERROR("previous() not implemented");
673 return StatusCode::FAILURE;
674}

◆ previous() [2/2]

StatusCode EventSelectorAthenaPool::previous ( IEvtSelector::Context & ctxt,
int jump ) const
overridevirtualinherited
Parameters
ctxt[IN/OUT] current event context is interated to previous event.
jump[IN] number of events to jump (currently not supported).

Definition at line 676 of file EventSelectorAthenaPool.cxx.

676 {
677 if (jump > 0) {
678 for (int i = 0; i < jump; i++) {
679 ATH_CHECK(previous(ctxt));
680 }
681 return StatusCode::SUCCESS;
682 }
683 return StatusCode::FAILURE;
684}
virtual StatusCode previous(IEvtSelector::Context &ctxt) const override

◆ readEvent()

StatusCode EventSelectorAthenaPool::readEvent ( int maxevt)
overridevirtualinherited

Read the next maxevt events.

Parameters
evtnum[IN] The number of events to read.

Definition at line 908 of file EventSelectorAthenaPool.cxx.

908 {
909 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
910 if (ds == nullptr) {
911 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
912 return StatusCode::FAILURE;
913 }
914 if (m_eventStreamingTool.empty()) {
915 ATH_MSG_ERROR("No AthenaSharedMemoryTool configured for readEvent()");
916 return StatusCode::FAILURE;
917 }
918 ATH_MSG_VERBOSE("Called read Event " << maxevt);
919 IEvtSelector::Context* ctxt = new EventContextAthenaPool(this);
920 for (int i = 0; i < maxevt || maxevt == -1; ++i) {
921 if (!next(*ctxt).isSuccess()) {
922 if (m_evtCount == -1) {
923 ATH_MSG_VERBOSE("Called read Event and read last event from input: " << i);
924 break;
925 }
926 ATH_MSG_ERROR("Cannot read Event " << m_evtCount - 1 << " into AthenaSharedMemoryTool");
927 delete ctxt; ctxt = nullptr;
928 return StatusCode::FAILURE;
929 } else {
930 ATH_MSG_VERBOSE("Called next, read Event " << m_evtCount - 1);
931 }
932 }
933 delete ctxt; ctxt = nullptr;
934 // End of file, wait for last event to be taken
936 while ( (sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
937 while (ds->readData().isSuccess()) {
938 ATH_MSG_VERBOSE("Called last readData, while marking last event in readEvent()");
939 }
940 usleep(1000);
941 }
942 if (!sc.isSuccess()) {
943 ATH_MSG_ERROR("Cannot put last Event marker to AthenaSharedMemoryTool");
944 return StatusCode::FAILURE;
945 } else {
946 sc = ds->readData();
947 while (sc.isSuccess() || sc.isRecoverable()) {
948 sc = ds->readData();
949 }
950 ATH_MSG_DEBUG("Failed last readData -> Clients are stopped, after marking last event in readEvent()");
951 }
952 return StatusCode::SUCCESS;
953}
virtual StatusCode next(IEvtSelector::Context &ctxt) const override

◆ recordAttributeList()

StatusCode DoubleEventSelectorAthenaPool::recordAttributeList ( ) const
overrideprivatevirtual

Record AttributeList in StoreGate.

Reimplemented from EventSelectorAthenaPool.

Definition at line 147 of file DoubleEventSelectorAthenaPool.cxx.

148{
149 ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::recordAttributeList");
150
151 // Get access to AttributeList
152 ATH_MSG_DEBUG("Get AttributeList from the collection");
153 // MN: accessing only attribute list, ignoring token list
154 const coral::AttributeList& attrList = m_headerIterator->currentRow().attributeList();
155 ATH_MSG_DEBUG("AttributeList size " << attrList.size());
156 std::unique_ptr<AthenaAttributeList> athAttrList{};
157
158 // Decide what to do based on the type of secondary file
160 // Create empty attribute list
161 athAttrList = std::make_unique<AthenaAttributeList>();
162 // Always add ByteStream as primary input
163 ATH_CHECK(m_secondarySelector->fillAttributeList(athAttrList.get(), "", false));
164
165 // Then fill the new attribute list from the primary file
166 ATH_MSG_DEBUG("Append primary attribute list properties to the secondary one with a suffix: " << m_secondaryAttrListSuffix.value());
167 ATH_CHECK(fillAttributeList(athAttrList.get(), "_" + m_secondaryAttrListSuffix.value(), true));
168 } else {
169 // Create a new attribute list from the primary input one
170 athAttrList = std::make_unique<AthenaAttributeList>(attrList);
171 // Fill the new attribute list from the primary file
172 ATH_CHECK(fillAttributeList(athAttrList.get(), "", false));
173
174 // Fill the new attribute list from the secondary file
175 ATH_MSG_DEBUG("Append secondary attribute list properties to the primary one with a suffix: " << m_secondaryAttrListSuffix.value());
176 ATH_CHECK(m_secondarySelector->fillAttributeList(athAttrList.get(), "_" + m_secondaryAttrListSuffix.value(), true));
177 }
178
179 // Add info about secondary input
180 athAttrList->extend("hasSecondaryInput", "bool");
181 (*athAttrList)["hasSecondaryInput"].data<bool>() = true;
182
183 SG::WriteHandle<AthenaAttributeList> wh(m_attrListKey, eventStore()->name());
184 ATH_CHECK(wh.record(std::move(athAttrList)));
185
186 return StatusCode::SUCCESS;
187}
Gaudi::Property< std::string > m_secondaryAttrListSuffix
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
str wh
Definition parseDir.py:45

◆ reinit()

StatusCode EventSelectorAthenaPool::reinit ( ) const
privateinherited

Reinitialize the service when a fork() occurred/was-issued.

Definition at line 212 of file EventSelectorAthenaPool.cxx.

212 {
213 ATH_MSG_DEBUG("reinitialization...");
214
215 // reset markers
216 m_numEvt.resize(m_inputCollectionsProp.value().size(), -1);
217 m_firstEvt.resize(m_inputCollectionsProp.value().size(), -1);
218
219 // Initialize InputCollectionsIterator
220 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
221 m_curCollection = 0;
222 if (!m_firstEvt.empty()) {
223 m_firstEvt[0] = 0;
224 }
225 m_inputCollectionsChanged = false;
226 m_evtCount = 0;
227 m_headerIterator = 0;
228 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
229 ATH_MSG_INFO("Done reinitialization for shared reader client");
230 return StatusCode::SUCCESS;
231 }
232 bool retError = false;
233 for (auto& tool : m_helperTools) {
234 if (!tool->postInitialize().isSuccess()) {
235 ATH_MSG_FATAL("Failed to postInitialize() " << tool->name());
236 retError = true;
237 }
238 }
239 if (retError) {
240 ATH_MSG_FATAL("Failed to postInitialize() helperTools");
241 return StatusCode::FAILURE;
242 }
243
244 // Create an m_poolCollectionConverter to read the objects in
245 m_poolCollectionConverter = getCollectionCnv();
246 if (!m_poolCollectionConverter) {
247 ATH_MSG_INFO("No Events found in any Input Collections");
248 if (m_processMetadata.value()) {
249 m_inputCollectionsIterator = m_inputCollectionsProp.value().end();
250 if (!m_inputCollectionsProp.value().empty()) --m_inputCollectionsIterator;
251 //NOTE (wb may 2016): this will make the FirstInputFile incident correspond to last file in the collection ... if want it to be first file then move iterator to begin and then move above two lines below this incident firing
252 if (m_collectionType.value() == "ImplicitCollection" && !m_firedIncident && !m_inputCollectionsProp.value().empty()) {
253 FileIncident firstInputFileIncident(name(), "FirstInputFile", *m_inputCollectionsIterator);
254 m_incidentSvc->fireIncident(firstInputFileIncident);
255 m_firedIncident = true;
256 }
257 }
258 return StatusCode::SUCCESS;
259 }
260 // Get DataHeader iterator
261 try {
262 m_headerIterator = &m_poolCollectionConverter->selectAll();
263 } catch (std::exception &e) {
264 ATH_MSG_FATAL("Cannot open implicit collection - check data/software version.");
265 ATH_MSG_ERROR(e.what());
266 return StatusCode::FAILURE;
267 }
268 while (m_headerIterator == nullptr || m_headerIterator->next() == 0) { // no selected events
269 if (m_poolCollectionConverter) {
270 m_poolCollectionConverter->disconnectDb().ignore();
271 m_poolCollectionConverter.reset();
272 }
273 ++m_inputCollectionsIterator;
274 m_poolCollectionConverter = getCollectionCnv();
275 if (m_poolCollectionConverter) {
276 m_headerIterator = &m_poolCollectionConverter->selectAll();
277 } else {
278 break;
279 }
280 }
281 if (!m_poolCollectionConverter || m_headerIterator == nullptr) { // no event selected in any collection
282 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
283 m_curCollection = 0;
284 m_poolCollectionConverter = getCollectionCnv();
285 if (!m_poolCollectionConverter) {
286 return StatusCode::SUCCESS;
287 }
288 m_headerIterator = &m_poolCollectionConverter->selectAll();
289 while (m_headerIterator == nullptr || m_headerIterator->next() == 0) { // empty collection
290 if (m_poolCollectionConverter) {
291 m_poolCollectionConverter->disconnectDb().ignore();
292 m_poolCollectionConverter.reset();
293 }
294 ++m_inputCollectionsIterator;
295 m_poolCollectionConverter = getCollectionCnv();
296 if (m_poolCollectionConverter) {
297 m_headerIterator = &m_poolCollectionConverter->selectAll();
298 } else {
299 break;
300 }
301 }
302 }
303 if (!m_poolCollectionConverter || m_headerIterator == nullptr) {
304 return StatusCode::SUCCESS;
305 }
306 const Token& headRef = m_headerIterator->eventRef();
307 const std::string fid = headRef.dbID().toString();
308 const int tech = headRef.technology();
309 ATH_MSG_VERBOSE("reinit(): First DataHeder Token=" << headRef.toString() );
310
311 // Check if File is BS, for which Incident is thrown by SingleEventInputSvc
312 if (tech != 0x00001000 && m_processMetadata.value() && !m_firedIncident) {
313 FileIncident firstInputFileIncident(name(), "FirstInputFile", "FID:" + fid, fid);
314 m_incidentSvc->fireIncident(firstInputFileIncident);
315 m_firedIncident = true;
316 }
317 return StatusCode::SUCCESS;
318}
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.

◆ releaseContext()

StatusCode EventSelectorAthenaPool::releaseContext ( IEvtSelector::Context *& ctxt) const
overridevirtualinherited
Parameters
ctxt[IN] current event context is released.

Definition at line 722 of file EventSelectorAthenaPool.cxx.

722 {
723 return StatusCode::SUCCESS;
724}

◆ resetCriteria()

StatusCode EventSelectorAthenaPool::resetCriteria ( const std::string & criteria,
IEvtSelector::Context & ctxt ) const
overridevirtualinherited

Set a selection criteria.

Parameters
criteriafilter predicate (SQL-style WHERE clause)
ctxt[IN] current event context.

Definition at line 726 of file EventSelectorAthenaPool.cxx.

727 {
728 return StatusCode::SUCCESS;
729}

◆ rewind()

StatusCode EventSelectorAthenaPool::rewind ( IEvtSelector::Context & ctxt) const
overridevirtualinherited
Parameters
ctxt[IN/OUT] current event context is rewound to first event.

Definition at line 694 of file EventSelectorAthenaPool.cxx.

694 {
695 ATH_CHECK(reinit());
696 ctxt = EventContextAthenaPool(this);
697 return StatusCode::SUCCESS;
698}

◆ seek()

StatusCode DoubleEventSelectorAthenaPool::seek ( Context & ctxt,
int evtNum ) const
overridevirtual

Seek to a given event number.

Parameters
ctxt[IN/OUT] current event context.
evtNum[IN] The event number to which to seek.

Reimplemented from EventSelectorAthenaPool.

Definition at line 121 of file DoubleEventSelectorAthenaPool.cxx.

122{
123 ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::seek");
124
126 ATH_CHECK(m_secondarySelector->seek(ctxt, evtNum));
127
128 return StatusCode::SUCCESS;
129}
virtual StatusCode seek(Context &ctxt, int evtnum) const override
Seek to a given event number.

◆ share()

StatusCode EventSelectorAthenaPool::share ( int evtnum)
overridevirtualinherited

Request to share a given event number.

Parameters
evtnum[IN] The event number to share.

Definition at line 875 of file EventSelectorAthenaPool.cxx.

875 {
876 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
877 if (ds == nullptr) {
878 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
879 return StatusCode::FAILURE;
880 }
881 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
882 StatusCode sc = m_eventStreamingTool->lockEvent(evtnum);
883 while (sc.isRecoverable()) {
884 usleep(1000);
885 sc = m_eventStreamingTool->lockEvent(evtnum);
886 }
887// Send stop client and wait for restart
888 if (sc.isFailure()) {
889 if (ds->makeClient(0).isFailure()) {
890 return StatusCode::FAILURE;
891 }
892 sc = m_eventStreamingTool->lockEvent(evtnum);
893 while (sc.isRecoverable() || sc.isFailure()) {
894 usleep(1000);
895 sc = m_eventStreamingTool->lockEvent(evtnum);
896 }
897//FIXME
898 if (ds->makeClient(1).isFailure()) {
899 return StatusCode::FAILURE;
900 }
901 }
902 return(sc);
903 }
904 return StatusCode::FAILURE;
905}

◆ size()

int DoubleEventSelectorAthenaPool::size ( Context & ctxt) const
overridevirtual

Return the size of the collection.

Parameters
ctxt[IN/OUT] current event context.

Reimplemented from EventSelectorAthenaPool.

Definition at line 132 of file DoubleEventSelectorAthenaPool.cxx.

133{
134 ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::size");
135
136 int sz1 = EventSelectorAthenaPool::size(ctxt);
137 int sz2 = m_secondarySelector->size(ctxt);
138
139 if (sz2 < sz1) {
140 ATH_MSG_WARNING("Fewer secondary input events than primary input events. Expect trouble!");
141 }
142
143 return sz1;
144}
virtual int size(Context &ctxt) const override
Return the size of the collection.

◆ start()

StatusCode EventSelectorAthenaPool::start ( )
overridevirtualinherited

Definition at line 320 of file EventSelectorAthenaPool.cxx.

320 {
321 if (m_poolCollectionConverter) {
322 // Reset iterators and apply new query
323 m_poolCollectionConverter->disconnectDb().ignore();
324 m_poolCollectionConverter.reset();
325 }
326 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
327 m_curCollection = 0;
328 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
329 return StatusCode::SUCCESS;
330 }
331 m_poolCollectionConverter = getCollectionCnv(true);
332 if (!m_poolCollectionConverter) {
333 ATH_MSG_INFO("No Events found in any Input Collections");
334 m_inputCollectionsIterator = m_inputCollectionsProp.value().end();
335 if (!m_inputCollectionsProp.value().empty()) {
336 --m_inputCollectionsIterator; //leave iterator in state of last input file
337 }
338 } else {
339 m_headerIterator = &m_poolCollectionConverter->selectAll();
340 }
341 m_evtCount = 0;
342 delete m_endIter;
343 m_endIter = nullptr;
344 m_endIter = new EventContextAthenaPool(nullptr);
345 return StatusCode::SUCCESS;
346}

◆ stop()

StatusCode EventSelectorAthenaPool::stop ( )
overridevirtualinherited

Definition at line 348 of file EventSelectorAthenaPool.cxx.

348 {
349 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
350 return StatusCode::SUCCESS;
351 }
352 IEvtSelector::Context* ctxt(nullptr);
353 if (!releaseContext(ctxt).isSuccess()) {
354 ATH_MSG_WARNING("Cannot release context");
355 }
356 return StatusCode::SUCCESS;
357}
virtual StatusCode releaseContext(IEvtSelector::Context *&ctxt) const override

Member Data Documentation

◆ ATLAS_THREAD_SAFE [1/10]

std::unique_ptr<PoolCollectionConverter> m_poolCollectionConverter EventSelectorAthenaPool::ATLAS_THREAD_SAFE {}
mutableprivateinherited

Definition at line 172 of file EventSelectorAthenaPool.h.

172{};

◆ ATLAS_THREAD_SAFE [2/10]

pool::ICollectionCursor* m_headerIterator EventSelectorAthenaPool::ATLAS_THREAD_SAFE {}
mutableprivateinherited

Definition at line 173 of file EventSelectorAthenaPool.h.

173{};

◆ ATLAS_THREAD_SAFE [3/10]

Guid m_guid EventSelectorAthenaPool::ATLAS_THREAD_SAFE {}
mutableprivateinherited

Definition at line 174 of file EventSelectorAthenaPool.h.

174{};

◆ ATLAS_THREAD_SAFE [4/10]

std::map<SG::SourceID, int> m_activeEventsPerSource EventSelectorAthenaPool::ATLAS_THREAD_SAFE
mutableprivateinherited

Definition at line 175 of file EventSelectorAthenaPool.h.

◆ ATLAS_THREAD_SAFE [5/10]

std::vector<std::string>::const_iterator m_inputCollectionsIterator EventSelectorAthenaPool::ATLAS_THREAD_SAFE
mutableprivateinherited

Definition at line 193 of file EventSelectorAthenaPool.h.

◆ ATLAS_THREAD_SAFE [6/10]

bool m_inputCollectionsChanged EventSelectorAthenaPool::ATLAS_THREAD_SAFE
mutableprivateinherited

flag to notify the EvSel that the inputs were changed and reinit() needs to be called ASAP

Definition at line 196 of file EventSelectorAthenaPool.h.

◆ ATLAS_THREAD_SAFE [7/10]

ToolHandleArray<IAthenaSelectorTool> m_helperTools EventSelectorAthenaPool::ATLAS_THREAD_SAFE {this, "HelperTools", {}, ""}
mutableprivateinherited

HelperTools, vector of names of AlgTools that are executed by the EventSelector.

Definition at line 203 of file EventSelectorAthenaPool.h.

203{this, "HelperTools", {}, ""};

◆ ATLAS_THREAD_SAFE [8/10]

std::vector<int> m_numEvt EventSelectorAthenaPool::ATLAS_THREAD_SAFE
mutableprivateinherited

Definition at line 226 of file EventSelectorAthenaPool.h.

◆ ATLAS_THREAD_SAFE [9/10]

std::vector<int> m_firstEvt EventSelectorAthenaPool::ATLAS_THREAD_SAFE
mutableprivateinherited

Definition at line 227 of file EventSelectorAthenaPool.h.

◆ ATLAS_THREAD_SAFE [10/10]

std::vector<std::pair<long,long> > m_skipEventRanges EventSelectorAthenaPool::ATLAS_THREAD_SAFE
mutableprivateinherited

Definition at line 234 of file EventSelectorAthenaPool.h.

◆ m_athenaPoolCnvSvc

ServiceHandle<IAthenaPoolCnvSvc> EventSelectorAthenaPool::m_athenaPoolCnvSvc {this, "ConversionService", "AthenaPoolCnvSvc", ""}
privateinherited

Definition at line 177 of file EventSelectorAthenaPool.h.

177{this, "ConversionService", "AthenaPoolCnvSvc", ""};

◆ m_attrListKey

std::string EventSelectorAthenaPool::m_attrListKey {"Input"}
privateinherited

AttributeList SG key.

Definition at line 189 of file EventSelectorAthenaPool.h.

189{"Input"};

◆ m_callLock

CallMutex EventSelectorAthenaPool::m_callLock
mutableprivateinherited

Definition at line 240 of file EventSelectorAthenaPool.h.

◆ m_collectionType

Gaudi::Property<std::string> EventSelectorAthenaPool::m_collectionType {this, "CollectionType", "ImplicitCollection", ""}
privateinherited

CollectionType, type of the collection: default = "ImplicitCollection".

Definition at line 186 of file EventSelectorAthenaPool.h.

186{this, "CollectionType", "ImplicitCollection", ""};

◆ m_counterTool

ToolHandle<IAthenaSelectorTool> EventSelectorAthenaPool::m_counterTool {this, "CounterTool", "", ""}
privateinherited

Definition at line 204 of file EventSelectorAthenaPool.h.

204{this, "CounterTool", "", ""};

◆ m_curCollection

std::atomic_long EventSelectorAthenaPool::m_curCollection {}
mutableprivateinherited

Definition at line 225 of file EventSelectorAthenaPool.h.

225{};

◆ m_endIter

EventContextAthenaPool* EventSelectorAthenaPool::m_endIter {}
privateinherited

Definition at line 170 of file EventSelectorAthenaPool.h.

170{};

◆ m_eventsPerLB

Gaudi::CheckedProperty<uint32_t> EventSelectorAthenaPool::m_eventsPerLB {this, "EventsPerLB", 1000, ""}
privateinherited

Definition at line 221 of file EventSelectorAthenaPool.h.

221{this, "EventsPerLB", 1000, ""};

◆ m_eventsPerRun

Gaudi::CheckedProperty<uint64_t> EventSelectorAthenaPool::m_eventsPerRun {this, "EventsPerRun", 1000000, ""}
privateinherited

Definition at line 219 of file EventSelectorAthenaPool.h.

219{this, "EventsPerRun", 1000000, ""};

◆ m_eventStreamingTool

ToolHandle<IAthenaIPCTool> EventSelectorAthenaPool::m_eventStreamingTool {this, "SharedMemoryTool", "", ""}
privateinherited

Definition at line 205 of file EventSelectorAthenaPool.h.

205{this, "SharedMemoryTool", "", ""};

◆ m_evtCount

std::atomic_int EventSelectorAthenaPool::m_evtCount {}
mutableprivateinherited

Definition at line 236 of file EventSelectorAthenaPool.h.

236{}; // internal count of events

◆ m_firedIncident

std::atomic_bool EventSelectorAthenaPool::m_firedIncident {}
mutableprivateinherited

Definition at line 237 of file EventSelectorAthenaPool.h.

237{};

◆ m_firstEventNo

Gaudi::CheckedProperty<uint64_t> EventSelectorAthenaPool::m_firstEventNo {this, "FirstEvent", 1, ""}
privateinherited

Definition at line 217 of file EventSelectorAthenaPool.h.

217{this, "FirstEvent", 1, ""};

◆ m_firstLBNo

Gaudi::CheckedProperty<uint32_t> EventSelectorAthenaPool::m_firstLBNo {this, "FirstLB", 0, ""}
privateinherited

Definition at line 220 of file EventSelectorAthenaPool.h.

220{this, "FirstLB", 0, ""};

◆ m_incidentSvc

ServiceHandle<IIncidentSvc> EventSelectorAthenaPool::m_incidentSvc {this, "IncidentSvc", "IncidentSvc", ""}
privateinherited

Definition at line 178 of file EventSelectorAthenaPool.h.

178{this, "IncidentSvc", "IncidentSvc", ""};

◆ m_initTimeStamp

Gaudi::CheckedProperty<uint32_t> EventSelectorAthenaPool::m_initTimeStamp {this, "InitialTimeStamp", 0, ""}
privateinherited

Definition at line 222 of file EventSelectorAthenaPool.h.

222{this, "InitialTimeStamp", 0, ""};

◆ m_inputCollectionsProp

Gaudi::Property<std::vector<std::string> > EventSelectorAthenaPool::m_inputCollectionsProp {this, "InputCollections", {}, ""}
privateinherited

InputCollections, vector with names of the input collections.

Definition at line 192 of file EventSelectorAthenaPool.h.

192{this, "InputCollections", {}, ""};

◆ m_isSecondary

Gaudi::Property<bool> EventSelectorAthenaPool::m_isSecondary {this, "IsSecondary", false, ""}
privateinherited

IsSecondary, know if this is an instance of secondary event selector.

Definition at line 182 of file EventSelectorAthenaPool.h.

182{this, "IsSecondary", false, ""};

◆ m_keepInputFilesOpen

Gaudi::Property<bool> EventSelectorAthenaPool::m_keepInputFilesOpen {this, "KeepInputFilesOpen", false, ""}
privateinherited

KeepInputFilesOpen, boolean flag to keep files open after PoolCollection reaches end: default = false.

Needed for PilUp to run without PoolFileCatalog. Relies on POOL to close files when reaching DB_AGE_LIMIT.

Definition at line 200 of file EventSelectorAthenaPool.h.

200{this, "KeepInputFilesOpen", false, ""};

◆ m_makeStreamingToolClient

Gaudi::Property<int> EventSelectorAthenaPool::m_makeStreamingToolClient {this, "MakeStreamingToolClient",0}
privateinherited

Make this instance a Streaming Client during first iteration automatically.

Definition at line 207 of file EventSelectorAthenaPool.h.

207{this, "MakeStreamingToolClient",0};

◆ m_oldRunNo

Gaudi::CheckedProperty<uint32_t> EventSelectorAthenaPool::m_oldRunNo {this, "OldRunNumber", 0, ""}
privateinherited

Definition at line 213 of file EventSelectorAthenaPool.h.

213{this, "OldRunNumber", 0, ""};

◆ m_overrideRunNumber

Gaudi::Property<bool> EventSelectorAthenaPool::m_overrideRunNumber {this, "OverrideRunNumber", false, ""}
privateinherited

Definition at line 214 of file EventSelectorAthenaPool.h.

214{this, "OverrideRunNumber", false, ""};

◆ m_overrideRunNumberFromInput

Gaudi::Property<bool> EventSelectorAthenaPool::m_overrideRunNumberFromInput {this, "OverrideRunNumberFromInput", false, ""}
privateinherited

Definition at line 215 of file EventSelectorAthenaPool.h.

215{this, "OverrideRunNumberFromInput", false, ""};

◆ m_processMetadata

Gaudi::Property<bool> EventSelectorAthenaPool::m_processMetadata {this, "ProcessMetadata", true, ""}
privateinherited

ProcessMetadata, switch on firing of FileIncidents which will trigger processing of metadata: default = true.

Definition at line 184 of file EventSelectorAthenaPool.h.

184{this, "ProcessMetadata", true, ""};

◆ m_runNo

Gaudi::CheckedProperty<uint32_t> EventSelectorAthenaPool::m_runNo {this, "RunNumber", 0, ""}
privateinherited

The following are included for compatibility with McEventSelector and are not really used.

However runNo, oldRunNo and overrideRunNumberFromInput are used to reset run number for simulated events, needed to use condition

Definition at line 212 of file EventSelectorAthenaPool.h.

212{this, "RunNumber", 0, ""};

◆ m_secondaryAttrListSuffix

Gaudi::Property<std::string> DoubleEventSelectorAthenaPool::m_secondaryAttrListSuffix {this, "SecondaryAttrListSuffix", "secondary", ""}
private

Definition at line 67 of file DoubleEventSelectorAthenaPool.h.

67{this, "SecondaryAttrListSuffix", "secondary", ""};

◆ m_secondaryByteStream

bool DoubleEventSelectorAthenaPool::m_secondaryByteStream {}
private

Definition at line 70 of file DoubleEventSelectorAthenaPool.h.

70{};

◆ m_secondarySelector

ServiceHandle<ISecondaryEventSelector> DoubleEventSelectorAthenaPool::m_secondarySelector {this, "SecondaryEventSelector", "SecondaryEventSelector", ""}
private

Definition at line 65 of file DoubleEventSelectorAthenaPool.h.

65{this, "SecondaryEventSelector", "SecondaryEventSelector", ""};

◆ m_skipEventRangesProp

Gaudi::Property<std::string> EventSelectorAthenaPool::m_skipEventRangesProp {this, "SkipEventRanges", {}, ""}
privateinherited

Skip Events - comma separated list of event to skip, ranges with '-': <start> - <end>

Definition at line 233 of file EventSelectorAthenaPool.h.

233{this, "SkipEventRanges", {}, ""};

◆ m_skipEvents

Gaudi::Property<int> EventSelectorAthenaPool::m_skipEvents {this, "SkipEvents", 0, ""}
privateinherited

SkipEvents, numbers of events to skip: default = 0.

Definition at line 230 of file EventSelectorAthenaPool.h.

230{this, "SkipEvents", 0, ""};

◆ m_skipEventSequenceProp

Gaudi::Property<std::vector<long> > EventSelectorAthenaPool::m_skipEventSequenceProp {this, "SkipEventSequence", {}, ""}
privateinherited

Definition at line 231 of file EventSelectorAthenaPool.h.

231{this, "SkipEventSequence", {}, ""};

◆ m_sourceID

SG::SlotSpecificObj<SG::SourceID> EventSelectorAthenaPool::m_sourceID
privateinherited

Definition at line 242 of file EventSelectorAthenaPool.h.

◆ m_sourceID1

SG::SlotSpecificObj<SG::SourceID> DoubleEventSelectorAthenaPool::m_sourceID1
private

Definition at line 72 of file DoubleEventSelectorAthenaPool.h.

◆ m_sourceID2

SG::SlotSpecificObj<SG::SourceID> DoubleEventSelectorAthenaPool::m_sourceID2
private

Definition at line 73 of file DoubleEventSelectorAthenaPool.h.

◆ m_timeStampInterval

Gaudi::Property<uint32_t> EventSelectorAthenaPool::m_timeStampInterval {this, "TimeStampInterval", 0, ""}
privateinherited

Definition at line 223 of file EventSelectorAthenaPool.h.

223{this, "TimeStampInterval", 0, ""};

The documentation for this class was generated from the following files: