ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaOutputStreamTool Class Reference

This is the implementation of IAthenaOutputStreamTool. More...

#include <AthenaOutputStreamTool.h>

Inheritance diagram for AthenaOutputStreamTool:

Public Types

typedef std::pair< std::string, std::string > TypeKeyPair
 Stream out objects.
typedef std::vector< TypeKeyPairTypeKeyPairs
typedef std::vector< DataObject * > DataObjectVec
 Stream out a vector of objects Must convert to DataObject, e.g.

Public Member Functions

 AthenaOutputStreamTool (const std::string &type, const std::string &name, const IInterface *parent)
 Standard AlgTool Constructor.
virtual StatusCode initialize () override
 AthAlgTool Interface method implementations:
StatusCode connectServices (const std::string &dataStore, const std::string &cnvSvc, bool extendProvenenceRecord) override
 Specify which data store and conversion service to use and whether to extend provenance Only use if one wants to override jobOptions.
StatusCode connectOutput (const std::string &outputName="") override
 Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wants to override jobOptions.
StatusCode commitOutput (bool doCommit=false) override
 Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.
StatusCode finalizeOutput () override
 Finalize the output stream after the last commit, e.g.
virtual StatusCode streamObjects (const TypeKeyPairs &typeKeys, const std::string &outputName="") override
virtual StatusCode streamObjects (const DataObjectVec &dataObjects, const std::string &outputName="") override
virtual StatusCode getInputItemList (SG::IFolder *m_p2BWrittenFromTool) override

Private Member Functions

StatusCode connectServices ()
 Do the real connection to services.
void propagateProvenance (const DataHeader &src_dh)
 copy provenance records when creating new DataHeaders

Private Attributes

StringProperty m_outputName { this, "OutputFile", "", "name of the output db name"}
StringProperty m_dataHeaderKey { this, "DataHeaderKey", "", "name of the data header key: defaults to tool name"}
StringProperty m_processTag { this, "ProcessingTag", "", "tag of processing stage: defaults to SG key of DataHeader (Stream name)"}
StringProperty m_outputCollection { this, "OutputCollection", "", "custom container name prefix for DataHeader: default = "" (will result in \"POOLContainer_\")"}
StringProperty m_containerPrefix { this, "PoolContainerPrefix", "", "prefix for top level POOL container: default = \"CollectionTree\""}
StringProperty m_containerNameHint { this, "TopLevelContainerName", "0", "naming hint policy for top level POOL container: default = \"0\""}
StringProperty m_metaDataOutputCollection { this, "MetaDataOutputCollection", "", "custom container name prefix for MetaDataHeader: default = "" (will result in \"MetaDataHdr\")"}
StringProperty m_metaDataContainerPrefix { this, "MetaDataPoolContainerPrefix", "", "prefix for top level MetaData container: default = "" (will result in \"MetaData\")"}
StringProperty m_branchNameHint { this, "SubLevelBranchName", "0", "naming hint policy for POOL branching: default = \"0\"" }
BooleanProperty m_extend { this, "SaveDecisions", false, "Set to true to add streaming decisions to an attributeList"}
std::string m_outputAttributes
std::string m_metaDataOutputAttributes
SG::ReadHandleKey< AthenaAttributeListm_attrListKey {this, "AttributeListKey", "", "optional key for AttributeList to be written as part of the DataHeader: default = \"\""}
std::string m_attrListWrite {""}
ServiceHandle< StoreGateSvcm_store { this, "Store", "StoreGateSvc/DetectorStore", "Pointer to the data store"}
ServiceHandle< IConversionSvc > m_conversionSvc { this, "ConversionService", "AthenaPoolCnvSvc" }
 Keep reference to the data conversion service.
ServiceHandle< IClassIDSvc > m_clidSvc
 Ref to ClassIDSvc to convert type name to clid.
ServiceHandle< IDecisionSvcm_decSvc
 Ref to DecisionSvc.
DataHeaderm_dataHeader {nullptr}
 Current DataHeader for streamed objects.
bool m_connectionOpen {false}
 Flag to tell whether connectOutput has been called.
bool m_extendProvenanceRecord {false}
 Flag as to whether to extend provenance via the DataHeader.
std::string m_keepProvenancesStr
 RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStream property.
std::regex m_keepProvenancesRE
 RegEx pattern created from m_keepProvenancesStr.
std::map< std::string, bool > m_keepProvenanceMatch
 Cache provenance RegEx matching result in a map.
std::set< std::string > m_skippedItems
 set of skipped item keys, because of missing CLID

Detailed Description

This is the implementation of IAthenaOutputStreamTool.

Definition at line 34 of file AthenaOutputStreamTool.h.

Member Typedef Documentation

◆ DataObjectVec

typedef std::vector<DataObject*> AthenaOutputStreamTool::DataObjectVec

Stream out a vector of objects Must convert to DataObject, e.g.

#include "AthenaKernel/StorableConversions.h" T* obj = xxx; DataObject* dataObject = SG::asStorable(obj);

Definition at line 76 of file AthenaOutputStreamTool.h.

◆ TypeKeyPair

typedef std::pair<std::string, std::string> AthenaOutputStreamTool::TypeKeyPair

Stream out objects.

Provide vector of typeName/key pairs. If key is empty, assumes only one object and this will fail if there is more than one

Definition at line 67 of file AthenaOutputStreamTool.h.

