ATLAS Offline Software
Public Types | Public Member Functions | Private Member Functions | Private Attributes | List of all members
AthenaOutputStreamTool Class Reference

This is the implementation of IAthenaOutputStreamTool. More...

#include <AthenaOutputStreamTool.h>

Inheritance diagram for AthenaOutputStreamTool:
Collaboration diagram for AthenaOutputStreamTool:

Public Types

typedef std::pair< std::string, std::string > TypeKeyPair
 Stream out objects. More...
 
typedef std::vector< TypeKeyPairTypeKeyPairs
 
typedef std::vector< DataObject * > DataObjectVec
 Stream out a vector of objects Must convert to DataObject, e.g. More...
 

Public Member Functions

 AthenaOutputStreamTool (const std::string &type, const std::string &name, const IInterface *parent)
 Standard AlgTool Constructor. More...
 
virtual StatusCode initialize () override
 AthAlgTool Interface method implementations: More...
 
StatusCode connectServices (const std::string &dataStore, const std::string &cnvSvc, bool extendProvenenceRecord) override
 Specify which data store and conversion service to use and whether to extend provenance Only use if one wants to override jobOptions. More...
 
StatusCode connectOutput (const std::string &outputName="") override
 Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wants to override jobOptions. More...
 
StatusCode commitOutput (bool doCommit=false) override
 Commit the output stream after having streamed out objects Must commitOutput AFTER streaming. More...
 
StatusCode finalizeOutput () override
 Finalize the output stream after the last commit, e.g. More...
 
virtual StatusCode streamObjects (const TypeKeyPairs &typeKeys, const std::string &outputName="") override
 
virtual StatusCode streamObjects (const DataObjectVec &dataObjects, const std::string &outputName="") override
 
virtual StatusCode getInputItemList (SG::IFolder *m_p2BWrittenFromTool) override
 

Private Member Functions

StatusCode connectServices ()
 Do the real connection to services. More...
 
void propagateProvenance (const DataHeader &src_dh)
 copy provenance records when creating new DataHeaders More...
 

Private Attributes

StringProperty m_outputName { this, "OutputFile", "", "name of the output db name"}
 
StringProperty m_dataHeaderKey { this, "DataHeaderKey", "", "name of the data header key: defaults to tool name"}
 
StringProperty m_processTag { this, "ProcessingTag", "", "tag of processing stage: defaults to SG key of DataHeader (Stream name)"}
 
StringProperty m_outputCollection { this, "OutputCollection", "", "custom container name prefix for DataHeader: default = "" (will result in \"POOLContainer_\")"}
 
StringProperty m_containerPrefix { this, "PoolContainerPrefix", "", "prefix for top level POOL container: default = \"CollectionTree\""}
 
StringProperty m_containerNameHint { this, "TopLevelContainerName", "0", "naming hint policy for top level POOL container: default = \"0\""}
 
StringProperty m_metaDataOutputCollection { this, "MetaDataOutputCollection", "", "custom container name prefix for MetaDataHeader: default = "" (will result in \"MetaDataHdr\")"}
 
StringProperty m_metaDataContainerPrefix { this, "MetaDataPoolContainerPrefix", "", "prefix for top level MetaData container: default = "" (will result in \"MetaData\")"}
 
StringProperty m_branchNameHint { this, "SubLevelBranchName", "0", "naming hint policy for POOL branching: default = \"0\"" }
 
BooleanProperty m_extend { this, "SaveDecisions", false, "Set to true to add streaming decisions to an attributeList"}
 
std::string m_outputAttributes
 
std::string m_metaDataOutputAttributes
 
SG::ReadHandleKey< AthenaAttributeListm_attrListKey {this, "AttributeListKey", "", "optional key for AttributeList to be written as part of the DataHeader: default = \"\""}
 
std::string m_attrListWrite {""}
 
ServiceHandle< StoreGateSvcm_store { this, "Store", "StoreGateSvc/DetectorStore", "Pointer to the data store"}
 
ServiceHandle< IConversionSvc > m_conversionSvc { this, "ConversionService", "AthenaPoolCnvSvc" }
 Keep reference to the data conversion service. More...
 
ServiceHandle< IClassIDSvc > m_clidSvc
 Ref to ClassIDSvc to convert type name to clid. More...
 
ServiceHandle< IDecisionSvcm_decSvc
 Ref to DecisionSvc. More...
 
DataHeaderm_dataHeader {nullptr}
 Current DataHeader for streamed objects. More...
 
bool m_connectionOpen {false}
 Flag to tell whether connectOutput has been called. More...
 
bool m_extendProvenanceRecord {false}
 Flag as to whether to extend provenance via the DataHeader. More...
 
std::string m_keepProvenancesStr
 RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStream property. More...
 
std::regex m_keepProvenancesRE
 RegEx pattern created from m_keepProvenancesStr. More...
 
std::map< std::string, bool > m_keepProvenanceMatch
 Cache provenance RegEx matching result in a map. More...
 
std::set< std::string > m_skippedItems
 set of skipped item keys, because of missing CLID More...
 

Detailed Description

This is the implementation of IAthenaOutputStreamTool.

Definition at line 33 of file AthenaOutputStreamTool.h.

Member Typedef Documentation

◆ DataObjectVec

typedef std::vector<DataObject*> AthenaOutputStreamTool::DataObjectVec

Stream out a vector of objects Must convert to DataObject, e.g.

#include "AthenaKernel/StorableConversions.h" T* obj = xxx; DataObject* dataObject = SG::asStorable(obj);

Definition at line 75 of file AthenaOutputStreamTool.h.

◆ TypeKeyPair

typedef std::pair<std::string, std::string> AthenaOutputStreamTool::TypeKeyPair

Stream out objects.

Provide vector of typeName/key pairs. If key is empty, assumes only one object and this will fail if there is more than one

Definition at line 66 of file AthenaOutputStreamTool.h.

◆ TypeKeyPairs

Definition at line 67 of file AthenaOutputStreamTool.h.

Constructor & Destructor Documentation

