24 #include "GaudiKernel/ClassID.h"
25 #include "GaudiKernel/FileIncident.h"
26 #include "GaudiKernel/IIncidentSvc.h"
27 #include "GaudiKernel/IIoComponentMgr.h"
28 #include "GaudiKernel/GaudiException.h"
29 #include "GaudiKernel/GenericAddress.h"
30 #include "GaudiKernel/StatusCode.h"
39 #include <boost/tokenizer.hpp>
49 size_t nbytes,
unsigned int status) {
58 base_class(
name, pSvcLocator)
71 m_inputCollectionsChanged =
false;
75 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
76 m_inputCollectionsChanged =
true;
89 m_autoRetrieveTools =
false;
90 m_checkToolDeps =
false;
101 ATH_MSG_FATAL(
"Use the property: EventSelector.InputCollections = "
102 <<
"[ \"<collectionName>\" ] (list of collections)");
103 return StatusCode::FAILURE;
105 boost::char_separator<char> sep_coma(
","), sep_hyph(
"-");
107 for(
const std::string&
r:
ranges ) {
108 boost::tokenizer fromto(
r, sep_hyph);
109 auto from_iter = fromto.begin();
110 long from = std::stol(*from_iter);
112 if( ++from_iter != fromto.end() ) {
113 to = std::stol(*from_iter);
115 m_skipEventRanges.emplace_back(from,
to);
119 m_skipEventRanges.emplace_back(
v,
v);
121 std::sort(m_skipEventRanges.begin(), m_skipEventRanges.end());
123 std::string skip_ranges_str;
124 for(
const auto& [
first,
second] : m_skipEventRanges ) {
125 if( !skip_ranges_str.empty() ) skip_ranges_str +=
", ";
129 if( !skip_ranges_str.empty() )
134 ATH_MSG_FATAL(
"EventSelector.CollectionType must be one of: RootCollection, ImplicitCollection (default)");
135 return StatusCode::FAILURE;
141 m_incidentSvc->addListener(
this, IncidentType::BeginProcessing, 0);
142 m_incidentSvc->addListener(
this, IncidentType::EndProcessing, 0);
156 return StatusCode::FAILURE;
158 std::string dummyStr;
164 std::vector<std::string>
propVal;
166 bool foundCnvSvc =
false;
167 for (
const auto& property :
propVal) {
173 ATH_MSG_FATAL(
"Cannot set EventPersistencySvc Property for CnvServices");
174 return StatusCode::FAILURE;
186 std::string fileType;
187 for (
const auto& inputCollection : incol) {
188 if (inputCollection.starts_with(
"LFN:") || inputCollection.starts_with(
"FID:")) {
197 ATH_MSG_FATAL(
"could not register [" << inputCollection <<
"] for output !");
204 return StatusCode::FAILURE;
210 return StatusCode::FAILURE;
227 if (!m_firstEvt.empty()) {
230 m_inputCollectionsChanged =
false;
232 m_headerIterator = 0;
234 ATH_MSG_INFO(
"Done reinitialization for shared reader client");
235 return StatusCode::SUCCESS;
237 bool retError =
false;
238 for (
auto&
tool : m_helperTools) {
239 if (!
tool->postInitialize().isSuccess()) {
246 return StatusCode::FAILURE;
251 if (!m_poolCollectionConverter) {
252 ATH_MSG_INFO(
"No Events found in any Input Collections");
258 FileIncident firstInputFileIncident(
name(),
"FirstInputFile", *m_inputCollectionsIterator);
263 return StatusCode::SUCCESS;
267 m_headerIterator = &m_poolCollectionConverter->selectAll();
269 ATH_MSG_FATAL(
"Cannot open implicit collection - check data/software version.");
271 return StatusCode::FAILURE;
273 while (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
274 if (m_poolCollectionConverter) {
275 m_poolCollectionConverter->disconnectDb().ignore();
276 m_poolCollectionConverter.reset();
278 ++m_inputCollectionsIterator;
280 if (m_poolCollectionConverter) {
281 m_headerIterator = &m_poolCollectionConverter->selectAll();
286 if (!m_poolCollectionConverter || m_headerIterator ==
nullptr) {
290 if (!m_poolCollectionConverter) {
291 return StatusCode::SUCCESS;
293 m_headerIterator = &m_poolCollectionConverter->selectAll();
294 while (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
295 if (m_poolCollectionConverter) {
296 m_poolCollectionConverter->disconnectDb().ignore();
297 m_poolCollectionConverter.reset();
299 ++m_inputCollectionsIterator;
301 if (m_poolCollectionConverter) {
302 m_headerIterator = &m_poolCollectionConverter->selectAll();
308 if (!m_poolCollectionConverter || m_headerIterator ==
nullptr) {
309 return StatusCode::SUCCESS;
311 const Token& headRef = m_headerIterator->eventRef();
318 FileIncident firstInputFileIncident(
name(),
"FirstInputFile",
"FID:" + fid, fid);
322 return StatusCode::SUCCESS;
326 if (m_poolCollectionConverter) {
328 m_poolCollectionConverter->disconnectDb().ignore();
329 m_poolCollectionConverter.reset();
334 return StatusCode::SUCCESS;
337 if (!m_poolCollectionConverter) {
338 ATH_MSG_INFO(
"No Events found in any Input Collections");
341 --m_inputCollectionsIterator;
344 m_headerIterator = &m_poolCollectionConverter->selectAll();
350 return StatusCode::SUCCESS;
355 return StatusCode::SUCCESS;
357 IEvtSelector::Context* ctxt(
nullptr);
361 return StatusCode::SUCCESS;
371 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
387 for (
auto&
tool : m_helperTools) {
388 if (!
tool->preFinalize().isSuccess()) {
394 m_headerIterator =
nullptr;
395 if (m_poolCollectionConverter) {
396 m_poolCollectionConverter.reset();
405 return StatusCode::SUCCESS;
409 std::lock_guard<CallMutex> lockGuard(
m_callLock);
413 while (
sc.isRecoverable()) {
420 void* tokenStr =
nullptr;
423 if (
sc.isRecoverable()) {
424 delete [] (
char*)tokenStr; tokenStr =
nullptr;
428 return StatusCode::FAILURE;
430 if (
sc.isFailure()) {
431 ATH_MSG_FATAL(
"Cannot get NextEvent from AthenaSharedMemoryTool");
432 delete [] (
char*)tokenStr; tokenStr =
nullptr;
433 return StatusCode::FAILURE;
439 athAttrList->extend(
"eventRef",
"string");
440 (*athAttrList)[
"eventRef"].data<std::string>() = std::string((
char*)tokenStr);
442 if (!
wh.record(std::move(athAttrList)).isSuccess()) {
443 delete [] (
char*)tokenStr; tokenStr =
nullptr;
445 return StatusCode::FAILURE;
448 token.
fromString(std::string((
char*)tokenStr));
449 delete [] (
char*)tokenStr; tokenStr =
nullptr;
454 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
458 FileIncident beginInputFileIncident(
name(),
"BeginInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
461 return StatusCode::SUCCESS;
463 for (
const auto&
tool : m_helperTools) {
464 if (!
tool->preNext().isSuccess()) {
471 if (
sc.isRecoverable()) {
474 if (
sc.isFailure()) {
475 return StatusCode::FAILURE;
483 && (m_skipEventRanges.empty() ||
m_evtCount < m_skipEventRanges.front().first))
489 return StatusCode::FAILURE;
491 std::string token = m_headerIterator->eventRef().toString();
495 token.length() + 1, 0)).isRecoverable() ) {
496 while (
ds->readData().isSuccess()) {
497 ATH_MSG_VERBOSE(
"Called last readData, while putting next event in next()");
501 if (!
sc.isSuccess()) {
503 return StatusCode::FAILURE;
507 if (!
eventStore()->clearStore().isSuccess()) {
512 return StatusCode::FAILURE;
517 for (
const auto&
tool : m_helperTools) {
519 if (toolStatus.isRecoverable()) {
522 status = StatusCode::RECOVERABLE;
524 }
else if (toolStatus.isFailure()) {
526 status = StatusCode::FAILURE;
529 if (
status.isRecoverable()) {
531 }
else if (
status.isFailure()) {
540 while( !m_skipEventRanges.empty() &&
m_evtCount >= m_skipEventRanges.front().second ) {
541 m_skipEventRanges.erase(m_skipEventRanges.begin());
546 return StatusCode::SUCCESS;
551 for (
int i = 0;
i < jump;
i++) {
554 return StatusCode::SUCCESS;
556 return StatusCode::FAILURE;
561 if( m_inputCollectionsChanged ) {
563 if( rc != StatusCode::SUCCESS )
return rc;
567 if (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
568 m_headerIterator =
nullptr;
570 m_poolCollectionConverter.reset();
578 if( m_inputCollectionsChanged ) {
580 if( rc != StatusCode::SUCCESS )
return rc;
583 ++m_inputCollectionsIterator;
586 if (!m_poolCollectionConverter) {
590 return StatusCode::FAILURE;
593 m_headerIterator = &m_poolCollectionConverter->selectAll();
596 return StatusCode::RECOVERABLE;
600 const Token& headRef = m_headerIterator->eventRef();
605 if (
guid != m_guid) {
614 m_activeEventsPerSource[
guid.toString()] = 0;
619 if (!
m_athenaPoolCnvSvc->setInputAttributes(*m_inputCollectionsIterator).isSuccess()) {
621 return StatusCode::FAILURE;
624 FileIncident beginInputFileIncident(
name(),
"BeginInputFile", *m_inputCollectionsIterator, m_guid.toString());
630 FileIncident beginInputFileIncident(
name(),
"BeginInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
635 return StatusCode::SUCCESS;
644 if (
sc.isRecoverable()) {
647 if (
sc.isFailure()) {
648 return StatusCode::FAILURE;
658 && (m_skipEventRanges.empty() ||
m_evtCount < m_skipEventRanges.front().first))
660 return StatusCode::SUCCESS;
662 while( !m_skipEventRanges.empty() &&
m_evtCount >= m_skipEventRanges.front().second ) {
663 m_skipEventRanges.erase(m_skipEventRanges.begin());
673 return StatusCode::SUCCESS;
678 return StatusCode::FAILURE;
683 for (
int i = 0;
i < jump;
i++) {
686 return StatusCode::SUCCESS;
688 return StatusCode::FAILURE;
694 return StatusCode::SUCCESS;
696 return StatusCode::FAILURE;
702 return StatusCode::SUCCESS;
706 IOpaqueAddress*& iop)
const {
707 std::string tokenStr;
711 tokenStr = (*attrList)[
"eventRef"].data<std::string>();
712 ATH_MSG_DEBUG(
"found AthenaAttribute, name = eventRef = " << tokenStr);
715 return StatusCode::FAILURE;
719 tokenStr = m_headerIterator->eventRef().toString();
721 auto token = std::make_unique<Token>();
722 token->fromString(tokenStr);
724 return StatusCode::SUCCESS;
728 return StatusCode::SUCCESS;
732 IEvtSelector::Context& )
const {
733 return StatusCode::SUCCESS;
738 if( m_inputCollectionsChanged ) {
740 if( rc != StatusCode::SUCCESS )
return rc;
748 m_headerIterator =
nullptr;
751 return StatusCode::RECOVERABLE;
755 m_poolCollectionConverter->disconnectDb().ignore();
757 m_poolCollectionConverter.reset();
762 <<
"\" from the collection list.");
770 if (!m_poolCollectionConverter->initialize().isSuccess()) {
771 m_headerIterator =
nullptr;
772 ATH_MSG_ERROR(
"seek: Unable to initialize PoolCollectionConverter.");
773 return StatusCode::FAILURE;
776 m_headerIterator = &m_poolCollectionConverter->selectAll();
779 next(*beginIter).ignore();
780 ATH_MSG_DEBUG(
"Token " << m_headerIterator->eventRef().toString());
782 m_headerIterator =
nullptr;
784 return StatusCode::FAILURE;
788 if (m_headerIterator->seek(evtNum - m_firstEvt[
m_curCollection]) == 0) {
789 m_headerIterator =
nullptr;
791 return StatusCode::FAILURE;
795 return StatusCode::SUCCESS;
807 for (std::size_t
i = 0,
imax = m_numEvt.size();
i <
imax;
i++) {
808 if (m_numEvt[
i] == -1) {
816 int collection_size = 0;
819 collection_size = hi->
size();
822 m_firstEvt[
i] = m_firstEvt[
i - 1] + m_numEvt[
i - 1];
826 m_numEvt[
i] = collection_size;
828 if (evtNum >= m_firstEvt[
i] && evtNum < m_firstEvt[
i] + m_numEvt[
i]) {
840 return StatusCode::FAILURE;
843 if (
ds->makeServer(
num - 1).isFailure()) {
844 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to output DataStreaming server");
846 return StatusCode::SUCCESS;
848 if (
ds->makeServer(
num + 1).isFailure()) {
849 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to input DataStreaming server");
850 return StatusCode::FAILURE;
853 return StatusCode::SUCCESS;
865 return StatusCode::FAILURE;
867 if (
ds->makeClient(
num + 1).isFailure()) {
868 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to DataStreaming client");
869 return StatusCode::FAILURE;
872 return StatusCode::SUCCESS;
875 std::string dummyStr;
884 return StatusCode::FAILURE;
888 while (
sc.isRecoverable()) {
893 if (
sc.isFailure()) {
894 if (
ds->makeClient(0).isFailure()) {
895 return StatusCode::FAILURE;
898 while (
sc.isRecoverable() ||
sc.isFailure()) {
903 if (
ds->makeClient(1).isFailure()) {
904 return StatusCode::FAILURE;
909 return StatusCode::FAILURE;
917 return StatusCode::FAILURE;
920 ATH_MSG_ERROR(
"No AthenaSharedMemoryTool configured for readEvent()");
921 return StatusCode::FAILURE;
925 for (
int i = 0;
i < maxevt || maxevt == -1; ++
i) {
926 if (!
next(*ctxt).isSuccess()) {
932 delete ctxt; ctxt =
nullptr;
933 return StatusCode::FAILURE;
938 delete ctxt; ctxt =
nullptr;
942 while (
ds->readData().isSuccess()) {
943 ATH_MSG_VERBOSE(
"Called last readData, while marking last event in readEvent()");
947 if (!
sc.isSuccess()) {
948 ATH_MSG_ERROR(
"Cannot put last Event marker to AthenaSharedMemoryTool");
949 return StatusCode::FAILURE;
952 while (
sc.isSuccess() ||
sc.isRecoverable()) {
955 ATH_MSG_DEBUG(
"Failed last readData -> Clients are stopped, after marking last event in readEvent()");
957 return StatusCode::SUCCESS;
967 std::unique_ptr<PoolCollectionConverter>
975 ATH_MSG_DEBUG(
"Try item: \"" << *m_inputCollectionsIterator <<
"\" from the collection list.");
977 *m_inputCollectionsIterator,
981 if (!
status.isSuccess()) {
984 if (!
status.isRecoverable()) {
985 ATH_MSG_ERROR(
"Unable to initialize PoolCollectionConverter.");
986 throw GaudiException(
"Unable to read: " + *m_inputCollectionsIterator,
name(), StatusCode::FAILURE);
988 ATH_MSG_ERROR(
"Unable to open: " << *m_inputCollectionsIterator);
989 throw GaudiException(
"Unable to open: " + *m_inputCollectionsIterator,
name(), StatusCode::FAILURE);
992 if (!pCollCnv->isValid().isSuccess()) {
994 ATH_MSG_DEBUG(
"No events found in: " << *m_inputCollectionsIterator <<
" skipped!!!");
996 FileIncident beginInputFileIncident(
name(),
"BeginInputFile", *m_inputCollectionsIterator);
998 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"eventless " + *m_inputCollectionsIterator);
1001 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb(*m_inputCollectionsIterator).ignore();
1002 ++m_inputCollectionsIterator;
1023 return StatusCode::SUCCESS;
1028 const pool::TokenList& tokenList = m_headerIterator->currentRow().tokenList();
1031 (*attrList)[
iter.tokenName() +
suffix].data<std::string>() =
iter->toString();
1035 std::string eventRef =
"eventRef";
1039 attrList->extend(eventRef,
"string");
1040 (*attrList)[eventRef].data<std::string>() = m_headerIterator->eventRef().toString();
1041 ATH_MSG_DEBUG(
"record AthenaAttribute, name = " + eventRef +
" = " << m_headerIterator->eventRef().toString() <<
".");
1045 for (
const auto &attr : sourceAttrList) {
1046 attrList->extend(attr.specification().name() +
suffix, attr.specification().type());
1047 (*attrList)[attr.specification().name() +
suffix] = attr;
1051 return StatusCode::SUCCESS;
1056 if (m_poolCollectionConverter) {
1057 m_poolCollectionConverter->disconnectDb().ignore();
1058 m_poolCollectionConverter.reset();
1060 m_headerIterator =
nullptr;
1062 if (!iomgr.retrieve().isSuccess()) {
1064 return StatusCode::FAILURE;
1066 if (!iomgr->io_hasitem(
this)) {
1067 ATH_MSG_FATAL(
"IoComponentMgr does not know about myself !");
1068 return StatusCode::FAILURE;
1075 std::set<std::size_t> updatedIndexes;
1077 if (updatedIndexes.find(
i) != updatedIndexes.end())
continue;
1080 if (!iomgr->io_contains(
this,
fname)) {
1082 return StatusCode::FAILURE;
1084 if (!iomgr->io_retrieve(
this,
fname).isSuccess()) {
1086 return StatusCode::FAILURE;
1088 if (savedName !=
fname) {
1092 updatedIndexes.insert(
i);
1093 for (std::size_t j =
i + 1; j <
imax; j++) {
1096 updatedIndexes.insert(j);
1108 if (m_poolCollectionConverter) {
1109 m_poolCollectionConverter->disconnectDb().ignore();
1110 m_poolCollectionConverter.reset();
1112 return StatusCode::SUCCESS;
1124 if (inc.type() == IncidentType::BeginProcessing) {
1135 ATH_MSG_WARNING(
"could not read event source ID from incident event context");
1138 if( m_activeEventsPerSource.find( fid ) == m_activeEventsPerSource.end()) {
1139 ATH_MSG_DEBUG(
"Incident handler ignoring unknown input FID: " << fid );
1142 ATH_MSG_DEBUG(
"** MN Incident handler " << inc.type() <<
" Event source ID=" << fid );
1143 if( inc.type() == IncidentType::BeginProcessing ) {
1145 m_activeEventsPerSource[fid]++;
1146 }
else if( inc.type() == IncidentType::EndProcessing ) {
1147 m_activeEventsPerSource[fid]--;
1152 for(
auto&
source: m_activeEventsPerSource )
1164 if(
m_eventStreamingTool.empty() && m_activeEventsPerSource.find(fid) != m_activeEventsPerSource.end()
1165 && m_activeEventsPerSource[fid] <= 0 && m_guid != fid ) {
1170 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + fid, fid);
1173 ATH_MSG_INFO(
"Disconnecting input sourceID: " << fid );
1175 m_activeEventsPerSource.erase( fid );