24 #include "GaudiKernel/ClassID.h"
25 #include "GaudiKernel/FileIncident.h"
26 #include "GaudiKernel/IIncidentSvc.h"
27 #include "GaudiKernel/IIoComponentMgr.h"
28 #include "GaudiKernel/GaudiException.h"
29 #include "GaudiKernel/GenericAddress.h"
30 #include "GaudiKernel/StatusCode.h"
38 #include <boost/tokenizer.hpp>
48 size_t nbytes,
unsigned int status) {
57 base_class(
name, pSvcLocator)
59 declareProperty(
"HelperTools", m_helperTools);
71 m_inputCollectionsChanged =
false;
75 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
76 m_inputCollectionsChanged =
true;
89 m_autoRetrieveTools =
false;
90 m_checkToolDeps =
false;
100 return(StatusCode::FAILURE);
104 ATH_MSG_FATAL(
"Use the property: EventSelector.InputCollections = "
105 <<
"[ \"<collectionName>\" ] (list of collections)");
106 return(StatusCode::FAILURE);
108 boost::char_separator<char> sep_coma(
","), sep_hyph(
"-");
110 for(
const std::string&
r:
ranges ) {
111 boost::tokenizer fromto(
r, sep_hyph);
112 auto from_iter = fromto.begin();
113 std::stringstream strstr1( *from_iter );
116 if( ++from_iter != fromto.end() ) {
117 std::stringstream strstr2( *from_iter );
122 m_skipEventRanges.push_back( std::pair(from,
to) );
126 m_skipEventRanges.push_back( std::pair(
v,
v) );
128 std::sort(m_skipEventRanges.begin(), m_skipEventRanges.end());
130 std::stringstream skip_ranges_ss;
131 for(
auto&
r: m_skipEventRanges ) {
132 if( not skip_ranges_ss.str().empty() ) skip_ranges_ss <<
", ";
133 skip_ranges_ss <<
r.first;
134 if(
r.first !=
r.second) skip_ranges_ss <<
"-" <<
r.second;
136 if( not skip_ranges_ss.str().empty() )
141 ATH_MSG_FATAL(
"EventSelector.CollectionType must be one of: RootCollection, ImplicitCollection (default)");
142 return(StatusCode::FAILURE);
147 return(StatusCode::FAILURE);
151 m_incidentSvc->addListener(
this, IncidentType::BeginProcessing, 0);
152 m_incidentSvc->addListener(
this, IncidentType::EndProcessing, 0);
158 return(StatusCode::FAILURE);
163 return(StatusCode::FAILURE);
166 if (!m_helperTools.retrieve().isSuccess()) {
168 return(StatusCode::FAILURE);
173 return(StatusCode::FAILURE);
175 std::string dummyStr;
177 ATH_MSG_ERROR(
"Could not make AthenaPoolCnvSvc a Share Client");
178 return(StatusCode::FAILURE);
184 std::vector<std::string>
propVal;
186 ATH_MSG_FATAL(
"Cannot get EventPersistencySvc Property for CnvServices");
187 return(StatusCode::FAILURE);
189 bool foundCnvSvc =
false;
190 for (
const auto& property :
propVal) {
196 ATH_MSG_FATAL(
"Cannot set EventPersistencySvc Property for CnvServices");
197 return(StatusCode::FAILURE);
203 if (!iomgr.retrieve().isSuccess()) {
205 return(StatusCode::FAILURE);
207 if (!iomgr->io_register(
this).isSuccess()) {
208 ATH_MSG_FATAL(
"Could not register myself with the IoComponentMgr !");
209 return(StatusCode::FAILURE);
216 if (incol[
icol].substr(0, 4) ==
"LFN:" || incol[
icol].substr(0, 4) ==
"FID:") {
221 if (
fileName.substr(0, 4) ==
"PFN:") {
232 return(StatusCode::FAILURE);
238 return(StatusCode::FAILURE);
250 for(
auto&
el : m_numEvt )
el = -1;
252 for(
auto&
el : m_firstEvt )
el = -1;
257 if (!m_firstEvt.empty()) {
260 m_inputCollectionsChanged =
false;
262 m_headerIterator = 0;
264 ATH_MSG_INFO(
"Done reinitialization for shared reader client");
265 return(StatusCode::SUCCESS);
267 bool retError =
false;
268 for (
auto&
tool : m_helperTools) {
269 if (!
tool->postInitialize().isSuccess()) {
276 return(StatusCode::FAILURE);
281 if (m_poolCollectionConverter ==
nullptr) {
282 ATH_MSG_INFO(
"No Events found in any Input Collections");
288 FileIncident firstInputFileIncident(
name(),
"FirstInputFile", *m_inputCollectionsIterator);
293 return(StatusCode::SUCCESS);
297 m_headerIterator = &m_poolCollectionConverter->selectAll();
299 ATH_MSG_FATAL(
"Cannot open implicit collection - check data/software version.");
301 return(StatusCode::FAILURE);
303 while (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
304 if (m_poolCollectionConverter !=
nullptr) {
305 m_poolCollectionConverter->disconnectDb().ignore();
306 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
308 ++m_inputCollectionsIterator;
310 if (m_poolCollectionConverter !=
nullptr) {
311 m_headerIterator = &m_poolCollectionConverter->selectAll();
316 if (m_poolCollectionConverter ==
nullptr || m_headerIterator ==
nullptr) {
320 if (m_poolCollectionConverter ==
nullptr) {
321 return(StatusCode::SUCCESS);
323 m_headerIterator = &m_poolCollectionConverter->selectAll();
324 while (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
325 if (m_poolCollectionConverter !=
nullptr) {
326 m_poolCollectionConverter->disconnectDb().ignore();
327 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
329 ++m_inputCollectionsIterator;
331 if (m_poolCollectionConverter !=
nullptr) {
332 m_headerIterator = &m_poolCollectionConverter->selectAll();
338 if (m_poolCollectionConverter ==
nullptr || m_headerIterator ==
nullptr) {
339 return(StatusCode::SUCCESS);
341 const Token& headRef = m_headerIterator->eventRef();
348 FileIncident firstInputFileIncident(
name(),
"FirstInputFile",
"FID:" + fid, fid);
352 return(StatusCode::SUCCESS);
356 if (m_poolCollectionConverter !=
nullptr) {
358 m_poolCollectionConverter->disconnectDb().ignore();
359 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
364 return(StatusCode::SUCCESS);
367 if (m_poolCollectionConverter ==
nullptr) {
368 ATH_MSG_INFO(
"No Events found in any Input Collections");
371 --m_inputCollectionsIterator;
374 m_headerIterator = &m_poolCollectionConverter->selectAll();
380 return(StatusCode::SUCCESS);
385 return(StatusCode::SUCCESS);
387 IEvtSelector::Context* ctxt(
nullptr);
391 return(StatusCode::SUCCESS);
401 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
417 for (
auto&
tool : m_helperTools) {
418 if (!
tool->preFinalize().isSuccess()) {
424 m_headerIterator =
nullptr;
425 if (m_poolCollectionConverter !=
nullptr) {
426 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
437 if (!m_helperTools.release().isSuccess()) {
455 return(StatusCode::SUCCESS);
459 std::lock_guard<CallMutex> lockGuard(
m_callLock);
463 while (
sc.isRecoverable()) {
470 void* tokenStr =
nullptr;
473 if (
sc.isRecoverable()) {
474 delete [] (
char*)tokenStr; tokenStr =
nullptr;
478 return(StatusCode::FAILURE);
480 if (
sc.isFailure()) {
481 ATH_MSG_FATAL(
"Cannot get NextEvent from AthenaSharedMemoryTool");
482 delete [] (
char*)tokenStr; tokenStr =
nullptr;
483 return(StatusCode::FAILURE);
489 athAttrList->extend(
"eventRef",
"string");
490 (*athAttrList)[
"eventRef"].data<std::string>() = std::string((
char*)tokenStr);
492 if (!
wh.record(std::move(athAttrList)).isSuccess()) {
493 delete [] (
char*)tokenStr; tokenStr =
nullptr;
495 return(StatusCode::FAILURE);
498 token.
fromString(std::string((
char*)tokenStr));
499 delete [] (
char*)tokenStr; tokenStr =
nullptr;
504 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
508 FileIncident beginInputFileIncident(
name(),
"BeginInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
511 return(StatusCode::SUCCESS);
513 for (
const auto&
tool : m_helperTools) {
514 if (!
tool->preNext().isSuccess()) {
521 if (
sc.isRecoverable()) {
524 if (
sc.isFailure()) {
525 return StatusCode::FAILURE;
533 && (m_skipEventRanges.empty() ||
m_evtCount < m_skipEventRanges.front().first))
539 return(StatusCode::FAILURE);
541 std::string token = m_headerIterator->eventRef().toString();
545 token.length() + 1, 0)).isRecoverable() ) {
546 while (
ds->readData().isSuccess()) {
547 ATH_MSG_VERBOSE(
"Called last readData, while putting next event in next()");
551 if (!
sc.isSuccess()) {
553 return(StatusCode::FAILURE);
557 if (!
eventStore()->clearStore().isSuccess()) {
562 return(StatusCode::FAILURE);
567 for (
const auto&
tool : m_helperTools) {
569 if (toolStatus.isRecoverable()) {
572 status = StatusCode::RECOVERABLE;
574 }
else if (toolStatus.isFailure()) {
576 status = StatusCode::FAILURE;
579 if (
status.isRecoverable()) {
581 }
else if (
status.isFailure()) {
590 while( !m_skipEventRanges.empty() &&
m_evtCount >= m_skipEventRanges.front().second ) {
591 m_skipEventRanges.erase(m_skipEventRanges.begin());
596 return(StatusCode::SUCCESS);
601 for (
int i = 0;
i < jump;
i++) {
604 return(StatusCode::SUCCESS);
606 return(StatusCode::FAILURE);
611 if( m_inputCollectionsChanged ) {
613 if( rc != StatusCode::SUCCESS )
return rc;
617 if (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
618 m_headerIterator =
nullptr;
620 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
628 if( m_inputCollectionsChanged ) {
630 if( rc != StatusCode::SUCCESS )
return rc;
633 ++m_inputCollectionsIterator;
636 if (m_poolCollectionConverter ==
nullptr) {
640 return StatusCode::FAILURE;
643 m_headerIterator = &m_poolCollectionConverter->selectAll();
646 return StatusCode::RECOVERABLE;
650 const Token& headRef = m_headerIterator->eventRef();
655 if (
guid != m_guid) {
664 m_activeEventsPerSource[
guid.toString()] = 0;
669 if (!
m_athenaPoolCnvSvc->setInputAttributes(*m_inputCollectionsIterator).isSuccess()) {
671 return(StatusCode::FAILURE);
674 FileIncident beginInputFileIncident(
name(),
"BeginInputFile", *m_inputCollectionsIterator, m_guid.toString());
680 FileIncident beginInputFileIncident(
name(),
"BeginInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
685 return StatusCode::SUCCESS;
694 if (
sc.isRecoverable()) {
697 if (
sc.isFailure()) {
698 return StatusCode::FAILURE;
708 && (m_skipEventRanges.empty() ||
m_evtCount < m_skipEventRanges.front().first))
710 return StatusCode::SUCCESS;
712 while( !m_skipEventRanges.empty() &&
m_evtCount >= m_skipEventRanges.front().second ) {
713 m_skipEventRanges.erase(m_skipEventRanges.begin());
723 return StatusCode::SUCCESS;
728 return(StatusCode::FAILURE);
733 for (
int i = 0;
i < jump;
i++) {
736 return(StatusCode::SUCCESS);
738 return(StatusCode::FAILURE);
744 return(StatusCode::SUCCESS);
746 return(StatusCode::FAILURE);
752 return(StatusCode::SUCCESS);
756 IOpaqueAddress*& iop)
const {
757 std::string tokenStr;
761 tokenStr = (*attrList)[
"eventRef"].data<std::string>();
762 ATH_MSG_DEBUG(
"found AthenaAttribute, name = eventRef = " << tokenStr);
765 return(StatusCode::FAILURE);
769 tokenStr = m_headerIterator->eventRef().toString();
771 auto token = std::make_unique<Token>();
772 token->fromString(tokenStr);
774 return(StatusCode::SUCCESS);
778 return(StatusCode::SUCCESS);
782 IEvtSelector::Context& )
const {
783 return(StatusCode::SUCCESS);
788 if( m_inputCollectionsChanged ) {
790 if( rc != StatusCode::SUCCESS )
return rc;
798 m_headerIterator =
nullptr;
801 return(StatusCode::RECOVERABLE);
805 m_poolCollectionConverter->disconnectDb().ignore();
807 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
812 <<
"\" from the collection list.");
820 if (!m_poolCollectionConverter->initialize().isSuccess()) {
821 m_headerIterator =
nullptr;
822 ATH_MSG_ERROR(
"seek: Unable to initialize PoolCollectionConverter.");
823 return(StatusCode::FAILURE);
826 m_headerIterator = &m_poolCollectionConverter->selectAll();
829 next(*beginIter).ignore();
830 ATH_MSG_DEBUG(
"Token " << m_headerIterator->eventRef().toString());
832 m_headerIterator =
nullptr;
834 return(StatusCode::FAILURE);
838 if (m_headerIterator->seek(evtNum - m_firstEvt[
m_curCollection]) == 0) {
839 m_headerIterator =
nullptr;
841 return(StatusCode::FAILURE);
845 return(StatusCode::SUCCESS);
857 for (std::size_t
i = 0,
imax = m_numEvt.size();
i <
imax;
i++) {
858 if (m_numEvt[
i] == -1) {
866 int collection_size = 0;
869 collection_size = hi->
size();
872 m_firstEvt[
i] = m_firstEvt[
i - 1] + m_numEvt[
i - 1];
876 m_numEvt[
i] = collection_size;
878 if (evtNum >= m_firstEvt[
i] && evtNum < m_firstEvt[
i] + m_numEvt[
i]) {
890 return(StatusCode::FAILURE);
893 if (
ds->makeServer(
num - 1).isFailure()) {
894 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to output DataStreaming server");
896 return(StatusCode::SUCCESS);
898 if (
ds->makeServer(
num + 1).isFailure()) {
899 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to input DataStreaming server");
900 return(StatusCode::FAILURE);
903 return(StatusCode::SUCCESS);
915 return(StatusCode::FAILURE);
917 if (
ds->makeClient(
num + 1).isFailure()) {
918 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to DataStreaming client");
919 return(StatusCode::FAILURE);
922 return(StatusCode::SUCCESS);
925 std::string dummyStr;
934 return(StatusCode::FAILURE);
938 while (
sc.isRecoverable()) {
943 if (
sc.isFailure()) {
944 if (
ds->makeClient(0).isFailure()) {
945 return(StatusCode::FAILURE);
948 while (
sc.isRecoverable() ||
sc.isFailure()) {
953 if (
ds->makeClient(1).isFailure()) {
954 return(StatusCode::FAILURE);
959 return(StatusCode::FAILURE);
967 return(StatusCode::FAILURE);
970 ATH_MSG_ERROR(
"No AthenaSharedMemoryTool configured for readEvent()");
971 return(StatusCode::FAILURE);
975 for (
int i = 0;
i < maxevt || maxevt == -1; ++
i) {
976 if (!
next(*ctxt).isSuccess()) {
982 delete ctxt; ctxt =
nullptr;
983 return(StatusCode::FAILURE);
988 delete ctxt; ctxt =
nullptr;
992 while (
ds->readData().isSuccess()) {
993 ATH_MSG_VERBOSE(
"Called last readData, while marking last event in readEvent()");
997 if (!
sc.isSuccess()) {
998 ATH_MSG_ERROR(
"Cannot put last Event marker to AthenaSharedMemoryTool");
999 return(StatusCode::FAILURE);
1001 sc =
ds->readData();
1002 while (
sc.isSuccess() ||
sc.isRecoverable()) {
1003 sc =
ds->readData();
1005 ATH_MSG_DEBUG(
"Failed last readData -> Clients are stopped, after marking last event in readEvent()");
1007 return(StatusCode::SUCCESS);
1015 for (std::size_t
i = 0,
imax = m_numEvt.size();
i <
imax;
i++) {
1028 ATH_MSG_DEBUG(
"Try item: \"" << *m_inputCollectionsIterator <<
"\" from the collection list.");
1030 *m_inputCollectionsIterator,
1034 if (!
status.isSuccess()) {
1036 delete pCollCnv; pCollCnv =
nullptr;
1037 if (!
status.isRecoverable()) {
1038 ATH_MSG_ERROR(
"Unable to initialize PoolCollectionConverter.");
1039 throw GaudiException(
"Unable to read: " + *m_inputCollectionsIterator,
name(), StatusCode::FAILURE);
1041 ATH_MSG_ERROR(
"Unable to open: " << *m_inputCollectionsIterator);
1042 throw GaudiException(
"Unable to open: " + *m_inputCollectionsIterator,
name(), StatusCode::FAILURE);
1045 if (!pCollCnv->
isValid().isSuccess()) {
1046 delete pCollCnv; pCollCnv =
nullptr;
1047 ATH_MSG_DEBUG(
"No events found in: " << *m_inputCollectionsIterator <<
" skipped!!!");
1049 FileIncident beginInputFileIncident(
name(),
"BeginInputFile", *m_inputCollectionsIterator);
1051 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"eventless " + *m_inputCollectionsIterator);
1054 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb(*m_inputCollectionsIterator).ignore();
1055 ++m_inputCollectionsIterator;
1075 if (!
wh.record(std::move(athAttrList)).isSuccess()) {
1077 return(StatusCode::FAILURE);
1079 return(StatusCode::SUCCESS);
1084 const pool::TokenList& tokenList = m_headerIterator->currentRow().tokenList();
1087 (*attrList)[
iter.tokenName() +
suffix].data<std::string>() =
iter->toString();
1091 std::string eventRef =
"eventRef";
1095 attrList->extend(eventRef,
"string");
1096 (*attrList)[eventRef].data<std::string>() = m_headerIterator->eventRef().toString();
1097 ATH_MSG_DEBUG(
"record AthenaAttribute, name = " + eventRef +
" = " << m_headerIterator->eventRef().toString() <<
".");
1101 for (
const auto &attr : sourceAttrList) {
1102 attrList->extend(attr.specification().name() +
suffix, attr.specification().type());
1103 (*attrList)[attr.specification().name() +
suffix] = attr;
1107 return StatusCode::SUCCESS;
1112 if (m_poolCollectionConverter !=
nullptr) {
1113 m_poolCollectionConverter->disconnectDb().ignore();
1114 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
1116 m_headerIterator =
nullptr;
1118 if (!iomgr.retrieve().isSuccess()) {
1120 return(StatusCode::FAILURE);
1122 if (!iomgr->io_hasitem(
this)) {
1123 ATH_MSG_FATAL(
"IoComponentMgr does not know about myself !");
1124 return(StatusCode::FAILURE);
1131 std::set<std::size_t> updatedIndexes;
1133 if (updatedIndexes.find(
i) != updatedIndexes.end())
continue;
1136 if (!iomgr->io_contains(
this,
fname)) {
1138 return(StatusCode::FAILURE);
1140 if (!iomgr->io_retrieve(
this,
fname).isSuccess()) {
1142 return(StatusCode::FAILURE);
1144 if (savedName !=
fname) {
1148 updatedIndexes.insert(
i);
1149 for (std::size_t j =
i + 1; j <
imax; j++) {
1152 updatedIndexes.insert(j);
1164 if (m_poolCollectionConverter !=
nullptr) {
1165 m_poolCollectionConverter->disconnectDb().ignore();
1166 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
1168 return(StatusCode::SUCCESS);
1180 if (inc.type() == IncidentType::BeginProcessing) {
1191 ATH_MSG_WARNING(
"could not read event source ID from incident event context");
1194 if( m_activeEventsPerSource.find( fid ) == m_activeEventsPerSource.end()) {
1195 ATH_MSG_DEBUG(
"Incident handler ignoring unknown input FID: " << fid );
1198 ATH_MSG_DEBUG(
"** MN Incident handler " << inc.type() <<
" Event source ID=" << fid );
1199 if( inc.type() == IncidentType::BeginProcessing ) {
1201 m_activeEventsPerSource[fid]++;
1202 }
else if( inc.type() == IncidentType::EndProcessing ) {
1203 m_activeEventsPerSource[fid]--;
1208 for(
auto&
source: m_activeEventsPerSource )
1220 if(
m_eventStreamingTool.empty() && m_activeEventsPerSource.find(fid) != m_activeEventsPerSource.end()
1221 && m_activeEventsPerSource[fid] <= 0 && m_guid != fid ) {
1226 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + fid, fid);
1229 ATH_MSG_INFO(
"Disconnecting input sourceID: " << fid );
1231 m_activeEventsPerSource.erase( fid );