◆ AthenaOutputStreamTool()

AthenaOutputStreamTool::AthenaOutputStreamTool ( const std::string &  type,
const std::string &  name,
const IInterface *  parent 
)

Standard AlgTool Constructor.

Constructor.

Definition at line 43 of file AthenaOutputStreamTool.cxx.

45  : base_class(type, name, parent),
46  m_clidSvc("ClassIDSvc", name),
47  m_decSvc("DecisionSvc/DecisionSvc", name) {
48 }

Member Function Documentation

◆ commitOutput()

StatusCode AthenaOutputStreamTool::commitOutput ( bool  doCommit = false)
override

Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.

Definition at line 338 of file AthenaOutputStreamTool.cxx.

338  {
339  ATH_MSG_DEBUG("In commitOutput");
340  // Connect the output file to the service
341  if (m_conversionSvc->commitOutput(m_outputName.value(), doCommit).isFailure()) {
342  ATH_MSG_ERROR("Unable to commit output " << m_outputName.value());
343  return(StatusCode::FAILURE);
344  }
345  // Set flag that connection is closed
346  m_connectionOpen = false;
347  return(StatusCode::SUCCESS);
348 }

◆ connectOutput()

StatusCode AthenaOutputStreamTool::connectOutput ( const std::string &  outputName = "")
override

Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wants to override jobOptions.

Definition at line 159 of file AthenaOutputStreamTool.cxx.

159  {
160  ATH_MSG_DEBUG("In connectOutput " << outputName);
161 
162  // Use arg if not empty, save the output name
163  if (!outputName.empty()) {
164  m_outputName.setValue(outputName);
165  }
166  if (m_outputName.value().empty()) {
167  ATH_MSG_ERROR("No OutputName provided");
168  return(StatusCode::FAILURE);
169  }
170  // Connect services if not already available
171  if (m_store == 0 || m_conversionSvc == 0) {
173  }
174  // Connect the output file to the service
175  if (m_conversionSvc->connectOutput(m_outputName.value()).isFailure()) {
176  ATH_MSG_ERROR("Unable to connect output " << m_outputName.value());
177  return(StatusCode::FAILURE);
178  } else {
179  ATH_MSG_DEBUG("Connected to " << m_outputName.value());
180  }
181 
182  // Remove DataHeader with same key if it exists
183  if (m_store->contains<DataHeader>(m_dataHeaderKey)) {
184  const DataHeader* preDh = nullptr;
185  if (m_store->retrieve(preDh, m_dataHeaderKey).isSuccess()) {
186  if (m_store->removeDataAndProxy(preDh).isFailure()) {
187  ATH_MSG_ERROR("Unable to get proxy for the DataHeader with key " << m_dataHeaderKey);
188  return(StatusCode::FAILURE);
189  }
190  ATH_MSG_DEBUG("Released DataHeader with key " << m_dataHeaderKey);
191  }
192  }
193 
194  // Create new DataHeader
195  m_dataHeader = new DataHeader();
197 
198  // Retrieve all existing DataHeaders from StoreGate
199  const DataHeader* dh = nullptr;
200  std::vector<std::string> dhKeys;
201  m_store->keys<DataHeader>(dhKeys);
202  for (const std::string& dhKey : dhKeys) {
203  bool primaryDH = false;
204  if (!m_store->transientContains<DataHeader>(dhKey)) {
205  if (dhKey == "EventSelector") primaryDH = true;
206  ATH_MSG_DEBUG("No transientContains DataHeader with key " << dhKey);
207  }
208  if (m_store->retrieve(dh, dhKey).isFailure()) {
209  ATH_MSG_DEBUG("Unable to retrieve the DataHeader with key " << dhKey);
210  }
211  if (dh->isInput() || hasInputAlias(*m_store->proxy(dh)) || primaryDH) {
213  }
214  }
215 
216  // Attach the attribute list to the DataHeader if requested
217  if (!m_attrListKey.key().empty() && m_store->storeID() == StoreID::EVENT_STORE) {
218  auto attrListHandle = SG::makeHandle(m_attrListKey);
219  if (!attrListHandle.isValid()) {
220  ATH_MSG_WARNING("Unable to retrieve AttributeList with key " << m_attrListKey);
221  } else {
222  m_dataHeader->setAttributeList(attrListHandle.cptr());
223  if (m_extend) { // Add streaming decisions
224  ATH_MSG_DEBUG("Adding stream decisions to " << m_attrListWrite);
225  // Look for attribute list created for mini-EventInfo
226  const AthenaAttributeList* attlist(attrListHandle.cptr());
227 
228  // Build new attribute list for modification
229  AthenaAttributeList* newone = new AthenaAttributeList(attlist->specification());
230  newone->copyData(*attlist);
231 
232  // Now loop over stream definitions and add decisions
233  auto streams = m_decSvc->getStreams();
234  for (auto it = streams.begin();
235  it != streams.end(); ++it) {
236  newone->extend(*it,"bool");
237  (*newone)[*it].data<bool>() = m_decSvc->isEventAccepted(*it,Gaudi::Hive::currentContext());
238  ATH_MSG_DEBUG("Added stream decision for " << *it << " to " << m_attrListKey);
239  }
240  // record new attribute list with old key + suffix
241  const AthenaAttributeList* attrList2 = nullptr;
242  if (!m_store->contains<AthenaAttributeList>(m_attrListWrite)) {
243  if (m_store->record(newone,m_attrListWrite).isFailure()) {
244  ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
245  }
246  } else {
247  ATH_MSG_DEBUG("Decisions already added by a different stream");
248  }
249  if (m_store->retrieve(attrList2,m_attrListWrite).isFailure()) {
250  ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
251  } else {
252  m_dataHeader->setAttributeList(attrList2);
253  }
254 /*
255  SG::WriteHandle<AthenaAttributeList> attrWrite(m_attrListWrite);
256  std::unique_ptr<AthenaAttributeList> uptr = std::make_unique<AthenaAttributeList>(*newone);
257  if ( attrWrite.record(std::move(uptr)).isFailure() ) {
258  ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
259  } else {
260  ATH_MSG_DEBUG("Decisions already added by a different stream");
261  }
262 */
263  //m_dataHeader->setAttributeList(newone);
264  } // list extend check
265  } // list retrieve check
266  } // list property check
267 
268  // Record DataHeader in StoreGate
270  if (wh.record(std::unique_ptr<DataHeader>(m_dataHeader)).isFailure()) {
271  ATH_MSG_ERROR("Unable to record DataHeader with key " << m_dataHeaderKey);
272  return(StatusCode::FAILURE);
273  } else {
274  ATH_MSG_DEBUG("Recorded DataHeader with key " << m_dataHeaderKey);
275  }
277  // Set flag that connection is open
278  m_connectionOpen = true;
279  return(StatusCode::SUCCESS);
280 }

