ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaOutputStreamTool Class Reference

This is the implementation of IAthenaOutputStreamTool. More...

#include <AthenaOutputStreamTool.h>

Inheritance diagram for AthenaOutputStreamTool:

Public Types

typedef std::pair< std::string, std::string > TypeKeyPair
 Stream out objects.
typedef std::vector< TypeKeyPairTypeKeyPairs
typedef std::vector< DataObject * > DataObjectVec
 Stream out a vector of objects Must convert to DataObject, e.g.

Public Member Functions

 AthenaOutputStreamTool (const std::string &type, const std::string &name, const IInterface *parent)
 Standard AlgTool Constructor.
virtual StatusCode initialize () override
 AthAlgTool Interface method implementations:
StatusCode connectServices (const std::string &dataStore, const std::string &cnvSvc, bool extendProvenenceRecord) override
 Specify which data store and conversion service to use and whether to extend provenance Only use if one wants to override jobOptions.
StatusCode connectOutput (const std::string &outputName="") override
 Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wants to override jobOptions.
StatusCode commitOutput (bool doCommit=false) override
 Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.
StatusCode finalizeOutput () override
 Finalize the output stream after the last commit, e.g.
virtual StatusCode streamObjects (const TypeKeyPairs &typeKeys, const std::string &outputName="") override
virtual StatusCode streamObjects (const DataObjectVec &dataObjects, const std::string &outputName="") override
virtual StatusCode getInputItemList (SG::IFolder *m_p2BWrittenFromTool) override

Private Member Functions

StatusCode connectServices ()
 Do the real connection to services.
void propagateProvenance (const DataHeader &src_dh)
 copy provenance records when creating new DataHeaders

Private Attributes

StringProperty m_outputName { this, "OutputFile", "", "name of the output db name"}
StringProperty m_dataHeaderKey { this, "DataHeaderKey", "", "name of the data header key: defaults to tool name"}
StringProperty m_processTag { this, "ProcessingTag", "", "tag of processing stage: defaults to SG key of DataHeader (Stream name)"}
StringProperty m_outputCollection { this, "OutputCollection", "", "custom container name prefix for DataHeader: default = "" (will result in \"POOLContainer_\")"}
StringProperty m_containerPrefix { this, "PoolContainerPrefix", "", "prefix for top level POOL container: default = \"CollectionTree\""}
StringProperty m_containerNameHint { this, "TopLevelContainerName", "0", "naming hint policy for top level POOL container: default = \"0\""}
StringProperty m_metaDataOutputCollection { this, "MetaDataOutputCollection", "", "custom container name prefix for MetaDataHeader: default = "" (will result in \"MetaDataHdr\")"}
StringProperty m_metaDataContainerPrefix { this, "MetaDataPoolContainerPrefix", "", "prefix for top level MetaData container: default = "" (will result in \"MetaData\")"}
StringProperty m_branchNameHint { this, "SubLevelBranchName", "0", "naming hint policy for POOL branching: default = \"0\"" }
BooleanProperty m_extend { this, "SaveDecisions", false, "Set to true to add streaming decisions to an attributeList"}
std::string m_outputAttributes
std::string m_metaDataOutputAttributes
SG::ReadHandleKey< AthenaAttributeListm_attrListKey {this, "AttributeListKey", "", "optional key for AttributeList to be written as part of the DataHeader: default = \"\""}
std::string m_attrListWrite {""}
ServiceHandle< StoreGateSvcm_store { this, "Store", "StoreGateSvc/DetectorStore", "Pointer to the data store"}
ServiceHandle< IConversionSvc > m_conversionSvc { this, "ConversionService", "AthenaPoolCnvSvc" }
 Keep reference to the data conversion service.
ServiceHandle< IClassIDSvc > m_clidSvc
 Ref to ClassIDSvc to convert type name to clid.
ServiceHandle< IDecisionSvcm_decSvc
 Ref to DecisionSvc.
DataHeaderm_dataHeader {nullptr}
 Current DataHeader for streamed objects.
bool m_connectionOpen {false}
 Flag to tell whether connectOutput has been called.
bool m_extendProvenanceRecord {false}
 Flag as to whether to extend provenance via the DataHeader.
std::string m_keepProvenancesStr
 RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStream property.
std::regex m_keepProvenancesRE
 RegEx pattern created from m_keepProvenancesStr.
std::map< std::string, bool > m_keepProvenanceMatch
 Cache provenance RegEx matching result in a map.
std::set< std::string > m_skippedItems
 set of skipped item keys, because of missing CLID

Detailed Description

This is the implementation of IAthenaOutputStreamTool.

Definition at line 34 of file AthenaOutputStreamTool.h.

Member Typedef Documentation

◆ DataObjectVec

typedef std::vector<DataObject*> AthenaOutputStreamTool::DataObjectVec

Stream out a vector of objects Must convert to DataObject, e.g.

#include "AthenaKernel/StorableConversions.h" T* obj = xxx; DataObject* dataObject = SG::asStorable(obj);

Definition at line 76 of file AthenaOutputStreamTool.h.

◆ TypeKeyPair

typedef std::pair<std::string, std::string> AthenaOutputStreamTool::TypeKeyPair

Stream out objects.

Provide vector of typeName/key pairs. If key is empty, assumes only one object and this will fail if there is more than one