◆ TypeKeyPairs

Definition at line 68 of file AthenaOutputStreamTool.h.

Constructor & Destructor Documentation

◆ AthenaOutputStreamTool()

AthenaOutputStreamTool::AthenaOutputStreamTool ( const std::string & type,
const std::string & name,
const IInterface * parent )

Standard AlgTool Constructor.

Constructor.

Definition at line 43 of file AthenaOutputStreamTool.cxx.

45 : base_class(type, name, parent),
46 m_clidSvc("ClassIDSvc", name),
47 m_decSvc("DecisionSvc/DecisionSvc", name) {
48}
ServiceHandle< IDecisionSvc > m_decSvc
Ref to DecisionSvc.
ServiceHandle< IClassIDSvc > m_clidSvc
Ref to ClassIDSvc to convert type name to clid.

Member Function Documentation

◆ commitOutput()

StatusCode AthenaOutputStreamTool::commitOutput ( bool doCommit = false)
override

Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.

Definition at line 340 of file AthenaOutputStreamTool.cxx.

340 {
341 ATH_MSG_DEBUG("In commitOutput");
342 // Connect the output file to the service
343 if (m_conversionSvc->commitOutput(m_outputName.value(), doCommit).isFailure()) {
344 ATH_MSG_ERROR("Unable to commit output " << m_outputName.value());
345 return(StatusCode::FAILURE);
346 }
347 // Set flag that connection is closed
348 m_connectionOpen = false;
349 return(StatusCode::SUCCESS);
350}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_DEBUG(x)
bool m_connectionOpen
Flag to tell whether connectOutput has been called.
ServiceHandle< IConversionSvc > m_conversionSvc
Keep reference to the data conversion service.

◆ connectOutput()

StatusCode AthenaOutputStreamTool::connectOutput ( const std::string & outputName = "")
override

Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wants to override jobOptions.

Definition at line 159 of file AthenaOutputStreamTool.cxx.

159 {
160 ATH_MSG_DEBUG("In connectOutput " << outputName);
161
162 // Use arg if not empty, save the output name
163 if (!outputName.empty()) {
164 m_outputName.setValue(outputName);
165 }
166 if (m_outputName.value().empty()) {
167 ATH_MSG_ERROR("No OutputName provided");
168 return(StatusCode::FAILURE);
169 }
170 // Connect services if not already available
171 if (m_store == 0 || m_conversionSvc == 0) {
173 }
174 // Connect the output file to the service
175 if (m_conversionSvc->connectOutput(m_outputName.value()).isFailure()) {
176 ATH_MSG_ERROR("Unable to connect output " << m_outputName.value());
177 return(StatusCode::FAILURE);
178 } else {
179 ATH_MSG_DEBUG("Connected to " << m_outputName.value());
180 }
181
182 // Remove DataHeader with same key if it exists
183 if (m_store->contains<DataHeader>(m_dataHeaderKey)) {
184 const DataHeader* preDh = nullptr;
185 if (m_store->retrieve(preDh, m_dataHeaderKey).isSuccess()) {
186 if (m_store->removeDataAndProxy(preDh).isFailure()) {
187 ATH_MSG_ERROR("Unable to get proxy for the DataHeader with key " << m_dataHeaderKey);
188 return(StatusCode::FAILURE);
189 }
190 ATH_MSG_DEBUG("Released DataHeader with key " << m_dataHeaderKey);
191 }
192 }
193
194 // Create new DataHeader
195 m_dataHeader = new DataHeader();
196 m_dataHeader->setProcessTag(m_processTag);
197
198 // Retrieve all existing DataHeaders from StoreGate
199 const DataHeader* dh = nullptr;
200 std::vector<std::string> dhKeys;
201 m_store->keys<DataHeader>(dhKeys);
202 //construct string out of loop
203 const std::string boolTypeStr{"bool"};
204 for (const std::string& dhKey : dhKeys) {
205 bool primaryDH = false;
206 if (!m_store->transientContains<DataHeader>(dhKey)) {
207 if (dhKey == "EventSelector") primaryDH = true;
208 ATH_MSG_DEBUG("No transientContains DataHeader with key " << dhKey);
209 }
210 if (m_store->retrieve(dh, dhKey).isFailure()) {
211 ATH_MSG_DEBUG("Unable to retrieve the DataHeader with key " << dhKey);
212 }
213 if (dh->isInput() || hasInputAlias(*m_store->proxy(dh)) || primaryDH) {
214 propagateProvenance( *dh );
215 }
216 }
217
218 // Attach the attribute list to the DataHeader if requested
219 if (!m_attrListKey.key().empty() && m_store->storeID() == StoreID::EVENT_STORE) {
220 auto attrListHandle = SG::makeHandle(m_attrListKey);
221 if (!attrListHandle.isValid()) {
222 ATH_MSG_WARNING("Unable to retrieve AttributeList with key " << m_attrListKey);
223 } else {
224 m_dataHeader->setAttributeList(attrListHandle.cptr());
225 if (m_extend) { // Add streaming decisions
226 ATH_MSG_DEBUG("Adding stream decisions to " << m_attrListWrite);
227 // Look for attribute list created for mini-EventInfo
228 const AthenaAttributeList* attlist(attrListHandle.cptr());
229
230 // Build new attribute list for modification
231 AthenaAttributeList* newone = new AthenaAttributeList(attlist->specification());
232 newone->copyData(*attlist);
233
234 // Now loop over stream definitions and add decisions
235 auto streams = m_decSvc->getStreams();
236 for (auto it = streams.begin();
237 it != streams.end(); ++it) {
238 newone->extend(*it,boolTypeStr);
239 (*newone)[*it].data<bool>() = m_decSvc->isEventAccepted(*it,Gaudi::Hive::currentContext());
240 ATH_MSG_DEBUG("Added stream decision for " << *it << " to " << m_attrListKey);
241 }
242 // record new attribute list with old key + suffix
243 const AthenaAttributeList* attrList2 = nullptr;
244 if (!m_store->contains<AthenaAttributeList>(m_attrListWrite)) {
245 if (m_store->record(newone,m_attrListWrite).isFailure()) {
246 ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
247 }
248 } else {
249 ATH_MSG_DEBUG("Decisions already added by a different stream");
250 }
251 if (m_store->retrieve(attrList2,m_attrListWrite).isFailure()) {
252 ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
253 } else {
254 m_dataHeader->setAttributeList(attrList2);
255 }
256/*
257 SG::WriteHandle<AthenaAttributeList> attrWrite(m_attrListWrite);
258 std::unique_ptr<AthenaAttributeList> uptr = std::make_unique<AthenaAttributeList>(*newone);
259 if ( attrWrite.record(std::move(uptr)).isFailure() ) {
260 ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
261 } else {
262 ATH_MSG_DEBUG("Decisions already added by a different stream");
263 }
264*/
265 //m_dataHeader->setAttributeList(newone);
266 } // list extend check
267 } // list retrieve check
268 } // list property check
269
270 // Record DataHeader in StoreGate
271 SG::WriteHandle<DataHeader> wh(m_dataHeaderKey, m_store->name());
272 if (wh.record(std::unique_ptr<DataHeader>(m_dataHeader)).isFailure()) {
273 ATH_MSG_ERROR("Unable to record DataHeader with key " << m_dataHeaderKey);
274 return(StatusCode::FAILURE);
275 } else {
276 ATH_MSG_DEBUG("Recorded DataHeader with key " << m_dataHeaderKey);
277 }
279 // Set flag that connection is open
280 m_connectionOpen = true;
281 return(StatusCode::SUCCESS);
282}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)
DataHeader * m_dataHeader
Current DataHeader for streamed objects.
StatusCode connectServices()
Do the real connection to services.
void propagateProvenance(const DataHeader &src_dh)
copy provenance records when creating new DataHeaders
ServiceHandle< StoreGateSvc > m_store
SG::ReadHandleKey< AthenaAttributeList > m_attrListKey
bool isInput() const
Check whether StatusFlag is "Input".
@ EVENT_STORE
Definition StoreID.h:26
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
str outputName
Definition lumiFormat.py:65
str wh
Definition parseDir.py:45