◆ connectServices() [1/2]

StatusCode AthenaOutputStreamTool::connectServices ( )
private

Do the real connection to services.

Definition at line 150 of file AthenaOutputStreamTool.cxx.

150  {
151  // Find the data store
152  if (m_store.retrieve().isFailure() || m_store == 0) {
153  ATH_MSG_ERROR("Could not locate " << m_store.typeAndName() << " store");
154  return(StatusCode::FAILURE);
155  }
156  return(StatusCode::SUCCESS);
157 }

◆ connectServices() [2/2]

StatusCode AthenaOutputStreamTool::connectServices ( const std::string &  dataStore,
const std::string &  cnvSvc,
bool  extendProvenenceRecord 
)
override

Specify which data store and conversion service to use and whether to extend provenance Only use if one wants to override jobOptions.

Definition at line 116 of file AthenaOutputStreamTool.cxx.

118  {
119  // Release old data store
120  if (m_store.isValid()) {
121  if (m_store.release().isFailure()) {
122  ATH_MSG_ERROR("Could not release " << m_store.typeAndName() << " store");
123  }
124  }
125  m_store = ServiceHandle<StoreGateSvc>(dataStore, this->name());
126  if (cnvSvc != m_conversionSvc.type() && cnvSvc != "EventPersistencySvc") {
127  if (m_conversionSvc.release().isFailure()) {
128  ATH_MSG_ERROR("Could not release " << m_conversionSvc.type());
129  }
131  if (m_conversionSvc.retrieve().isFailure() || m_conversionSvc == 0) {
132  ATH_MSG_ERROR("Could not locate " << m_conversionSvc.type());
133  return(StatusCode::FAILURE);
134  }
135  }
136  m_extendProvenanceRecord = extendProvenenceRecord;
137  auto pprop = dynamic_cast<const IProperty*>(parent());
138  if (not pprop){
139  ATH_MSG_ERROR("'parent' could not be cast to IProperty");
140  return(StatusCode::FAILURE);
141  }
142  auto keep = dynamic_cast<const StringProperty&>( pprop->getProperty("KeepProvenanceTagsRegEx") );
143  m_keepProvenancesStr = keep.value();
144  // create RegEx pattern from the property value, specify extended grammar
145  m_keepProvenancesRE = std::regex(m_keepProvenancesStr, std::regex::extended);
146 
147  return(connectServices());
148 }

◆ finalizeOutput()

StatusCode AthenaOutputStreamTool::finalizeOutput ( )
override

Finalize the output stream after the last commit, e.g.

in finalize

Definition at line 350 of file AthenaOutputStreamTool.cxx.

350  {
351  AthCnvSvc* athConversionSvc = dynamic_cast<AthCnvSvc*>(m_conversionSvc.get());
352  if (athConversionSvc != 0) {
353  if (athConversionSvc->disconnectOutput(m_outputName.value()).isFailure()) {
354  ATH_MSG_ERROR("Unable to finalize output " << m_outputName.value());
355  return(StatusCode::FAILURE);
356  }
357  }
358  return(StatusCode::SUCCESS);
359 }

◆ getInputItemList()

StatusCode AthenaOutputStreamTool::getInputItemList ( SG::IFolder m_p2BWrittenFromTool)
overridevirtual

Definition at line 533 of file AthenaOutputStreamTool.cxx.

533  {
534  const std::string hltKey = "HLTAutoKey";
537  if (m_store->retrieve(beg, ending).isFailure() || beg == ending) {
538  ATH_MSG_DEBUG("No DataHeaders present in StoreGate");
539  } else {
540  for ( ; beg != ending; ++beg) {
541  if (m_store->transientContains<DataHeader>(beg.key()) && beg->isInput()) {
542  for (std::vector<DataHeaderElement>::const_iterator it = beg->begin(), itLast = beg->end();
543  it != itLast; ++it) {
544  // Only insert the primary clid, not the ones for the symlinks!
545  CLID clid = it->getPrimaryClassID();
546  if (clid != ClassID_traits<DataHeader>::ID()) {
547  //check the typename is known ... we make an exception if the key contains 'Aux.' ... aux containers may not have their keys known yet in some cases
548  //see https://its.cern.ch/jira/browse/ATLASG-59 for the solution
549  std::string typeName;
550  if (m_clidSvc->getTypeNameOfID(clid, typeName).isFailure() && it->getKey().find("Aux.") == std::string::npos) {
551  if (m_skippedItems.find(it->getKey()) == m_skippedItems.end()) {
552  ATH_MSG_WARNING("Skipping " << it->getKey() << " with unknown clid " << clid << " . Further warnings for this item are suppressed" );
553  m_skippedItems.insert(it->getKey());
554  }
555  continue;
556  }
557  ATH_MSG_DEBUG("Adding " << typeName << "#" << it->getKey() << " (clid " << clid << ") to itemlist");
558  const std::string keyName = it->getKey();
559  if (keyName.size() > 10 && keyName.compare(0, 10,hltKey)==0) {
560  p2BWrittenFromTool->add(clid, hltKey + "*").ignore();
561  } else if (keyName.size() > 10 && keyName.compare(keyName.size() - 10, 10, hltKey)==0) {
562  p2BWrittenFromTool->add(clid, "*" + hltKey).ignore();
563  } else {
564  p2BWrittenFromTool->add(clid, keyName).ignore();
565  }
566  }
567  }
568  }
569  }
570  }
571  ATH_MSG_DEBUG("Adding DataHeader for stream " << name());
572  return(StatusCode::SUCCESS);
573 }