Definition at line 67 of file AthenaOutputStreamTool.h.

◆ TypeKeyPairs

Definition at line 68 of file AthenaOutputStreamTool.h.

Constructor & Destructor Documentation

◆ AthenaOutputStreamTool()

AthenaOutputStreamTool::AthenaOutputStreamTool ( const std::string & type,
const std::string & name,
const IInterface * parent )

Standard AlgTool Constructor.

Constructor.

Definition at line 43 of file AthenaOutputStreamTool.cxx.

45 : base_class(type, name, parent),
46 m_clidSvc("ClassIDSvc", name),
47 m_decSvc("DecisionSvc/DecisionSvc", name) {
48}
ServiceHandle< IDecisionSvc > m_decSvc
Ref to DecisionSvc.
ServiceHandle< IClassIDSvc > m_clidSvc
Ref to ClassIDSvc to convert type name to clid.

Member Function Documentation

◆ commitOutput()

StatusCode AthenaOutputStreamTool::commitOutput ( bool doCommit = false)
override

Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.

Definition at line 338 of file AthenaOutputStreamTool.cxx.

338 {
339 ATH_MSG_DEBUG("In commitOutput");
340 // Connect the output file to the service
341 if (m_conversionSvc->commitOutput(m_outputName.value(), doCommit).isFailure()) {
342 ATH_MSG_ERROR("Unable to commit output " << m_outputName.value());
343 return(StatusCode::FAILURE);
344 }
345 // Set flag that connection is closed
346 m_connectionOpen = false;
347 return(StatusCode::SUCCESS);
348}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_DEBUG(x)
bool m_connectionOpen
Flag to tell whether connectOutput has been called.
ServiceHandle< IConversionSvc > m_conversionSvc
Keep reference to the data conversion service.

◆ connectOutput()

StatusCode AthenaOutputStreamTool::connectOutput ( const std::string & outputName = "")
override

Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wants to override jobOptions.

Definition at line 159 of file AthenaOutputStreamTool.cxx.

159 {
160 ATH_MSG_DEBUG("In connectOutput " << outputName);
161
162 // Use arg if not empty, save the output name
163 if (!outputName.empty()) {
164 m_outputName.setValue(outputName);
165 }
166 if (m_outputName.value().empty()) {
167 ATH_MSG_ERROR("No OutputName provided");
168 return(StatusCode::FAILURE);
169 }
170 // Connect services if not already available
171 if (m_store == 0 || m_conversionSvc == 0) {
173 }
174 // Connect the output file to the service
175 if (m_conversionSvc->connectOutput(m_outputName.value()).isFailure()) {
176 ATH_MSG_ERROR("Unable to connect output " << m_outputName.value());
177 return(StatusCode::FAILURE);
178 } else {
179 ATH_MSG_DEBUG("Connected to " << m_outputName.value());
180 }
181
182 // Remove DataHeader with same key if it exists
183 if (m_store->contains<DataHeader>(m_dataHeaderKey)) {
184 const DataHeader* preDh = nullptr;
185 if (m_store->retrieve(preDh, m_dataHeaderKey).isSuccess()) {
186 if (m_store->removeDataAndProxy(preDh).isFailure()) {
187 ATH_MSG_ERROR("Unable to get proxy for the DataHeader with key " << m_dataHeaderKey);
188 return(StatusCode::FAILURE);
189 }
190 ATH_MSG_DEBUG("Released DataHeader with key " << m_dataHeaderKey);
191 }
192 }
193
194 // Create new DataHeader
195 m_dataHeader = new DataHeader();
196 m_dataHeader->setProcessTag(m_processTag);
197
198 // Retrieve all existing DataHeaders from StoreGate
199 const DataHeader* dh = nullptr;
200 std::vector<std::string> dhKeys;
201 m_store->keys<DataHeader>(dhKeys);
202 for (const std::string& dhKey : dhKeys) {
203 bool primaryDH = false;
204 if (!m_store->transientContains<DataHeader>(dhKey)) {
205 if (dhKey == "EventSelector") primaryDH = true;
206 ATH_MSG_DEBUG("No transientContains DataHeader with key " << dhKey);
207 }
208 if (m_store->retrieve(dh, dhKey).isFailure()) {
209 ATH_MSG_DEBUG("Unable to retrieve the DataHeader with key " << dhKey);
210 }
211 if (dh->isInput() || hasInputAlias(*m_store->proxy(dh)) || primaryDH) {
212 propagateProvenance( *dh );
213 }
214 }
215
216 // Attach the attribute list to the DataHeader if requested
217 if (!m_attrListKey.key().empty() && m_store->storeID() == StoreID::EVENT_STORE) {
218 auto attrListHandle = SG::makeHandle(m_attrListKey);
219 if (!attrListHandle.isValid()) {
220 ATH_MSG_WARNING("Unable to retrieve AttributeList with key " << m_attrListKey);
221 } else {
222 m_dataHeader->setAttributeList(attrListHandle.cptr());
223 if (m_extend) { // Add streaming decisions
224 ATH_MSG_DEBUG("Adding stream decisions to " << m_attrListWrite);
225 // Look for attribute list created for mini-EventInfo
226 const AthenaAttributeList* attlist(attrListHandle.cptr());
227
228 // Build new attribute list for modification
229 AthenaAttributeList* newone = new AthenaAttributeList(attlist->specification());
230 newone->copyData(*attlist);
231
232 // Now loop over stream definitions and add decisions
233 auto streams = m_decSvc->getStreams();
234 for (auto it = streams.begin();
235 it != streams.end(); ++it) {
236 newone->extend(*it,"bool");
237 (*newone)[*it].data<bool>() = m_decSvc->isEventAccepted(*it,Gaudi::Hive::currentContext());
238 ATH_MSG_DEBUG("Added stream decision for " << *it << " to " << m_attrListKey);
239 }
240 // record new attribute list with old key + suffix
241 const AthenaAttributeList* attrList2 = nullptr;
242 if (!m_store->contains<AthenaAttributeList>(m_attrListWrite)) {
243 if (m_store->record(newone,m_attrListWrite).isFailure()) {
244 ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
245 }
246 } else {
247 ATH_MSG_DEBUG("Decisions already added by a different stream");
248 }
249 if (m_store->retrieve(attrList2,m_attrListWrite).isFailure()) {
250 ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
251 } else {
252 m_dataHeader->setAttributeList(attrList2);
253 }
254/*
255 SG::WriteHandle<AthenaAttributeList> attrWrite(m_attrListWrite);
256 std::unique_ptr<AthenaAttributeList> uptr = std::make_unique<AthenaAttributeList>(*newone);
257 if ( attrWrite.record(std::move(uptr)).isFailure() ) {
258 ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
259 } else {
260 ATH_MSG_DEBUG("Decisions already added by a different stream");
261 }
262*/
263 //m_dataHeader->setAttributeList(newone);
264 } // list extend check
265 } // list retrieve check
266 } // list property check
267
268 // Record DataHeader in StoreGate
269 SG::WriteHandle<DataHeader> wh(m_dataHeaderKey, m_store->name());
270 if (wh.record(std::unique_ptr<DataHeader>(m_dataHeader)).isFailure()) {
271 ATH_MSG_ERROR("Unable to record DataHeader with key " << m_dataHeaderKey);
272 return(StatusCode::FAILURE);
273 } else {
274 ATH_MSG_DEBUG("Recorded DataHeader with key " << m_dataHeaderKey);
275 }
277 // Set flag that connection is open
278 m_connectionOpen = true;
279 return(StatusCode::SUCCESS);
280}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)
DataHeader * m_dataHeader
Current DataHeader for streamed objects.
StatusCode connectServices()
Do the real connection to services.
void propagateProvenance(const DataHeader &src_dh)
copy provenance records when creating new DataHeaders
ServiceHandle< StoreGateSvc > m_store
SG::ReadHandleKey< AthenaAttributeList > m_attrListKey
@ EVENT_STORE
Definition StoreID.h:26
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
str outputName
Definition lumiFormat.py:65
str wh
Definition parseDir.py:45

