ATLAS Offline Software
Public Types | Public Member Functions | Private Member Functions | Private Attributes | List of all members
AthenaOutputStreamTool Class Reference

This is the implementation of IAthenaOutputStreamTool. More...

#include <AthenaOutputStreamTool.h>

Inheritance diagram for AthenaOutputStreamTool:
Collaboration diagram for AthenaOutputStreamTool:

Public Types

typedef std::pair< std::string, std::string > TypeKeyPair
 Stream out objects. More...
 
typedef std::vector< TypeKeyPairTypeKeyPairs
 
typedef std::vector< DataObject * > DataObjectVec
 Stream out a vector of objects Must convert to DataObject, e.g. More...
 

Public Member Functions

 AthenaOutputStreamTool (const std::string &type, const std::string &name, const IInterface *parent)
 Standard AlgTool Constructor. More...
 
virtual ~AthenaOutputStreamTool ()
 Destructor. More...
 
StatusCode initialize ()
 AthAlgTool Interface method implementations: More...
 
StatusCode finalize ()
 
StatusCode connectServices (const std::string &dataStore, const std::string &cnvSvc, bool extendProvenenceRecord)
 Specify which data store and conversion service to use and whether to extend provenance Only use if one wants to override jobOptions. More...
 
StatusCode connectOutput (const std::string &outputName="")
 Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wants to override jobOptions. More...
 
StatusCode commitOutput (bool doCommit=false)
 Commit the output stream after having streamed out objects Must commitOutput AFTER streaming. More...
 
StatusCode finalizeOutput ()
 Finalize the output stream after the last commit, e.g. More...
 
virtual StatusCode streamObjects (const TypeKeyPairs &typeKeys, const std::string &outputName="")
 
virtual StatusCode streamObjects (const DataObjectVec &dataObjects, const std::string &outputName="")
 
virtual StatusCode getInputItemList (SG::IFolder *m_p2BWrittenFromTool)
 

Private Member Functions

virtual StatusCode connectServices ()
 Do the real connection to services. More...
 

Private Attributes

StringProperty m_outputName { this, "OutputFile", "", "name of the output db name"}
 
StringProperty m_dataHeaderKey { this, "DataHeaderKey", "", "name of the data header key: defaults to tool name"}
 
StringProperty m_processTag { this, "ProcessingTag", "", "tag of processing stage: defaults to SG key of DataHeader (Stream name)"}
 
StringProperty m_outputCollection { this, "OutputCollection", "", "custom container name prefix for DataHeader: default = "" (will result in \"POOLContainer_\")"}
 
StringProperty m_containerPrefix { this, "PoolContainerPrefix", "", "prefix for top level POOL container: default = \"CollectionTree\""}
 
StringProperty m_containerNameHint { this, "TopLevelContainerName", "0", "naming hint policy for top level POOL container: default = \"0\""}
 
StringProperty m_metaDataOutputCollection { this, "MetaDataOutputCollection", "", "custom container name prefix for MetaDataHeader: default = "" (will result in \"MetaDataHdr\")"}
 
StringProperty m_metaDataContainerPrefix { this, "MetaDataPoolContainerPrefix", "", "prefix for top level MetaData container: default = "" (will result in \"MetaData\")"}
 
StringProperty m_branchNameHint { this, "SubLevelBranchName", "0", "naming hint policy for POOL branching: default = \"0\"" }
 
std::string m_outputAttributes {""}
 
std::string m_metaDataOutputAttributes {""}
 
SG::ReadHandleKey< AthenaAttributeListm_attrListKey {this, "AttributeListKey", "", "optional key for AttributeList to be written as part of the DataHeader: default = \"\""}
 
std::string m_attrListWrite {""}
 
ServiceHandle< StoreGateSvcm_store { this, "Store", "StoreGateSvc/DetectorStore", "Pointer to the data store"}
 
ServiceHandle< IConversionSvc > m_conversionSvc
 Keep reference to the data conversion service. More...
 
ServiceHandle< IClassIDSvc > m_clidSvc
 Ref to ClassIDSvc to convert type name to clid. More...
 
ServiceHandle< IDecisionSvcm_decSvc
 Ref to DecisionSvc. More...
 
DataHeaderm_dataHeader
 Current DataHeader for streamed objects. More...
 
bool m_connectionOpen
 Flag to tell whether connectOutput has been called. More...
 
bool m_extendProvenanceRecord
 Flag as to whether to extend provenance via the DataHeader. More...
 
bool m_extend
 Flag to extend attribute list with stream flags from DecisionSvc. More...
 
std::set< std::string > m_skippedItems
 set of skipped item keys, because of missing CLID More...
 

Detailed Description

This is the implementation of IAthenaOutputStreamTool.

Definition at line 31 of file AthenaOutputStreamTool.h.

Member Typedef Documentation

◆ DataObjectVec

typedef std::vector<DataObject*> AthenaOutputStreamTool::DataObjectVec

Stream out a vector of objects Must convert to DataObject, e.g.

#include "AthenaKernel/StorableConversions.h" T* obj = xxx; DataObject* dataObject = SG::asStorable(obj);

Definition at line 76 of file AthenaOutputStreamTool.h.

◆ TypeKeyPair

typedef std::pair<std::string, std::string> AthenaOutputStreamTool::TypeKeyPair

Stream out objects.

Provide vector of typeName/key pairs. If key is empty, assumes only one object and this will fail if there is more than one

Definition at line 67 of file AthenaOutputStreamTool.h.

◆ TypeKeyPairs

Definition at line 68 of file AthenaOutputStreamTool.h.

Constructor & Destructor Documentation

◆ AthenaOutputStreamTool()

