26 #include "GaudiKernel/ClassID.h"
27 #include "GaudiKernel/FileIncident.h"
28 #include "GaudiKernel/IIncidentSvc.h"
29 #include "GaudiKernel/IIoComponentMgr.h"
30 #include "GaudiKernel/GaudiException.h"
31 #include "GaudiKernel/GenericAddress.h"
32 #include "GaudiKernel/StatusCode.h"
40 #include <boost/tokenizer.hpp>
50 size_t nbytes,
unsigned int status) {
59 base_class(
name, pSvcLocator)
61 declareProperty(
"HelperTools", m_helperTools);
73 m_inputCollectionsChanged =
false;
77 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
78 m_inputCollectionsChanged =
true;
91 m_autoRetrieveTools =
false;
92 m_checkToolDeps =
false;
102 return(StatusCode::FAILURE);
106 ATH_MSG_FATAL(
"Use the property: EventSelector.InputCollections = "
107 <<
"[ \"<collectionName>\" ] (list of collections)");
108 return(StatusCode::FAILURE);
110 boost::char_separator<char> sep_coma(
","), sep_hyph(
"-");
112 for(
const std::string&
r:
ranges ) {
113 boost::tokenizer fromto(
r, sep_hyph);
114 auto from_iter = fromto.begin();
115 std::stringstream strstr1( *from_iter );
118 if( ++from_iter != fromto.end() ) {
119 std::stringstream strstr2( *from_iter );
124 m_skipEventRanges.push_back( std::pair(from,
to) );
128 m_skipEventRanges.push_back( std::pair(
v,
v) );
130 std::sort(m_skipEventRanges.begin(), m_skipEventRanges.end());
132 std::stringstream skip_ranges_ss;
133 for(
auto&
r: m_skipEventRanges ) {
134 if( not skip_ranges_ss.str().empty() ) skip_ranges_ss <<
", ";
135 skip_ranges_ss <<
r.first;
136 if(
r.first !=
r.second) skip_ranges_ss <<
"-" <<
r.second;
138 if( not skip_ranges_ss.str().empty() )
143 ATH_MSG_FATAL(
"EventSelector.CollectionType must be one of: ExplicitROOT, ImplicitROOT (default)");
144 return(StatusCode::FAILURE);
149 return(StatusCode::FAILURE);
153 m_incidentSvc->addListener(
this, IncidentType::BeginProcessing, 0);
154 m_incidentSvc->addListener(
this, IncidentType::EndProcessing, 0);
160 return(StatusCode::FAILURE);
165 return(StatusCode::FAILURE);
168 if (!m_helperTools.retrieve().isSuccess()) {
170 return(StatusCode::FAILURE);
175 return(StatusCode::FAILURE);
177 std::string dummyStr;
179 ATH_MSG_ERROR(
"Could not make AthenaPoolCnvSvc a Share Client");
180 return(StatusCode::FAILURE);
186 std::vector<std::string>
propVal;
188 ATH_MSG_FATAL(
"Cannot get EventPersistencySvc Property for CnvServices");
189 return(StatusCode::FAILURE);
191 bool foundCnvSvc =
false;
192 for (
const auto& property :
propVal) {
198 ATH_MSG_FATAL(
"Cannot set EventPersistencySvc Property for CnvServices");
199 return(StatusCode::FAILURE);
205 if (!iomgr.retrieve().isSuccess()) {
207 return(StatusCode::FAILURE);
209 if (!iomgr->io_register(
this).isSuccess()) {
210 ATH_MSG_FATAL(
"Could not register myself with the IoComponentMgr !");
211 return(StatusCode::FAILURE);
218 if (incol[
icol].substr(0, 4) ==
"LFN:" || incol[
icol].substr(0, 4) ==
"FID:") {
223 if (
fileName.substr(0, 4) ==
"PFN:") {
234 return(StatusCode::FAILURE);
240 return(StatusCode::FAILURE);
252 for(
auto&
el : m_numEvt )
el = -1;
254 for(
auto&
el : m_firstEvt )
el = -1;
259 if (!m_firstEvt.empty()) {
262 m_inputCollectionsChanged =
false;
264 m_headerIterator = 0;
266 ATH_MSG_INFO(
"Done reinitialization for shared reader client");
267 return(StatusCode::SUCCESS);
269 bool retError =
false;
270 for (
auto&
tool : m_helperTools) {
271 if (!
tool->postInitialize().isSuccess()) {
278 return(StatusCode::FAILURE);
284 if (m_poolCollectionConverter ==
nullptr) {
285 ATH_MSG_INFO(
"No Events found in any Input Collections");
291 FileIncident firstInputFileIncident(
name(),
"FirstInputFile", *m_inputCollectionsIterator);
296 return(StatusCode::SUCCESS);
310 m_headerIterator = &m_poolCollectionConverter->executeQuery();
312 ATH_MSG_FATAL(
"Cannot open implicit collection - check data/software version.");
314 return(StatusCode::FAILURE);
316 while (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
317 if (m_poolCollectionConverter !=
nullptr) {
318 m_poolCollectionConverter->disconnectDb().ignore();
319 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
321 ++m_inputCollectionsIterator;
323 if (m_poolCollectionConverter !=
nullptr) {
324 m_headerIterator = &m_poolCollectionConverter->executeQuery();
329 if (m_poolCollectionConverter ==
nullptr || m_headerIterator ==
nullptr) {
333 if (m_poolCollectionConverter ==
nullptr) {
334 return(StatusCode::SUCCESS);
336 m_headerIterator = &m_poolCollectionConverter->selectAll();
337 while (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
338 if (m_poolCollectionConverter !=
nullptr) {
339 m_poolCollectionConverter->disconnectDb().ignore();
340 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
342 ++m_inputCollectionsIterator;
344 if (m_poolCollectionConverter !=
nullptr) {
345 m_headerIterator = &m_poolCollectionConverter->selectAll();
351 if (m_poolCollectionConverter ==
nullptr || m_headerIterator ==
nullptr) {
352 return(StatusCode::SUCCESS);
355 m_headerIterator->eventRef()
356 : m_headerIterator->currentRow().tokenList()[
m_refName.value() +
"_ref"];
363 FileIncident firstInputFileIncident(
name(),
"FirstInputFile",
"FID:" + fid, fid);
367 return(StatusCode::SUCCESS);
371 if (m_poolCollectionConverter !=
nullptr) {
373 m_poolCollectionConverter->disconnectDb().ignore();
374 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
379 return(StatusCode::SUCCESS);
382 if (m_poolCollectionConverter ==
nullptr) {
383 ATH_MSG_INFO(
"No Events found in any Input Collections");
386 --m_inputCollectionsIterator;
389 m_headerIterator = &m_poolCollectionConverter->executeQuery();
395 return(StatusCode::SUCCESS);
400 return(StatusCode::SUCCESS);
402 IEvtSelector::Context* ctxt(
nullptr);
406 return(StatusCode::SUCCESS);
416 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
432 for (
auto&
tool : m_helperTools) {
433 if (!
tool->preFinalize().isSuccess()) {
439 m_headerIterator =
nullptr;
440 if (m_poolCollectionConverter !=
nullptr) {
441 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
452 if (!m_helperTools.release().isSuccess()) {
470 return(StatusCode::SUCCESS);
474 std::lock_guard<CallMutex> lockGuard(
m_callLock);
478 while (
sc.isRecoverable()) {
485 void* tokenStr =
nullptr;
488 if (
sc.isRecoverable()) {
489 delete [] (
char*)tokenStr; tokenStr =
nullptr;
493 return(StatusCode::FAILURE);
495 if (
sc.isFailure()) {
496 ATH_MSG_FATAL(
"Cannot get NextEvent from AthenaSharedMemoryTool");
497 delete [] (
char*)tokenStr; tokenStr =
nullptr;
498 return(StatusCode::FAILURE);
504 athAttrList->extend(
"eventRef",
"string");
505 (*athAttrList)[
"eventRef"].data<std::string>() = std::string((
char*)tokenStr);
507 if (!
wh.record(std::move(athAttrList)).isSuccess()) {
508 delete [] (
char*)tokenStr; tokenStr =
nullptr;
510 return(StatusCode::FAILURE);
513 token.
fromString(std::string((
char*)tokenStr));
514 delete [] (
char*)tokenStr; tokenStr =
nullptr;
519 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
523 FileIncident beginInputFileIncident(
name(),
"BeginInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
526 return(StatusCode::SUCCESS);
528 for (
const auto&
tool : m_helperTools) {
529 if (!
tool->preNext().isSuccess()) {
536 if (
sc.isRecoverable()) {
539 if (
sc.isFailure()) {
540 return StatusCode::FAILURE;
548 && (m_skipEventRanges.empty() ||
m_evtCount < m_skipEventRanges.front().first))
551 std::string token = m_headerIterator->eventRef().toString();
555 token.length() + 1, 0)).isRecoverable() ) {
557 ATH_MSG_VERBOSE(
"Called last readData, while putting next event in next()");
561 if (!
sc.isSuccess()) {
563 return(StatusCode::FAILURE);
567 if (!
eventStore()->clearStore().isSuccess()) {
572 return(StatusCode::FAILURE);
577 for (
const auto&
tool : m_helperTools) {
579 if (toolStatus.isRecoverable()) {
582 status = StatusCode::RECOVERABLE;
584 }
else if (toolStatus.isFailure()) {
586 status = StatusCode::FAILURE;
589 if (
status.isRecoverable()) {
591 }
else if (
status.isFailure()) {
600 while( !m_skipEventRanges.empty() &&
m_evtCount >= m_skipEventRanges.front().second ) {
601 m_skipEventRanges.erase(m_skipEventRanges.begin());
606 return(StatusCode::SUCCESS);
611 for (
int i = 0;
i < jump;
i++) {
614 return(StatusCode::SUCCESS);
616 return(StatusCode::FAILURE);
621 if( m_inputCollectionsChanged ) {
623 if( rc != StatusCode::SUCCESS )
return rc;
627 if (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
628 m_headerIterator =
nullptr;
630 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
638 if( m_inputCollectionsChanged ) {
640 if( rc != StatusCode::SUCCESS )
return rc;
643 ++m_inputCollectionsIterator;
646 if (m_poolCollectionConverter ==
nullptr) {
650 return StatusCode::FAILURE;
653 m_headerIterator = &m_poolCollectionConverter->executeQuery();
656 return StatusCode::RECOVERABLE;
662 m_headerIterator->eventRef()
663 : m_headerIterator->currentRow().tokenList()[
m_refName.value() +
"_ref"];
668 if (
guid != m_guid) {
677 m_activeEventsPerSource[
guid.toString()] = 0;
682 if (!
m_athenaPoolCnvSvc->setInputAttributes(*m_inputCollectionsIterator).isSuccess()) {
684 return(StatusCode::FAILURE);
687 FileIncident beginInputFileIncident(
name(),
"BeginInputFile", *m_inputCollectionsIterator, m_guid.toString());
693 FileIncident beginInputFileIncident(
name(),
"BeginInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
698 return StatusCode::SUCCESS;
707 if (
sc.isRecoverable()) {
710 if (
sc.isFailure()) {
711 return StatusCode::FAILURE;
721 && (m_skipEventRanges.empty() ||
m_evtCount < m_skipEventRanges.front().first))
723 return StatusCode::SUCCESS;
725 while( !m_skipEventRanges.empty() &&
m_evtCount >= m_skipEventRanges.front().second ) {
726 m_skipEventRanges.erase(m_skipEventRanges.begin());
736 return StatusCode::SUCCESS;
741 return(StatusCode::FAILURE);
746 for (
int i = 0;
i < jump;
i++) {
749 return(StatusCode::SUCCESS);
751 return(StatusCode::FAILURE);
757 return(StatusCode::SUCCESS);
759 return(StatusCode::FAILURE);
765 return(StatusCode::SUCCESS);
769 IOpaqueAddress*& iop)
const {
770 std::string tokenStr;
775 tokenStr = (*attrList)[
"eventRef"].data<std::string>();
776 ATH_MSG_DEBUG(
"found AthenaAttribute, name = eventRef = " << tokenStr);
778 tokenStr = (*attrList)[
m_refName.value() +
"_ref"].data<std::string>();
783 return(StatusCode::FAILURE);
787 tokenStr = m_poolCollectionConverter->retrieveToken(m_headerIterator,
m_refName.value());
792 return(StatusCode::SUCCESS);
796 return(StatusCode::SUCCESS);
800 IEvtSelector::Context& )
const {
801 return(StatusCode::SUCCESS);
806 if( m_inputCollectionsChanged ) {
808 if( rc != StatusCode::SUCCESS )
return rc;
816 m_headerIterator =
nullptr;
819 return(StatusCode::RECOVERABLE);
823 m_poolCollectionConverter->disconnectDb().ignore();
825 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
830 <<
"\" from the collection list.");
839 if (!m_poolCollectionConverter->initialize().isSuccess()) {
840 m_headerIterator =
nullptr;
841 ATH_MSG_ERROR(
"seek: Unable to initialize PoolCollectionConverter.");
842 return(StatusCode::FAILURE);
845 m_headerIterator = &m_poolCollectionConverter->executeQuery();
848 next(*beginIter).ignore();
849 ATH_MSG_DEBUG(
"Token " << m_headerIterator->eventRef().toString());
851 m_headerIterator =
nullptr;
853 return(StatusCode::FAILURE);
860 return(StatusCode::FAILURE);
863 m_headerIterator =
nullptr;
865 return(StatusCode::FAILURE);
869 return(StatusCode::SUCCESS);
881 for (std::size_t
i = 0,
imax = m_numEvt.size();
i <
imax;
i++) {
882 if (m_numEvt[
i] == -1) {
891 int collection_size = 0;
898 collection_size = cs->
size();
901 m_firstEvt[
i] = m_firstEvt[
i - 1] + m_numEvt[
i - 1];
905 m_numEvt[
i] = collection_size;
907 if (evtNum >= m_firstEvt[
i] && evtNum < m_firstEvt[
i] + m_numEvt[
i]) {
918 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to output DataStreaming server");
920 return(StatusCode::SUCCESS);
923 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to input DataStreaming server");
924 return(StatusCode::FAILURE);
927 return(StatusCode::SUCCESS);
937 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to DataStreaming client");
938 return(StatusCode::FAILURE);
941 return(StatusCode::SUCCESS);
944 std::string dummyStr;
952 while (
sc.isRecoverable()) {
957 if (
sc.isFailure()) {
959 return(StatusCode::FAILURE);
962 while (
sc.isRecoverable() ||
sc.isFailure()) {
968 return(StatusCode::FAILURE);
973 return(StatusCode::FAILURE);
979 ATH_MSG_ERROR(
"No AthenaSharedMemoryTool configured for readEvent()");
980 return(StatusCode::FAILURE);
984 for (
int i = 0;
i < maxevt || maxevt == -1; ++
i) {
985 if (!
next(*ctxt).isSuccess()) {
991 delete ctxt; ctxt =
nullptr;
992 return(StatusCode::FAILURE);
997 delete ctxt; ctxt =
nullptr;
1002 ATH_MSG_VERBOSE(
"Called last readData, while marking last event in readEvent()");
1006 if (!
sc.isSuccess()) {
1007 ATH_MSG_ERROR(
"Cannot put last Event marker to AthenaSharedMemoryTool");
1008 return(StatusCode::FAILURE);
1011 while (
sc.isSuccess() ||
sc.isRecoverable()) {
1014 ATH_MSG_DEBUG(
"Failed last readData -> Clients are stopped, after marking last event in readEvent()");
1016 return(StatusCode::SUCCESS);
1024 for (std::size_t
i = 0,
imax = m_numEvt.size();
i <
imax;
i++) {
1037 ATH_MSG_DEBUG(
"Try item: \"" << *m_inputCollectionsIterator <<
"\" from the collection list.");
1039 *m_inputCollectionsIterator,
1044 if (!
status.isSuccess()) {
1046 delete pCollCnv; pCollCnv =
nullptr;
1047 if (!
status.isRecoverable()) {
1048 ATH_MSG_ERROR(
"Unable to initialize PoolCollectionConverter.");
1049 throw GaudiException(
"Unable to read: " + *m_inputCollectionsIterator,
name(), StatusCode::FAILURE);
1051 ATH_MSG_ERROR(
"Unable to open: " << *m_inputCollectionsIterator);
1052 throw GaudiException(
"Unable to open: " + *m_inputCollectionsIterator,
name(), StatusCode::FAILURE);
1055 if (!pCollCnv->
isValid().isSuccess()) {
1056 delete pCollCnv; pCollCnv =
nullptr;
1057 ATH_MSG_DEBUG(
"No events found in: " << *m_inputCollectionsIterator <<
" skipped!!!");
1059 FileIncident beginInputFileIncident(
name(),
"BeginInputFile", *m_inputCollectionsIterator);
1061 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"eventless " + *m_inputCollectionsIterator);
1064 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb(*m_inputCollectionsIterator).ignore();
1065 ++m_inputCollectionsIterator;
1085 if (!
wh.record(std::move(athAttrList)).isSuccess()) {
1087 return(StatusCode::FAILURE);
1089 return(StatusCode::SUCCESS);
1094 const pool::TokenList& tokenList = m_headerIterator->currentRow().tokenList();
1097 (*attrList)[iter.tokenName() +
suffix].data<std::string>() = iter->toString();
1098 ATH_MSG_DEBUG(
"record AthenaAttribute, name = " << iter.tokenName() +
suffix <<
" = " << iter->toString() <<
".");
1101 std::string eventRef =
"eventRef";
1105 attrList->extend(eventRef,
"string");
1106 (*attrList)[eventRef].data<std::string>() = m_headerIterator->eventRef().toString();
1107 ATH_MSG_DEBUG(
"record AthenaAttribute, name = " + eventRef +
" = " << m_headerIterator->eventRef().toString() <<
".");
1111 for (
const auto &attr : sourceAttrList) {
1112 attrList->extend(attr.specification().name() +
suffix, attr.specification().type());
1113 (*attrList)[attr.specification().name() +
suffix] = attr;
1117 return StatusCode::SUCCESS;
1122 if (m_poolCollectionConverter !=
nullptr) {
1123 m_poolCollectionConverter->disconnectDb().ignore();
1124 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
1126 m_headerIterator =
nullptr;
1128 if (!iomgr.retrieve().isSuccess()) {
1130 return(StatusCode::FAILURE);
1132 if (!iomgr->io_hasitem(
this)) {
1133 ATH_MSG_FATAL(
"IoComponentMgr does not know about myself !");
1134 return(StatusCode::FAILURE);
1141 std::set<std::size_t> updatedIndexes;
1143 if (updatedIndexes.find(
i) != updatedIndexes.end())
continue;
1146 if (!iomgr->io_contains(
this,
fname)) {
1148 return(StatusCode::FAILURE);
1150 if (!iomgr->io_retrieve(
this,
fname).isSuccess()) {
1152 return(StatusCode::FAILURE);
1154 if (savedName !=
fname) {
1158 updatedIndexes.insert(
i);
1159 for (std::size_t j =
i + 1; j <
imax; j++) {
1162 updatedIndexes.insert(j);
1174 if (m_poolCollectionConverter !=
nullptr) {
1175 m_poolCollectionConverter->disconnectDb().ignore();
1176 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
1178 return(StatusCode::SUCCESS);
1190 if (inc.type() == IncidentType::BeginProcessing) {
1201 ATH_MSG_WARNING(
"could not read event source ID from incident event context");
1204 if( m_activeEventsPerSource.find( fid ) == m_activeEventsPerSource.end()) {
1205 ATH_MSG_DEBUG(
"Incident handler ignoring unknown input FID: " << fid );
1208 ATH_MSG_DEBUG(
"** MN Incident handler " << inc.type() <<
" Event source ID=" << fid );
1209 if( inc.type() == IncidentType::BeginProcessing ) {
1211 m_activeEventsPerSource[fid]++;
1212 }
else if( inc.type() == IncidentType::EndProcessing ) {
1213 m_activeEventsPerSource[fid]--;
1218 for(
auto&
source: m_activeEventsPerSource )
1230 if(
m_eventStreamingTool.empty() && m_activeEventsPerSource.find(fid) != m_activeEventsPerSource.end()
1231 && m_activeEventsPerSource[fid] <= 0 && m_guid != fid ) {
1236 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + fid, fid);
1239 ATH_MSG_INFO(
"Disconnecting input sourceID: " << fid );
1241 m_activeEventsPerSource.erase( fid );