◆ connectServices() [1/2]

StatusCode AthenaOutputStreamTool::connectServices ( )
private

Do the real connection to services.

Definition at line 150 of file AthenaOutputStreamTool.cxx.

150 {
151 // Find the data store
152 if (m_store.retrieve().isFailure() || m_store == 0) {
153 ATH_MSG_ERROR("Could not locate " << m_store.typeAndName() << " store");
154 return(StatusCode::FAILURE);
155 }
156 return(StatusCode::SUCCESS);
157}

◆ connectServices() [2/2]

StatusCode AthenaOutputStreamTool::connectServices ( const std::string & dataStore,
const std::string & cnvSvc,
bool extendProvenenceRecord )
override

Specify which data store and conversion service to use and whether to extend provenance Only use if one wants to override jobOptions.

Definition at line 116 of file AthenaOutputStreamTool.cxx.

118 {
119 // Release old data store
120 if (m_store.isValid()) {
121 if (m_store.release().isFailure()) {
122 ATH_MSG_ERROR("Could not release " << m_store.typeAndName() << " store");
123 }
124 }
125 m_store = ServiceHandle<StoreGateSvc>(dataStore, this->name());
126 if (cnvSvc != m_conversionSvc.type() && cnvSvc != "EventPersistencySvc") {
127 if (m_conversionSvc.release().isFailure()) {
128 ATH_MSG_ERROR("Could not release " << m_conversionSvc.type());
129 }
130 m_conversionSvc = ServiceHandle<IConversionSvc>(cnvSvc, this->name());
131 if (m_conversionSvc.retrieve().isFailure() || m_conversionSvc == 0) {
132 ATH_MSG_ERROR("Could not locate " << m_conversionSvc.type());
133 return(StatusCode::FAILURE);
134 }
135 }
136 m_extendProvenanceRecord = extendProvenenceRecord;
137 auto pprop = dynamic_cast<const IProperty*>(parent());
138 if (not pprop){
139 ATH_MSG_ERROR("'parent' could not be cast to IProperty");
140 return(StatusCode::FAILURE);
141 }
142 auto keep = dynamic_cast<const StringProperty&>( pprop->getProperty("KeepProvenanceTagsRegEx") );
143 m_keepProvenancesStr = keep.value();
144 // create RegEx pattern from the property value, specify extended grammar
145 m_keepProvenancesRE = std::regex(m_keepProvenancesStr, std::regex::extended);
146
147 return(connectServices());
148}
bool m_extendProvenanceRecord
Flag as to whether to extend provenance via the DataHeader.
std::regex m_keepProvenancesRE
RegEx pattern created from m_keepProvenancesStr.
std::string m_keepProvenancesStr
RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStrea...