AthenaOutputStreamTool::AthenaOutputStreamTool ( const std::string &  type,
const std::string &  name,
const IInterface *  parent 
)

Standard AlgTool Constructor.

Constructor.

Definition at line 43 of file AthenaOutputStreamTool.cxx.

45  : base_class(type, name, parent),
46  m_store("DetectorStore", name),
47  m_conversionSvc("AthenaPoolCnvSvc", name),
48  m_clidSvc("ClassIDSvc", name),
49  m_decSvc("DecisionSvc/DecisionSvc", name),
50  m_dataHeader(nullptr),
51  m_connectionOpen(false),
53  // Declare IAthenaOutputStreamTool interface
54  declareInterface<IAthenaOutputStreamTool>(this);
55 
56  declareProperty("SaveDecisions", m_extend = false, "Set to true to add streaming decisions to an attributeList");
57 }

◆ ~AthenaOutputStreamTool()

AthenaOutputStreamTool::~AthenaOutputStreamTool ( )
virtual

Destructor.

Definition at line 59 of file AthenaOutputStreamTool.cxx.

59  {
60 }

Member Function Documentation

◆ commitOutput()

StatusCode AthenaOutputStreamTool::commitOutput ( bool  doCommit = false)

Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.

Definition at line 325 of file AthenaOutputStreamTool.cxx.

325  {
326  ATH_MSG_DEBUG("In commitOutput");
327  // Connect the output file to the service
328  if (m_conversionSvc->commitOutput(m_outputName.value(), doCommit).isFailure()) {
329  ATH_MSG_ERROR("Unable to commit output " << m_outputName.value());
330  return(StatusCode::FAILURE);
331  }
332  // Set flag that connection is closed
333  m_connectionOpen = false;
334  return(StatusCode::SUCCESS);
335 }

◆ connectOutput()

StatusCode AthenaOutputStreamTool::connectOutput ( const std::string &  outputName = "")

Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wants to override jobOptions.

Definition at line 172 of file AthenaOutputStreamTool.cxx.

172  {
173  ATH_MSG_DEBUG("In connectOutput " << outputName);
174 
175  // Use arg if not empty, save the output name
176  if (!outputName.empty()) {
177  m_outputName.setValue(outputName);
178  }
179  if (m_outputName.value().empty()) {
180  ATH_MSG_ERROR("No OutputName provided");
181  return(StatusCode::FAILURE);
182  }
183  // Connect services if not already available
184  if (m_store == 0 || m_conversionSvc == 0) {
185  if (connectServices().isFailure()) {
186  ATH_MSG_ERROR("Unable to connect services");
187  return(StatusCode::FAILURE);
188  }
189  }
190  // Connect the output file to the service
191  if (m_conversionSvc->connectOutput(m_outputName.value()).isFailure()) {
192  ATH_MSG_ERROR("Unable to connect output " << m_outputName.value());
193  return(StatusCode::FAILURE);
194  } else {
195  ATH_MSG_DEBUG("Connected to " << m_outputName.value());
196  }
197 
198  // Remove DataHeader with same key if it exists
199  if (m_store->contains<DataHeader>(m_dataHeaderKey)) {
200  const DataHeader* preDh = nullptr;
201  if (m_store->retrieve(preDh, m_dataHeaderKey).isSuccess()) {
202  if (m_store->removeDataAndProxy(preDh).isFailure()) {
203  ATH_MSG_ERROR("Unable to get proxy for the DataHeader with key " << m_dataHeaderKey);
204  return(StatusCode::FAILURE);
205  }
206  ATH_MSG_DEBUG("Released DataHeader with key " << m_dataHeaderKey);
207  }
208  }
209 
210  // Create new DataHeader
211  m_dataHeader = new DataHeader();
213 
214  // Retrieve all existing DataHeaders from StoreGate
215  const DataHeader* dh = nullptr;
216  std::vector<std::string> dhKeys;
217  m_store->keys<DataHeader>(dhKeys);
218  for (const std::string& dhKey : dhKeys) {
219  bool primaryDH = false;
220  if (!m_store->transientContains<DataHeader>(dhKey)) {
221  if (dhKey == "EventSelector") primaryDH = true;
222  ATH_MSG_DEBUG("No transientContains DataHeader with key " << dhKey);
223  }
224  if (m_store->retrieve(dh, dhKey).isFailure()) {
225  ATH_MSG_DEBUG("Unable to retrieve the DataHeader with key " << dhKey);
226  }
227  SG::DataProxy* dhProxy = m_store->proxy(dh);
228  if (dh->isInput() || hasInputAlias (*dhProxy) || primaryDH) {
229  // Add DataHeader token to new DataHeader
231  std::string pTag;
232  SG::TransientAddress* dhTransAddr = 0;
233  for (const DataHeaderElement& dhe : *dh) {
234  if (dhe.getPrimaryClassID() == ClassID_traits<DataHeader>::ID()) {
235  pTag = dhe.getKey();
236  delete dhTransAddr; dhTransAddr = dhe.getAddress(0);
237  }
238  }
239  // Update dhTransAddr to handle fast merged files.
240  if (dhProxy != 0 && dhProxy->address() != 0) {
241  delete dhTransAddr; dhTransAddr = 0;
243  dhProxy->address(),
244  pTag));
245  }
246  else if (dhTransAddr != nullptr) {
248  dhTransAddr->address(),
249  pTag));
250  delete dhTransAddr; dhTransAddr = 0;
251  }
252  }
253  for(auto iter=dh->beginProvenance(), iEnd=dh->endProvenance(); iter != iEnd; ++iter) {
255  }
256  }
257  }
258 
259  // Attach the attribute list to the DataHeader if requested
260  if (!m_attrListKey.key().empty() && m_store->storeID() == StoreID::EVENT_STORE) {
261  auto attrListHandle = SG::makeHandle(m_attrListKey);
262  if (!attrListHandle.isValid()) {
263  ATH_MSG_WARNING("Unable to retrieve AttributeList with key " << m_attrListKey);
264  } else {
265  m_dataHeader->setAttributeList(attrListHandle.cptr());
266  if (m_extend) { // Add streaming decisions
267  ATH_MSG_DEBUG("Adding stream decisions to " << m_attrListWrite);
268  // Look for attribute list created for mini-EventInfo
269  const AthenaAttributeList* attlist(attrListHandle.cptr());
270 
271  // Build new attribute list for modification
272  AthenaAttributeList* newone = new AthenaAttributeList(attlist->specification());
273  newone->copyData(*attlist);
274 
275  // Now loop over stream definitions and add decisions
276  auto streams = m_decSvc->getStreams();
277  for (auto it = streams.begin();
278  it != streams.end(); ++it) {
279  newone->extend(*it,"bool");
280  (*newone)[*it].data<bool>() = m_decSvc->isEventAccepted(*it,Gaudi::Hive::currentContext());
281  ATH_MSG_DEBUG("Added stream decision for " << *it << " to " << m_attrListKey);
282  }
283  // record new attribute list with old key + suffix
284  const AthenaAttributeList* attrList2 = nullptr;
285  if (!m_store->contains<AthenaAttributeList>(m_attrListWrite)) {
286  if (m_store->record(newone,m_attrListWrite).isFailure()) {
287  ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
288  }
289  } else {
290  ATH_MSG_DEBUG("Decisions already added by a different stream");
291  }
292  if (m_store->retrieve(attrList2,m_attrListWrite).isFailure()) {
293  ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
294  } else {
295  m_dataHeader->setAttributeList(attrList2);
296  }
297 /*
298  SG::WriteHandle<AthenaAttributeList> attrWrite(m_attrListWrite);
299  std::unique_ptr<AthenaAttributeList> uptr = std::make_unique<AthenaAttributeList>(*newone);
300  if ( attrWrite.record(std::move(uptr)).isFailure() ) {
301  ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
302  } else {
303  ATH_MSG_DEBUG("Decisions already added by a different stream");
304  }
305 */
306  //m_dataHeader->setAttributeList(newone);
307  } // list extend check
308  } // list retrieve check
309  } // list property check
310 
311  // Record DataHeader in StoreGate
313  if (wh.record(std::unique_ptr<DataHeader>(m_dataHeader)).isFailure()) {
314  ATH_MSG_ERROR("Unable to record DataHeader with key " << m_dataHeaderKey);
315  return(StatusCode::FAILURE);
316  } else {
317  ATH_MSG_DEBUG("Recorded DataHeader with key " << m_dataHeaderKey);
318  }
320  // Set flag that connection is open
321  m_connectionOpen = true;
322  return(StatusCode::SUCCESS);
323 }