◆ initialize()

StatusCode AthenaOutputStreamTool::initialize ( )
overridevirtual

AthAlgTool Interface method implementations:

Definition at line 50 of file AthenaOutputStreamTool.cxx.

50  {
51  ATH_MSG_INFO("Initializing " << name());
52 
53  ATH_CHECK( m_clidSvc.retrieve() );
54  ATH_CHECK( m_conversionSvc.retrieve() );
55 
56  // Autoconfigure
57  if (m_dataHeaderKey.empty()) {
58  m_dataHeaderKey.setValue(name());
59  // Remove "ToolSvc." from m_dataHeaderKey.
60  if (m_dataHeaderKey.value().starts_with( "ToolSvc.")) {
61  m_dataHeaderKey.setValue(m_dataHeaderKey.value().substr(8));
62  // Remove "Tool" from m_dataHeaderKey.
63  if (m_dataHeaderKey.value().find("Tool") == m_dataHeaderKey.size() - 4) {
64  m_dataHeaderKey.setValue(m_dataHeaderKey.value().substr(0, m_dataHeaderKey.size() - 4));
65  }
66  } else {
67  const INamedInterface* parentAlg = dynamic_cast<const INamedInterface*>(parent());
68  if (parentAlg != 0) {
69  m_dataHeaderKey.setValue(parentAlg->name());
70  }
71  }
72  }
73  if (m_processTag.empty()) {
74  m_processTag.setValue(m_dataHeaderKey);
75  }
76 
77  { // handle the AttrKey overwrite
78  const std::string keyword = "[AttributeListKey=";
79  std::string::size_type pos = m_outputName.value().find(keyword);
80  if( (pos != std::string::npos) ) {
81  ATH_MSG_INFO("The AttrListKey will be overwritten/set by the value from the OutputName: " << m_outputName);
82  const std::string attrListKey = m_outputName.value().substr(pos + keyword.size(),
83  m_outputName.value().find(']', pos + keyword.size()) - pos - keyword.size());
84  m_attrListKey = attrListKey;
85  }
86  }
87  if ( ! m_attrListKey.key().empty() ) {
89  m_attrListWrite = m_attrListKey.key() + "Decisions";
90  //ATH_CHECK(m_attrListWrite.initialize());
91  }
92 
93  if (!m_outputCollection.value().empty()) {
94  m_outputAttributes += "[OutputCollection=" + m_outputCollection.value() + "]";
95  }
96  if (!m_containerPrefix.value().empty()) {
97  m_outputAttributes += "[PoolContainerPrefix=" + m_containerPrefix.value() + "]";
98  }
99  if (m_containerNameHint.value() != "0") {
100  m_outputAttributes += "[TopLevelContainerName=" + m_containerNameHint.value() + "]";
101  }
102  if (m_branchNameHint.value() != "0") {
103  m_outputAttributes += "[SubLevelBranchName=" + m_branchNameHint.value() + "]";
104  }
105  if (!m_metaDataOutputCollection.value().empty()) {
106  m_metaDataOutputAttributes += "[OutputCollection=" + m_metaDataOutputCollection.value() + "]";
107  }
108  if (!m_metaDataContainerPrefix.value().empty()) {
109  m_metaDataOutputAttributes += "[PoolContainerPrefix=" + m_metaDataContainerPrefix.value() + "]";
110  }
111 
112  if (m_extend) ATH_CHECK(m_decSvc.retrieve());
113  return(StatusCode::SUCCESS);
114 }

◆ propagateProvenance()

void AthenaOutputStreamTool::propagateProvenance ( const DataHeader src_dh)
private

copy provenance records when creating new DataHeaders

Definition at line 283 of file AthenaOutputStreamTool.cxx.

284 {
285  // keep track of provenance entries inserted into the new DataHeader
286  std::set<std::string> insertedTags{};
287  // Add DataHeader token to the new DataHeader
289  std::string pTag;
290  std::unique_ptr<SG::TransientAddress> dhTransAddr;
291  for (const DataHeaderElement& dhe : src_dh) {
292  if (dhe.getPrimaryClassID() == ClassID_traits<DataHeader>::ID()) {
293  pTag = dhe.getKey();
294  dhTransAddr.reset( dhe.getAddress(0) );
295  }
296  }
297  // Update dhTransAddr to handle fast merged files.
298  if( auto dhProxy=m_store->proxy(&src_dh); dhProxy && dhProxy->address() ) {
299  DataHeaderElement dhe(dhProxy, dhProxy->address(), pTag);
301  insertedTags.insert(std::move(pTag));
302  }
303  else if( dhTransAddr ) {
304  DataHeaderElement dhe(dhTransAddr.get(), dhTransAddr->address(), pTag);
306  insertedTags.insert(std::move(pTag));
307  }
308  }
309 
310  // empty regexpr means do not keep any provenance
311  if( !m_keepProvenancesStr.empty() ) {
312  // Each stream tag is written only once in the provenance record
313  // In files where there are multiple entries per stream tag
314  // the record is in reverse, i.e., the latest appears first.
315  // Therefore, only keep the first entry if there are multiple
316  // matches so that we retain the latest one.
317  for(auto iter=src_dh.beginProvenance(), iEnd=src_dh.endProvenance(); iter != iEnd; ++iter) {
318  const auto & currentKey = (*iter).getKey();
319  if( insertedTags.insert(currentKey).second ) {
320  // first prov with that tag. Now check if we want to keep that tag
321  bool keep = false;
322  auto it = m_keepProvenanceMatch.find( currentKey );
323  if( it != m_keepProvenanceMatch.end() ) {
324  keep = it->second;
325  } else {
326  keep = std::regex_search(currentKey, m_keepProvenancesRE);
327  m_keepProvenanceMatch[currentKey] = keep;
328  }
329  if( keep ) {
331  }
332  }
333  }
334  }
335 }