◆ finalizeOutput()

StatusCode AthenaOutputStreamTool::finalizeOutput ( )
override

Finalize the output stream after the last commit, e.g.

in finalize

Definition at line 350 of file AthenaOutputStreamTool.cxx.

350 {
351 AthCnvSvc* athConversionSvc = dynamic_cast<AthCnvSvc*>(m_conversionSvc.get());
352 if (athConversionSvc != 0) {
353 if (athConversionSvc->disconnectOutput(m_outputName.value()).isFailure()) {
354 ATH_MSG_ERROR("Unable to finalize output " << m_outputName.value());
355 return(StatusCode::FAILURE);
356 }
357 }
358 return(StatusCode::SUCCESS);
359}
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.

◆ getInputItemList()

StatusCode AthenaOutputStreamTool::getInputItemList ( SG::IFolder * m_p2BWrittenFromTool)
overridevirtual

Definition at line 533 of file AthenaOutputStreamTool.cxx.

533 {
534 const std::string hltKey = "HLTAutoKey";
535 SG::ConstIterator<DataHeader> beg;
536 SG::ConstIterator<DataHeader> ending;
537 if (m_store->retrieve(beg, ending).isFailure() || beg == ending) {
538 ATH_MSG_DEBUG("No DataHeaders present in StoreGate");
539 } else {
540 for ( ; beg != ending; ++beg) {
541 if (m_store->transientContains<DataHeader>(beg.key()) && beg->isInput()) {
542 for (std::vector<DataHeaderElement>::const_iterator it = beg->begin(), itLast = beg->end();
543 it != itLast; ++it) {
544 // Only insert the primary clid, not the ones for the symlinks!
545 CLID clid = it->getPrimaryClassID();
546 if (clid != ClassID_traits<DataHeader>::ID()) {
547 //check the typename is known ... we make an exception if the key contains 'Aux.' ... aux containers may not have their keys known yet in some cases
548 //see https://its.cern.ch/jira/browse/ATLASG-59 for the solution
549 std::string typeName;
550 if (m_clidSvc->getTypeNameOfID(clid, typeName).isFailure() && it->getKey().find("Aux.") == std::string::npos) {
551 if (m_skippedItems.find(it->getKey()) == m_skippedItems.end()) {
552 ATH_MSG_WARNING("Skipping " << it->getKey() << " with unknown clid " << clid << " . Further warnings for this item are suppressed" );
553 m_skippedItems.insert(it->getKey());
554 }
555 continue;
556 }
557 ATH_MSG_DEBUG("Adding " << typeName << "#" << it->getKey() << " (clid " << clid << ") to itemlist");
558 const std::string keyName = it->getKey();
559 if (keyName.size() > 10 && keyName.compare(0, 10,hltKey)==0) {
560 p2BWrittenFromTool->add(clid, hltKey + "*").ignore();
561 } else if (keyName.size() > 10 && keyName.compare(keyName.size() - 10, 10, hltKey)==0) {
562 p2BWrittenFromTool->add(clid, "*" + hltKey).ignore();
563 } else {
564 p2BWrittenFromTool->add(clid, keyName).ignore();
565 }
566 }
567 }
568 }
569 }
570 }
571 ATH_MSG_DEBUG("Adding DataHeader for stream " << name());
572 return(StatusCode::SUCCESS);
573}
uint32_t CLID
The Class ID type.
std::set< std::string > m_skippedItems
set of skipped item keys, because of missing CLID

◆ initialize()

StatusCode AthenaOutputStreamTool::initialize ( )
overridevirtual

AthAlgTool Interface method implementations:

Definition at line 50 of file AthenaOutputStreamTool.cxx.