◆ connectServices() [1/2]

StatusCode AthenaOutputStreamTool::connectServices ( )
privatevirtual

Do the real connection to services.

Definition at line 163 of file AthenaOutputStreamTool.cxx.

163  {
164  // Find the data store
165  if (m_store.retrieve().isFailure() || m_store == 0) {
166  ATH_MSG_ERROR("Could not locate " << m_store.typeAndName() << " store");
167  return(StatusCode::FAILURE);
168  }
169  return(StatusCode::SUCCESS);
170 }

◆ connectServices() [2/2]

StatusCode AthenaOutputStreamTool::connectServices ( const std::string &  dataStore,
const std::string &  cnvSvc,
bool  extendProvenenceRecord 
)

Specify which data store and conversion service to use and whether to extend provenance Only use if one wants to override jobOptions.

Definition at line 139 of file AthenaOutputStreamTool.cxx.

141  {
142  // Release old data store
143  if (m_store.isValid()) {
144  if (m_store.release().isFailure()) {
145  ATH_MSG_ERROR("Could not release " << m_store.typeAndName() << " store");
146  }
147  }
148  m_store = ServiceHandle<StoreGateSvc>(dataStore, this->name());
149  if (cnvSvc != m_conversionSvc.type() && cnvSvc != "EventPersistencySvc") {
150  if (m_conversionSvc.release().isFailure()) {
151  ATH_MSG_ERROR("Could not release " << m_conversionSvc.type());
152  }
154  if (m_conversionSvc.retrieve().isFailure() || m_conversionSvc == 0) {
155  ATH_MSG_ERROR("Could not locate " << m_conversionSvc.type());
156  return(StatusCode::FAILURE);
157  }
158  }
159  m_extendProvenanceRecord = extendProvenenceRecord;
160  return(connectServices());
161 }

◆ finalize()

StatusCode AthenaOutputStreamTool::finalize ( )

Definition at line 128 of file AthenaOutputStreamTool.cxx.

128  {
129  m_decSvc.release().ignore();
130  if (m_conversionSvc.release().isFailure()) {
131  ATH_MSG_WARNING("Cannot release AthenaPoolCnvSvc");
132  }
133  if (m_clidSvc.release().isFailure()) {
134  ATH_MSG_WARNING("Cannot release the CLIDSvc");
135  }
136  return(StatusCode::SUCCESS);
137 }

◆ finalizeOutput()

StatusCode AthenaOutputStreamTool::finalizeOutput ( )

Finalize the output stream after the last commit, e.g.

in finalize

Definition at line 337 of file AthenaOutputStreamTool.cxx.