◆ connectServices() [1/2]

StatusCode AthenaOutputStreamTool::connectServices ( )
private

Do the real connection to services.

Definition at line 150 of file AthenaOutputStreamTool.cxx.

150 {
151 // Find the data store
152 if (m_store.retrieve().isFailure() || m_store == 0) {
153 ATH_MSG_ERROR("Could not locate " << m_store.typeAndName() << " store");
154 return(StatusCode::FAILURE);
155 }
156 return(StatusCode::SUCCESS);
157}

◆ connectServices() [2/2]

StatusCode AthenaOutputStreamTool::connectServices ( const std::string & dataStore,
const std::string & cnvSvc,
bool extendProvenenceRecord )
override

Specify which data store and conversion service to use and whether to extend provenance Only use if one wants to override jobOptions.

Definition at line 116 of file AthenaOutputStreamTool.cxx.

118 {
119 // Release old data store
120 if (m_store.isValid()) {
121 if (m_store.release().isFailure()) {
122 ATH_MSG_ERROR("Could not release " << m_store.typeAndName() << " store");
123 }
124 }
125 m_store = ServiceHandle<StoreGateSvc>(dataStore, this->name());
126 if (cnvSvc != m_conversionSvc.type() && cnvSvc != "EventPersistencySvc") {
127 if (m_conversionSvc.release().isFailure()) {
128 ATH_MSG_ERROR("Could not release " << m_conversionSvc.type());
129 }
130 m_conversionSvc = ServiceHandle<IConversionSvc>(cnvSvc, this->name());
131 if (m_conversionSvc.retrieve().isFailure() || m_conversionSvc == 0) {
132 ATH_MSG_ERROR("Could not locate " << m_conversionSvc.type());
133 return(StatusCode::FAILURE);
134 }
135 }
136 m_extendProvenanceRecord = extendProvenenceRecord;
137 auto pprop = dynamic_cast<const IProperty*>(parent());
138 if (not pprop){
139 ATH_MSG_ERROR("'parent' could not be cast to IProperty");
140 return(StatusCode::FAILURE);
141 }
142 auto keep = dynamic_cast<const StringProperty&>( pprop->getProperty("KeepProvenanceTagsRegEx") );
143 m_keepProvenancesStr = keep.value();
144 // create RegEx pattern from the property value, specify extended grammar
145 m_keepProvenancesRE = std::regex(m_keepProvenancesStr, std::regex::extended);
146
147 return(connectServices());
148}
bool m_extendProvenanceRecord
Flag as to whether to extend provenance via the DataHeader.
std::regex m_keepProvenancesRE
RegEx pattern created from m_keepProvenancesStr.
std::string m_keepProvenancesStr
RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStrea...

◆ finalizeOutput()

StatusCode AthenaOutputStreamTool::finalizeOutput ( )
override

Finalize the output stream after the last commit, e.g.

in finalize

Definition at line 352 of file AthenaOutputStreamTool.cxx.

352 {
353 AthCnvSvc* athConversionSvc = dynamic_cast<AthCnvSvc*>(m_conversionSvc.get());
354 if (athConversionSvc != 0) {
355 if (athConversionSvc->disconnectOutput(m_outputName.value()).isFailure()) {
356 ATH_MSG_ERROR("Unable to finalize output " << m_outputName.value());
357 return(StatusCode::FAILURE);
358 }
359 }
360 return(StatusCode::SUCCESS);
361}
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.

◆ getInputItemList()

StatusCode AthenaOutputStreamTool::getInputItemList ( SG::IFolder * m_p2BWrittenFromTool)
overridevirtual

Definition at line 535 of file AthenaOutputStreamTool.cxx.

535 {
536 const std::string hltKey = "HLTAutoKey";
537 SG::ConstIterator<DataHeader> beg;
538 SG::ConstIterator<DataHeader> ending;
539 if (m_store->retrieve(beg, ending).isFailure() || beg == ending) {
540 ATH_MSG_DEBUG("No DataHeaders present in StoreGate");
541 } else {
542 for ( ; beg != ending; ++beg) {
543 if (m_store->transientContains<DataHeader>(beg.key()) && beg->isInput()) {
544 for (std::vector<DataHeaderElement>::const_iterator it = beg->begin(), itLast = beg->end();
545 it != itLast; ++it) {
546 // Only insert the primary clid, not the ones for the symlinks!
547 CLID clid = it->getPrimaryClassID();
548 if (clid != ClassID_traits<DataHeader>::ID()) {
549 //check the typename is known ... we make an exception if the key contains 'Aux.' ... aux containers may not have their keys known yet in some cases
550 //see https://its.cern.ch/jira/browse/ATLASG-59 for the solution
551 std::string typeName;
552 if (m_clidSvc->getTypeNameOfID(clid, typeName).isFailure() && it->getKey().find("Aux.") == std::string::npos) {
553 if (m_skippedItems.find(it->getKey()) == m_skippedItems.end()) {
554 ATH_MSG_WARNING("Skipping " << it->getKey() << " with unknown clid " << clid << " . Further warnings for this item are suppressed" );
555 m_skippedItems.insert(it->getKey());
556 }
557 continue;
558 }
559 ATH_MSG_DEBUG("Adding " << typeName << "#" << it->getKey() << " (clid " << clid << ") to itemlist");
560 const std::string keyName = it->getKey();
561 if (keyName.size() > 10 && keyName.compare(0, 10,hltKey)==0) {
562 p2BWrittenFromTool->add(clid, hltKey + "*").ignore();
563 } else if (keyName.size() > 10 && keyName.compare(keyName.size() - 10, 10, hltKey)==0) {
564 p2BWrittenFromTool->add(clid, "*" + hltKey).ignore();
565 } else {
566 p2BWrittenFromTool->add(clid, keyName).ignore();
567 }
568 }
569 }
570 }
571 }
572 }
573 ATH_MSG_DEBUG("Adding DataHeader for stream " << name());
574 return(StatusCode::SUCCESS);
575}
uint32_t CLID
The Class ID type.
std::set< std::string > m_skippedItems
set of skipped item keys, because of missing CLID

◆ initialize()

StatusCode AthenaOutputStreamTool::initialize ( )
overridevirtual

AthAlgTool Interface method implementations:

Definition at line 50 of file AthenaOutputStreamTool.cxx.