50 {
51 ATH_MSG_INFO("Initializing " << name());
52
53 ATH_CHECK( m_clidSvc.retrieve() );
54 ATH_CHECK( m_conversionSvc.retrieve() );
55
56 // Autoconfigure
57 if (m_dataHeaderKey.empty()) {
58 m_dataHeaderKey.setValue(name());
59 // Remove "ToolSvc." from m_dataHeaderKey.
60 if (m_dataHeaderKey.value().starts_with( "ToolSvc.")) {
61 m_dataHeaderKey.setValue(m_dataHeaderKey.value().substr(8));
62 // Remove "Tool" from m_dataHeaderKey.
63 if (m_dataHeaderKey.value().find("Tool") == m_dataHeaderKey.size() - 4) {
64 m_dataHeaderKey.setValue(m_dataHeaderKey.value().substr(0, m_dataHeaderKey.size() - 4));
65 }
66 } else {
67 const INamedInterface* parentAlg = dynamic_cast<const INamedInterface*>(parent());
68 if (parentAlg != 0) {
69 m_dataHeaderKey.setValue(parentAlg->name());
70 }
71 }
72 }
73 if (m_processTag.empty()) {
75 }
76
77 { // handle the AttrKey overwrite
78 const std::string keyword = "[AttributeListKey=";
79 std::string::size_type pos = m_outputName.value().find(keyword);
80 if( (pos != std::string::npos) ) {
81 ATH_MSG_INFO("The AttrListKey will be overwritten/set by the value from the OutputName: " << m_outputName);
82 const std::string attrListKey = m_outputName.value().substr(pos + keyword.size(),
83 m_outputName.value().find(']', pos + keyword.size()) - pos - keyword.size());
84 m_attrListKey = attrListKey;
85 }
86 }
87 if ( ! m_attrListKey.key().empty() ) {
88 ATH_CHECK(m_attrListKey.initialize());
89 m_attrListWrite = m_attrListKey.key() + "Decisions";
90 //ATH_CHECK(m_attrListWrite.initialize());
91 }
92
93 if (!m_outputCollection.value().empty()) {
94 m_outputAttributes += "[OutputCollection=" + m_outputCollection.value() + "]";
95 }
96 if (!m_containerPrefix.value().empty()) {
97 m_outputAttributes += "[PoolContainerPrefix=" + m_containerPrefix.value() + "]";
98 }
99 if (m_containerNameHint.value() != "0") {
100 m_outputAttributes += "[TopLevelContainerName=" + m_containerNameHint.value() + "]";
101 }
102 if (m_branchNameHint.value() != "0") {
103 m_outputAttributes += "[SubLevelBranchName=" + m_branchNameHint.value() + "]";
104 }
105 if (!m_metaDataOutputCollection.value().empty()) {
106 m_metaDataOutputAttributes += "[OutputCollection=" + m_metaDataOutputCollection.value() + "]";
107 }
108 if (!m_metaDataContainerPrefix.value().empty()) {
109 m_metaDataOutputAttributes += "[PoolContainerPrefix=" + m_metaDataContainerPrefix.value() + "]";
110 }
111
112 if (m_extend) ATH_CHECK(m_decSvc.retrieve());
113 return(StatusCode::SUCCESS);
114}
#define ATH_MSG_INFO(x)
StringProperty m_metaDataContainerPrefix
StringProperty m_metaDataOutputCollection
std::string keyword(const std::string &classname)

◆ propagateProvenance()

void AthenaOutputStreamTool::propagateProvenance ( const DataHeader & src_dh)
private

copy provenance records when creating new DataHeaders

Definition at line 283 of file AthenaOutputStreamTool.cxx.

284{
285 // keep track of provenance entries inserted into the new DataHeader
286 std::set<std::string> insertedTags{};
287 // Add DataHeader token to the new DataHeader
289 std::string pTag;
290 std::unique_ptr<SG::TransientAddress> dhTransAddr;
291 for (const DataHeaderElement& dhe : src_dh) {
292 if (dhe.getPrimaryClassID() == ClassID_traits<DataHeader>::ID()) {
293 pTag = dhe.getKey();
294 dhTransAddr.reset( dhe.getAddress( m_conversionSvc->repSvcType() ) );
295 }
296 }
297 // Update dhTransAddr to handle fast merged files.
298 if( auto dhProxy=m_store->proxy(&src_dh); dhProxy && dhProxy->address() ) {
299 DataHeaderElement dhe(dhProxy, dhProxy->address(), pTag);
300 m_dataHeader->insertProvenance(dhe);
301 insertedTags.insert(std::move(pTag));
302 }
303 else if( dhTransAddr ) {
304 DataHeaderElement dhe(dhTransAddr.get(), dhTransAddr->address(), pTag);
305 m_dataHeader->insertProvenance(dhe);
306 insertedTags.insert(std::move(pTag));
307 }
308 }
309
310 // empty regexpr means do not keep any provenance
311 if( !m_keepProvenancesStr.empty() ) {
312 // Each stream tag is written only once in the provenance record
313 // In files where there are multiple entries per stream tag
314 // the record is in reverse, i.e., the latest appears first.
315 // Therefore, only keep the first entry if there are multiple
316 // matches so that we retain the latest one.
317 for(auto iter=src_dh.beginProvenance(), iEnd=src_dh.endProvenance(); iter != iEnd; ++iter) {
318 const auto & currentKey = (*iter).getKey();
319 if( insertedTags.insert(currentKey).second ) {
320 // first prov with that tag. Now check if we want to keep that tag
321 bool keep = false;
322 auto it = m_keepProvenanceMatch.find( currentKey );
323 if( it != m_keepProvenanceMatch.end() ) {
324 keep = it->second;
325 } else {
326 keep = std::regex_search(currentKey, m_keepProvenancesRE);
327 m_keepProvenanceMatch[currentKey] = keep;
328 }
329 if( keep ) {
330 m_dataHeader->insertProvenance(*iter);
331 }
332 }
333 }
334 }
335}
std::map< std::string, bool > m_keepProvenanceMatch
Cache provenance RegEx matching result in a map.

◆ streamObjects() [1/2]

StatusCode AthenaOutputStreamTool::streamObjects ( const DataObjectVec & dataObjects,
const std::string & outputName = "" )
overridevirtual

Definition at line 422 of file AthenaOutputStreamTool.cxx.