337  {
338  AthCnvSvc* athConversionSvc = dynamic_cast<AthCnvSvc*>(m_conversionSvc.get());
339  if (athConversionSvc != 0) {
340  if (athConversionSvc->disconnectOutput(m_outputName.value()).isFailure()) {
341  ATH_MSG_ERROR("Unable to finalize output " << m_outputName.value());
342  return(StatusCode::FAILURE);
343  }
344  }
345  return(StatusCode::SUCCESS);
346 }

◆ getInputItemList()

StatusCode AthenaOutputStreamTool::getInputItemList ( SG::IFolder m_p2BWrittenFromTool)
virtual

Definition at line 520 of file AthenaOutputStreamTool.cxx.

520  {
521  const std::string hltKey = "HLTAutoKey";
524  if (m_store->retrieve(beg, ending).isFailure() || beg == ending) {
525  ATH_MSG_DEBUG("No DataHeaders present in StoreGate");
526  } else {
527  for ( ; beg != ending; ++beg) {
528  if (m_store->transientContains<DataHeader>(beg.key()) && beg->isInput()) {
529  for (std::vector<DataHeaderElement>::const_iterator it = beg->begin(), itLast = beg->end();
530  it != itLast; ++it) {
531  // Only insert the primary clid, not the ones for the symlinks!
532  CLID clid = it->getPrimaryClassID();
533  if (clid != ClassID_traits<DataHeader>::ID()) {
534  //check the typename is known ... we make an exception if the key contains 'Aux.' ... aux containers may not have their keys known yet in some cases
535  //see https://its.cern.ch/jira/browse/ATLASG-59 for the solution
536  std::string typeName;
537  if (m_clidSvc->getTypeNameOfID(clid, typeName).isFailure() && it->getKey().find("Aux.") == std::string::npos) {
538  if (m_skippedItems.find(it->getKey()) == m_skippedItems.end()) {
539  ATH_MSG_WARNING("Skipping " << it->getKey() << " with unknown clid " << clid << " . Further warnings for this item are suppressed" );
540  m_skippedItems.insert(it->getKey());
541  }
542  continue;
543  }
544  ATH_MSG_DEBUG("Adding " << typeName << "#" << it->getKey() << " (clid " << clid << ") to itemlist");
545  const std::string keyName = it->getKey();
546  if (keyName.size() > 10 && keyName.compare(0, 10,hltKey)==0) {
547  p2BWrittenFromTool->add(clid, hltKey + "*").ignore();
548  } else if (keyName.size() > 10 && keyName.compare(keyName.size() - 10, 10, hltKey)==0) {
549  p2BWrittenFromTool->add(clid, "*" + hltKey).ignore();
550  } else {
551  p2BWrittenFromTool->add(clid, keyName).ignore();
552  }
553  }
554  }
555  }
556  }
557  }
558  ATH_MSG_DEBUG("Adding DataHeader for stream " << name());
559  return(StatusCode::SUCCESS);
560 }

◆ initialize()

StatusCode AthenaOutputStreamTool::initialize ( )

AthAlgTool Interface method implementations:

Definition at line 62 of file AthenaOutputStreamTool.cxx.

62  {
63  ATH_MSG_INFO("Initializing " << name());
64 
65  ATH_CHECK( m_clidSvc.retrieve() );
66  ATH_CHECK( m_conversionSvc.retrieve() );
67 
68  // Autoconfigure
69  if (m_dataHeaderKey.empty()) {
70  m_dataHeaderKey.setValue(name());
71  // Remove "ToolSvc." from m_dataHeaderKey.
72  if (m_dataHeaderKey.value().starts_with( "ToolSvc.")) {
73  m_dataHeaderKey.setValue(m_dataHeaderKey.value().substr(8));
74  // Remove "Tool" from m_dataHeaderKey.
75  if (m_dataHeaderKey.value().find("Tool") == m_dataHeaderKey.size() - 4) {
76  m_dataHeaderKey.setValue(m_dataHeaderKey.value().substr(0, m_dataHeaderKey.size() - 4));
77  }
78  } else {
79  const INamedInterface* parentAlg = dynamic_cast<const INamedInterface*>(parent());
80  if (parentAlg != 0) {
81  m_dataHeaderKey.setValue(parentAlg->name());
82  }
83  }
84  }
85  if (m_processTag.empty()) {
86  m_processTag.setValue(m_dataHeaderKey);
87  }
88 
89  { // handle the AttrKey overwrite
90  const std::string keyword = "[AttributeListKey=";
91  std::string::size_type pos = m_outputName.value().find(keyword);
92  if( (pos != std::string::npos) ) {
93  ATH_MSG_INFO("The AttrListKey will be overwritten/set by the value from the OutputName: " << m_outputName);
94  const std::string attrListKey = m_outputName.value().substr(pos + keyword.size(),
95  m_outputName.value().find(']', pos + keyword.size()) - pos - keyword.size());
96  m_attrListKey = attrListKey;
97  }
98  }
99  if ( ! m_attrListKey.key().empty() ) {
101  m_attrListWrite = m_attrListKey.key() + "Decisions";
102  //ATH_CHECK(m_attrListWrite.initialize());
103  }
104 
105  if (!m_outputCollection.value().empty()) {
106  m_outputAttributes += "[OutputCollection=" + m_outputCollection.value() + "]";
107  }
108  if (!m_containerPrefix.value().empty()) {
109  m_outputAttributes += "[PoolContainerPrefix=" + m_containerPrefix.value() + "]";
110  }
111  if (m_containerNameHint.value() != "0") {
112  m_outputAttributes += "[TopLevelContainerName=" + m_containerNameHint.value() + "]";
113  }
114  if (m_branchNameHint.value() != "0") {
115  m_outputAttributes += "[SubLevelBranchName=" + m_branchNameHint.value() + "]";
116  }
117  if (!m_metaDataOutputCollection.value().empty()) {
118  m_metaDataOutputAttributes += "[OutputCollection=" + m_metaDataOutputCollection.value() + "]";
119  }
120  if (!m_metaDataContainerPrefix.value().empty()) {
121  m_metaDataOutputAttributes += "[PoolContainerPrefix=" + m_metaDataContainerPrefix.value() + "]";
122  }
123 
124  if (m_extend) ATH_CHECK(m_decSvc.retrieve());
125  return(StatusCode::SUCCESS);
126 }

