26 #include "GaudiKernel/ClassID.h"
27 #include "GaudiKernel/FileIncident.h"
28 #include "GaudiKernel/IIncidentSvc.h"
29 #include "GaudiKernel/IIoComponentMgr.h"
30 #include "GaudiKernel/GaudiException.h"
31 #include "GaudiKernel/GenericAddress.h"
32 #include "GaudiKernel/StatusCode.h"
40 #include <boost/tokenizer.hpp>
50 size_t nbytes,
unsigned int status) {
59 base_class(
name, pSvcLocator)
73 m_inputCollectionsChanged =
false;
77 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
78 m_inputCollectionsChanged =
true;
91 m_autoRetrieveTools =
false;
92 m_checkToolDeps =
false;
102 return(StatusCode::FAILURE);
106 ATH_MSG_FATAL(
"Use the property: EventSelector.InputCollections = "
107 <<
"[ \"<collectionName>\" ] (list of collections)");
108 return(StatusCode::FAILURE);
110 boost::char_separator<char> sep_coma(
","), sep_hyph(
"-");
112 for(
const std::string&
r:
ranges ) {
113 boost::tokenizer fromto(
r, sep_hyph);
114 auto from_iter = fromto.begin();
115 std::stringstream strstr1( *from_iter );
118 if( ++from_iter != fromto.end() ) {
119 std::stringstream strstr2( *from_iter );
124 m_skipEventRanges.push_back( std::pair(from,
to) );
128 m_skipEventRanges.push_back( std::pair(
v,
v) );
130 std::sort(m_skipEventRanges.begin(), m_skipEventRanges.end());
132 std::stringstream skip_ranges_ss;
133 for(
auto&
r: m_skipEventRanges ) {
134 if( not skip_ranges_ss.str().empty() ) skip_ranges_ss <<
", ";
135 skip_ranges_ss <<
r.first;
136 if(
r.first !=
r.second) skip_ranges_ss <<
"-" <<
r.second;
138 if( not skip_ranges_ss.str().empty() )
143 ATH_MSG_FATAL(
"EventSelector.CollectionType must be one of: ExplicitROOT, ImplicitROOT (default)");
144 return(StatusCode::FAILURE);
149 return(StatusCode::FAILURE);
153 m_incidentSvc->addListener(
this, IncidentType::BeginProcessing, 0);
154 m_incidentSvc->addListener(
this, IncidentType::EndProcessing, 0);
160 return(StatusCode::FAILURE);
165 return(StatusCode::FAILURE);
168 if (!m_helperTools.retrieve().isSuccess()) {
170 return(StatusCode::FAILURE);
175 return(StatusCode::FAILURE);
177 std::string dummyStr;
179 ATH_MSG_ERROR(
"Could not make AthenaPoolCnvSvc a Share Client");
180 return(StatusCode::FAILURE);
186 std::vector<std::string>
propVal;
188 ATH_MSG_FATAL(
"Cannot get EventPersistencySvc Property for CnvServices");
189 return(StatusCode::FAILURE);
191 bool foundCnvSvc =
false;
192 for (
const auto& property :
propVal) {
198 ATH_MSG_FATAL(
"Cannot set EventPersistencySvc Property for CnvServices");
199 return(StatusCode::FAILURE);
205 if (!iomgr.retrieve().isSuccess()) {
207 return(StatusCode::FAILURE);
209 if (!iomgr->io_register(
this).isSuccess()) {
210 ATH_MSG_FATAL(
"Could not register myself with the IoComponentMgr !");
211 return(StatusCode::FAILURE);
218 if (incol[
icol].substr(0, 4) ==
"LFN:" || incol[
icol].substr(0, 4) ==
"FID:") {
223 if (
fileName.substr(0, 4) ==
"PFN:") {
234 return(StatusCode::FAILURE);
240 return(StatusCode::FAILURE);
252 for(
auto&
el : m_numEvt )
el = -1;
254 for(
auto&
el : m_firstEvt )
el = -1;
259 if (!m_firstEvt.empty()) {
262 m_inputCollectionsChanged =
false;
264 m_headerIterator = 0;
266 ATH_MSG_INFO(
"Done reinitialization for shared reader client");
267 return(StatusCode::SUCCESS);
269 bool retError =
false;
270 for (
auto&
tool : m_helperTools) {
271 if (!
tool->postInitialize().isSuccess()) {
278 return(StatusCode::FAILURE);
284 if (m_poolCollectionConverter ==
nullptr) {
285 ATH_MSG_INFO(
"No Events found in any Input Collections");
291 FileIncident firstInputFileIncident(
name(),
"FirstInputFile", *m_inputCollectionsIterator);
296 return(StatusCode::SUCCESS);
310 m_headerIterator = &m_poolCollectionConverter->executeQuery();
312 ATH_MSG_FATAL(
"Cannot open implicit collection - check data/software version.");
314 return(StatusCode::FAILURE);
316 while (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
317 if (m_poolCollectionConverter !=
nullptr) {
318 m_poolCollectionConverter->disconnectDb().ignore();
319 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
321 ++m_inputCollectionsIterator;
323 if (m_poolCollectionConverter !=
nullptr) {
324 m_headerIterator = &m_poolCollectionConverter->executeQuery();
329 if (m_poolCollectionConverter ==
nullptr || m_headerIterator ==
nullptr) {
333 if (m_poolCollectionConverter ==
nullptr) {
334 return(StatusCode::SUCCESS);
336 m_headerIterator = &m_poolCollectionConverter->selectAll();
337 while (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
338 if (m_poolCollectionConverter !=
nullptr) {
339 m_poolCollectionConverter->disconnectDb().ignore();
340 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
342 ++m_inputCollectionsIterator;
344 if (m_poolCollectionConverter !=
nullptr) {
345 m_headerIterator = &m_poolCollectionConverter->selectAll();
351 if (m_poolCollectionConverter ==
nullptr || m_headerIterator ==
nullptr) {
352 return(StatusCode::SUCCESS);
355 m_headerIterator->eventRef()
356 : m_headerIterator->currentRow().tokenList()[
m_refName.value() +
"_ref"];
363 FileIncident firstInputFileIncident(
name(),
"FirstInputFile",
"FID:" + fid, fid);
367 return(StatusCode::SUCCESS);
371 if (m_poolCollectionConverter !=
nullptr) {
373 m_poolCollectionConverter->disconnectDb().ignore();
374 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
379 return(StatusCode::SUCCESS);
382 if (m_poolCollectionConverter ==
nullptr) {
383 ATH_MSG_INFO(
"No Events found in any Input Collections");
386 --m_inputCollectionsIterator;
389 m_headerIterator = &m_poolCollectionConverter->executeQuery();
395 return(StatusCode::SUCCESS);
400 return(StatusCode::SUCCESS);
402 IEvtSelector::Context* ctxt(
nullptr);
406 return(StatusCode::SUCCESS);
416 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
432 for (
auto&
tool : m_helperTools) {
433 if (!
tool->preFinalize().isSuccess()) {
439 m_headerIterator =
nullptr;
440 if (m_poolCollectionConverter !=
nullptr) {
441 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
452 if (!m_helperTools.release().isSuccess()) {
469 if (riid == IEvtSelector::interfaceID()) {
470 *ppvInterface =
dynamic_cast<IEvtSelector*
>(
this);
471 }
else if (riid == IIoComponent::interfaceID()) {
472 *ppvInterface =
dynamic_cast<IIoComponent*
>(
this);
473 }
else if (riid == IProperty::interfaceID()) {
474 *ppvInterface =
dynamic_cast<IProperty*
>(
this);
475 }
else if (riid == IEvtSelectorSeek::interfaceID()) {
479 }
else if (riid == ISecondaryEventSelector::interfaceID()) {
482 return(::AthService::queryInterface(riid, ppvInterface));
485 return(StatusCode::SUCCESS);
490 return(StatusCode::SUCCESS);
494 std::lock_guard<CallMutex> lockGuard(
m_callLock);
498 while (
sc.isRecoverable()) {
505 void* tokenStr =
nullptr;
508 if (
sc.isRecoverable()) {
509 delete [] (
char*)tokenStr; tokenStr =
nullptr;
513 return(StatusCode::FAILURE);
515 if (
sc.isFailure()) {
516 ATH_MSG_FATAL(
"Cannot get NextEvent from AthenaSharedMemoryTool");
517 delete [] (
char*)tokenStr; tokenStr =
nullptr;
518 return(StatusCode::FAILURE);
524 athAttrList->extend(
"eventRef",
"string");
525 (*athAttrList)[
"eventRef"].data<std::string>() = std::string((
char*)tokenStr);
527 if (!
wh.record(std::move(athAttrList)).isSuccess()) {
528 delete [] (
char*)tokenStr; tokenStr =
nullptr;
530 return(StatusCode::FAILURE);
533 token.
fromString(std::string((
char*)tokenStr));
534 delete [] (
char*)tokenStr; tokenStr =
nullptr;
539 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
543 FileIncident beginInputFileIncident(
name(),
"BeginInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
546 return(StatusCode::SUCCESS);
548 for (
const auto&
tool : m_helperTools) {
549 if (!
tool->preNext().isSuccess()) {
556 if (
sc.isRecoverable()) {
559 if (
sc.isFailure()) {
560 return StatusCode::FAILURE;
568 && (m_skipEventRanges.empty() ||
m_evtCount < m_skipEventRanges.front().first))
571 std::string token = m_headerIterator->eventRef().toString();
575 token.length() + 1, 0)).isRecoverable() ) {
577 ATH_MSG_VERBOSE(
"Called last readData, while putting next event in next()");
581 if (!
sc.isSuccess()) {
583 return(StatusCode::FAILURE);
587 if (!
eventStore()->clearStore().isSuccess()) {
592 return(StatusCode::FAILURE);
597 for (
const auto&
tool : m_helperTools) {
599 if (toolStatus.isRecoverable()) {
602 status = StatusCode::RECOVERABLE;
604 }
else if (toolStatus.isFailure()) {
606 status = StatusCode::FAILURE;
609 if (
status.isRecoverable()) {
611 }
else if (
status.isFailure()) {
620 while( !m_skipEventRanges.empty() &&
m_evtCount >= m_skipEventRanges.front().second ) {
621 m_skipEventRanges.erase(m_skipEventRanges.begin());
626 return(StatusCode::SUCCESS);
631 for (
int i = 0;
i < jump;
i++) {
634 return(StatusCode::SUCCESS);
636 return(StatusCode::FAILURE);
641 if( m_inputCollectionsChanged ) {
643 if( rc != StatusCode::SUCCESS )
return rc;
647 if (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
648 m_headerIterator =
nullptr;
650 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
658 if( m_inputCollectionsChanged ) {
660 if( rc != StatusCode::SUCCESS )
return rc;
663 ++m_inputCollectionsIterator;
666 if (m_poolCollectionConverter ==
nullptr) {
670 return StatusCode::FAILURE;
673 m_headerIterator = &m_poolCollectionConverter->executeQuery();
676 return StatusCode::RECOVERABLE;
682 m_headerIterator->eventRef()
683 : m_headerIterator->currentRow().tokenList()[
m_refName.value() +
"_ref"];
688 if (
guid != m_guid) {
697 m_activeEventsPerSource[
guid.toString()] = 0;
702 if (!
m_athenaPoolCnvSvc->setInputAttributes(*m_inputCollectionsIterator).isSuccess()) {
704 return(StatusCode::FAILURE);
707 FileIncident beginInputFileIncident(
name(),
"BeginInputFile", *m_inputCollectionsIterator, m_guid.toString());
713 FileIncident beginInputFileIncident(
name(),
"BeginInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
718 return StatusCode::SUCCESS;
727 if (
sc.isRecoverable()) {
730 if (
sc.isFailure()) {
731 return StatusCode::FAILURE;
741 && (m_skipEventRanges.empty() ||
m_evtCount < m_skipEventRanges.front().first))
743 return StatusCode::SUCCESS;
745 while( !m_skipEventRanges.empty() &&
m_evtCount >= m_skipEventRanges.front().second ) {
746 m_skipEventRanges.erase(m_skipEventRanges.begin());
756 return StatusCode::SUCCESS;
761 return(StatusCode::FAILURE);
766 for (
int i = 0;
i < jump;
i++) {
769 return(StatusCode::SUCCESS);
771 return(StatusCode::FAILURE);
777 return(StatusCode::SUCCESS);
779 return(StatusCode::FAILURE);
785 return(StatusCode::SUCCESS);
789 IOpaqueAddress*& iop)
const {
790 std::string tokenStr;
795 tokenStr = (*attrList)[
"eventRef"].data<std::string>();
796 ATH_MSG_DEBUG(
"found AthenaAttribute, name = eventRef = " << tokenStr);
798 tokenStr = (*attrList)[
m_refName.value() +
"_ref"].data<std::string>();
803 return(StatusCode::FAILURE);
807 tokenStr = m_poolCollectionConverter->retrieveToken(m_headerIterator,
m_refName.value());
812 return(StatusCode::SUCCESS);
816 return(StatusCode::SUCCESS);
820 IEvtSelector::Context& )
const {
821 return(StatusCode::SUCCESS);
826 if( m_inputCollectionsChanged ) {
828 if( rc != StatusCode::SUCCESS )
return rc;
836 m_headerIterator =
nullptr;
839 return(StatusCode::RECOVERABLE);
843 m_poolCollectionConverter->disconnectDb().ignore();
845 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
850 <<
"\" from the collection list.");
859 if (!m_poolCollectionConverter->initialize().isSuccess()) {
860 m_headerIterator =
nullptr;
861 ATH_MSG_ERROR(
"seek: Unable to initialize PoolCollectionConverter.");
862 return(StatusCode::FAILURE);
865 m_headerIterator = &m_poolCollectionConverter->executeQuery();
868 next(*beginIter).ignore();
869 ATH_MSG_DEBUG(
"Token " << m_headerIterator->eventRef().toString());
871 m_headerIterator =
nullptr;
873 return(StatusCode::FAILURE);
880 return(StatusCode::FAILURE);
883 m_headerIterator =
nullptr;
885 return(StatusCode::FAILURE);
889 return(StatusCode::SUCCESS);
901 for (std::size_t
i = 0,
imax = m_numEvt.size();
i <
imax;
i++) {
902 if (m_numEvt[
i] == -1) {
911 int collection_size = 0;
918 collection_size = cs->
size();
921 m_firstEvt[
i] = m_firstEvt[
i - 1] + m_numEvt[
i - 1];
925 m_numEvt[
i] = collection_size;
927 if (evtNum >= m_firstEvt[
i] && evtNum < m_firstEvt[
i] + m_numEvt[
i]) {
938 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to output DataStreaming server");
940 return(StatusCode::SUCCESS);
943 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to input DataStreaming server");
944 return(StatusCode::FAILURE);
947 return(StatusCode::SUCCESS);
957 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to DataStreaming client");
958 return(StatusCode::FAILURE);
961 return(StatusCode::SUCCESS);
964 std::string dummyStr;
972 while (
sc.isRecoverable()) {
977 if (
sc.isFailure()) {
979 return(StatusCode::FAILURE);
982 while (
sc.isRecoverable() ||
sc.isFailure()) {
988 return(StatusCode::FAILURE);
993 return(StatusCode::FAILURE);
999 ATH_MSG_ERROR(
"No AthenaSharedMemoryTool configured for readEvent()");
1000 return(StatusCode::FAILURE);
1004 for (
int i = 0;
i < maxevt || maxevt == -1; ++
i) {
1005 if (!
next(*ctxt).isSuccess()) {
1007 ATH_MSG_VERBOSE(
"Called read Event and read last event from input: " <<
i);
1011 delete ctxt; ctxt =
nullptr;
1012 return(StatusCode::FAILURE);
1017 delete ctxt; ctxt =
nullptr;
1022 ATH_MSG_VERBOSE(
"Called last readData, while marking last event in readEvent()");
1026 if (!
sc.isSuccess()) {
1027 ATH_MSG_ERROR(
"Cannot put last Event marker to AthenaSharedMemoryTool");
1028 return(StatusCode::FAILURE);
1031 while (
sc.isSuccess() ||
sc.isRecoverable()) {
1034 ATH_MSG_DEBUG(
"Failed last readData -> Clients are stopped, after marking last event in readEvent()");
1036 return(StatusCode::SUCCESS);
1044 for (std::size_t
i = 0,
imax = m_numEvt.size();
i <
imax;
i++) {
1057 ATH_MSG_DEBUG(
"Try item: \"" << *m_inputCollectionsIterator <<
"\" from the collection list.");
1059 *m_inputCollectionsIterator,
1064 if (!
status.isSuccess()) {
1066 delete pCollCnv; pCollCnv =
nullptr;
1067 if (!
status.isRecoverable()) {
1068 ATH_MSG_ERROR(
"Unable to initialize PoolCollectionConverter.");
1069 throw GaudiException(
"Unable to read: " + *m_inputCollectionsIterator,
name(), StatusCode::FAILURE);
1071 ATH_MSG_ERROR(
"Unable to open: " << *m_inputCollectionsIterator);
1072 throw GaudiException(
"Unable to open: " + *m_inputCollectionsIterator,
name(), StatusCode::FAILURE);
1075 if (!pCollCnv->
isValid().isSuccess()) {
1076 delete pCollCnv; pCollCnv =
nullptr;
1077 ATH_MSG_DEBUG(
"No events found in: " << *m_inputCollectionsIterator <<
" skipped!!!");
1079 FileIncident beginInputFileIncident(
name(),
"BeginInputFile", *m_inputCollectionsIterator);
1081 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"eventless " + *m_inputCollectionsIterator);
1084 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb(*m_inputCollectionsIterator).ignore();
1085 ++m_inputCollectionsIterator;
1105 if (!
wh.record(std::move(athAttrList)).isSuccess()) {
1107 return(StatusCode::FAILURE);
1109 return(StatusCode::SUCCESS);
1114 const pool::TokenList& tokenList = m_headerIterator->currentRow().tokenList();
1117 (*attrList)[iter.tokenName() +
suffix].data<std::string>() = iter->toString();
1118 ATH_MSG_DEBUG(
"record AthenaAttribute, name = " << iter.tokenName() +
suffix <<
" = " << iter->toString() <<
".");
1121 std::string eventRef =
"eventRef";
1125 attrList->extend(eventRef,
"string");
1126 (*attrList)[eventRef].data<std::string>() = m_headerIterator->eventRef().toString();
1127 ATH_MSG_DEBUG(
"record AthenaAttribute, name = " + eventRef +
" = " << m_headerIterator->eventRef().toString() <<
".");
1131 for (
const auto &attr : sourceAttrList) {
1132 attrList->extend(attr.specification().name() +
suffix, attr.specification().type());
1133 (*attrList)[attr.specification().name() +
suffix] = attr;
1137 return StatusCode::SUCCESS;
1142 if (m_poolCollectionConverter !=
nullptr) {
1143 m_poolCollectionConverter->disconnectDb().ignore();
1144 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
1146 m_headerIterator =
nullptr;
1148 if (!iomgr.retrieve().isSuccess()) {
1150 return(StatusCode::FAILURE);
1152 if (!iomgr->io_hasitem(
this)) {
1153 ATH_MSG_FATAL(
"IoComponentMgr does not know about myself !");
1154 return(StatusCode::FAILURE);
1161 std::set<std::size_t> updatedIndexes;
1163 if (updatedIndexes.find(
i) != updatedIndexes.end())
continue;
1166 if (!iomgr->io_contains(
this,
fname)) {
1168 return(StatusCode::FAILURE);
1170 if (!iomgr->io_retrieve(
this,
fname).isSuccess()) {
1172 return(StatusCode::FAILURE);
1174 if (savedName !=
fname) {
1178 updatedIndexes.insert(
i);
1179 for (std::size_t j =
i + 1; j <
imax; j++) {
1182 updatedIndexes.insert(j);
1194 if (m_poolCollectionConverter !=
nullptr) {
1195 m_poolCollectionConverter->disconnectDb().ignore();
1196 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
1198 return(StatusCode::SUCCESS);
1210 if (inc.type() == IncidentType::BeginProcessing) {
1221 ATH_MSG_WARNING(
"could not read event source ID from incident event context");
1224 if( m_activeEventsPerSource.find( fid ) == m_activeEventsPerSource.end()) {
1225 ATH_MSG_DEBUG(
"Incident handler ignoring unknown input FID: " << fid );
1228 ATH_MSG_DEBUG(
"** MN Incident handler " << inc.type() <<
" Event source ID=" << fid );
1229 if( inc.type() == IncidentType::BeginProcessing ) {
1231 m_activeEventsPerSource[fid]++;
1232 }
else if( inc.type() == IncidentType::EndProcessing ) {
1233 m_activeEventsPerSource[fid]--;
1238 for(
auto& source: m_activeEventsPerSource )
1239 msg(
MSG::DEBUG) <<
"SourceID: " << source.first <<
" active events: " << source.second <<
endmsg;
1250 if(
m_eventStreamingTool.empty() && m_activeEventsPerSource.find(fid) != m_activeEventsPerSource.end()
1251 && m_activeEventsPerSource[fid] <= 0 && m_guid != fid ) {
1256 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + fid, fid);
1259 ATH_MSG_INFO(
"Disconnecting input sourceID: " << fid );
1261 m_activeEventsPerSource.erase( fid );