422 {
423 // Check that a connection has been opened
424 if (!m_connectionOpen) {
425 ATH_MSG_ERROR("Connection NOT open. Please open a connection before streaming out objects.");
426 return(StatusCode::FAILURE);
427 }
428 // Connect the output file to the service
429 std::string outputConnectionString = outputName;
430 const std::string defaultMetaDataString = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData]";
431 if (std::string::size_type mpos = outputConnectionString.find(defaultMetaDataString); mpos!=std::string::npos) {
432 // If we're in here we're writing MetaData
433 // Now let's see if we should be overwriting the MetaData attributes
434 // For the time-being this happens when we're writing MetaData in the augmentation mode
435 if (!m_metaDataOutputAttributes.empty()) {
436 // Note: This won't work quite right if only one attribute is set though!
437 outputConnectionString.replace(mpos, defaultMetaDataString.length(), m_metaDataOutputAttributes);
438 }
439 }
440 for (std::string::size_type pos = m_outputAttributes.find('['); pos != std::string::npos; pos = m_outputAttributes.find('[', ++pos)) {
441 if (outputConnectionString.find(m_outputAttributes.substr(pos, m_outputAttributes.find('=', pos) + 1 - pos)) == std::string::npos) {
442 outputConnectionString += m_outputAttributes.substr(pos, m_outputAttributes.find(']', pos) + 1 - pos);
443 }
444 }
445
446 // Check that the DataHeader is still valid
447 DataObject* dataHeaderObj = m_store->accessData(ClassID_traits<DataHeader>::ID(), m_dataHeaderKey);
448 std::map<DataObject*, IOpaqueAddress*> written;
449 for (DataObject* dobj : dataObjects) {
450 // Do not write the DataHeader via the explicit list
451 if (dobj->clID() == ClassID_traits<DataHeader>::ID()) {
452 ATH_MSG_DEBUG("Explicit request to write DataHeader: " << dobj->name() << " - skipping it.");
453 // Do not stream out same object twice
454 } else if (written.find(dobj) != written.end()) {
455 // Print warning and skip
456 ATH_MSG_DEBUG("Trying to write DataObject twice (clid/key): " << dobj->clID() << " " << dobj->name());
457 ATH_MSG_DEBUG(" Skipping this one.");
458 } else {
459 // Write object
460 IOpaqueAddress* addr = new TokenAddress(0, dobj->clID(), outputConnectionString);
461 addr->addRef();
462 if (m_conversionSvc->createRep(dobj, addr).isSuccess()) {
463 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dobj, addr));
464 } else {
465 ATH_MSG_ERROR("Could not create Rep for DataObject (clid/key):" << dobj->clID() << " " << dobj->name());
466 return(StatusCode::FAILURE);
467 }
468 }
469 }
470 // End of loop over DataObjects, write DataHeader
471 if ((m_conversionSvc.type() == "AthenaPoolCnvSvc" || m_conversionSvc.type() == "AthenaPoolSharedIOCnvSvc") && dataHeaderObj != nullptr) {
472 IOpaqueAddress* addr = new TokenAddress(0, dataHeaderObj->clID(), outputConnectionString);
473 addr->addRef();
474 if (m_conversionSvc->createRep(dataHeaderObj, addr).isSuccess()) {
475 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dataHeaderObj, addr));
476 } else {
477 ATH_MSG_ERROR("Could not create Rep for DataHeader");
478 return(StatusCode::FAILURE);
479 }
480 }
481 for (DataObject* dobj : dataObjects) {
482 // call fillRepRefs of persistency service
483 SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(dobj->registry());
484 if (proxy != nullptr && written.find(dobj) != written.end()) {
485 IOpaqueAddress* addr(written.find(dobj)->second);
486 if ((m_conversionSvc->fillRepRefs(addr, dobj)).isSuccess()) {
487 if (dobj->clID() != 1 || addr->par()[0] != "\n") {
488 if (dobj->clID() != ClassID_traits<DataHeader>::ID()) {
489 m_dataHeader->insert(proxy, addr);
490 } else {
491 m_dataHeader->insert(proxy, addr, m_processTag);
492 }
493 if (proxy->address() == nullptr) {
494 proxy->setAddress(addr);
495 }
496 addr->release();
497 }
498 } else {
499 ATH_MSG_ERROR("Could not fill Object Refs for DataObject (clid/key):" << dobj->clID() << " " << dobj->name());
500 return(StatusCode::FAILURE);
501 }
502 } else {
503 ATH_MSG_WARNING("Could cast DataObject " << dobj->clID() << " " << dobj->name());
504 }
505 }
506 m_dataHeader->addHash(&*m_store);
507 if ((m_conversionSvc.type() == "AthenaPoolCnvSvc" || m_conversionSvc.type() == "AthenaPoolSharedIOCnvSvc") && dataHeaderObj != nullptr) {
508 // End of DataObjects, fill refs for DataHeader
509 SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(dataHeaderObj->registry());
510 if (proxy != nullptr && written.find(dataHeaderObj) != written.end()) {
511 IOpaqueAddress* addr(written.find(dataHeaderObj)->second);
512 if ((m_conversionSvc->fillRepRefs(addr, dataHeaderObj)).isSuccess()) {
513 if (dataHeaderObj->clID() != 1 || addr->par()[0] != "\n") {
514 if (dataHeaderObj->clID() != ClassID_traits<DataHeader>::ID()) {
515 m_dataHeader->insert(proxy, addr);
516 } else {
517 m_dataHeader->insert(proxy, addr, m_processTag);
518 }
519 addr->release();
520 }
521 } else {
522 ATH_MSG_ERROR("Could not fill Object Refs for DataHeader");
523 return(StatusCode::FAILURE);
524 }
525 } else {
526 ATH_MSG_ERROR("Could not cast DataHeader");
527 return(StatusCode::FAILURE);
528 }
529 }
530 return(StatusCode::SUCCESS);
531}