◆ streamObjects() [1/2]

StatusCode AthenaOutputStreamTool::streamObjects ( const DataObjectVec dataObjects,
const std::string &  outputName = "" 
)
virtual

Definition at line 409 of file AthenaOutputStreamTool.cxx.

409  {
410  // Check that a connection has been opened
411  if (!m_connectionOpen) {
412  ATH_MSG_ERROR("Connection NOT open. Please open a connection before streaming out objects.");
413  return(StatusCode::FAILURE);
414  }
415  // Connect the output file to the service
416  std::string outputConnectionString = outputName;
417  const std::string defaultMetaDataString = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData]";
418  if (std::string::size_type mpos = outputConnectionString.find(defaultMetaDataString); mpos!=std::string::npos) {
419  // If we're in here we're writing MetaData
420  // Now let's see if we should be overwriting the MetaData attributes
421  // For the time-being this happens when we're writing MetaData in the augmentation mode
422  if (!m_metaDataOutputAttributes.empty()) {
423  // Note: This won't work quite right if only one attribute is set though!
424  outputConnectionString.replace(mpos, defaultMetaDataString.length(), m_metaDataOutputAttributes);
425  }
426  }
427  for (std::string::size_type pos = m_outputAttributes.find('['); pos != std::string::npos; pos = m_outputAttributes.find('[', ++pos)) {
428  if (outputConnectionString.find(m_outputAttributes.substr(pos, m_outputAttributes.find('=', pos) + 1 - pos)) == std::string::npos) {
429  outputConnectionString += m_outputAttributes.substr(pos, m_outputAttributes.find(']', pos) + 1 - pos);
430  }
431  }
432 
433  // Check that the DataHeader is still valid
434  DataObject* dataHeaderObj = m_store->accessData(ClassID_traits<DataHeader>::ID(), m_dataHeaderKey);
435  std::map<DataObject*, IOpaqueAddress*> written;
436  for (DataObject* dobj : dataObjects) {
437  // Do not write the DataHeader via the explicit list
438  if (dobj->clID() == ClassID_traits<DataHeader>::ID()) {
439  ATH_MSG_DEBUG("Explicit request to write DataHeader: " << dobj->name() << " - skipping it.");
440  // Do not stream out same object twice
441  } else if (written.find(dobj) != written.end()) {
442  // Print warning and skip
443  ATH_MSG_DEBUG("Trying to write DataObject twice (clid/key): " << dobj->clID() << " " << dobj->name());
444  ATH_MSG_DEBUG(" Skipping this one.");
445  } else {
446  // Write object
447  IOpaqueAddress* addr = new TokenAddress(0, dobj->clID(), outputConnectionString);
448  addr->addRef();
449  if (m_conversionSvc->createRep(dobj, addr).isSuccess()) {
450  written.insert(std::pair<DataObject*, IOpaqueAddress*>(dobj, addr));
451  } else {
452  ATH_MSG_ERROR("Could not create Rep for DataObject (clid/key):" << dobj->clID() << " " << dobj->name());
453  return(StatusCode::FAILURE);
454  }
455  }
456  }
457  // End of loop over DataObjects, write DataHeader
458  if (m_conversionSvc.type() == "AthenaPoolCnvSvc" && dataHeaderObj != nullptr) {
459  IOpaqueAddress* addr = new TokenAddress(0, dataHeaderObj->clID(), outputConnectionString);
460  addr->addRef();
461  if (m_conversionSvc->createRep(dataHeaderObj, addr).isSuccess()) {
462  written.insert(std::pair<DataObject*, IOpaqueAddress*>(dataHeaderObj, addr));
463  } else {
464  ATH_MSG_ERROR("Could not create Rep for DataHeader");
465  return(StatusCode::FAILURE);
466  }
467  }
468  for (DataObject* dobj : dataObjects) {
469  // call fillRepRefs of persistency service
470  SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(dobj->registry());
471  if (proxy != nullptr && written.find(dobj) != written.end()) {
472  IOpaqueAddress* addr(written.find(dobj)->second);
473  if ((m_conversionSvc->fillRepRefs(addr, dobj)).isSuccess()) {
474  if (dobj->clID() != 1 || addr->par()[0] != "\n") {
475  if (dobj->clID() != ClassID_traits<DataHeader>::ID()) {
476  m_dataHeader->insert(proxy, addr);
477  } else {
479  }
480  if (proxy->address() == nullptr) {
481  proxy->setAddress(addr);
482  }
483  addr->release();
484  }
485  } else {
486  ATH_MSG_ERROR("Could not fill Object Refs for DataObject (clid/key):" << dobj->clID() << " " << dobj->name());
487  return(StatusCode::FAILURE);
488  }
489  } else {
490  ATH_MSG_WARNING("Could cast DataObject " << dobj->clID() << " " << dobj->name());
491  }
492  }
494  if (m_conversionSvc.type() == "AthenaPoolCnvSvc" && dataHeaderObj != nullptr) {
495  // End of DataObjects, fill refs for DataHeader
496  SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(dataHeaderObj->registry());
497  if (proxy != nullptr && written.find(dataHeaderObj) != written.end()) {
498  IOpaqueAddress* addr(written.find(dataHeaderObj)->second);
499  if ((m_conversionSvc->fillRepRefs(addr, dataHeaderObj)).isSuccess()) {
500  if (dataHeaderObj->clID() != 1 || addr->par()[0] != "\n") {
501  if (dataHeaderObj->clID() != ClassID_traits<DataHeader>::ID()) {
502  m_dataHeader->insert(proxy, addr);
503  } else {
505  }
506  addr->release();
507  }
508  } else {
509  ATH_MSG_ERROR("Could not fill Object Refs for DataHeader");
510  return(StatusCode::FAILURE);
511  }
512  } else {
513  ATH_MSG_ERROR("Could not cast DataHeader");
514  return(StatusCode::FAILURE);
515  }
516  }
517  return(StatusCode::SUCCESS);
518 }

◆ streamObjects() [2/2]

StatusCode AthenaOutputStreamTool::streamObjects ( const TypeKeyPairs typeKeys,
const std::string &  outputName = "" 
)
virtual

Definition at line 348 of file AthenaOutputStreamTool.cxx.

348  {
349  ATH_MSG_DEBUG("In streamObjects");
350  // Check that a connection has been opened
351  if (!m_connectionOpen) {
352  ATH_MSG_ERROR("Connection NOT open. Please open a connection before streaming out objects.");
353  return(StatusCode::FAILURE);
354  }
355  // Use arg if not empty, save the output name
356  if (!outputName.empty()) {
357  m_outputName.setValue(outputName);
358  }
359  if (m_outputName.value().empty()) {
360  ATH_MSG_ERROR("No OutputName provided");
361  return(StatusCode::FAILURE);
362  }
363  // Now iterate over the type/key pairs and stream out each object
364  std::vector<DataObject*> dataObjects;
365  for (TypeKeyPairs::const_iterator first = typeKeys.begin(), last = typeKeys.end();
366  first != last; ++first) {
367  const std::string& type = (*first).first;
368  const std::string& key = (*first).second;
369  // Find the clid for type name from the CLIDSvc
370  CLID clid;
371  if (m_clidSvc->getIDOfTypeName(type, clid).isFailure()) {
372  ATH_MSG_ERROR("Could not get clid for typeName " << type);
373  return(StatusCode::FAILURE);
374  }
375  DataObject* dObj = 0;
376  // Two options: no key or explicit key
377  if (key.empty()) {
378  ATH_MSG_DEBUG("Get data object with no key");
379  // Get DataObject without key
380  dObj = m_store->accessData(clid);
381  } else {
382  ATH_MSG_DEBUG("Get data object with key");
383  // Get DataObjects with key
384  dObj = m_store->accessData(clid, key);
385  }
386  if (dObj == 0) {
387  // No object - print warning and return
388  ATH_MSG_DEBUG("No object found for type " << type << " key " << key);
389  return(StatusCode::SUCCESS);
390  } else {
391  ATH_MSG_DEBUG("Found object for type " << type << " key " << key);
392  }
393  // Save the dObj
394  dataObjects.push_back(dObj);
395  }
396  // Stream out objects
397  if (dataObjects.size() == 0) {
398  ATH_MSG_DEBUG("No data objects found");
399  return(StatusCode::SUCCESS);
400  }
401  StatusCode status = streamObjects(dataObjects, m_outputName.value());
402  if (!status.isSuccess()) {
403  ATH_MSG_ERROR("Could not stream out objects");
404  return(status);
405  }
406  return(StatusCode::SUCCESS);
407 }

Member Data Documentation

◆ m_attrListKey

SG::ReadHandleKey<AthenaAttributeList> AthenaOutputStreamTool::m_attrListKey {this, "AttributeListKey", "", "optional key for AttributeList to be written as part of the DataHeader: default = \"\""}
private

Definition at line 97 of file AthenaOutputStreamTool.h.

◆ m_attrListWrite

std::string AthenaOutputStreamTool::m_attrListWrite {""}
private

Definition at line 99 of file AthenaOutputStreamTool.h.

◆ m_branchNameHint

StringProperty AthenaOutputStreamTool::m_branchNameHint { this, "SubLevelBranchName", "0", "naming hint policy for POOL branching: default = \"0\"" }
private

Definition at line 94 of file AthenaOutputStreamTool.h.

◆ m_clidSvc

ServiceHandle<IClassIDSvc> AthenaOutputStreamTool::m_clidSvc
private

Ref to ClassIDSvc to convert type name to clid.

Definition at line 105 of file AthenaOutputStreamTool.h.

◆ m_connectionOpen

bool AthenaOutputStreamTool::m_connectionOpen
private

Flag to tell whether connectOutput has been called.

Definition at line 111 of file AthenaOutputStreamTool.h.

◆ m_containerNameHint

StringProperty AthenaOutputStreamTool::m_containerNameHint { this, "TopLevelContainerName", "0", "naming hint policy for top level POOL container: default = \"0\""}
private

Definition at line 91 of file AthenaOutputStreamTool.h.

◆ m_containerPrefix

StringProperty AthenaOutputStreamTool::m_containerPrefix { this, "PoolContainerPrefix", "", "prefix for top level POOL container: default = \"CollectionTree\""}
private

Definition at line 90 of file AthenaOutputStreamTool.h.

◆ m_conversionSvc

ServiceHandle<IConversionSvc> AthenaOutputStreamTool::m_conversionSvc
private

Keep reference to the data conversion service.

Definition at line 103 of file AthenaOutputStreamTool.h.

◆ m_dataHeader

DataHeader* AthenaOutputStreamTool::m_dataHeader
private

Current DataHeader for streamed objects.

Definition at line 109 of file AthenaOutputStreamTool.h.

◆ m_dataHeaderKey

StringProperty AthenaOutputStreamTool::m_dataHeaderKey { this, "DataHeaderKey", "", "name of the data header key: defaults to tool name"}
private

Definition at line 87 of file AthenaOutputStreamTool.h.

◆ m_decSvc

ServiceHandle<IDecisionSvc> AthenaOutputStreamTool::m_decSvc
private

Ref to DecisionSvc.

Definition at line 107 of file AthenaOutputStreamTool.h.

◆ m_extend

bool AthenaOutputStreamTool::m_extend
private

Flag to extend attribute list with stream flags from DecisionSvc.

Definition at line 115 of file AthenaOutputStreamTool.h.

◆ m_extendProvenanceRecord

bool AthenaOutputStreamTool::m_extendProvenanceRecord
private

Flag as to whether to extend provenance via the DataHeader.

Definition at line 113 of file AthenaOutputStreamTool.h.

◆ m_metaDataContainerPrefix

StringProperty AthenaOutputStreamTool::m_metaDataContainerPrefix { this, "MetaDataPoolContainerPrefix", "", "prefix for top level MetaData container: default = "" (will result in \"MetaData\")"}
private

Definition at line 93 of file AthenaOutputStreamTool.h.

◆ m_metaDataOutputAttributes

std::string AthenaOutputStreamTool::m_metaDataOutputAttributes {""}
private

Definition at line 96 of file AthenaOutputStreamTool.h.

◆ m_metaDataOutputCollection

StringProperty AthenaOutputStreamTool::m_metaDataOutputCollection { this, "MetaDataOutputCollection", "", "custom container name prefix for MetaDataHeader: default = "" (will result in \"MetaDataHdr\")"}
private

Definition at line 92 of file AthenaOutputStreamTool.h.

◆ m_outputAttributes

std::string AthenaOutputStreamTool::m_outputAttributes {""}
private

Definition at line 95 of file AthenaOutputStreamTool.h.

◆ m_outputCollection

StringProperty AthenaOutputStreamTool::m_outputCollection { this, "OutputCollection", "", "custom container name prefix for DataHeader: default = "" (will result in \"POOLContainer_\")"}
private

Definition at line 89 of file AthenaOutputStreamTool.h.

◆ m_outputName

StringProperty AthenaOutputStreamTool::m_outputName { this, "OutputFile", "", "name of the output db name"}
private

Definition at line 86 of file AthenaOutputStreamTool.h.

◆ m_processTag

StringProperty AthenaOutputStreamTool::m_processTag { this, "ProcessingTag", "", "tag of processing stage: defaults to SG key of DataHeader (Stream name)"}
private

Definition at line 88 of file AthenaOutputStreamTool.h.

◆ m_skippedItems

std::set<std::string> AthenaOutputStreamTool::m_skippedItems
private

set of skipped item keys, because of missing CLID

Definition at line 118 of file AthenaOutputStreamTool.h.

◆ m_store

ServiceHandle<StoreGateSvc> AthenaOutputStreamTool::m_store { this, "Store", "StoreGateSvc/DetectorStore", "Pointer to the data store"}
private

Definition at line 101 of file AthenaOutputStreamTool.h.


