8 #include "GaudiKernel/GaudiException.h"
9 #include "GaudiKernel/IAlgManager.h"
10 #include "GaudiKernel/IIoComponentMgr.h"
11 #include "GaudiKernel/ISvcLocator.h"
12 #include "GaudiKernel/IOpaqueAddress.h"
13 #include "GaudiKernel/IProperty.h"
14 #include "GaudiKernel/IClassIDSvc.h"
15 #include "GaudiKernel/ClassID.h"
16 #include "GaudiKernel/MsgStream.h"
17 #include "GaudiKernel/AlgTool.h"
45 #include <boost/tokenizer.hpp>
51 using boost::tokenizer;
52 using boost::char_separator;
83 AltDataBucket (
void* ptr,
CLID clid,
const std::type_info& tinfo,
85 : m_proxy(this, makeTransientAddress(clid,
proxy).
release()),
86 m_ptr (ptr), m_clid (clid), m_tinfo (tinfo)
91 AltDataBucket(
void* ptr,
CLID clid,
const std::type_info& tinfo,
const std::string&
name) :
92 m_proxy(this,
new SG::TransientAddress(clid,
name) ),
93 m_ptr(ptr), m_clid(clid), m_tinfo(tinfo)
98 virtual const CLID& clID()
const override {
return m_clid; }
99 virtual void*
object()
override {
return m_ptr; }
100 virtual const std::type_info&
tinfo()
const override {
return m_tinfo; }
104 bool =
true)
override
106 virtual void*
cast (
const std::type_info& tinfo,
108 bool =
true)
override
109 {
if (tinfo == m_tinfo)
114 virtual void lock()
override {}
119 std::unique_ptr<SG::TransientAddress>
125 const std::type_info& m_tinfo;
129 std::unique_ptr<SG::TransientAddress>
132 auto newTad = std::make_unique<SG::TransientAddress>
133 (clid, oldProxy.
name());
134 newTad->setAlias (oldProxy.
alias());
142 newTad->setTransientID (tclid);
157 m_dataStore(
"StoreGateSvc",
name),
158 m_metadataStore(
"MetaDataStore",
name),
159 m_currentStore(&m_dataStore),
160 m_itemSvc(
"ItemListSvc",
name),
161 m_metaDataSvc(
"MetaDataSvc",
name),
162 m_dictLoader(
"AthDictLoaderSvc",
name),
163 m_tpCnvSvc(
"AthTPCnvSvc",
name),
164 m_outputAttributes(),
165 m_pCLIDSvc(
"ClassIDSvc",
name),
166 m_outSeqSvc(
"OutputStreamSequencerSvc",
name),
167 m_p2BWritten(string(
"SG::Folder/") +
name + string(
"_TopFolder"), this),
168 m_decoder(string(
"SG::Folder/") +
name + string(
"_excluded"), this),
169 m_compressionDecoderHigh(string(
"SG::Folder/") +
name + string(
"_compressed_high"), this),
170 m_compressionDecoderLow(string(
"SG::Folder/") +
name + string(
"_compressed_low"), this),
171 m_transient(string(
"SG::Folder/") +
name + string(
"_transient"), this),
173 m_streamer(string(
"AthenaOutputStreamTool/") +
name + string(
"Tool"), this),
265 IProperty *pAsIProp =
dynamic_cast<IProperty*
> (&*
m_transient);
268 return StatusCode::FAILURE;
276 const std::string&
k =
item.key();
277 if (
k.find(
'*') != std::string::npos)
continue;
278 if (
k.find(
'.') != std::string::npos)
continue;
280 if (titem.id() ==
item.id() && titem.key() ==
k) {
292 IProperty *pAsIProp =
dynamic_cast<IProperty*
> (&*
m_transient);
295 return StatusCode::FAILURE;
310 if (!incsvc.retrieve().isSuccess()) {
312 return(StatusCode::FAILURE);
315 incsvc->addListener(
this, IncidentType::BeginProcessing, 95);
316 incsvc->addListener(
this, IncidentType::EndProcessing, 95);
321 if(m_compressionBitsHigh < 5 || m_compressionBitsHigh > 23) {
322 ATH_MSG_INFO(
"Float compression mantissa bits for high compression " <<
327 if(m_compressionBitsLow < 5 || m_compressionBitsLow > 23) {
328 ATH_MSG_INFO(
"Float compression mantissa bits for low compression " <<
334 ATH_MSG_ERROR(
"Float compression mantissa bits for low compression " <<
337 return StatusCode::FAILURE;
340 ATH_MSG_VERBOSE(
"Both high and low float compression lists are empty. Float compression will NOT be applied.");
342 ATH_MSG_INFO(
"Either high or low (or both) float compression lists are defined. Float compression will be applied.");
360 return StatusCode::SUCCESS;
366 return StatusCode::SUCCESS;
374 std::unique_lock<mutex_t> lock(
m_mutex);
376 if( inc.type() ==
"MetaDataStop" ) {
397 EventContext::ContextID_t slot = inc.context().slot();
398 if( slot == EventContext::INVALID_CONTEXT_ID ) {
399 throw GaudiException(
"Received Incident with invalid slot in ES mode",
name(), StatusCode::FAILURE);
401 auto count_events_in_range = [&](
const std::string&
range) {
403 [&](
auto&
el){return el.second == range;} );
405 if( inc.type() == IncidentType::BeginProcessing ) {
410 if( !rangeFN.empty() and rangeFN != newRangeFN ) {
411 ATH_MSG_INFO(
"Slot range change: '" << rangeFN <<
"' -> '" << newRangeFN <<
"'");
416 if( count_events_in_range(rangeFN) == 1 ) {
420 ATH_MSG_INFO(
"slot " << slot <<
" processing event in range: " << newRangeFN);
425 else if( inc.type() == IncidentType::EndProcessing ) {
434 if( count_events_in_range(rangeFN) == 1 ) {
441 ATH_MSG_DEBUG(
"Leaving incident handler for " << inc.type());
455 ATH_MSG_INFO(
"Finished writing Event Sequence to " << rangeFN);
457 strm_iter->second->finalizeOutput().ignore();
458 strm_iter->second->finalize().ignore();
475 if (!
tool->preFinalize().isSuccess()) {
476 throw GaudiException(
"Cannot finalize helper tool",
name(), StatusCode::FAILURE);
480 throw GaudiException(
"Failed on MetaDataSvc prepareOutput",
name(), StatusCode::FAILURE);
487 if (
write().isFailure()) {
488 throw GaudiException(
"Cannot write on finalize",
name(), StatusCode::FAILURE);
497 throw GaudiException(
"Unable to connect metadata services",
name(), StatusCode::FAILURE);
500 m_outputAttributes =
"[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData][AttributeListKey=]";
502 IProperty *pAsIProp(
nullptr);
504 nullptr == (pAsIProp =
dynamic_cast<IProperty*
>(&*
m_p2BWritten)) ||
506 throw GaudiException(
"Folder property [metadataItemList] not found",
name(), StatusCode::FAILURE);
508 if (
write().isFailure()) {
509 throw GaudiException(
"Cannot write metadata",
name(), StatusCode::FAILURE);
515 throw GaudiException(
"Unable to re-connect services",
name(), StatusCode::FAILURE);
518 if ((pAsIProp->setProperty(
m_itemList)).isFailure()) {
519 throw GaudiException(
"Folder property [itemList] not found",
name(), StatusCode::FAILURE);
531 if (!
m_streamer->finalizeOutput().isSuccess()) {
542 return(StatusCode::FAILURE);
548 return(StatusCode::SUCCESS);
555 if (!
tool->preExecute().isSuccess()) {
561 if (
write().isFailure()) {
566 if(!
tool->postExecute().isSuccess()) {
579 return(StatusCode::FAILURE);
581 return(StatusCode::SUCCESS);
590 std::unique_lock<mutex_t> lock(
m_mutex);
603 IProperty *mstreamer_props =
dynamic_cast<IProperty*
> (&*
m_streamer);
604 IProperty *streamer_props =
dynamic_cast<IProperty*
> (&*streamer);
605 for (
const auto& prop : mstreamer_props->getProperties() ) {
606 ATH_CHECK( streamer_props->setProperty( *prop ) );
608 if( !streamer or streamer->initialize().isFailure()
611 return StatusCode::FAILURE;
626 std::vector<std::unique_ptr<DataObject> > ownedObjects = std::move(
m_ownedObjects );
630 bool checkCountError =
false;
633 unsigned int lastCount = 0;
636 bool isError =
false;
638 lastCount = (*cit).second;
640 }
else if (lastCount != (*cit).second) {
643 checkCountError =
true;
647 << (*cit).first <<
", " << (*cit).second <<
" should be: " << lastCount);
649 ATH_MSG_DEBUG(
" Object/count: " << (*cit).first <<
", " << (*cit).second);
652 if (checkCountError) {
653 ATH_MSG_FATAL(
"Check number of writes failed. See messages above "
654 "to identify which container is not always written");
655 return(StatusCode::FAILURE);
671 return StatusCode::FAILURE;
676 if (!currentStatus.isSuccess()) {
677 if (!currentStatus.isRecoverable()) {
684 bool doCommit =
false;
690 return(StatusCode::FAILURE);
693 return(StatusCode::SUCCESS);
706 ATH_MSG_WARNING(
"collectAllObjects() could not get ItemList from Tool.");
710 auto vetoes = std::make_unique<SG::SelectionVetoes>();
711 auto compInfo = std::make_unique<SG::CompressionInfo>();
714 std::vector<CLID> folderclids;
719 folderclids.push_back(
i->id());
724 IDataSelector prunedList;
726 if (
std::find(folderclids.begin(),folderclids.end(),(*it)->clID())!=folderclids.end()) {
729 (*it)->name() <<
" ignored");
732 prunedList.push_back(*
it);
736 ATH_MSG_DEBUG(
"Object " << (*it)->clID() <<
","<< (*it)->name() <<
" found that was not in itemlist");
740 for (
auto it = prunedList.begin();
it != prunedList.end(); ++
it) {
741 if ((*it)->name().length() > 4 && (*it)->name().substr((*it)->name().length() - 4) ==
"Aux.") {
745 for (
auto it = prunedList.begin();
it != prunedList.end(); ++
it) {
746 if ((*it)->name().length() <= 4 || (*it)->name().substr((*it)->name().length() - 4) !=
"Aux.") {
752 if (!vetoes->empty()) {
757 if (!compInfo->empty()) {
761 return StatusCode::SUCCESS;
770 size_t dotpos =
item.key().find(
'.');
771 string item_key, aux_attr;
772 if( dotpos != string::npos ) {
773 item_key =
item.key().substr(0, dotpos+1);
774 aux_attr =
item.key().substr(dotpos+1);
776 item_key =
item.key();
779 ATH_MSG_DEBUG(
"addItemObjects(" << item_id <<
",\"" << item_key <<
"\") called");
781 if( aux_attr.size() ) {
784 static const std::string wildCard =
"*";
785 std::set<std::string> clidKeys;
787 iter != iterEnd; ++iter) {
788 if (iter->id() == item_id) {
789 clidKeys.insert(iter->key());
796 std::map< unsigned int, std::set< std::string > > comp_attr_map;
801 for(
const auto&
it : comp_attr_map ) {
802 ATH_MSG_DEBUG(
" Comp Attr " <<
it.second.size() <<
" with " <<
it.first <<
" mantissa bits.");
803 if (
it.second.size() > 0 ) {
804 for(
const auto& attr :
it.second ) {
814 bool gotProxies =
false;
817 if (
match !=
nullptr) {
818 map.insert({item_key,
match});
824 if (!gotProxies && ((*m_currentStore)->proxyRange(remapped_item_id, iter,
end)).isSuccess()) {
828 bool added =
false, removed =
false;
831 std::vector<std::string> keyTokens;
832 keyTokens.reserve(2);
833 std::vector<std::string> xkeyTokens;
834 xkeyTokens.reserve(2);
835 ATH_MSG_VERBOSE(
"Calling tokenizeAtStep( " << keyTokens <<
", " << item_key <<
", " << wildCard <<
")" );
837 ATH_MSG_VERBOSE(
"Done calling tokenizeAtStep( " << keyTokens <<
", " << item_key <<
", " << wildCard <<
")" );
839 for (; iter !=
end; ++iter) {
841 string proxyName = itemProxy->
name();
848 bool keyMatch = ( item_key ==
"*" ||
849 item_key == proxyName ||
852 ATH_MSG_VERBOSE(
"Calling matchKey( " << keyTokens <<
", " << proxyName <<
")" );
853 keyMatch =
matchKey(keyTokens, proxyName);
854 ATH_MSG_VERBOSE(
"Done calling matchKey( " << keyTokens <<
", " << proxyName <<
") with result: " << keyMatch );
858 bool xkeyMatch =
false;
859 for (std::set<std::string>::const_iterator c2k_it = clidKeys.begin(), c2k_itEnd = clidKeys.end();
860 keyMatch && c2k_it != c2k_itEnd; ++c2k_it) {
861 if (*c2k_it == wildCard) {
865 std::string::size_type xsep = c2k_it->find(wildCard);
867 if (xsep == std::string::npos) {
868 if (*c2k_it == proxyName) {
874 xkeyMatch =
matchKey(xkeyTokens, proxyName);
884 if (keyMatch && !xkeyMatch) {
887 ATH_MSG_ERROR(
" Could not get data object for id " << remapped_item_id <<
",\"" << proxyName);
890 if (
nullptr != itemProxy->object()) {
894 if( item_id != remapped_item_id ) {
901 auto altbucket = std::make_unique<AltDataBucket>(
907 ATH_MSG_ERROR(
"Failed to retrieve object from MetaCont with key=" << item_key <<
", for EventRangeID=" <<
m_outSeqSvc->currentRangeID() );
908 return StatusCode::FAILURE;
910 }
else if (
item.exact()) {
914 if (!dbb) std::abort();
915 void* ptr = dbb->
cast (item_id);
921 std::make_unique<AltDataBucket>
930 m_objects.push_back(itemProxy->object());
931 ATH_MSG_DEBUG(
" Added object " << item_id <<
",\"" << proxyName <<
"\"");
936 std::stringstream tns;
937 if (!
m_pCLIDSvc->getTypeNameOfID(item_id, tn).isSuccess()) {
939 << item_id <<
",\"" << proxyName);
940 tns << item_id <<
'_' << proxyName;
942 tn +=
'_' + proxyName;
957 << itemProxy->
clID() <<
" to SG::IConstAuxStore*" );
971 std::string
key = item_key;
972 key.erase (
key.size()-4, 4);
979 for(
const auto&
it :
compression.getCompressedAuxIDs( allVars ) ) {
980 if(
it.second.size() > 0 ) {
981 compInfo[
key ][
it.first ] =
it.second;
983 " variables that'll be lossy float compressed"
984 " with " <<
it.first <<
" mantissa bits" );
1003 if (
m_itemSvc->addStreamItem(this->name(),tns.str()).isFailure()) {
1007 }
else if (keyMatch && xkeyMatch) {
1011 if (!added && !removed) {
1012 ATH_MSG_DEBUG(
" No object matching " << item_id <<
",\"" << item_key <<
"\" found");
1013 }
else if (removed) {
1014 ATH_MSG_DEBUG(
" Object being excluded based on property setting "
1015 << item_id <<
",\"" << item_key <<
"\". Skipping");
1018 ATH_MSG_DEBUG(
" Failed to receive proxy iterators from StoreGate for "
1019 << item_id <<
",\"" << item_key <<
"\". Skipping");
1021 return StatusCode::SUCCESS;
1029 std::set<std::string>
1031 const CLID& item_id,
1032 const std::string& item_key)
const
1035 std::set<std::string>
result;
1038 if(item_key.find(
"Aux.") == string::npos) {
1044 iter != iterEnd; ++iter) {
1046 if (iter->id() != item_id) {
1050 size_t seppos = iter->key().find(
'.');
1051 string comp_item_key{
""}, comp_str{
""};
1052 if(seppos != string::npos) {
1053 comp_item_key = iter->key().substr(0, seppos+1);
1054 comp_str = iter->key().substr(seppos+1);
1056 comp_item_key = iter->key();
1060 if (!comp_str.empty() && comp_item_key == item_key) {
1061 std::stringstream
ss(comp_str);
1063 while( std::getline(
ss, attr,
'.') ) {
1075 const std::string& tns,
1076 const std::string& aux_attr,
1081 if( aux_attr.size() ) {
1082 std::stringstream
ss(aux_attr);
1084 while( std::getline(
ss, attr,
'.') ) {
1086 std::stringstream temp;
1087 temp << tns << attr;
1088 if (
m_itemSvc->addStreamItem(this->name(),temp.str()).isFailure()) {
1099 std::string
key = itemProxy.
name();
1100 if (
key.size() >= 4 &&
key.compare (
key.size()-4, 4,
"Aux.")==0)
1102 key.erase (
key.size()-4, 4);
1118 if ( !selected.
test( auxid ) ) {
1127 IProperty *pAsIProp(
nullptr);
1129 nullptr == (pAsIProp =
dynamic_cast<IProperty*
>(&*
m_p2BWritten)) ||
1130 (pAsIProp->setProperty(
m_itemList)).isFailure()) {
1131 throw GaudiException(
"Folder property [itemList] not found",
name(), StatusCode::FAILURE);
1136 IProperty *pAsIProp(
nullptr);
1137 if ((
m_decoder.retrieve()).isFailure() ||
1138 nullptr == (pAsIProp =
dynamic_cast<IProperty*
>(&*
m_decoder)) ||
1139 (pAsIProp->setProperty(
"ItemList",
m_excludeList.toString())).isFailure()) {
1140 throw GaudiException(
"Folder property [itemList] not found",
name(), StatusCode::FAILURE);
1145 IProperty *pAsIProp(
nullptr);
1149 throw GaudiException(
"Folder property [ItemList] not found",
name(), StatusCode::FAILURE);
1154 IProperty *pAsIProp(
nullptr);
1158 throw GaudiException(
"Folder property [ItemList] not found",
name(), StatusCode::FAILURE);
1163 const std::string& portia,
1164 const std::string& sepstr )
const {
1167 if (portia.starts_with( sepstr )) {
1168 subStrings.push_back(
"");
1170 boost::char_separator<char> csep(sepstr.c_str());
1171 boost::tokenizer<char_separator<char>>
tokens(portia, csep);
1172 for (
const std::string&
t :
tokens) {
1174 subStrings.push_back(
t);
1177 if ( portia.size() >= sepstr.size() &&
1178 portia.compare( portia.size() - sepstr.size(), sepstr.size(), sepstr) == 0 ) {
1179 subStrings.push_back(
"");
1185 const string& proxyName)
const {
1186 bool keyMatch =
true;
1189 std::vector<std::string>::const_iterator itrEnd =
key.cend();
1190 std::vector<std::string>::const_iterator itr =
key.cbegin();
1194 std::string::size_type proxyNamePos=0;
1195 while ( itr != itrEnd &&
1196 std::string::npos != ( proxyNamePos = proxyName.find(*itr, proxyNamePos) )
1199 ATH_MSG_VERBOSE(
"If we are at the begin iterator and the first element is Not an empty string");
1200 if ( !(
key.front().empty()) && itr ==
key.cbegin() && proxyNamePos != 0 ) {
1202 ATH_MSG_VERBOSE(
"We had to match a precise name at the beginning, but didn't find it at the beginning");
1206 if ( !(
key.back().empty()) && itr == --(
key.cend()) && (proxyNamePos+itr->size()!=proxyName.size()) ) {
1208 ATH_MSG_VERBOSE(
"We had to match a precise name at the end, but didn't find it at the end");
1211 ATH_MSG_VERBOSE(
"Found a match of subkey: " << *itr <<
" in string: " << proxyName
1212 <<
" at position: " << proxyNamePos );
1214 proxyNamePos += itr->size();
1218 if ( itr != itrEnd ) {
1220 ATH_MSG_VERBOSE(
"Couldn't match every sub-string... return: " << keyMatch);
1222 else {
ATH_MSG_VERBOSE(
"Did match every sub-string... return: " << keyMatch); }
1234 if (!incSvc.retrieve().isSuccess()) {
1236 return StatusCode::FAILURE;
1238 incSvc->removeListener(
this,
"MetaDataStop");
1239 incSvc->addListener(
this,
"MetaDataStop", 50);
1241 if (!
tool->postInitialize().isSuccess()) {
1245 return StatusCode::SUCCESS;
1252 if (!
tool->preFinalize().isSuccess()) {
1256 const Incident metaDataStopIncident(
name(),
"MetaDataStop");
1257 this->
handle(metaDataStopIncident);
1259 if (!incSvc.retrieve().isSuccess()) {
1261 return StatusCode::FAILURE;
1263 incSvc->removeListener(
this,
"MetaDataStop");
1267 return StatusCode::SUCCESS;
1281 std::unique_ptr<ITPCnvBase> tpcnv =
m_tpCnvSvc->t2p_cnv_unique (clid);