50 {
51 ATH_MSG_INFO("Initializing " << name());
52
53 ATH_CHECK( m_clidSvc.retrieve() );
54 ATH_CHECK( m_conversionSvc.retrieve() );
55
56 // Autoconfigure
57 if (m_dataHeaderKey.empty()) {
58 m_dataHeaderKey.setValue(name());
59 // Remove "ToolSvc." from m_dataHeaderKey.
60 if (m_dataHeaderKey.value().starts_with( "ToolSvc.")) {
61 m_dataHeaderKey.setValue(m_dataHeaderKey.value().substr(8));
62 // Remove "Tool" from m_dataHeaderKey.
63 if (m_dataHeaderKey.value().find("Tool") == m_dataHeaderKey.size() - 4) {
64 m_dataHeaderKey.setValue(m_dataHeaderKey.value().substr(0, m_dataHeaderKey.size() - 4));
65 }
66 } else {
67 const INamedInterface* parentAlg = dynamic_cast<const INamedInterface*>(parent());
68 if (parentAlg != 0) {
69 m_dataHeaderKey.setValue(parentAlg->name());
70 }
71 }
72 }
73 if (m_processTag.empty()) {
75 }
76
77 { // handle the AttrKey overwrite
78 const std::string keyword = "[AttributeListKey=";
79 std::string::size_type pos = m_outputName.value().find(keyword);
80 if( (pos != std::string::npos) ) {
81 ATH_MSG_INFO("The AttrListKey will be overwritten/set by the value from the OutputName: " << m_outputName);
82 const std::string attrListKey = m_outputName.value().substr(pos + keyword.size(),
83 m_outputName.value().find(']', pos + keyword.size()) - pos - keyword.size());
84 m_attrListKey = attrListKey;
85 }
86 }
87 if ( ! m_attrListKey.key().empty() ) {
88 ATH_CHECK(m_attrListKey.initialize());
89 m_attrListWrite = m_attrListKey.key() + "Decisions";
90 //ATH_CHECK(m_attrListWrite.initialize());
91 }
92
93 if (!m_outputCollection.value().empty()) {
94 m_outputAttributes += "[OutputCollection=" + m_outputCollection.value() + "]";
95 }
96 if (!m_containerPrefix.value().empty()) {
97 m_outputAttributes += "[PoolContainerPrefix=" + m_containerPrefix.value() + "]";
98 }
99 if (m_containerNameHint.value() != "0") {
100 m_outputAttributes += "[TopLevelContainerName=" + m_containerNameHint.value() + "]";
101 }
102 if (m_branchNameHint.value() != "0") {
103 m_outputAttributes += "[SubLevelBranchName=" + m_branchNameHint.value() + "]";
104 }
105 if (!m_metaDataOutputCollection.value().empty()) {
106 m_metaDataOutputAttributes += "[OutputCollection=" + m_metaDataOutputCollection.value() + "]";
107 }
108 if (!m_metaDataContainerPrefix.value().empty()) {
109 m_metaDataOutputAttributes += "[PoolContainerPrefix=" + m_metaDataContainerPrefix.value() + "]";
110 }
111
112 if (m_extend) ATH_CHECK(m_decSvc.retrieve());
113 return(StatusCode::SUCCESS);
114}
#define ATH_MSG_INFO(x)
StringProperty m_metaDataContainerPrefix
StringProperty m_metaDataOutputCollection
std::string keyword(const std::string &classname)

◆ propagateProvenance()

void AthenaOutputStreamTool::propagateProvenance ( const DataHeader & src_dh)
private

copy provenance records when creating new DataHeaders

Definition at line 285 of file AthenaOutputStreamTool.cxx.

286{
287 // keep track of provenance entries inserted into the new DataHeader
288 std::set<std::string> insertedTags{};
289 // Add DataHeader token to the new DataHeader
291 std::string pTag;
292 std::unique_ptr<SG::TransientAddress> dhTransAddr;
293 for (const DataHeaderElement& dhe : src_dh) {
294 if (dhe.getPrimaryClassID() == ClassID_traits<DataHeader>::ID()) {
295 pTag = dhe.getKey();
296 dhTransAddr.reset( dhe.getAddress( m_conversionSvc->repSvcType() ) );
297 }
298 }
299 // Update dhTransAddr to handle fast merged files.
300 if( auto dhProxy=m_store->proxy(&src_dh); dhProxy && dhProxy->address() ) {
301 DataHeaderElement dhe(dhProxy, dhProxy->address(), pTag);
302 m_dataHeader->insertProvenance(dhe);
303 insertedTags.insert(std::move(pTag));
304 }
305 else if( dhTransAddr ) {
306 DataHeaderElement dhe(dhTransAddr.get(), dhTransAddr->address(), pTag);
307 m_dataHeader->insertProvenance(dhe);
308 insertedTags.insert(std::move(pTag));
309 }
310 }
311
312 // empty regexpr means do not keep any provenance
313 if( !m_keepProvenancesStr.empty() ) {
314 // Each stream tag is written only once in the provenance record
315 // In files where there are multiple entries per stream tag
316 // the record is in reverse, i.e., the latest appears first.
317 // Therefore, only keep the first entry if there are multiple
318 // matches so that we retain the latest one.
319 for(auto iter=src_dh.beginProvenance(), iEnd=src_dh.endProvenance(); iter != iEnd; ++iter) {
320 const auto & currentKey = (*iter).getKey();
321 if( insertedTags.insert(currentKey).second ) {
322 // first prov with that tag. Now check if we want to keep that tag
323 bool keep = false;
324 auto it = m_keepProvenanceMatch.find( currentKey );
325 if( it != m_keepProvenanceMatch.end() ) {
326 keep = it->second;
327 } else {
328 keep = std::regex_search(currentKey, m_keepProvenancesRE);
329 m_keepProvenanceMatch[currentKey] = keep;
330 }
331 if( keep ) {
332 m_dataHeader->insertProvenance(*iter);
333 }
334 }
335 }
336 }
337}
std::map< std::string, bool > m_keepProvenanceMatch
Cache provenance RegEx matching result in a map.

◆ streamObjects() [1/2]

StatusCode AthenaOutputStreamTool::streamObjects ( const DataObjectVec & dataObjects,
const std::string & outputName = "" )
overridevirtual

Definition at line 424 of file AthenaOutputStreamTool.cxx.

424 {
425 // Check that a connection has been opened
426 if (!m_connectionOpen) {
427 ATH_MSG_ERROR("Connection NOT open. Please open a connection before streaming out objects.");
428 return(StatusCode::FAILURE);
429 }
430 // Connect the output file to the service
431 std::string outputConnectionString = outputName;
432 const std::string defaultMetaDataString = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData]";
433 if (std::string::size_type mpos = outputConnectionString.find(defaultMetaDataString); mpos!=std::string::npos) {
434 // If we're in here we're writing MetaData
435 // Now let's see if we should be overwriting the MetaData attributes
436 // For the time-being this happens when we're writing MetaData in the augmentation mode
437 if (!m_metaDataOutputAttributes.empty()) {
438 // Note: This won't work quite right if only one attribute is set though!
439 outputConnectionString.replace(mpos, defaultMetaDataString.length(), m_metaDataOutputAttributes);
440 }
441 }
442 for (std::string::size_type pos = m_outputAttributes.find('['); pos != std::string::npos; pos = m_outputAttributes.find('[', ++pos)) {
443 if (outputConnectionString.find(m_outputAttributes.substr(pos, m_outputAttributes.find('=', pos) + 1 - pos)) == std::string::npos) {
444 outputConnectionString += m_outputAttributes.substr(pos, m_outputAttributes.find(']', pos) + 1 - pos);
445 }
446 }
447
448 // Check that the DataHeader is still valid
449 DataObject* dataHeaderObj = m_store->accessData(ClassID_traits<DataHeader>::ID(), m_dataHeaderKey);
450 std::map<DataObject*, IOpaqueAddress*> written;
451 for (DataObject* dobj : dataObjects) {
452 // Do not write the DataHeader via the explicit list
453 if (dobj->clID() == ClassID_traits<DataHeader>::ID()) {
454 ATH_MSG_DEBUG("Explicit request to write DataHeader: " << dobj->name() << " - skipping it.");
455 // Do not stream out same object twice
456 } else if (written.find(dobj) != written.end()) {
457 // Print warning and skip
458 ATH_MSG_DEBUG("Trying to write DataObject twice (clid/key): " << dobj->clID() << " " << dobj->name());
459 ATH_MSG_DEBUG(" Skipping this one.");
460 } else {
461 // Write object
462 IOpaqueAddress* addr = new TokenAddress(0, dobj->clID(), outputConnectionString);
463 addr->addRef();
464 if (m_conversionSvc->createRep(dobj, addr).isSuccess()) {
465 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dobj, addr));
466 } else {
467 ATH_MSG_ERROR("Could not create Rep for DataObject (clid/key):" << dobj->clID() << " " << dobj->name());
468 return(StatusCode::FAILURE);
469 }
470 }
471 }
472 // End of loop over DataObjects, write DataHeader
473 if ((m_conversionSvc.type() == "AthenaPoolCnvSvc" || m_conversionSvc.type() == "AthenaPoolSharedIOCnvSvc") && dataHeaderObj != nullptr) {
474 IOpaqueAddress* addr = new TokenAddress(0, dataHeaderObj->clID(), outputConnectionString);
475 addr->addRef();
476 if (m_conversionSvc->createRep(dataHeaderObj, addr).isSuccess()) {
477 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dataHeaderObj, addr));
478 } else {
479 ATH_MSG_ERROR("Could not create Rep for DataHeader");
480 return(StatusCode::FAILURE);
481 }
482 }
483 for (DataObject* dobj : dataObjects) {
484 // call fillRepRefs of persistency service
485 SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(dobj->registry());
486 if (proxy != nullptr && written.find(dobj) != written.end()) {
487 IOpaqueAddress* addr(written.find(dobj)->second);
488 if ((m_conversionSvc->fillRepRefs(addr, dobj)).isSuccess()) {
489 if (dobj->clID() != 1 || addr->par()[0] != "\n") {
490 if (dobj->clID() != ClassID_traits<DataHeader>::ID()) {
491 m_dataHeader->insert(proxy, addr);
492 } else {
493 m_dataHeader->insert(proxy, addr, m_processTag);
494 }
495 if (proxy->address() == nullptr) {
496 proxy->setAddress(addr);
497 }
498 addr->release();
499 }
500 } else {
501 ATH_MSG_ERROR("Could not fill Object Refs for DataObject (clid/key):" << dobj->clID() << " " << dobj->name());
502 return(StatusCode::FAILURE);
503 }
504 } else {
505 ATH_MSG_WARNING("Could cast DataObject " << dobj->clID() << " " << dobj->name());
506 }
507 }
508 m_dataHeader->addHash(&*m_store);
509 if ((m_conversionSvc.type() == "AthenaPoolCnvSvc" || m_conversionSvc.type() == "AthenaPoolSharedIOCnvSvc") && dataHeaderObj != nullptr) {
510 // End of DataObjects, fill refs for DataHeader
511 SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(dataHeaderObj->registry());
512 if (proxy != nullptr && written.find(dataHeaderObj) != written.end()) {
513 IOpaqueAddress* addr(written.find(dataHeaderObj)->second);
514 if ((m_conversionSvc->fillRepRefs(addr, dataHeaderObj)).isSuccess()) {
515 if (dataHeaderObj->clID() != 1 || addr->par()[0] != "\n") {
516 if (dataHeaderObj->clID() != ClassID_traits<DataHeader>::ID()) {
517 m_dataHeader->insert(proxy, addr);
518 } else {
519 m_dataHeader->insert(proxy, addr, m_processTag);
520 }
521 addr->release();
522 }
523 } else {
524 ATH_MSG_ERROR("Could not fill Object Refs for DataHeader");
525 return(StatusCode::FAILURE);
526 }
527 } else {
528 ATH_MSG_ERROR("Could not cast DataHeader");
529 return(StatusCode::FAILURE);
530 }
531 }
532 return(StatusCode::SUCCESS);
533}

◆ streamObjects() [2/2]

StatusCode AthenaOutputStreamTool::streamObjects ( const TypeKeyPairs & typeKeys,
const std::string & outputName = "" )
overridevirtual

Definition at line 363 of file AthenaOutputStreamTool.cxx.

363 {
364 ATH_MSG_DEBUG("In streamObjects");
365 // Check that a connection has been opened
366 if (!m_connectionOpen) {
367 ATH_MSG_ERROR("Connection NOT open. Please open a connection before streaming out objects.");
368 return(StatusCode::FAILURE);
369 }
370 // Use arg if not empty, save the output name
371 if (!outputName.empty()) {
372 m_outputName.setValue(outputName);
373 }
374 if (m_outputName.value().empty()) {
375 ATH_MSG_ERROR("No OutputName provided");
376 return(StatusCode::FAILURE);
377 }
378 // Now iterate over the type/key pairs and stream out each object
379 std::vector<DataObject*> dataObjects;
380 for (TypeKeyPairs::const_iterator first = typeKeys.begin(), last = typeKeys.end();
381 first != last; ++first) {
382 const std::string& type = (*first).first;
383 const std::string& key = (*first).second;
384 // Find the clid for type name from the CLIDSvc
385 CLID clid;
386 if (m_clidSvc->getIDOfTypeName(type, clid).isFailure()) {
387 ATH_MSG_ERROR("Could not get clid for typeName " << type);
388 return(StatusCode::FAILURE);
389 }
390 DataObject* dObj = 0;
391 // Two options: no key or explicit key
392 if (key.empty()) {
393 ATH_MSG_DEBUG("Get data object with no key");
394 // Get DataObject without key
395 dObj = m_store->accessData(clid);
396 } else {
397 ATH_MSG_DEBUG("Get data object with key");
398 // Get DataObjects with key
399 dObj = m_store->accessData(clid, key);
400 }
401 if (dObj == 0) {
402 // No object - print warning and return
403 ATH_MSG_DEBUG("No object found for type " << type << " key " << key);
404 return(StatusCode::SUCCESS);
405 } else {
406 ATH_MSG_DEBUG("Found object for type " << type << " key " << key);
407 }
408 // Save the dObj
409 dataObjects.push_back(dObj);
410 }
411 // Stream out objects
412 if (dataObjects.size() == 0) {
413 ATH_MSG_DEBUG("No data objects found");
414 return(StatusCode::SUCCESS);
415 }
416 StatusCode status = streamObjects(dataObjects, m_outputName.value());
417 if (!status.isSuccess()) {
418 ATH_MSG_ERROR("Could not stream out objects");
419 return(status);
420 }
421 return(StatusCode::SUCCESS);
422}
virtual StatusCode streamObjects(const TypeKeyPairs &typeKeys, const std::string &outputName="") override
::StatusCode StatusCode
StatusCode definition for legacy code.
status
Definition merge.py:16

Member Data Documentation

◆ m_attrListKey

SG::ReadHandleKey<AthenaAttributeList> AthenaOutputStreamTool::m_attrListKey {this, "AttributeListKey", "", "optional key for AttributeList to be written as part of the DataHeader: default = \"\""}
private

Definition at line 101 of file AthenaOutputStreamTool.h.

101{this, "AttributeListKey", "", "optional key for AttributeList to be written as part of the DataHeader: default = \"\""};

◆ m_attrListWrite

std::string AthenaOutputStreamTool::m_attrListWrite {""}
private

Definition at line 103 of file AthenaOutputStreamTool.h.

103{""};

◆ m_branchNameHint

StringProperty AthenaOutputStreamTool::m_branchNameHint { this, "SubLevelBranchName", "0", "naming hint policy for POOL branching: default = \"0\"" }
private

Definition at line 96 of file AthenaOutputStreamTool.h.

96{ this, "SubLevelBranchName", "0", "naming hint policy for POOL branching: default = \"0\"" };

◆ m_clidSvc

ServiceHandle<IClassIDSvc> AthenaOutputStreamTool::m_clidSvc
private

Ref to ClassIDSvc to convert type name to clid.

Definition at line 109 of file AthenaOutputStreamTool.h.

◆ m_connectionOpen

bool AthenaOutputStreamTool::m_connectionOpen {false}
private

Flag to tell whether connectOutput has been called.

Definition at line 115 of file AthenaOutputStreamTool.h.

115{false};

◆ m_containerNameHint

StringProperty AthenaOutputStreamTool::m_containerNameHint { this, "TopLevelContainerName", "0", "naming hint policy for top level POOL container: default = \"0\""}
private

Definition at line 93 of file AthenaOutputStreamTool.h.

93{ this, "TopLevelContainerName", "0", "naming hint policy for top level POOL container: default = \"0\""};

◆ m_containerPrefix

StringProperty AthenaOutputStreamTool::m_containerPrefix { this, "PoolContainerPrefix", "", "prefix for top level POOL container: default = \"CollectionTree\""}
private

Definition at line 92 of file AthenaOutputStreamTool.h.

92{ this, "PoolContainerPrefix", "", "prefix for top level POOL container: default = \"CollectionTree\""};

◆ m_conversionSvc

ServiceHandle<IConversionSvc> AthenaOutputStreamTool::m_conversionSvc { this, "ConversionService", "AthenaPoolCnvSvc" }
private

Keep reference to the data conversion service.

Definition at line 107 of file AthenaOutputStreamTool.h.

107{ this, "ConversionService", "AthenaPoolCnvSvc" };

◆ m_dataHeader

DataHeader* AthenaOutputStreamTool::m_dataHeader {nullptr}
private

Current DataHeader for streamed objects.

Definition at line 113 of file AthenaOutputStreamTool.h.

113{nullptr};

◆ m_dataHeaderKey

StringProperty AthenaOutputStreamTool::m_dataHeaderKey { this, "DataHeaderKey", "", "name of the data header key: defaults to tool name"}
private

Definition at line 89 of file AthenaOutputStreamTool.h.

89{ this, "DataHeaderKey", "", "name of the data header key: defaults to tool name"};

◆ m_decSvc

ServiceHandle<IDecisionSvc> AthenaOutputStreamTool::m_decSvc
private

Ref to DecisionSvc.

Definition at line 111 of file AthenaOutputStreamTool.h.

◆ m_extend

BooleanProperty AthenaOutputStreamTool::m_extend { this, "SaveDecisions", false, "Set to true to add streaming decisions to an attributeList"}
private

Definition at line 97 of file AthenaOutputStreamTool.h.

97{ this, "SaveDecisions", false, "Set to true to add streaming decisions to an attributeList"};

◆ m_extendProvenanceRecord

bool AthenaOutputStreamTool::m_extendProvenanceRecord {false}
private

Flag as to whether to extend provenance via the DataHeader.

Definition at line 118 of file AthenaOutputStreamTool.h.

118{false};

◆ m_keepProvenanceMatch

std::map<std::string, bool> AthenaOutputStreamTool::m_keepProvenanceMatch
private

Cache provenance RegEx matching result in a map.

Definition at line 124 of file AthenaOutputStreamTool.h.

◆ m_keepProvenancesRE

std::regex AthenaOutputStreamTool::m_keepProvenancesRE
private

RegEx pattern created from m_keepProvenancesStr.

Definition at line 122 of file AthenaOutputStreamTool.h.

◆ m_keepProvenancesStr

std::string AthenaOutputStreamTool::m_keepProvenancesStr
private

RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStream property.

Definition at line 120 of file AthenaOutputStreamTool.h.

◆ m_metaDataContainerPrefix

StringProperty AthenaOutputStreamTool::m_metaDataContainerPrefix { this, "MetaDataPoolContainerPrefix", "", "prefix for top level MetaData container: default = "" (will result in \"MetaData\")"}
private

Definition at line 95 of file AthenaOutputStreamTool.h.

95{ this, "MetaDataPoolContainerPrefix", "", "prefix for top level MetaData container: default = "" (will result in \"MetaData\")"};

◆ m_metaDataOutputAttributes

std::string AthenaOutputStreamTool::m_metaDataOutputAttributes
private

Definition at line 100 of file AthenaOutputStreamTool.h.

◆ m_metaDataOutputCollection

StringProperty AthenaOutputStreamTool::m_metaDataOutputCollection { this, "MetaDataOutputCollection", "", "custom container name prefix for MetaDataHeader: default = "" (will result in \"MetaDataHdr\")"}
private

Definition at line 94 of file AthenaOutputStreamTool.h.

94{ this, "MetaDataOutputCollection", "", "custom container name prefix for MetaDataHeader: default = "" (will result in \"MetaDataHdr\")"};

◆ m_outputAttributes

std::string AthenaOutputStreamTool::m_outputAttributes
private

Definition at line 99 of file AthenaOutputStreamTool.h.

◆ m_outputCollection

StringProperty AthenaOutputStreamTool::m_outputCollection { this, "OutputCollection", "", "custom container name prefix for DataHeader: default = "" (will result in \"POOLContainer_\")"}
private

Definition at line 91 of file AthenaOutputStreamTool.h.

91{ this, "OutputCollection", "", "custom container name prefix for DataHeader: default = "" (will result in \"POOLContainer_\")"};

◆ m_outputName

StringProperty AthenaOutputStreamTool::m_outputName { this, "OutputFile", "", "name of the output db name"}
private

Definition at line 88 of file AthenaOutputStreamTool.h.

88{ this, "OutputFile", "", "name of the output db name"};

◆ m_processTag

StringProperty AthenaOutputStreamTool::m_processTag { this, "ProcessingTag", "", "tag of processing stage: defaults to SG key of DataHeader (Stream name)"}
private

Definition at line 90 of file AthenaOutputStreamTool.h.

90{ this, "ProcessingTag", "", "tag of processing stage: defaults to SG key of DataHeader (Stream name)"};

◆ m_skippedItems

std::set<std::string> AthenaOutputStreamTool::m_skippedItems
private

set of skipped item keys, because of missing CLID

Definition at line 127 of file AthenaOutputStreamTool.h.

◆ m_store

ServiceHandle<StoreGateSvc> AthenaOutputStreamTool::m_store { this, "Store", "StoreGateSvc/DetectorStore", "Pointer to the data store"}
private

Definition at line 105 of file AthenaOutputStreamTool.h.

105{ this, "Store", "StoreGateSvc/DetectorStore", "Pointer to the data store"};

The documentation for this class was generated from the following files: