8 #include "GaudiKernel/GaudiException.h"
9 #include "GaudiKernel/IAlgManager.h"
10 #include "GaudiKernel/IIoComponentMgr.h"
11 #include "GaudiKernel/ISvcLocator.h"
12 #include "GaudiKernel/IOpaqueAddress.h"
13 #include "GaudiKernel/IProperty.h"
14 #include "GaudiKernel/IClassIDSvc.h"
15 #include "GaudiKernel/ClassID.h"
16 #include "GaudiKernel/MsgStream.h"
17 #include "GaudiKernel/AlgTool.h"
46 #include <boost/tokenizer.hpp>
52 using boost::tokenizer;
53 using boost::char_separator;
84 AltDataBucket (
void*
ptr,
CLID clid,
const std::type_info& tinfo,
86 : m_proxy(this, makeTransientAddress(clid,
proxy).
release()),
87 m_ptr (
ptr), m_clid (clid), m_tinfo (tinfo)
92 AltDataBucket(
void*
ptr,
CLID clid,
const std::type_info& tinfo,
const std::string&
name) :
93 m_proxy(this,
new SG::TransientAddress(clid,
name) ),
94 m_ptr(
ptr), m_clid(clid), m_tinfo(tinfo)
99 virtual const CLID& clID()
const override {
return m_clid; }
100 virtual void*
object()
override {
return m_ptr; }
101 virtual const std::type_info&
tinfo()
const override {
return m_tinfo; }
105 bool =
true)
override
107 virtual void*
cast (
const std::type_info& tinfo,
109 bool =
true)
override
110 {
if (tinfo == m_tinfo)
115 virtual void lock()
override {}
120 std::unique_ptr<SG::TransientAddress>
126 const std::type_info& m_tinfo;
130 std::unique_ptr<SG::TransientAddress>
133 auto newTad = std::make_unique<SG::TransientAddress>
134 (clid, oldProxy.
name());
135 newTad->setAlias (oldProxy.
alias());
143 newTad->setTransientID (tclid);
157 : base_class(
name, pSvcLocator),
158 m_dataStore(
"StoreGateSvc",
name),
159 m_metadataStore(
"MetaDataStore",
name),
160 m_currentStore(&m_dataStore),
161 m_itemSvc(
"ItemListSvc",
name),
162 m_metaDataSvc(
"MetaDataSvc",
name),
163 m_dictLoader(
"AthDictLoaderSvc",
name),
164 m_tpCnvSvc(
"AthTPCnvSvc",
name),
165 m_incidentSvc(
"IncidentSvc",
name),
166 m_outputAttributes(),
167 m_pCLIDSvc(
"ClassIDSvc",
name),
168 m_outSeqSvc(
"OutputStreamSequencerSvc",
name),
169 m_p2BWritten(string(
"SG::Folder/") +
name + string(
"_TopFolder"), this),
170 m_decoder(string(
"SG::Folder/") +
name + string(
"_excluded"), this),
171 m_compressionDecoderHigh(string(
"SG::Folder/") +
name + string(
"_compressed_high"), this),
172 m_compressionDecoderLow(string(
"SG::Folder/") +
name + string(
"_compressed_low"), this),
173 m_transient(string(
"SG::Folder/") +
name + string(
"_transient"), this),
175 m_streamer(string(
"AthenaOutputStreamTool/") +
name + string(
"Tool"), this),
180 declareProperty(
"OutputFile",
m_outputName=
"DidNotNameOutput.root");
181 declareProperty(
"EvtConversionSvc",
m_persName=
"EventPersistencySvc");
267 IProperty *pAsIProp =
dynamic_cast<IProperty*
> (&*
m_transient);
270 return StatusCode::FAILURE;
278 const std::string&
k =
item.key();
279 if (
k.find(
'*') != std::string::npos)
continue;
280 if (
k.find(
'.') != std::string::npos)
continue;
282 if (titem.id() ==
item.id() && titem.key() ==
k) {
294 IProperty *pAsIProp =
dynamic_cast<IProperty*
> (&*
m_transient);
297 return StatusCode::FAILURE;
313 m_incidentSvc->addListener(
this, IncidentType::BeginProcessing, 95);
314 m_incidentSvc->addListener(
this, IncidentType::EndProcessing, 95);
319 if(m_compressionBitsHigh < 5 || m_compressionBitsHigh > 23) {
320 ATH_MSG_INFO(
"Float compression mantissa bits for high compression " <<
325 if(m_compressionBitsLow < 5 || m_compressionBitsLow > 23) {
326 ATH_MSG_INFO(
"Float compression mantissa bits for low compression " <<
332 ATH_MSG_ERROR(
"Float compression mantissa bits for low compression " <<
335 return StatusCode::FAILURE;
338 ATH_MSG_VERBOSE(
"Both high and low float compression lists are empty. Float compression will NOT be applied.");
340 ATH_MSG_INFO(
"Either high or low (or both) float compression lists are defined. Float compression will be applied.");
358 return StatusCode::SUCCESS;
364 return StatusCode::SUCCESS;
372 std::unique_lock<mutex_t> lock(
m_mutex);
374 if( inc.type() ==
"MetaDataStop" ) {
395 EventContext::ContextID_t slot = inc.context().slot();
396 if( slot == EventContext::INVALID_CONTEXT_ID ) {
397 throw GaudiException(
"Received Incident with invalid slot in ES mode",
name(), StatusCode::FAILURE);
399 auto count_events_in_range = [&](
const std::string&
range) {
401 [&](
auto&
el){return el.second == range;} );
403 if( inc.type() == IncidentType::BeginProcessing ) {
408 if( !rangeFN.empty() and rangeFN != newRangeFN ) {
409 ATH_MSG_INFO(
"Slot range change: '" << rangeFN <<
"' -> '" << newRangeFN <<
"'");
414 if( count_events_in_range(rangeFN) == 1 ) {
418 ATH_MSG_INFO(
"slot " << slot <<
" processing event in range: " << newRangeFN);
423 else if( inc.type() == IncidentType::EndProcessing ) {
432 if( count_events_in_range(rangeFN) == 1 ) {
439 ATH_MSG_DEBUG(
"Leaving incident handler for " << inc.type());
453 ATH_MSG_INFO(
"Finished writing Event Sequence to " << rangeFN);
455 strm_iter->second->finalizeOutput().ignore();
456 strm_iter->second->finalize().ignore();
471 if (!
tool->preFinalize().isSuccess()) {
472 throw GaudiException(
"Cannot finalize helper tool",
name(), StatusCode::FAILURE);
476 throw GaudiException(
"Failed on MetaDataSvc prepareOutput",
name(), StatusCode::FAILURE);
483 if (
write().isFailure()) {
484 throw GaudiException(
"Cannot write on finalize",
name(), StatusCode::FAILURE);
491 size_t pos = DHFWriteIncidentfileName.find(
':');
492 if(
pos != std::string::npos ) DHFWriteIncidentfileName = DHFWriteIncidentfileName.substr(
pos+1);
493 FileIncident incident(
name(),
"WriteDataHeaderForms", DHFWriteIncidentfileName);
501 throw GaudiException(
"Unable to connect metadata services",
name(), StatusCode::FAILURE);
504 m_outputAttributes =
"[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData][AttributeListKey=]";
506 IProperty *pAsIProp(
nullptr);
508 nullptr == (pAsIProp =
dynamic_cast<IProperty*
>(&*
m_p2BWritten)) ||
510 throw GaudiException(
"Folder property [metadataItemList] not found",
name(), StatusCode::FAILURE);
512 if (
write().isFailure()) {
513 throw GaudiException(
"Cannot write metadata",
name(), StatusCode::FAILURE);
522 throw GaudiException(
"Unable to re-connect services",
name(), StatusCode::FAILURE);
525 if ((pAsIProp->setProperty(
m_itemList)).isFailure()) {
526 throw GaudiException(
"Folder property [itemList] not found",
name(), StatusCode::FAILURE);
538 if (!
m_streamer->finalizeOutput().isSuccess()) {
549 return(StatusCode::FAILURE);
555 return(StatusCode::SUCCESS);
562 if (!
tool->preExecute().isSuccess()) {
568 if (
write().isFailure()) {
573 if(!
tool->postExecute().isSuccess()) {
586 return(StatusCode::FAILURE);
588 return(StatusCode::SUCCESS);
597 std::unique_lock<mutex_t> lock(
m_mutex);
609 IProperty *mstreamer_props =
dynamic_cast<IProperty*
> (&*
m_streamer);
610 IProperty *streamer_props =
dynamic_cast<IProperty*
> (&*streamer);
611 for (
const auto& prop : mstreamer_props->getProperties() ) {
612 ATH_CHECK( streamer_props->setProperty( *prop ) );
614 if( !streamer or streamer->initialize().isFailure()
617 return StatusCode::FAILURE;
630 std::vector<std::unique_ptr<DataObject> > ownedObjects = std::move(
m_ownedObjects );
634 bool checkCountError =
false;
637 unsigned int lastCount = 0;
640 bool isError =
false;
642 lastCount = (*cit).second;
644 }
else if (lastCount != (*cit).second) {
647 checkCountError =
true;
651 << (*cit).first <<
", " << (*cit).second <<
" should be: " << lastCount);
653 ATH_MSG_DEBUG(
" Object/count: " << (*cit).first <<
", " << (*cit).second);
656 if (checkCountError) {
657 ATH_MSG_FATAL(
"Check number of writes failed. See messages above "
658 "to identify which container is not always written");
659 return(StatusCode::FAILURE);
675 return StatusCode::FAILURE;
680 if (!currentStatus.isSuccess()) {
681 if (!currentStatus.isRecoverable()) {
688 bool doCommit =
false;
694 return(StatusCode::FAILURE);
697 return(StatusCode::SUCCESS);
710 ATH_MSG_WARNING(
"collectAllObjects() could not get ItemList from Tool.");
714 auto vetoes = std::make_unique<SG::SelectionVetoes>();
715 auto compInfo = std::make_unique<SG::CompressionInfo>();
718 std::vector<CLID> folderclids;
723 folderclids.push_back(
i->id());
728 IDataSelector prunedList;
730 if (
std::find(folderclids.begin(),folderclids.end(),(*it)->clID())!=folderclids.end()) {
733 (*it)->name() <<
" ignored");
736 prunedList.push_back(*
it);
740 ATH_MSG_DEBUG(
"Object " << (*it)->clID() <<
","<< (*it)->name() <<
" found that was not in itemlist");
744 for (
auto it = prunedList.begin();
it != prunedList.end(); ++
it) {
745 if ((*it)->name().length() > 4 && (*it)->name().substr((*it)->name().length() - 4) ==
"Aux.") {
749 for (
auto it = prunedList.begin();
it != prunedList.end(); ++
it) {
750 if ((*it)->name().length() <= 4 || (*it)->name().substr((*it)->name().length() - 4) !=
"Aux.") {
756 if (!vetoes->empty()) {
761 if (!compInfo->empty()) {
765 return StatusCode::SUCCESS;
774 size_t dotpos =
item.key().find(
'.');
775 string item_key, aux_attr;
776 if( dotpos != string::npos ) {
777 item_key =
item.key().substr(0, dotpos+1);
778 aux_attr =
item.key().substr(dotpos+1);
780 item_key =
item.key();
783 ATH_MSG_DEBUG(
"addItemObjects(" << item_id <<
",\"" << item_key <<
"\") called");
785 if( aux_attr.size() ) {
788 static const std::string wildCard =
"*";
789 std::set<std::string> clidKeys;
791 iter != iterEnd; ++iter) {
792 if (iter->id() == item_id) {
793 clidKeys.insert(iter->key());
800 std::map< unsigned int, std::set< std::string > > comp_attr_map;
805 for(
const auto&
it : comp_attr_map ) {
806 ATH_MSG_DEBUG(
" Comp Attr " <<
it.second.size() <<
" with " <<
it.first <<
" mantissa bits.");
807 if (
it.second.size() > 0 ) {
808 for(
const auto& attr :
it.second ) {
818 bool gotProxies =
false;
821 if (
match !=
nullptr) {
822 map.insert({item_key,
match});
828 if (!gotProxies && ((*m_currentStore)->proxyRange(remapped_item_id, iter,
end)).isSuccess()) {
832 bool added =
false, removed =
false;
835 std::vector<std::string> keyTokens;
836 keyTokens.reserve(2);
837 std::vector<std::string> xkeyTokens;
838 xkeyTokens.reserve(2);
839 ATH_MSG_VERBOSE(
"Calling tokenizeAtStep( " << keyTokens <<
", " << item_key <<
", " << wildCard <<
")" );
841 ATH_MSG_VERBOSE(
"Done calling tokenizeAtStep( " << keyTokens <<
", " << item_key <<
", " << wildCard <<
")" );
843 for (; iter !=
end; ++iter) {
845 string proxyName = itemProxy->
name();
852 bool keyMatch = ( item_key ==
"*" ||
853 item_key == proxyName ||
856 ATH_MSG_VERBOSE(
"Calling matchKey( " << keyTokens <<
", " << proxyName <<
")" );
857 keyMatch =
matchKey(keyTokens, proxyName);
858 ATH_MSG_VERBOSE(
"Done calling matchKey( " << keyTokens <<
", " << proxyName <<
") with result: " << keyMatch );
862 bool xkeyMatch =
false;
863 for (std::set<std::string>::const_iterator c2k_it = clidKeys.begin(), c2k_itEnd = clidKeys.end();
864 keyMatch && c2k_it != c2k_itEnd; ++c2k_it) {
865 if (*c2k_it == wildCard) {
869 std::string::size_type xsep = c2k_it->find(wildCard);
871 if (xsep == std::string::npos) {
872 if (*c2k_it == proxyName) {
878 xkeyMatch =
matchKey(xkeyTokens, proxyName);
888 if (keyMatch && !xkeyMatch) {
891 ATH_MSG_ERROR(
" Could not get data object for id " << remapped_item_id <<
",\"" << proxyName);
894 if (
nullptr != itemProxy->object()) {
898 if( item_id != remapped_item_id ) {
905 auto altbucket = std::make_unique<AltDataBucket>(
911 ATH_MSG_ERROR(
"Failed to retrieve object from MetaCont with key=" << item_key <<
", for EventRangeID=" <<
m_outSeqSvc->currentRangeID() );
912 return StatusCode::FAILURE;
914 }
else if (
item.exact()) {
918 if (!dbb) std::abort();
919 void*
ptr = dbb->
cast (item_id);
925 std::make_unique<AltDataBucket>
934 m_objects.push_back(itemProxy->object());
935 ATH_MSG_DEBUG(
" Added object " << item_id <<
",\"" << proxyName <<
"\"");
940 std::stringstream tns;
941 if (!
m_pCLIDSvc->getTypeNameOfID(item_id, tn).isSuccess()) {
943 << item_id <<
",\"" << proxyName);
944 tns << item_id <<
'_' << proxyName;
946 tn +=
'_' + proxyName;
961 << itemProxy->
clID() <<
" to SG::IConstAuxStore*" );
975 std::string
key = item_key;
976 key.erase (
key.size()-4, 4);
983 for(
const auto&
it :
compression.getCompressedAuxIDs( allVars ) ) {
984 if(
it.second.size() > 0 ) {
985 compInfo[
key ][
it.first ] =
it.second;
987 " variables that'll be lossy float compressed"
988 " with " <<
it.first <<
" mantissa bits" );
1007 if (
m_itemSvc->addStreamItem(this->name(),tns.str()).isFailure()) {
1011 }
else if (keyMatch && xkeyMatch) {
1015 if (!added && !removed) {
1016 ATH_MSG_DEBUG(
" No object matching " << item_id <<
",\"" << item_key <<
"\" found");
1017 }
else if (removed) {
1018 ATH_MSG_DEBUG(
" Object being excluded based on property setting "
1019 << item_id <<
",\"" << item_key <<
"\". Skipping");
1022 ATH_MSG_DEBUG(
" Failed to receive proxy iterators from StoreGate for "
1023 << item_id <<
",\"" << item_key <<
"\". Skipping");
1025 return StatusCode::SUCCESS;
1033 std::set<std::string>
1035 const CLID& item_id,
1036 const std::string& item_key)
const
1039 std::set<std::string>
result;
1042 if(item_key.find(
"Aux.") == string::npos) {
1048 iter != iterEnd; ++iter) {
1050 if (iter->id() != item_id) {
1054 size_t seppos = iter->key().find(
'.');
1055 string comp_item_key{
""}, comp_str{
""};
1056 if(seppos != string::npos) {
1057 comp_item_key = iter->key().substr(0, seppos+1);
1058 comp_str = iter->key().substr(seppos+1);
1060 comp_item_key = iter->key();
1064 if (!comp_str.empty() && comp_item_key == item_key) {
1065 std::stringstream
ss(comp_str);
1067 while( std::getline(
ss, attr,
'.') ) {
1079 const std::string& tns,
1080 const std::string& aux_attr,
1085 if( aux_attr.size() ) {
1086 std::stringstream
ss(aux_attr);
1088 while( std::getline(
ss, attr,
'.') ) {
1090 std::stringstream temp;
1091 temp << tns << attr;
1092 if (
m_itemSvc->addStreamItem(this->name(),temp.str()).isFailure()) {
1103 std::string
key = itemProxy.
name();
1104 if (
key.size() >= 4 &&
key.compare (
key.size()-4, 4,
"Aux.")==0)
1106 key.erase (
key.size()-4, 4);
1122 if ( !selected.
test( auxid ) ) {
1131 IProperty *pAsIProp(
nullptr);
1133 nullptr == (pAsIProp =
dynamic_cast<IProperty*
>(&*
m_p2BWritten)) ||
1134 (pAsIProp->setProperty(
m_itemList)).isFailure()) {
1135 throw GaudiException(
"Folder property [itemList] not found",
name(), StatusCode::FAILURE);
1140 IProperty *pAsIProp(
nullptr);
1141 if ((
m_decoder.retrieve()).isFailure() ||
1142 nullptr == (pAsIProp =
dynamic_cast<IProperty*
>(&*
m_decoder)) ||
1143 (pAsIProp->setProperty(
"ItemList",
m_excludeList.toString())).isFailure()) {
1144 throw GaudiException(
"Folder property [itemList] not found",
name(), StatusCode::FAILURE);
1149 IProperty *pAsIProp(
nullptr);
1153 throw GaudiException(
"Folder property [ItemList] not found",
name(), StatusCode::FAILURE);
1158 IProperty *pAsIProp(
nullptr);
1162 throw GaudiException(
"Folder property [ItemList] not found",
name(), StatusCode::FAILURE);
1167 const std::string& portia,
1168 const std::string& sepstr )
const {
1171 if (portia.starts_with( sepstr )) {
1172 subStrings.push_back(
"");
1174 boost::char_separator<char> csep(sepstr.c_str());
1175 boost::tokenizer<char_separator<char>>
tokens(portia, csep);
1176 for (
const std::string&
t :
tokens) {
1178 subStrings.push_back(
t);
1181 if ( portia.size() >= sepstr.size() &&
1182 portia.compare( portia.size() - sepstr.size(), sepstr.size(), sepstr) == 0 ) {
1183 subStrings.push_back(
"");
1189 const string& proxyName)
const {
1190 bool keyMatch =
true;
1193 std::vector<std::string>::const_iterator itrEnd =
key.cend();
1194 std::vector<std::string>::const_iterator itr =
key.cbegin();
1198 std::string::size_type proxyNamePos=0;
1199 while ( itr != itrEnd &&
1200 std::string::npos != ( proxyNamePos = proxyName.find(*itr, proxyNamePos) )
1203 ATH_MSG_VERBOSE(
"If we are at the begin iterator and the first element is Not an empty string");
1204 if ( !(
key.front().empty()) && itr ==
key.cbegin() && proxyNamePos != 0 ) {
1206 ATH_MSG_VERBOSE(
"We had to match a precise name at the beginning, but didn't find it at the beginning");
1210 if ( !(
key.back().empty()) && itr == --(
key.cend()) && (proxyNamePos+itr->size()!=proxyName.size()) ) {
1212 ATH_MSG_VERBOSE(
"We had to match a precise name at the end, but didn't find it at the end");
1215 ATH_MSG_VERBOSE(
"Found a match of subkey: " << *itr <<
" in string: " << proxyName
1216 <<
" at position: " << proxyNamePos );
1218 proxyNamePos += itr->size();
1222 if ( itr != itrEnd ) {
1224 ATH_MSG_VERBOSE(
"Couldn't match every sub-string... return: " << keyMatch);
1226 else {
ATH_MSG_VERBOSE(
"Did match every sub-string... return: " << keyMatch); }
1240 if (!
tool->postInitialize().isSuccess()) {
1244 return StatusCode::SUCCESS;
1251 if (!
tool->preFinalize().isSuccess()) {
1255 const Incident metaDataStopIncident(
name(),
"MetaDataStop");
1256 this->
handle(metaDataStopIncident);
1261 return StatusCode::SUCCESS;
1275 std::unique_ptr<ITPCnvBase> tpcnv =
m_tpCnvSvc->t2p_cnv_unique (clid);