◆ streamObjects() [1/2]

StatusCode AthenaOutputStreamTool::streamObjects ( const DataObjectVec dataObjects,
const std::string &  outputName = "" 
)
overridevirtual

Definition at line 422 of file AthenaOutputStreamTool.cxx.

422  {
423  // Check that a connection has been opened
424  if (!m_connectionOpen) {
425  ATH_MSG_ERROR("Connection NOT open. Please open a connection before streaming out objects.");
426  return(StatusCode::FAILURE);
427  }
428  // Connect the output file to the service
429  std::string outputConnectionString = outputName;
430  const std::string defaultMetaDataString = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData]";
431  if (std::string::size_type mpos = outputConnectionString.find(defaultMetaDataString); mpos!=std::string::npos) {
432  // If we're in here we're writing MetaData
433  // Now let's see if we should be overwriting the MetaData attributes
434  // For the time-being this happens when we're writing MetaData in the augmentation mode
435  if (!m_metaDataOutputAttributes.empty()) {
436  // Note: This won't work quite right if only one attribute is set though!
437  outputConnectionString.replace(mpos, defaultMetaDataString.length(), m_metaDataOutputAttributes);
438  }
439  }
440  for (std::string::size_type pos = m_outputAttributes.find('['); pos != std::string::npos; pos = m_outputAttributes.find('[', ++pos)) {
441  if (outputConnectionString.find(m_outputAttributes.substr(pos, m_outputAttributes.find('=', pos) + 1 - pos)) == std::string::npos) {
442  outputConnectionString += m_outputAttributes.substr(pos, m_outputAttributes.find(']', pos) + 1 - pos);
443  }
444  }
445 
446  // Check that the DataHeader is still valid
447  DataObject* dataHeaderObj = m_store->accessData(ClassID_traits<DataHeader>::ID(), m_dataHeaderKey);
448  std::map<DataObject*, IOpaqueAddress*> written;
449  for (DataObject* dobj : dataObjects) {
450  // Do not write the DataHeader via the explicit list
451  if (dobj->clID() == ClassID_traits<DataHeader>::ID()) {
452  ATH_MSG_DEBUG("Explicit request to write DataHeader: " << dobj->name() << " - skipping it.");
453  // Do not stream out same object twice
454  } else if (written.find(dobj) != written.end()) {
455  // Print warning and skip
456  ATH_MSG_DEBUG("Trying to write DataObject twice (clid/key): " << dobj->clID() << " " << dobj->name());
457  ATH_MSG_DEBUG(" Skipping this one.");
458  } else {
459  // Write object
460  IOpaqueAddress* addr = new TokenAddress(0, dobj->clID(), outputConnectionString);
461  addr->addRef();
462  if (m_conversionSvc->createRep(dobj, addr).isSuccess()) {
463  written.insert(std::pair<DataObject*, IOpaqueAddress*>(dobj, addr));
464  } else {
465  ATH_MSG_ERROR("Could not create Rep for DataObject (clid/key):" << dobj->clID() << " " << dobj->name());
466  return(StatusCode::FAILURE);
467  }
468  }
469  }
470  // End of loop over DataObjects, write DataHeader
471  if ((m_conversionSvc.type() == "AthenaPoolCnvSvc" || m_conversionSvc.type() == "AthenaPoolSharedIOCnvSvc") && dataHeaderObj != nullptr) {
472  IOpaqueAddress* addr = new TokenAddress(0, dataHeaderObj->clID(), outputConnectionString);
473  addr->addRef();
474  if (m_conversionSvc->createRep(dataHeaderObj, addr).isSuccess()) {
475  written.insert(std::pair<DataObject*, IOpaqueAddress*>(dataHeaderObj, addr));
476  } else {
477  ATH_MSG_ERROR("Could not create Rep for DataHeader");
478  return(StatusCode::FAILURE);
479  }
480  }
481  for (DataObject* dobj : dataObjects) {
482  // call fillRepRefs of persistency service
483  SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(dobj->registry());
484  if (proxy != nullptr && written.find(dobj) != written.end()) {
485  IOpaqueAddress* addr(written.find(dobj)->second);
486  if ((m_conversionSvc->fillRepRefs(addr, dobj)).isSuccess()) {
487  if (dobj->clID() != 1 || addr->par()[0] != "\n") {
488  if (dobj->clID() != ClassID_traits<DataHeader>::ID()) {
489  m_dataHeader->insert(proxy, addr);
490  } else {
492  }
493  if (proxy->address() == nullptr) {
494  proxy->setAddress(addr);
495  }
496  addr->release();
497  }
498  } else {
499  ATH_MSG_ERROR("Could not fill Object Refs for DataObject (clid/key):" << dobj->clID() << " " << dobj->name());
500  return(StatusCode::FAILURE);
501  }
502  } else {
503  ATH_MSG_WARNING("Could cast DataObject " << dobj->clID() << " " << dobj->name());
504  }
505  }
507  if ((m_conversionSvc.type() == "AthenaPoolCnvSvc" || m_conversionSvc.type() == "AthenaPoolSharedIOCnvSvc") && dataHeaderObj != nullptr) {
508  // End of DataObjects, fill refs for DataHeader
509  SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(dataHeaderObj->registry());
510  if (proxy != nullptr && written.find(dataHeaderObj) != written.end()) {
511  IOpaqueAddress* addr(written.find(dataHeaderObj)->second);
512  if ((m_conversionSvc->fillRepRefs(addr, dataHeaderObj)).isSuccess()) {
513  if (dataHeaderObj->clID() != 1 || addr->par()[0] != "\n") {
514  if (dataHeaderObj->clID() != ClassID_traits<DataHeader>::ID()) {
515  m_dataHeader->insert(proxy, addr);
516  } else {
518  }
519  addr->release();
520  }
521  } else {
522  ATH_MSG_ERROR("Could not fill Object Refs for DataHeader");
523  return(StatusCode::FAILURE);
524  }
525  } else {
526  ATH_MSG_ERROR("Could not cast DataHeader");
527  return(StatusCode::FAILURE);
528  }
529  }
530  return(StatusCode::SUCCESS);
531 }

