26 #include "GaudiKernel/ClassID.h"
27 #include "GaudiKernel/FileIncident.h"
28 #include "GaudiKernel/IIncidentSvc.h"
29 #include "GaudiKernel/IIoComponentMgr.h"
30 #include "GaudiKernel/GaudiException.h"
31 #include "GaudiKernel/GenericAddress.h"
32 #include "GaudiKernel/StatusCode.h"
40 #include <boost/tokenizer.hpp>
50 size_t nbytes,
unsigned int status) {
59 base_class(
name, pSvcLocator)
61 declareProperty(
"HelperTools", m_helperTools);
73 m_inputCollectionsChanged =
false;
77 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
78 m_inputCollectionsChanged =
true;
91 m_autoRetrieveTools =
false;
92 m_checkToolDeps =
false;
102 return(StatusCode::FAILURE);
106 ATH_MSG_FATAL(
"Use the property: EventSelector.InputCollections = "
107 <<
"[ \"<collectionName>\" ] (list of collections)");
108 return(StatusCode::FAILURE);
110 boost::char_separator<char> sep_coma(
","), sep_hyph(
"-");
112 for(
const std::string&
r:
ranges ) {
113 boost::tokenizer fromto(
r, sep_hyph);
114 auto from_iter = fromto.begin();
115 std::stringstream strstr1( *from_iter );
118 if( ++from_iter != fromto.end() ) {
119 std::stringstream strstr2( *from_iter );
124 m_skipEventRanges.push_back( std::pair(from,
to) );
128 m_skipEventRanges.push_back( std::pair(
v,
v) );
130 std::sort(m_skipEventRanges.begin(), m_skipEventRanges.end());
132 std::stringstream skip_ranges_ss;
133 for(
auto&
r: m_skipEventRanges ) {
134 if( not skip_ranges_ss.str().empty() ) skip_ranges_ss <<
", ";
135 skip_ranges_ss <<
r.first;
136 if(
r.first !=
r.second) skip_ranges_ss <<
"-" <<
r.second;
138 if( not skip_ranges_ss.str().empty() )
143 ATH_MSG_FATAL(
"EventSelector.CollectionType must be one of: ExplicitROOT, ImplicitROOT (default)");
144 return(StatusCode::FAILURE);
149 return(StatusCode::FAILURE);
153 m_incidentSvc->addListener(
this, IncidentType::BeginProcessing, 0);
154 m_incidentSvc->addListener(
this, IncidentType::EndProcessing, 0);
160 return(StatusCode::FAILURE);
165 return(StatusCode::FAILURE);
168 if (!m_helperTools.retrieve().isSuccess()) {
170 return(StatusCode::FAILURE);
175 return(StatusCode::FAILURE);
177 std::string dummyStr;
179 ATH_MSG_ERROR(
"Could not make AthenaPoolCnvSvc a Share Client");
180 return(StatusCode::FAILURE);
186 std::vector<std::string>
propVal;
188 ATH_MSG_FATAL(
"Cannot get EventPersistencySvc Property for CnvServices");
189 return(StatusCode::FAILURE);
191 bool foundCnvSvc =
false;
192 for (
const auto& property :
propVal) {
198 ATH_MSG_FATAL(
"Cannot set EventPersistencySvc Property for CnvServices");
199 return(StatusCode::FAILURE);
205 if (!iomgr.retrieve().isSuccess()) {
207 return(StatusCode::FAILURE);
209 if (!iomgr->io_register(
this).isSuccess()) {
210 ATH_MSG_FATAL(
"Could not register myself with the IoComponentMgr !");
211 return(StatusCode::FAILURE);
218 if (incol[
icol].substr(0, 4) ==
"LFN:" || incol[
icol].substr(0, 4) ==
"FID:") {
223 if (
fileName.substr(0, 4) ==
"PFN:") {
234 return(StatusCode::FAILURE);
240 return(StatusCode::FAILURE);
252 for(
auto&
el : m_numEvt )
el = -1;
254 for(
auto&
el : m_firstEvt )
el = -1;
259 if (!m_firstEvt.empty()) {
262 m_inputCollectionsChanged =
false;
264 m_headerIterator = 0;
266 ATH_MSG_INFO(
"Done reinitialization for shared reader client");
267 return(StatusCode::SUCCESS);
269 bool retError =
false;
270 for (
auto&
tool : m_helperTools) {
271 if (!
tool->postInitialize().isSuccess()) {
278 return(StatusCode::FAILURE);
283 if (m_poolCollectionConverter ==
nullptr) {
284 ATH_MSG_INFO(
"No Events found in any Input Collections");
290 FileIncident firstInputFileIncident(
name(),
"FirstInputFile", *m_inputCollectionsIterator);
295 return(StatusCode::SUCCESS);
299 m_headerIterator = &m_poolCollectionConverter->selectAll();
301 ATH_MSG_FATAL(
"Cannot open implicit collection - check data/software version.");
303 return(StatusCode::FAILURE);
305 while (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
306 if (m_poolCollectionConverter !=
nullptr) {
307 m_poolCollectionConverter->disconnectDb().ignore();
308 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
310 ++m_inputCollectionsIterator;
312 if (m_poolCollectionConverter !=
nullptr) {
313 m_headerIterator = &m_poolCollectionConverter->selectAll();
318 if (m_poolCollectionConverter ==
nullptr || m_headerIterator ==
nullptr) {
322 if (m_poolCollectionConverter ==
nullptr) {
323 return(StatusCode::SUCCESS);
325 m_headerIterator = &m_poolCollectionConverter->selectAll();
326 while (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
327 if (m_poolCollectionConverter !=
nullptr) {
328 m_poolCollectionConverter->disconnectDb().ignore();
329 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
331 ++m_inputCollectionsIterator;
333 if (m_poolCollectionConverter !=
nullptr) {
334 m_headerIterator = &m_poolCollectionConverter->selectAll();
340 if (m_poolCollectionConverter ==
nullptr || m_headerIterator ==
nullptr) {
341 return(StatusCode::SUCCESS);
343 const Token& headRef = m_headerIterator->eventRef();
350 FileIncident firstInputFileIncident(
name(),
"FirstInputFile",
"FID:" + fid, fid);
354 return(StatusCode::SUCCESS);
358 if (m_poolCollectionConverter !=
nullptr) {
360 m_poolCollectionConverter->disconnectDb().ignore();
361 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
366 return(StatusCode::SUCCESS);
369 if (m_poolCollectionConverter ==
nullptr) {
370 ATH_MSG_INFO(
"No Events found in any Input Collections");
373 --m_inputCollectionsIterator;
376 m_headerIterator = &m_poolCollectionConverter->selectAll();
382 return(StatusCode::SUCCESS);
387 return(StatusCode::SUCCESS);
389 IEvtSelector::Context* ctxt(
nullptr);
393 return(StatusCode::SUCCESS);
403 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
419 for (
auto&
tool : m_helperTools) {
420 if (!
tool->preFinalize().isSuccess()) {
426 m_headerIterator =
nullptr;
427 if (m_poolCollectionConverter !=
nullptr) {
428 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
439 if (!m_helperTools.release().isSuccess()) {
457 return(StatusCode::SUCCESS);
461 std::lock_guard<CallMutex> lockGuard(
m_callLock);
465 while (
sc.isRecoverable()) {
472 void* tokenStr =
nullptr;
475 if (
sc.isRecoverable()) {
476 delete [] (
char*)tokenStr; tokenStr =
nullptr;
480 return(StatusCode::FAILURE);
482 if (
sc.isFailure()) {
483 ATH_MSG_FATAL(
"Cannot get NextEvent from AthenaSharedMemoryTool");
484 delete [] (
char*)tokenStr; tokenStr =
nullptr;
485 return(StatusCode::FAILURE);
491 athAttrList->extend(
"eventRef",
"string");
492 (*athAttrList)[
"eventRef"].data<std::string>() = std::string((
char*)tokenStr);
494 if (!
wh.record(std::move(athAttrList)).isSuccess()) {
495 delete [] (
char*)tokenStr; tokenStr =
nullptr;
497 return(StatusCode::FAILURE);
500 token.
fromString(std::string((
char*)tokenStr));
501 delete [] (
char*)tokenStr; tokenStr =
nullptr;
506 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
510 FileIncident beginInputFileIncident(
name(),
"BeginInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
513 return(StatusCode::SUCCESS);
515 for (
const auto&
tool : m_helperTools) {
516 if (!
tool->preNext().isSuccess()) {
523 if (
sc.isRecoverable()) {
526 if (
sc.isFailure()) {
527 return StatusCode::FAILURE;
535 && (m_skipEventRanges.empty() ||
m_evtCount < m_skipEventRanges.front().first))
538 std::string token = m_headerIterator->eventRef().toString();
542 token.length() + 1, 0)).isRecoverable() ) {
544 ATH_MSG_VERBOSE(
"Called last readData, while putting next event in next()");
548 if (!
sc.isSuccess()) {
550 return(StatusCode::FAILURE);
554 if (!
eventStore()->clearStore().isSuccess()) {
559 return(StatusCode::FAILURE);
564 for (
const auto&
tool : m_helperTools) {
566 if (toolStatus.isRecoverable()) {
569 status = StatusCode::RECOVERABLE;
571 }
else if (toolStatus.isFailure()) {
573 status = StatusCode::FAILURE;
576 if (
status.isRecoverable()) {
578 }
else if (
status.isFailure()) {
587 while( !m_skipEventRanges.empty() &&
m_evtCount >= m_skipEventRanges.front().second ) {
588 m_skipEventRanges.erase(m_skipEventRanges.begin());
593 return(StatusCode::SUCCESS);
598 for (
int i = 0;
i < jump;
i++) {
601 return(StatusCode::SUCCESS);
603 return(StatusCode::FAILURE);
608 if( m_inputCollectionsChanged ) {
610 if( rc != StatusCode::SUCCESS )
return rc;
614 if (m_headerIterator ==
nullptr || m_headerIterator->next() == 0) {
615 m_headerIterator =
nullptr;
617 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
625 if( m_inputCollectionsChanged ) {
627 if( rc != StatusCode::SUCCESS )
return rc;
630 ++m_inputCollectionsIterator;
633 if (m_poolCollectionConverter ==
nullptr) {
637 return StatusCode::FAILURE;
640 m_headerIterator = &m_poolCollectionConverter->selectAll();
643 return StatusCode::RECOVERABLE;
647 const Token& headRef = m_headerIterator->eventRef();
652 if (
guid != m_guid) {
661 m_activeEventsPerSource[
guid.toString()] = 0;
666 if (!
m_athenaPoolCnvSvc->setInputAttributes(*m_inputCollectionsIterator).isSuccess()) {
668 return(StatusCode::FAILURE);
671 FileIncident beginInputFileIncident(
name(),
"BeginInputFile", *m_inputCollectionsIterator, m_guid.toString());
677 FileIncident beginInputFileIncident(
name(),
"BeginInputFile",
"FID:" + m_guid.toString(), m_guid.toString());
682 return StatusCode::SUCCESS;
691 if (
sc.isRecoverable()) {
694 if (
sc.isFailure()) {
695 return StatusCode::FAILURE;
705 && (m_skipEventRanges.empty() ||
m_evtCount < m_skipEventRanges.front().first))
707 return StatusCode::SUCCESS;
709 while( !m_skipEventRanges.empty() &&
m_evtCount >= m_skipEventRanges.front().second ) {
710 m_skipEventRanges.erase(m_skipEventRanges.begin());
720 return StatusCode::SUCCESS;
725 return(StatusCode::FAILURE);
730 for (
int i = 0;
i < jump;
i++) {
733 return(StatusCode::SUCCESS);
735 return(StatusCode::FAILURE);
741 return(StatusCode::SUCCESS);
743 return(StatusCode::FAILURE);
749 return(StatusCode::SUCCESS);
753 IOpaqueAddress*& iop)
const {
754 std::string tokenStr;
758 tokenStr = (*attrList)[
"eventRef"].data<std::string>();
759 ATH_MSG_DEBUG(
"found AthenaAttribute, name = eventRef = " << tokenStr);
762 return(StatusCode::FAILURE);
766 tokenStr = m_poolCollectionConverter->retrieveToken(m_headerIterator,
"");
768 auto token = std::make_unique<Token>();
769 token->fromString(tokenStr);
771 return(StatusCode::SUCCESS);
775 return(StatusCode::SUCCESS);
779 IEvtSelector::Context& )
const {
780 return(StatusCode::SUCCESS);
785 if( m_inputCollectionsChanged ) {
787 if( rc != StatusCode::SUCCESS )
return rc;
795 m_headerIterator =
nullptr;
798 return(StatusCode::RECOVERABLE);
802 m_poolCollectionConverter->disconnectDb().ignore();
804 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
809 <<
"\" from the collection list.");
817 if (!m_poolCollectionConverter->initialize().isSuccess()) {
818 m_headerIterator =
nullptr;
819 ATH_MSG_ERROR(
"seek: Unable to initialize PoolCollectionConverter.");
820 return(StatusCode::FAILURE);
823 m_headerIterator = &m_poolCollectionConverter->selectAll();
826 next(*beginIter).ignore();
827 ATH_MSG_DEBUG(
"Token " << m_headerIterator->eventRef().toString());
829 m_headerIterator =
nullptr;
831 return(StatusCode::FAILURE);
838 return(StatusCode::FAILURE);
841 m_headerIterator =
nullptr;
843 return(StatusCode::FAILURE);
847 return(StatusCode::SUCCESS);
859 for (std::size_t
i = 0,
imax = m_numEvt.size();
i <
imax;
i++) {
860 if (m_numEvt[
i] == -1) {
868 int collection_size = 0;
875 collection_size = cs->
size();
878 m_firstEvt[
i] = m_firstEvt[
i - 1] + m_numEvt[
i - 1];
882 m_numEvt[
i] = collection_size;
884 if (evtNum >= m_firstEvt[
i] && evtNum < m_firstEvt[
i] + m_numEvt[
i]) {
895 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to output DataStreaming server");
897 return(StatusCode::SUCCESS);
900 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to input DataStreaming server");
901 return(StatusCode::FAILURE);
904 return(StatusCode::SUCCESS);
914 ATH_MSG_ERROR(
"Failed to switch AthenaPoolCnvSvc to DataStreaming client");
915 return(StatusCode::FAILURE);
918 return(StatusCode::SUCCESS);
921 std::string dummyStr;
929 while (
sc.isRecoverable()) {
934 if (
sc.isFailure()) {
936 return(StatusCode::FAILURE);
939 while (
sc.isRecoverable() ||
sc.isFailure()) {
945 return(StatusCode::FAILURE);
950 return(StatusCode::FAILURE);
956 ATH_MSG_ERROR(
"No AthenaSharedMemoryTool configured for readEvent()");
957 return(StatusCode::FAILURE);
961 for (
int i = 0;
i < maxevt || maxevt == -1; ++
i) {
962 if (!
next(*ctxt).isSuccess()) {
968 delete ctxt; ctxt =
nullptr;
969 return(StatusCode::FAILURE);
974 delete ctxt; ctxt =
nullptr;
979 ATH_MSG_VERBOSE(
"Called last readData, while marking last event in readEvent()");
983 if (!
sc.isSuccess()) {
984 ATH_MSG_ERROR(
"Cannot put last Event marker to AthenaSharedMemoryTool");
985 return(StatusCode::FAILURE);
988 while (
sc.isSuccess() ||
sc.isRecoverable()) {
991 ATH_MSG_DEBUG(
"Failed last readData -> Clients are stopped, after marking last event in readEvent()");
993 return(StatusCode::SUCCESS);
1001 for (std::size_t
i = 0,
imax = m_numEvt.size();
i <
imax;
i++) {
1014 ATH_MSG_DEBUG(
"Try item: \"" << *m_inputCollectionsIterator <<
"\" from the collection list.");
1016 *m_inputCollectionsIterator,
1020 if (!
status.isSuccess()) {
1022 delete pCollCnv; pCollCnv =
nullptr;
1023 if (!
status.isRecoverable()) {
1024 ATH_MSG_ERROR(
"Unable to initialize PoolCollectionConverter.");
1025 throw GaudiException(
"Unable to read: " + *m_inputCollectionsIterator,
name(), StatusCode::FAILURE);
1027 ATH_MSG_ERROR(
"Unable to open: " << *m_inputCollectionsIterator);
1028 throw GaudiException(
"Unable to open: " + *m_inputCollectionsIterator,
name(), StatusCode::FAILURE);
1031 if (!pCollCnv->
isValid().isSuccess()) {
1032 delete pCollCnv; pCollCnv =
nullptr;
1033 ATH_MSG_DEBUG(
"No events found in: " << *m_inputCollectionsIterator <<
" skipped!!!");
1035 FileIncident beginInputFileIncident(
name(),
"BeginInputFile", *m_inputCollectionsIterator);
1037 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"eventless " + *m_inputCollectionsIterator);
1040 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb(*m_inputCollectionsIterator).ignore();
1041 ++m_inputCollectionsIterator;
1061 if (!
wh.record(std::move(athAttrList)).isSuccess()) {
1063 return(StatusCode::FAILURE);
1065 return(StatusCode::SUCCESS);
1070 const pool::TokenList& tokenList = m_headerIterator->currentRow().tokenList();
1073 (*attrList)[iter.tokenName() +
suffix].data<std::string>() = iter->toString();
1074 ATH_MSG_DEBUG(
"record AthenaAttribute, name = " << iter.tokenName() +
suffix <<
" = " << iter->toString() <<
".");
1077 std::string eventRef =
"eventRef";
1081 attrList->extend(eventRef,
"string");
1082 (*attrList)[eventRef].data<std::string>() = m_headerIterator->eventRef().toString();
1083 ATH_MSG_DEBUG(
"record AthenaAttribute, name = " + eventRef +
" = " << m_headerIterator->eventRef().toString() <<
".");
1087 for (
const auto &attr : sourceAttrList) {
1088 attrList->extend(attr.specification().name() +
suffix, attr.specification().type());
1089 (*attrList)[attr.specification().name() +
suffix] = attr;
1093 return StatusCode::SUCCESS;
1098 if (m_poolCollectionConverter !=
nullptr) {
1099 m_poolCollectionConverter->disconnectDb().ignore();
1100 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
1102 m_headerIterator =
nullptr;
1104 if (!iomgr.retrieve().isSuccess()) {
1106 return(StatusCode::FAILURE);
1108 if (!iomgr->io_hasitem(
this)) {
1109 ATH_MSG_FATAL(
"IoComponentMgr does not know about myself !");
1110 return(StatusCode::FAILURE);
1117 std::set<std::size_t> updatedIndexes;
1119 if (updatedIndexes.find(
i) != updatedIndexes.end())
continue;
1122 if (!iomgr->io_contains(
this,
fname)) {
1124 return(StatusCode::FAILURE);
1126 if (!iomgr->io_retrieve(
this,
fname).isSuccess()) {
1128 return(StatusCode::FAILURE);
1130 if (savedName !=
fname) {
1134 updatedIndexes.insert(
i);
1135 for (std::size_t j =
i + 1; j <
imax; j++) {
1138 updatedIndexes.insert(j);
1150 if (m_poolCollectionConverter !=
nullptr) {
1151 m_poolCollectionConverter->disconnectDb().ignore();
1152 delete m_poolCollectionConverter; m_poolCollectionConverter =
nullptr;
1154 return(StatusCode::SUCCESS);
1166 if (inc.type() == IncidentType::BeginProcessing) {
1177 ATH_MSG_WARNING(
"could not read event source ID from incident event context");
1180 if( m_activeEventsPerSource.find( fid ) == m_activeEventsPerSource.end()) {
1181 ATH_MSG_DEBUG(
"Incident handler ignoring unknown input FID: " << fid );
1184 ATH_MSG_DEBUG(
"** MN Incident handler " << inc.type() <<
" Event source ID=" << fid );
1185 if( inc.type() == IncidentType::BeginProcessing ) {
1187 m_activeEventsPerSource[fid]++;
1188 }
else if( inc.type() == IncidentType::EndProcessing ) {
1189 m_activeEventsPerSource[fid]--;
1194 for(
auto&
source: m_activeEventsPerSource )
1206 if(
m_eventStreamingTool.empty() && m_activeEventsPerSource.find(fid) != m_activeEventsPerSource.end()
1207 && m_activeEventsPerSource[fid] <= 0 && m_guid != fid ) {
1212 FileIncident endInputFileIncident(
name(),
"EndInputFile",
"FID:" + fid, fid);
1215 ATH_MSG_INFO(
"Disconnecting input sourceID: " << fid );
1217 m_activeEventsPerSource.erase( fid );