◆ streamObjects() [2/2]

StatusCode AthenaOutputStreamTool::streamObjects ( const TypeKeyPairs & typeKeys,
const std::string & outputName = "" )
overridevirtual

Definition at line 361 of file AthenaOutputStreamTool.cxx.

361 {
362 ATH_MSG_DEBUG("In streamObjects");
363 // Check that a connection has been opened
364 if (!m_connectionOpen) {
365 ATH_MSG_ERROR("Connection NOT open. Please open a connection before streaming out objects.");
366 return(StatusCode::FAILURE);
367 }
368 // Use arg if not empty, save the output name
369 if (!outputName.empty()) {
370 m_outputName.setValue(outputName);
371 }
372 if (m_outputName.value().empty()) {
373 ATH_MSG_ERROR("No OutputName provided");
374 return(StatusCode::FAILURE);
375 }
376 // Now iterate over the type/key pairs and stream out each object
377 std::vector<DataObject*> dataObjects;
378 for (TypeKeyPairs::const_iterator first = typeKeys.begin(), last = typeKeys.end();
379 first != last; ++first) {
380 const std::string& type = (*first).first;
381 const std::string& key = (*first).second;
382 // Find the clid for type name from the CLIDSvc
383 CLID clid;
384 if (m_clidSvc->getIDOfTypeName(type, clid).isFailure()) {
385 ATH_MSG_ERROR("Could not get clid for typeName " << type);
386 return(StatusCode::FAILURE);
387 }
388 DataObject* dObj = 0;
389 // Two options: no key or explicit key
390 if (key.empty()) {
391 ATH_MSG_DEBUG("Get data object with no key");
392 // Get DataObject without key
393 dObj = m_store->accessData(clid);
394 } else {
395 ATH_MSG_DEBUG("Get data object with key");
396 // Get DataObjects with key
397 dObj = m_store->accessData(clid, key);
398 }
399 if (dObj == 0) {
400 // No object - print warning and return
401 ATH_MSG_DEBUG("No object found for type " << type << " key " << key);
402 return(StatusCode::SUCCESS);
403 } else {
404 ATH_MSG_DEBUG("Found object for type " << type << " key " << key);
405 }
406 // Save the dObj
407 dataObjects.push_back(dObj);
408 }
409 // Stream out objects
410 if (dataObjects.size() == 0) {
411 ATH_MSG_DEBUG("No data objects found");
412 return(StatusCode::SUCCESS);
413 }
414 StatusCode status = streamObjects(dataObjects, m_outputName.value());
415 if (!status.isSuccess()) {
416 ATH_MSG_ERROR("Could not stream out objects");
417 return(status);
418 }
419 return(StatusCode::SUCCESS);
420}
virtual StatusCode streamObjects(const TypeKeyPairs &typeKeys, const std::string &outputName="") override
::StatusCode StatusCode
StatusCode definition for legacy code.
status
Definition merge.py:16

Member Data Documentation

◆ m_attrListKey

SG::ReadHandleKey<AthenaAttributeList> AthenaOutputStreamTool::m_attrListKey {this, "AttributeListKey", "", "optional key for AttributeList to be written as part of the DataHeader: default = \"\""}
private

Definition at line 101 of file AthenaOutputStreamTool.h.

101{this, "AttributeListKey", "", "optional key for AttributeList to be written as part of the DataHeader: default = \"\""};

◆ m_attrListWrite

std::string AthenaOutputStreamTool::m_attrListWrite {""}
private

Definition at line 103 of file AthenaOutputStreamTool.h.

103{""};

◆ m_branchNameHint

StringProperty AthenaOutputStreamTool::m_branchNameHint { this, "SubLevelBranchName", "0", "naming hint policy for POOL branching: default = \"0\"" }
private

Definition at line 96 of file AthenaOutputStreamTool.h.

96{ this, "SubLevelBranchName", "0", "naming hint policy for POOL branching: default = \"0\"" };

◆ m_clidSvc

ServiceHandle<IClassIDSvc> AthenaOutputStreamTool::m_clidSvc
private

Ref to ClassIDSvc to convert type name to clid.

Definition at line 109 of file AthenaOutputStreamTool.h.

◆ m_connectionOpen

bool AthenaOutputStreamTool::m_connectionOpen {false}
private

Flag to tell whether connectOutput has been called.

Definition at line 115 of file AthenaOutputStreamTool.h.

115{false};

◆ m_containerNameHint

StringProperty AthenaOutputStreamTool::m_containerNameHint { this, "TopLevelContainerName", "0", "naming hint policy for top level POOL container: default = \"0\""}
private

Definition at line 93 of file AthenaOutputStreamTool.h.

93{ this, "TopLevelContainerName", "0", "naming hint policy for top level POOL container: default = \"0\""};

◆ m_containerPrefix

StringProperty AthenaOutputStreamTool::m_containerPrefix { this, "PoolContainerPrefix", "", "prefix for top level POOL container: default = \"CollectionTree\""}
private

Definition at line 92 of file AthenaOutputStreamTool.h.

92{ this, "PoolContainerPrefix", "", "prefix for top level POOL container: default = \"CollectionTree\""};

◆ m_conversionSvc