The documentation for this class was generated from the following files:
DataHeader::setProcessTag
void setProcessTag(const std::string &processTag)
Set ProcessTag for DataHeader.
Definition: DataHeader.cxx:243
AthenaOutputStreamTool::streamObjects
virtual StatusCode streamObjects(const TypeKeyPairs &typeKeys, const std::string &outputName="")
Definition: AthenaOutputStreamTool.cxx:348
StateLessPT_NewConfig.proxy
proxy
Definition: StateLessPT_NewConfig.py:392
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
python.outputTest_v2.streams
streams
Definition: outputTest_v2.py:55
DataHeader::setAttributeList
void setAttributeList(const coral::AttributeList *attrList)
Definition: DataHeader.cxx:308
AthenaOutputStreamTool::m_extend
bool m_extend
Flag to extend attribute list with stream flags from DecisionSvc.
Definition: AthenaOutputStreamTool.h:115
DataHeader::addHash
void addHash(IStringPool *pool)
Add new entry to hash map.
Definition: DataHeader.cxx:297
skel.it
it
Definition: skel.GENtoEVGEN.py:396
SG::TransientAddress
Definition: TransientAddress.h:32
AthenaOutputStreamTool::m_dataHeader
DataHeader * m_dataHeader
Current DataHeader for streamed objects.
Definition: AthenaOutputStreamTool.h:109
AthenaOutputStreamTool::m_skippedItems
std::set< std::string > m_skippedItems
set of skipped item keys, because of missing CLID
Definition: AthenaOutputStreamTool.h:118
AthenaOutputStreamTool::m_metaDataContainerPrefix
StringProperty m_metaDataContainerPrefix
Definition: AthenaOutputStreamTool.h:93
PyPoolBrowser.dh
dh
Definition: PyPoolBrowser.py:102
AthenaOutputStreamTool::m_processTag
StringProperty m_processTag
Definition: AthenaOutputStreamTool.h:88
SG::VarHandleKey::key
const std::string & key() const
Return the StoreGate ID for the referenced object.
Definition: AthToolSupport/AsgDataHandles/Root/VarHandleKey.cxx:141
AthenaOutputStreamTool::connectServices
virtual StatusCode connectServices()
Do the real connection to services.
Definition: AthenaOutputStreamTool.cxx:163
DataHeader::insertProvenance
void insertProvenance(const DataHeaderElement &dhe)
Insert a new element into the "Provenance" vector.
Definition: DataHeader.cxx:293
TokenAddress
This class provides a Generic Transient Address for POOL tokens.
Definition: TokenAddress.h:21
SG::makeHandle
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
Definition: ReadCondHandle.h:270
AthenaOutputStreamTool::m_containerNameHint
StringProperty m_containerNameHint
Definition: AthenaOutputStreamTool.h:91
DataHeaderElement
This class provides a persistent form for the TransientAddress.
Definition: DataHeader.h:36
SG::DataProxy::address
virtual IOpaqueAddress * address() const override final
Retrieve IOpaqueAddress.
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
DataHeader
This class provides the layout for summary information stored for data written to POOL.
Definition: DataHeader.h:124
DataHeader::Output
@ Output
Definition: DataHeader.h:126
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaOutputStreamTool::m_dataHeaderKey
StringProperty m_dataHeaderKey
Definition: AthenaOutputStreamTool.h:87
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:40
test_pyathena.parent
parent
Definition: test_pyathena.py:15
parseDir.wh
wh
Definition: parseDir.py:46
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaAttributeList
An AttributeList represents a logical row of attributes in a metadata table. The name and type of eac...
Definition: PersistentDataModel/PersistentDataModel/AthenaAttributeList.h:45
plotmaker.keyName
keyName
Definition: plotmaker.py:145
SG::VarHandleKey::initialize
StatusCode initialize(bool used=true)
If this object is used as a property, then this should be called during the initialize phase.
Definition: AthToolSupport/AsgDataHandles/Root/VarHandleKey.cxx:103
jobOptions.pTag
string pTag
Definition: jobOptions.py:28
CLID
uint32_t CLID
The Class ID type.
Definition: Event/xAOD/xAODCore/xAODCore/ClassID_traits.h:47
AthenaOutputStreamTool::m_containerPrefix
StringProperty m_containerPrefix
Definition: AthenaOutputStreamTool.h:90
DataHeader::insert
void insert(const SG::TransientAddress *sgAddress, IOpaqueAddress *tokAddress=0, const std::string &pTag="")
Insert a new element into the "DataObject" vector.
Definition: DataHeader.cxx:267
AthenaOutputStreamTool::m_branchNameHint
StringProperty m_branchNameHint
Definition: AthenaOutputStreamTool.h:94
AthenaOutputStreamTool::m_outputCollection
StringProperty m_outputCollection
Definition: AthenaOutputStreamTool.h:89
DataHeader::setStatus
void setStatus(statusFlag status)
Set StatusFlag enum for DataHeader.
Definition: DataHeader.cxx:235
AthenaOutputStreamTool::m_conversionSvc
ServiceHandle< IConversionSvc > m_conversionSvc
Keep reference to the data conversion service.
Definition: AthenaOutputStreamTool.h:103
WriteBchToCool.beg
beg
Definition: WriteBchToCool.py:69
AthenaOutputStreamTool::m_metaDataOutputCollection
StringProperty m_metaDataOutputCollection
Definition: AthenaOutputStreamTool.h:92
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
AthenaOutputStreamTool::m_metaDataOutputAttributes
std::string m_metaDataOutputAttributes
Definition: AthenaOutputStreamTool.h:96
lumiFormat.outputName
string outputName
Definition: lumiFormat.py:65
AthenaOutputStreamTool::m_outputAttributes
std::string m_outputAttributes
Definition: AthenaOutputStreamTool.h:95
python.LumiBlobConversion.pos
pos
Definition: LumiBlobConversion.py:18
AthenaOutputStreamTool::m_attrListKey
SG::ReadHandleKey< AthenaAttributeList > m_attrListKey
Definition: AthenaOutputStreamTool.h:97
SG::WriteHandle
Definition: StoreGate/StoreGate/WriteHandle.h:76
AthenaOutputStreamTool::m_extendProvenanceRecord
bool m_extendProvenanceRecord
Flag as to whether to extend provenance via the DataHeader.
Definition: AthenaOutputStreamTool.h:113
AthenaOutputStreamTool::m_store
ServiceHandle< StoreGateSvc > m_store
Definition: AthenaOutputStreamTool.h:101
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaOutputStreamTool::m_connectionOpen
bool m_connectionOpen
Flag to tell whether connectOutput has been called.
Definition: AthenaOutputStreamTool.h:111
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
DeMoScan.first
bool first
Definition: DeMoScan.py:536
ReadCalibFromCool.typeName
typeName
Definition: ReadCalibFromCool.py:477
AthCnvSvc::disconnectOutput
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.
Definition: AthCnvSvc.cxx:408
merge.status
status
Definition: merge.py:17
StoreID::EVENT_STORE
@ EVENT_STORE
Definition: StoreID.h:26
AthenaOutputStreamTool::m_clidSvc
ServiceHandle< IClassIDSvc > m_clidSvc
Ref to ClassIDSvc to convert type name to clid.
Definition: AthenaOutputStreamTool.h:105
AthenaOutputStreamTool::m_decSvc
ServiceHandle< IDecisionSvc > m_decSvc
Ref to DecisionSvc.
Definition: AthenaOutputStreamTool.h:107
SG::DataProxy
Definition: DataProxy.h:45
LArParamsProperties::keyword
std::string keyword(const std::string &classname)
Definition: LArParamsProperties.cxx:160
SG::ConstIterator
Definition: SGIterator.h:163
AthCnvSvc
Definition: AthCnvSvc.h:66
AthenaOutputStreamTool::m_attrListWrite
std::string m_attrListWrite
Definition: AthenaOutputStreamTool.h:99
ServiceHandle< StoreGateSvc >
mapkey::key
key
Definition: TElectronEfficiencyCorrectionTool.cxx:37
AthenaOutputStreamTool::m_outputName
StringProperty m_outputName
Definition: AthenaOutputStreamTool.h:86