◆ streamObjects() [2/2]

StatusCode AthenaOutputStreamTool::streamObjects ( const TypeKeyPairs typeKeys,
const std::string &  outputName = "" 
)
overridevirtual

Definition at line 361 of file AthenaOutputStreamTool.cxx.

361  {
362  ATH_MSG_DEBUG("In streamObjects");
363  // Check that a connection has been opened
364  if (!m_connectionOpen) {
365  ATH_MSG_ERROR("Connection NOT open. Please open a connection before streaming out objects.");
366  return(StatusCode::FAILURE);
367  }
368  // Use arg if not empty, save the output name
369  if (!outputName.empty()) {
370  m_outputName.setValue(outputName);
371  }
372  if (m_outputName.value().empty()) {
373  ATH_MSG_ERROR("No OutputName provided");
374  return(StatusCode::FAILURE);
375  }
376  // Now iterate over the type/key pairs and stream out each object
377  std::vector<DataObject*> dataObjects;
378  for (TypeKeyPairs::const_iterator first = typeKeys.begin(), last = typeKeys.end();
379  first != last; ++first) {
380  const std::string& type = (*first).first;
381  const std::string& key = (*first).second;
382  // Find the clid for type name from the CLIDSvc
383  CLID clid;
384  if (m_clidSvc->getIDOfTypeName(type, clid).isFailure()) {
385  ATH_MSG_ERROR("Could not get clid for typeName " << type);
386  return(StatusCode::FAILURE);
387  }
388  DataObject* dObj = 0;
389  // Two options: no key or explicit key
390  if (key.empty()) {
391  ATH_MSG_DEBUG("Get data object with no key");
392  // Get DataObject without key
393  dObj = m_store->accessData(clid);
394  } else {
395  ATH_MSG_DEBUG("Get data object with key");
396  // Get DataObjects with key
397  dObj = m_store->accessData(clid, key);
398  }
399  if (dObj == 0) {
400  // No object - print warning and return
401  ATH_MSG_DEBUG("No object found for type " << type << " key " << key);
402  return(StatusCode::SUCCESS);
403  } else {
404  ATH_MSG_DEBUG("Found object for type " << type << " key " << key);
405  }
406  // Save the dObj
407  dataObjects.push_back(dObj);
408  }
409  // Stream out objects
410  if (dataObjects.size() == 0) {
411  ATH_MSG_DEBUG("No data objects found");
412  return(StatusCode::SUCCESS);
413  }
414  StatusCode status = streamObjects(dataObjects, m_outputName.value());
415  if (!status.isSuccess()) {
416  ATH_MSG_ERROR("Could not stream out objects");
417  return(status);
418  }
419  return(StatusCode::SUCCESS);
420 }

Member Data Documentation

◆ m_attrListKey

SG::ReadHandleKey<AthenaAttributeList> AthenaOutputStreamTool::m_attrListKey {this, "AttributeListKey", "", "optional key for AttributeList to be written as part of the DataHeader: default = \"\""}
private

Definition at line 100 of file AthenaOutputStreamTool.h.

◆ m_attrListWrite

std::string AthenaOutputStreamTool::m_attrListWrite {""}
private

Definition at line 102 of file AthenaOutputStreamTool.h.

◆ m_branchNameHint

StringProperty AthenaOutputStreamTool::m_branchNameHint { this, "SubLevelBranchName", "0", "naming hint policy for POOL branching: default = \"0\"" }
private

Definition at line 95 of file AthenaOutputStreamTool.h.

◆ m_clidSvc

ServiceHandle<IClassIDSvc> AthenaOutputStreamTool::m_clidSvc
private

Ref to ClassIDSvc to convert type name to clid.

Definition at line 108 of file AthenaOutputStreamTool.h.

◆ m_connectionOpen

bool AthenaOutputStreamTool::m_connectionOpen {false}
private

Flag to tell whether connectOutput has been called.

Definition at line 114 of file AthenaOutputStreamTool.h.

◆ m_containerNameHint

StringProperty AthenaOutputStreamTool::m_containerNameHint { this, "TopLevelContainerName", "0", "naming hint policy for top level POOL container: default = \"0\""}
private

Definition at line 92 of file AthenaOutputStreamTool.h.

◆ m_containerPrefix

StringProperty AthenaOutputStreamTool::m_containerPrefix { this, "PoolContainerPrefix", "", "prefix for top level POOL container: default = \"CollectionTree\""}
private

Definition at line 91 of file AthenaOutputStreamTool.h.

◆ m_conversionSvc

ServiceHandle<IConversionSvc> AthenaOutputStreamTool::m_conversionSvc { this, "ConversionService", "AthenaPoolCnvSvc" }
private

Keep reference to the data conversion service.

Definition at line 106 of file AthenaOutputStreamTool.h.

◆ m_dataHeader

DataHeader* AthenaOutputStreamTool::m_dataHeader {nullptr}
private

Current DataHeader for streamed objects.

Definition at line 112 of file AthenaOutputStreamTool.h.

◆ m_dataHeaderKey

StringProperty AthenaOutputStreamTool::m_dataHeaderKey { this, "DataHeaderKey", "", "name of the data header key: defaults to tool name"}
private

Definition at line 88 of file AthenaOutputStreamTool.h.

◆ m_decSvc

ServiceHandle<IDecisionSvc> AthenaOutputStreamTool::m_decSvc
private

Ref to DecisionSvc.

Definition at line 110 of file AthenaOutputStreamTool.h.

◆ m_extend

BooleanProperty AthenaOutputStreamTool::m_extend { this, "SaveDecisions", false, "Set to true to add streaming decisions to an attributeList"}
private

Definition at line 96 of file AthenaOutputStreamTool.h.

◆ m_extendProvenanceRecord

bool AthenaOutputStreamTool::m_extendProvenanceRecord {false}
private

Flag as to whether to extend provenance via the DataHeader.

Definition at line 117 of file AthenaOutputStreamTool.h.

◆ m_keepProvenanceMatch

std::map<std::string, bool> AthenaOutputStreamTool::m_keepProvenanceMatch
private

Cache provenance RegEx matching result in a map.

Definition at line 123 of file AthenaOutputStreamTool.h.

◆ m_keepProvenancesRE

std::regex AthenaOutputStreamTool::m_keepProvenancesRE
private

RegEx pattern created from m_keepProvenancesStr.

Definition at line 121 of file AthenaOutputStreamTool.h.

◆ m_keepProvenancesStr

std::string AthenaOutputStreamTool::m_keepProvenancesStr
private

RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStream property.

Definition at line 119 of file AthenaOutputStreamTool.h.

◆ m_metaDataContainerPrefix

StringProperty AthenaOutputStreamTool::m_metaDataContainerPrefix { this, "MetaDataPoolContainerPrefix", "", "prefix for top level MetaData container: default = "" (will result in \"MetaData\")"}
private

Definition at line 94 of file AthenaOutputStreamTool.h.

◆ m_metaDataOutputAttributes

std::string AthenaOutputStreamTool::m_metaDataOutputAttributes
private

Definition at line 99 of file AthenaOutputStreamTool.h.

◆ m_metaDataOutputCollection

StringProperty AthenaOutputStreamTool::m_metaDataOutputCollection { this, "MetaDataOutputCollection", "", "custom container name prefix for MetaDataHeader: default = "" (will result in \"MetaDataHdr\")"}
private

Definition at line 93 of file AthenaOutputStreamTool.h.

◆ m_outputAttributes

std::string AthenaOutputStreamTool::m_outputAttributes
private

Definition at line 98 of file AthenaOutputStreamTool.h.

◆ m_outputCollection

StringProperty AthenaOutputStreamTool::m_outputCollection { this, "OutputCollection", "", "custom container name prefix for DataHeader: default = "" (will result in \"POOLContainer_\")"}
private

Definition at line 90 of file AthenaOutputStreamTool.h.

◆ m_outputName

StringProperty AthenaOutputStreamTool::m_outputName { this, "OutputFile", "", "name of the output db name"}
private

Definition at line 87 of file AthenaOutputStreamTool.h.

◆ m_processTag

StringProperty AthenaOutputStreamTool::m_processTag { this, "ProcessingTag", "", "tag of processing stage: defaults to SG key of DataHeader (Stream name)"}
private

Definition at line 89 of file AthenaOutputStreamTool.h.

◆ m_skippedItems

std::set<std::string> AthenaOutputStreamTool::m_skippedItems
private

set of skipped item keys, because of missing CLID

Definition at line 126 of file AthenaOutputStreamTool.h.

◆ m_store

ServiceHandle<StoreGateSvc> AthenaOutputStreamTool::m_store { this, "Store", "StoreGateSvc/DetectorStore", "Pointer to the data store"}
private

Definition at line 104 of file AthenaOutputStreamTool.h.


The documentation for this class was generated from the following files:
AthenaOutputStreamTool::propagateProvenance
void propagateProvenance(const DataHeader &src_dh)
copy provenance records when creating new DataHeaders
Definition: AthenaOutputStreamTool.cxx:283
DataHeader::setProcessTag
void setProcessTag(const std::string &processTag)
Set ProcessTag for DataHeader.
Definition: DataHeader.cxx:242
createLinkingScheme.iter
iter
Definition: createLinkingScheme.py:62
StateLessPT_NewConfig.proxy
proxy
Definition: StateLessPT_NewConfig.py:407
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
python.outputTest_v2.streams
streams
Definition: outputTest_v2.py:55
AthenaOutputStreamTool::m_extend
BooleanProperty m_extend
Definition: AthenaOutputStreamTool.h:96
DataHeader::setAttributeList
void setAttributeList(const coral::AttributeList *attrList)
Definition: DataHeader.cxx:317
DataHeader::addHash
void addHash(IStringPool *pool)
Add new entry to hash map.
Definition: DataHeader.cxx:296
skel.it
it
Definition: skel.GENtoEVGEN.py:407
AthenaOutputStreamTool::m_dataHeader
DataHeader * m_dataHeader
Current DataHeader for streamed objects.
Definition: AthenaOutputStreamTool.h:112
AthenaOutputStreamTool::m_skippedItems
std::set< std::string > m_skippedItems
set of skipped item keys, because of missing CLID
Definition: AthenaOutputStreamTool.h:126
AthenaOutputStreamTool::m_metaDataContainerPrefix
StringProperty m_metaDataContainerPrefix
Definition: AthenaOutputStreamTool.h:94
AthenaOutputStreamTool::m_keepProvenancesStr
std::string m_keepProvenancesStr
RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStrea...
Definition: AthenaOutputStreamTool.h:119
PyPoolBrowser.dh
dh
Definition: PyPoolBrowser.py:102
AthenaOutputStreamTool::m_processTag
StringProperty m_processTag
Definition: AthenaOutputStreamTool.h:89
AthenaOutputStreamTool::m_keepProvenancesRE
std::regex m_keepProvenancesRE
RegEx pattern created from m_keepProvenancesStr.
Definition: AthenaOutputStreamTool.h:121
SG::VarHandleKey::key
const std::string & key() const
Return the StoreGate ID for the referenced object.
Definition: AthToolSupport/AsgDataHandles/Root/VarHandleKey.cxx:141
AthenaOutputStreamTool::connectServices
StatusCode connectServices()
Do the real connection to services.
Definition: AthenaOutputStreamTool.cxx:150
DataHeader::insertProvenance
void insertProvenance(const DataHeaderElement &dhe)
Insert a new element into the "Provenance" vector.
Definition: DataHeader.cxx:292
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
PrepareReferenceFile.regex
regex
Definition: PrepareReferenceFile.py:43
TokenAddress
This class provides a Generic Transient Address for POOL tokens.
Definition: TokenAddress.h:23
SG::makeHandle
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
Definition: ReadCondHandle.h:274
AthenaOutputStreamTool::m_containerNameHint
StringProperty m_containerNameHint
Definition: AthenaOutputStreamTool.h:92
DataHeaderElement
This class provides a persistent form for the TransientAddress.
Definition: DataHeader.h:37
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
DataHeader
This class provides the layout for summary information stored for data written to POOL.
Definition: DataHeader.h:126
DataHeader::Output
@ Output
Definition: DataHeader.h:128
ReadCalibFromCool.keep
keep
Definition: ReadCalibFromCool.py:85
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaOutputStreamTool::m_dataHeaderKey
StringProperty m_dataHeaderKey
Definition: AthenaOutputStreamTool.h:88
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:37
test_pyathena.parent
parent
Definition: test_pyathena.py:15
parseDir.wh
wh
Definition: parseDir.py:45
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaAttributeList
An AttributeList represents a logical row of attributes in a metadata table. The name and type of eac...
Definition: PersistentDataModel/PersistentDataModel/AthenaAttributeList.h:45
plotmaker.keyName
keyName
Definition: plotmaker.py:145
SG::VarHandleKey::initialize
StatusCode initialize(bool used=true)
If this object is used as a property, then this should be called during the initialize phase.
Definition: AthToolSupport/AsgDataHandles/Root/VarHandleKey.cxx:103
jobOptions.pTag
string pTag
Definition: jobOptions.py:28
CLID
uint32_t CLID
The Class ID type.
Definition: Event/xAOD/xAODCore/xAODCore/ClassID_traits.h:47
AthenaOutputStreamTool::m_containerPrefix
StringProperty m_containerPrefix
Definition: AthenaOutputStreamTool.h:91
DataHeader::insert
void insert(const SG::TransientAddress *sgAddress, IOpaqueAddress *tokAddress=0, const std::string &pTag="")
Insert a new element into the "DataObject" vector.
Definition: DataHeader.cxx:266
AthenaOutputStreamTool::m_branchNameHint
StringProperty m_branchNameHint
Definition: AthenaOutputStreamTool.h:95
AthenaOutputStreamTool::m_outputCollection
StringProperty m_outputCollection
Definition: AthenaOutputStreamTool.h:90
DataHeader::setStatus
void setStatus(statusFlag status)
Set StatusFlag enum for DataHeader.
Definition: DataHeader.cxx:234
AthenaOutputStreamTool::m_conversionSvc
ServiceHandle< IConversionSvc > m_conversionSvc
Keep reference to the data conversion service.
Definition: AthenaOutputStreamTool.h:106
WriteBchToCool.beg
beg
Definition: WriteBchToCool.py:69
AthenaOutputStreamTool::m_metaDataOutputCollection
StringProperty m_metaDataOutputCollection
Definition: AthenaOutputStreamTool.h:93
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AthenaOutputStreamTool::m_metaDataOutputAttributes
std::string m_metaDataOutputAttributes
Definition: AthenaOutputStreamTool.h:99
lumiFormat.outputName
string outputName
Definition: lumiFormat.py:65
AthenaOutputStreamTool::m_outputAttributes
std::string m_outputAttributes
Definition: AthenaOutputStreamTool.h:98
python.LumiBlobConversion.pos
pos
Definition: LumiBlobConversion.py:16
AthenaOutputStreamTool::m_attrListKey
SG::ReadHandleKey< AthenaAttributeList > m_attrListKey
Definition: AthenaOutputStreamTool.h:100
SG::WriteHandle
Definition: StoreGate/StoreGate/WriteHandle.h:73
AthenaOutputStreamTool::m_extendProvenanceRecord
bool m_extendProvenanceRecord
Flag as to whether to extend provenance via the DataHeader.
Definition: AthenaOutputStreamTool.h:117
AthenaOutputStreamTool::m_store
ServiceHandle< StoreGateSvc > m_store
Definition: AthenaOutputStreamTool.h:104
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaOutputStreamTool::m_connectionOpen
bool m_connectionOpen
Flag to tell whether connectOutput has been called.
Definition: AthenaOutputStreamTool.h:114
DeMoScan.first
bool first
Definition: DeMoScan.py:534
ReadCalibFromCool.typeName
typeName
Definition: ReadCalibFromCool.py:477
AthCnvSvc::disconnectOutput
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.
Definition: AthCnvSvc.cxx:408
merge.status
status
Definition: merge.py:16
StoreID::EVENT_STORE
@ EVENT_STORE
Definition: StoreID.h:26
AthenaOutputStreamTool::m_clidSvc
ServiceHandle< IClassIDSvc > m_clidSvc
Ref to ClassIDSvc to convert type name to clid.
Definition: AthenaOutputStreamTool.h:108
AthenaOutputStreamTool::m_decSvc
ServiceHandle< IDecisionSvc > m_decSvc
Ref to DecisionSvc.
Definition: AthenaOutputStreamTool.h:110
AthenaOutputStreamTool::m_keepProvenanceMatch
std::map< std::string, bool > m_keepProvenanceMatch
Cache provenance RegEx matching result in a map.
Definition: AthenaOutputStreamTool.h:123
SG::DataProxy
Definition: DataProxy.h:45
LArParamsProperties::keyword
std::string keyword(const std::string &classname)
Definition: LArParamsProperties.cxx:160
SG::ConstIterator
Definition: SGIterator.h:164
AthCnvSvc
Definition: AthCnvSvc.h:66
AthenaOutputStreamTool::m_attrListWrite
std::string m_attrListWrite
Definition: AthenaOutputStreamTool.h:102
AthenaOutputStreamTool::streamObjects
virtual StatusCode streamObjects(const TypeKeyPairs &typeKeys, const std::string &outputName="") override
Definition: AthenaOutputStreamTool.cxx:361
ServiceHandle< StoreGateSvc >
mapkey::key
key
Definition: TElectronEfficiencyCorrectionTool.cxx:37
AthenaOutputStreamTool::m_outputName
StringProperty m_outputName
Definition: AthenaOutputStreamTool.h:87