ServiceHandle<IConversionSvc> AthenaOutputStreamTool::m_conversionSvc { this, "ConversionService", "AthenaPoolCnvSvc" }
private

Keep reference to the data conversion service.

Definition at line 107 of file AthenaOutputStreamTool.h.

107{ this, "ConversionService", "AthenaPoolCnvSvc" };

◆ m_dataHeader

DataHeader* AthenaOutputStreamTool::m_dataHeader {nullptr}
private

Current DataHeader for streamed objects.

Definition at line 113 of file AthenaOutputStreamTool.h.

113{nullptr};

◆ m_dataHeaderKey

StringProperty AthenaOutputStreamTool::m_dataHeaderKey { this, "DataHeaderKey", "", "name of the data header key: defaults to tool name"}
private

Definition at line 89 of file AthenaOutputStreamTool.h.

89{ this, "DataHeaderKey", "", "name of the data header key: defaults to tool name"};

◆ m_decSvc

ServiceHandle<IDecisionSvc> AthenaOutputStreamTool::m_decSvc
private

Ref to DecisionSvc.

Definition at line 111 of file AthenaOutputStreamTool.h.

◆ m_extend

BooleanProperty AthenaOutputStreamTool::m_extend { this, "SaveDecisions", false, "Set to true to add streaming decisions to an attributeList"}
private

Definition at line 97 of file AthenaOutputStreamTool.h.

97{ this, "SaveDecisions", false, "Set to true to add streaming decisions to an attributeList"};

◆ m_extendProvenanceRecord

bool AthenaOutputStreamTool::m_extendProvenanceRecord {false}
private

Flag as to whether to extend provenance via the DataHeader.

Definition at line 118 of file AthenaOutputStreamTool.h.

118{false};

◆ m_keepProvenanceMatch

std::map<std::string, bool> AthenaOutputStreamTool::m_keepProvenanceMatch
private

Cache provenance RegEx matching result in a map.

Definition at line 124 of file AthenaOutputStreamTool.h.

◆ m_keepProvenancesRE

std::regex AthenaOutputStreamTool::m_keepProvenancesRE
private

RegEx pattern created from m_keepProvenancesStr.

Definition at line 122 of file AthenaOutputStreamTool.h.

◆ m_keepProvenancesStr

std::string AthenaOutputStreamTool::m_keepProvenancesStr
private

RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStream property.

Definition at line 120 of file AthenaOutputStreamTool.h.

◆ m_metaDataContainerPrefix

StringProperty AthenaOutputStreamTool::m_metaDataContainerPrefix { this, "MetaDataPoolContainerPrefix", "", "prefix for top level MetaData container: default = "" (will result in \"MetaData\")"}
private

Definition at line 95 of file AthenaOutputStreamTool.h.

95{ this, "MetaDataPoolContainerPrefix", "", "prefix for top level MetaData container: default = "" (will result in \"MetaData\")"};

◆ m_metaDataOutputAttributes

std::string AthenaOutputStreamTool::m_metaDataOutputAttributes
private

Definition at line 100 of file AthenaOutputStreamTool.h.

◆ m_metaDataOutputCollection

StringProperty AthenaOutputStreamTool::m_metaDataOutputCollection { this, "MetaDataOutputCollection", "", "custom container name prefix for MetaDataHeader: default = "" (will result in \"MetaDataHdr\")"}
private

Definition at line 94 of file AthenaOutputStreamTool.h.

94{ this, "MetaDataOutputCollection", "", "custom container name prefix for MetaDataHeader: default = "" (will result in \"MetaDataHdr\")"};

◆ m_outputAttributes

std::string AthenaOutputStreamTool::m_outputAttributes
private

Definition at line 99 of file AthenaOutputStreamTool.h.

◆ m_outputCollection

StringProperty AthenaOutputStreamTool::m_outputCollection { this, "OutputCollection", "", "custom container name prefix for DataHeader: default = "" (will result in \"POOLContainer_\")"}
private

Definition at line 91 of file AthenaOutputStreamTool.h.

91{ this, "OutputCollection", "", "custom container name prefix for DataHeader: default = "" (will result in \"POOLContainer_\")"};

◆ m_outputName

StringProperty AthenaOutputStreamTool::m_outputName { this, "OutputFile", "", "name of the output db name"}
private

Definition at line 88 of file AthenaOutputStreamTool.h.

88{ this, "OutputFile", "", "name of the output db name"};

◆ m_processTag

StringProperty AthenaOutputStreamTool::m_processTag { this, "ProcessingTag", "", "tag of processing stage: defaults to SG key of DataHeader (Stream name)"}
private

Definition at line 90 of file AthenaOutputStreamTool.h.

90{ this, "ProcessingTag", "", "tag of processing stage: defaults to SG key of DataHeader (Stream name)"};

◆ m_skippedItems

std::set<std::string> AthenaOutputStreamTool::m_skippedItems
private

set of skipped item keys, because of missing CLID

Definition at line 127 of file AthenaOutputStreamTool.h.

◆ m_store

ServiceHandle<StoreGateSvc> AthenaOutputStreamTool::m_store { this, "Store", "StoreGateSvc/DetectorStore", "Pointer to the data store"}
private

Definition at line 105 of file AthenaOutputStreamTool.h.

105{ this, "Store", "StoreGateSvc/DetectorStore", "Pointer to the data store"};

The documentation for this class was generated from the following files: