ATLAS Offline Software
AthenaOutputStreamTool.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
10 #include "AthenaOutputStreamTool.h"
11 
12 // Gaudi
13 #include "GaudiKernel/IOpaqueAddress.h"
14 #include "GaudiKernel/INamedInterface.h"
15 #include "GaudiKernel/IClassIDSvc.h"
16 #include "GaudiKernel/ThreadLocalContext.h"
17 
18 // Athena
21 #include "StoreGate/StoreGateSvc.h"
22 #include "SGTools/DataProxy.h"
23 #include "SGTools/SGIFolder.h"
27 
28 
29 namespace {
30 
33 bool hasInputAlias (const SG::DataProxy& dp)
34 {
35  std::string inputName = dp.name() + "_Input";
36  return dp.hasAlias (inputName);
37 }
38 
39 
40 } // anonymous namespace
41 
44  const std::string& name,
45  const IInterface* parent) : base_class(type, name, parent),
46  m_clidSvc("ClassIDSvc", name),
47  m_decSvc("DecisionSvc/DecisionSvc", name) {
48 }
49 //__________________________________________________________________________
51  ATH_MSG_INFO("Initializing " << name());
52 
53  ATH_CHECK( m_clidSvc.retrieve() );
54  ATH_CHECK( m_conversionSvc.retrieve() );
55 
56  // Autoconfigure
57  if (m_dataHeaderKey.empty()) {
58  m_dataHeaderKey.setValue(name());
59  // Remove "ToolSvc." from m_dataHeaderKey.
60  if (m_dataHeaderKey.value().starts_with( "ToolSvc.")) {
61  m_dataHeaderKey.setValue(m_dataHeaderKey.value().substr(8));
62  // Remove "Tool" from m_dataHeaderKey.
63  if (m_dataHeaderKey.value().find("Tool") == m_dataHeaderKey.size() - 4) {
64  m_dataHeaderKey.setValue(m_dataHeaderKey.value().substr(0, m_dataHeaderKey.size() - 4));
65  }
66  } else {
67  const INamedInterface* parentAlg = dynamic_cast<const INamedInterface*>(parent());
68  if (parentAlg != 0) {
69  m_dataHeaderKey.setValue(parentAlg->name());
70  }
71  }
72  }
73  if (m_processTag.empty()) {
74  m_processTag.setValue(m_dataHeaderKey);
75  }
76 
77  { // handle the AttrKey overwrite
78  const std::string keyword = "[AttributeListKey=";
79  std::string::size_type pos = m_outputName.value().find(keyword);
80  if( (pos != std::string::npos) ) {
81  ATH_MSG_INFO("The AttrListKey will be overwritten/set by the value from the OutputName: " << m_outputName);
82  const std::string attrListKey = m_outputName.value().substr(pos + keyword.size(),
83  m_outputName.value().find(']', pos + keyword.size()) - pos - keyword.size());
84  m_attrListKey = attrListKey;
85  }
86  }
87  if ( ! m_attrListKey.key().empty() ) {
89  m_attrListWrite = m_attrListKey.key() + "Decisions";
90  //ATH_CHECK(m_attrListWrite.initialize());
91  }
92 
93  if (!m_outputCollection.value().empty()) {
94  m_outputAttributes += "[OutputCollection=" + m_outputCollection.value() + "]";
95  }
96  if (!m_containerPrefix.value().empty()) {
97  m_outputAttributes += "[PoolContainerPrefix=" + m_containerPrefix.value() + "]";
98  }
99  if (m_containerNameHint.value() != "0") {
100  m_outputAttributes += "[TopLevelContainerName=" + m_containerNameHint.value() + "]";
101  }
102  if (m_branchNameHint.value() != "0") {
103  m_outputAttributes += "[SubLevelBranchName=" + m_branchNameHint.value() + "]";
104  }
105  if (!m_metaDataOutputCollection.value().empty()) {
106  m_metaDataOutputAttributes += "[OutputCollection=" + m_metaDataOutputCollection.value() + "]";
107  }
108  if (!m_metaDataContainerPrefix.value().empty()) {
109  m_metaDataOutputAttributes += "[PoolContainerPrefix=" + m_metaDataContainerPrefix.value() + "]";
110  }
111 
112  if (m_extend) ATH_CHECK(m_decSvc.retrieve());
113  return(StatusCode::SUCCESS);
114 }
115 //__________________________________________________________________________
117  const std::string& cnvSvc,
118  bool extendProvenenceRecord) {
119  // Release old data store
120  if (m_store.isValid()) {
121  if (m_store.release().isFailure()) {
122  ATH_MSG_ERROR("Could not release " << m_store.typeAndName() << " store");
123  }
124  }
125  m_store = ServiceHandle<StoreGateSvc>(dataStore, this->name());
126  if (cnvSvc != m_conversionSvc.type() && cnvSvc != "EventPersistencySvc") {
127  if (m_conversionSvc.release().isFailure()) {
128  ATH_MSG_ERROR("Could not release " << m_conversionSvc.type());
129  }
131  if (m_conversionSvc.retrieve().isFailure() || m_conversionSvc == 0) {
132  ATH_MSG_ERROR("Could not locate " << m_conversionSvc.type());
133  return(StatusCode::FAILURE);
134  }
135  }
136  m_extendProvenanceRecord = extendProvenenceRecord;
137  auto pprop = dynamic_cast<const IProperty*>(parent());
138  if (not pprop){
139  ATH_MSG_ERROR("'parent' could not be cast to IProperty");
140  return(StatusCode::FAILURE);
141  }
142  auto keep = dynamic_cast<const StringProperty&>( pprop->getProperty("KeepProvenanceTagsRegEx") );
143  m_keepProvenancesStr = keep.value();
144  // create RegEx pattern from the property value, specify extended grammar
145  m_keepProvenancesRE = std::regex(m_keepProvenancesStr, std::regex::extended);
146 
147  return(connectServices());
148 }
149 //__________________________________________________________________________
151  // Find the data store
152  if (m_store.retrieve().isFailure() || m_store == 0) {
153  ATH_MSG_ERROR("Could not locate " << m_store.typeAndName() << " store");
154  return(StatusCode::FAILURE);
155  }
156  return(StatusCode::SUCCESS);
157 }
158 //__________________________________________________________________________
160  ATH_MSG_DEBUG("In connectOutput " << outputName);
161 
162  // Use arg if not empty, save the output name
163  if (!outputName.empty()) {
164  m_outputName.setValue(outputName);
165  }
166  if (m_outputName.value().empty()) {
167  ATH_MSG_ERROR("No OutputName provided");
168  return(StatusCode::FAILURE);
169  }
170  // Connect services if not already available
171  if (m_store == 0 || m_conversionSvc == 0) {
173  }
174  // Connect the output file to the service
175  if (m_conversionSvc->connectOutput(m_outputName.value()).isFailure()) {
176  ATH_MSG_ERROR("Unable to connect output " << m_outputName.value());
177  return(StatusCode::FAILURE);
178  } else {
179  ATH_MSG_DEBUG("Connected to " << m_outputName.value());
180  }
181 
182  // Remove DataHeader with same key if it exists
183  if (m_store->contains<DataHeader>(m_dataHeaderKey)) {
184  const DataHeader* preDh = nullptr;
185  if (m_store->retrieve(preDh, m_dataHeaderKey).isSuccess()) {
186  if (m_store->removeDataAndProxy(preDh).isFailure()) {
187  ATH_MSG_ERROR("Unable to get proxy for the DataHeader with key " << m_dataHeaderKey);
188  return(StatusCode::FAILURE);
189  }
190  ATH_MSG_DEBUG("Released DataHeader with key " << m_dataHeaderKey);
191  }
192  }
193 
194  // Create new DataHeader
195  m_dataHeader = new DataHeader();
197 
198  // Retrieve all existing DataHeaders from StoreGate
199  const DataHeader* dh = nullptr;
200  std::vector<std::string> dhKeys;
201  m_store->keys<DataHeader>(dhKeys);
202  for (const std::string& dhKey : dhKeys) {
203  bool primaryDH = false;
204  if (!m_store->transientContains<DataHeader>(dhKey)) {
205  if (dhKey == "EventSelector") primaryDH = true;
206  ATH_MSG_DEBUG("No transientContains DataHeader with key " << dhKey);
207  }
208  if (m_store->retrieve(dh, dhKey).isFailure()) {
209  ATH_MSG_DEBUG("Unable to retrieve the DataHeader with key " << dhKey);
210  }
211  if (dh->isInput() || hasInputAlias(*m_store->proxy(dh)) || primaryDH) {
213  }
214  }
215 
216  // Attach the attribute list to the DataHeader if requested
217  if (!m_attrListKey.key().empty() && m_store->storeID() == StoreID::EVENT_STORE) {
218  auto attrListHandle = SG::makeHandle(m_attrListKey);
219  if (!attrListHandle.isValid()) {
220  ATH_MSG_WARNING("Unable to retrieve AttributeList with key " << m_attrListKey);
221  } else {
222  m_dataHeader->setAttributeList(attrListHandle.cptr());
223  if (m_extend) { // Add streaming decisions
224  ATH_MSG_DEBUG("Adding stream decisions to " << m_attrListWrite);
225  // Look for attribute list created for mini-EventInfo
226  const AthenaAttributeList* attlist(attrListHandle.cptr());
227 
228  // Build new attribute list for modification
229  AthenaAttributeList* newone = new AthenaAttributeList(attlist->specification());
230  newone->copyData(*attlist);
231 
232  // Now loop over stream definitions and add decisions
233  auto streams = m_decSvc->getStreams();
234  for (auto it = streams.begin();
235  it != streams.end(); ++it) {
236  newone->extend(*it,"bool");
237  (*newone)[*it].data<bool>() = m_decSvc->isEventAccepted(*it,Gaudi::Hive::currentContext());
238  ATH_MSG_DEBUG("Added stream decision for " << *it << " to " << m_attrListKey);
239  }
240  // record new attribute list with old key + suffix
241  const AthenaAttributeList* attrList2 = nullptr;
242  if (!m_store->contains<AthenaAttributeList>(m_attrListWrite)) {
243  if (m_store->record(newone,m_attrListWrite).isFailure()) {
244  ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
245  }
246  } else {
247  ATH_MSG_DEBUG("Decisions already added by a different stream");
248  }
249  if (m_store->retrieve(attrList2,m_attrListWrite).isFailure()) {
250  ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
251  } else {
252  m_dataHeader->setAttributeList(attrList2);
253  }
254 /*
255  SG::WriteHandle<AthenaAttributeList> attrWrite(m_attrListWrite);
256  std::unique_ptr<AthenaAttributeList> uptr = std::make_unique<AthenaAttributeList>(*newone);
257  if ( attrWrite.record(std::move(uptr)).isFailure() ) {
258  ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
259  } else {
260  ATH_MSG_DEBUG("Decisions already added by a different stream");
261  }
262 */
263  //m_dataHeader->setAttributeList(newone);
264  } // list extend check
265  } // list retrieve check
266  } // list property check
267 
268  // Record DataHeader in StoreGate
270  if (wh.record(std::unique_ptr<DataHeader>(m_dataHeader)).isFailure()) {
271  ATH_MSG_ERROR("Unable to record DataHeader with key " << m_dataHeaderKey);
272  return(StatusCode::FAILURE);
273  } else {
274  ATH_MSG_DEBUG("Recorded DataHeader with key " << m_dataHeaderKey);
275  }
277  // Set flag that connection is open
278  m_connectionOpen = true;
279  return(StatusCode::SUCCESS);
280 }
281 
282 //__________________________________________________________________________
284 {
285  // keep track of provenance entries inserted into the new DataHeader
286  std::set<std::string> insertedTags{};
287  // Add DataHeader token to the new DataHeader
289  std::string pTag;
290  std::unique_ptr<SG::TransientAddress> dhTransAddr;
291  for (const DataHeaderElement& dhe : src_dh) {
292  if (dhe.getPrimaryClassID() == ClassID_traits<DataHeader>::ID()) {
293  pTag = dhe.getKey();
294  dhTransAddr.reset( dhe.getAddress(0) );
295  }
296  }
297  // Update dhTransAddr to handle fast merged files.
298  if( auto dhProxy=m_store->proxy(&src_dh); dhProxy && dhProxy->address() ) {
299  DataHeaderElement dhe(dhProxy, dhProxy->address(), pTag);
301  insertedTags.insert(pTag);
302  }
303  else if( dhTransAddr ) {
304  DataHeaderElement dhe(dhTransAddr.get(), dhTransAddr->address(), pTag);
306  insertedTags.insert(pTag);
307  }
308  }
309 
310  // empty regexpr means do not keep any provenance
311  if( !m_keepProvenancesStr.empty() ) {
312  // Each stream tag is written only once in the provenance record
313  // In files where there are multiple entries per stream tag
314  // the record is in reverse, i.e., the latest appears first.
315  // Therefore, only keep the first entry if there are multiple
316  // matches so that we retain the latest one.
317  for(auto iter=src_dh.beginProvenance(), iEnd=src_dh.endProvenance(); iter != iEnd; ++iter) {
318  const auto & currentKey = (*iter).getKey();
319  if( insertedTags.insert(currentKey).second ) {
320  // first prov with that tag. Now check if we want to keep that tag
321  bool keep = false;
322  auto it = m_keepProvenanceMatch.find( currentKey );
323  if( it != m_keepProvenanceMatch.end() ) {
324  keep = it->second;
325  } else {
326  keep = std::regex_search(currentKey, m_keepProvenancesRE);
327  m_keepProvenanceMatch[currentKey] = keep;
328  }
329  if( keep ) {
331  }
332  }
333  }
334  }
335 }
336 
337 //__________________________________________________________________________
339  ATH_MSG_DEBUG("In commitOutput");
340  // Connect the output file to the service
341  if (m_conversionSvc->commitOutput(m_outputName.value(), doCommit).isFailure()) {
342  ATH_MSG_ERROR("Unable to commit output " << m_outputName.value());
343  return(StatusCode::FAILURE);
344  }
345  // Set flag that connection is closed
346  m_connectionOpen = false;
347  return(StatusCode::SUCCESS);
348 }
349 //__________________________________________________________________________
351  AthCnvSvc* athConversionSvc = dynamic_cast<AthCnvSvc*>(m_conversionSvc.get());
352  if (athConversionSvc != 0) {
353  if (athConversionSvc->disconnectOutput(m_outputName.value()).isFailure()) {
354  ATH_MSG_ERROR("Unable to finalize output " << m_outputName.value());
355  return(StatusCode::FAILURE);
356  }
357  }
358  return(StatusCode::SUCCESS);
359 }
360 //__________________________________________________________________________
362  ATH_MSG_DEBUG("In streamObjects");
363  // Check that a connection has been opened
364  if (!m_connectionOpen) {
365  ATH_MSG_ERROR("Connection NOT open. Please open a connection before streaming out objects.");
366  return(StatusCode::FAILURE);
367  }
368  // Use arg if not empty, save the output name
369  if (!outputName.empty()) {
370  m_outputName.setValue(outputName);
371  }
372  if (m_outputName.value().empty()) {
373  ATH_MSG_ERROR("No OutputName provided");
374  return(StatusCode::FAILURE);
375  }
376  // Now iterate over the type/key pairs and stream out each object
377  std::vector<DataObject*> dataObjects;
378  for (TypeKeyPairs::const_iterator first = typeKeys.begin(), last = typeKeys.end();
379  first != last; ++first) {
380  const std::string& type = (*first).first;
381  const std::string& key = (*first).second;
382  // Find the clid for type name from the CLIDSvc
383  CLID clid;
384  if (m_clidSvc->getIDOfTypeName(type, clid).isFailure()) {
385  ATH_MSG_ERROR("Could not get clid for typeName " << type);
386  return(StatusCode::FAILURE);
387  }
388  DataObject* dObj = 0;
389  // Two options: no key or explicit key
390  if (key.empty()) {
391  ATH_MSG_DEBUG("Get data object with no key");
392  // Get DataObject without key
393  dObj = m_store->accessData(clid);
394  } else {
395  ATH_MSG_DEBUG("Get data object with key");
396  // Get DataObjects with key
397  dObj = m_store->accessData(clid, key);
398  }
399  if (dObj == 0) {
400  // No object - print warning and return
401  ATH_MSG_DEBUG("No object found for type " << type << " key " << key);
402  return(StatusCode::SUCCESS);
403  } else {
404  ATH_MSG_DEBUG("Found object for type " << type << " key " << key);
405  }
406  // Save the dObj
407  dataObjects.push_back(dObj);
408  }
409  // Stream out objects
410  if (dataObjects.size() == 0) {
411  ATH_MSG_DEBUG("No data objects found");
412  return(StatusCode::SUCCESS);
413  }
414  StatusCode status = streamObjects(dataObjects, m_outputName.value());
415  if (!status.isSuccess()) {
416  ATH_MSG_ERROR("Could not stream out objects");
417  return(status);
418  }
419  return(StatusCode::SUCCESS);
420 }
421 //__________________________________________________________________________
423  // Check that a connection has been opened
424  if (!m_connectionOpen) {
425  ATH_MSG_ERROR("Connection NOT open. Please open a connection before streaming out objects.");
426  return(StatusCode::FAILURE);
427  }
428  // Connect the output file to the service
429  std::string outputConnectionString = outputName;
430  const std::string defaultMetaDataString = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData]";
431  if (std::string::size_type mpos = outputConnectionString.find(defaultMetaDataString); mpos!=std::string::npos) {
432  // If we're in here we're writing MetaData
433  // Now let's see if we should be overwriting the MetaData attributes
434  // For the time-being this happens when we're writing MetaData in the augmentation mode
435  if (!m_metaDataOutputAttributes.empty()) {
436  // Note: This won't work quite right if only one attribute is set though!
437  outputConnectionString.replace(mpos, defaultMetaDataString.length(), m_metaDataOutputAttributes);
438  }
439  }
440  for (std::string::size_type pos = m_outputAttributes.find('['); pos != std::string::npos; pos = m_outputAttributes.find('[', ++pos)) {
441  if (outputConnectionString.find(m_outputAttributes.substr(pos, m_outputAttributes.find('=', pos) + 1 - pos)) == std::string::npos) {
442  outputConnectionString += m_outputAttributes.substr(pos, m_outputAttributes.find(']', pos) + 1 - pos);
443  }
444  }
445 
446  // Check that the DataHeader is still valid
447  DataObject* dataHeaderObj = m_store->accessData(ClassID_traits<DataHeader>::ID(), m_dataHeaderKey);
448  std::map<DataObject*, IOpaqueAddress*> written;
449  for (DataObject* dobj : dataObjects) {
450  // Do not write the DataHeader via the explicit list
451  if (dobj->clID() == ClassID_traits<DataHeader>::ID()) {
452  ATH_MSG_DEBUG("Explicit request to write DataHeader: " << dobj->name() << " - skipping it.");
453  // Do not stream out same object twice
454  } else if (written.find(dobj) != written.end()) {
455  // Print warning and skip
456  ATH_MSG_DEBUG("Trying to write DataObject twice (clid/key): " << dobj->clID() << " " << dobj->name());
457  ATH_MSG_DEBUG(" Skipping this one.");
458  } else {
459  // Write object
460  IOpaqueAddress* addr = new TokenAddress(0, dobj->clID(), outputConnectionString);
461  addr->addRef();
462  if (m_conversionSvc->createRep(dobj, addr).isSuccess()) {
463  written.insert(std::pair<DataObject*, IOpaqueAddress*>(dobj, addr));
464  } else {
465  ATH_MSG_ERROR("Could not create Rep for DataObject (clid/key):" << dobj->clID() << " " << dobj->name());
466  return(StatusCode::FAILURE);
467  }
468  }
469  }
470  // End of loop over DataObjects, write DataHeader
471  if ((m_conversionSvc.type() == "AthenaPoolCnvSvc" || m_conversionSvc.type() == "AthenaPoolSharedIOCnvSvc") && dataHeaderObj != nullptr) {
472  IOpaqueAddress* addr = new TokenAddress(0, dataHeaderObj->clID(), outputConnectionString);
473  addr->addRef();
474  if (m_conversionSvc->createRep(dataHeaderObj, addr).isSuccess()) {
475  written.insert(std::pair<DataObject*, IOpaqueAddress*>(dataHeaderObj, addr));
476  } else {
477  ATH_MSG_ERROR("Could not create Rep for DataHeader");
478  return(StatusCode::FAILURE);
479  }
480  }
481  for (DataObject* dobj : dataObjects) {
482  // call fillRepRefs of persistency service
483  SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(dobj->registry());
484  if (proxy != nullptr && written.find(dobj) != written.end()) {
485  IOpaqueAddress* addr(written.find(dobj)->second);
486  if ((m_conversionSvc->fillRepRefs(addr, dobj)).isSuccess()) {
487  if (dobj->clID() != 1 || addr->par()[0] != "\n") {
488  if (dobj->clID() != ClassID_traits<DataHeader>::ID()) {
489  m_dataHeader->insert(proxy, addr);
490  } else {
492  }
493  if (proxy->address() == nullptr) {
494  proxy->setAddress(addr);
495  }
496  addr->release();
497  }
498  } else {
499  ATH_MSG_ERROR("Could not fill Object Refs for DataObject (clid/key):" << dobj->clID() << " " << dobj->name());
500  return(StatusCode::FAILURE);
501  }
502  } else {
503  ATH_MSG_WARNING("Could cast DataObject " << dobj->clID() << " " << dobj->name());
504  }
505  }
507  if ((m_conversionSvc.type() == "AthenaPoolCnvSvc" || m_conversionSvc.type() == "AthenaPoolSharedIOCnvSvc") && dataHeaderObj != nullptr) {
508  // End of DataObjects, fill refs for DataHeader
509  SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(dataHeaderObj->registry());
510  if (proxy != nullptr && written.find(dataHeaderObj) != written.end()) {
511  IOpaqueAddress* addr(written.find(dataHeaderObj)->second);
512  if ((m_conversionSvc->fillRepRefs(addr, dataHeaderObj)).isSuccess()) {
513  if (dataHeaderObj->clID() != 1 || addr->par()[0] != "\n") {
514  if (dataHeaderObj->clID() != ClassID_traits<DataHeader>::ID()) {
515  m_dataHeader->insert(proxy, addr);
516  } else {
518  }
519  addr->release();
520  }
521  } else {
522  ATH_MSG_ERROR("Could not fill Object Refs for DataHeader");
523  return(StatusCode::FAILURE);
524  }
525  } else {
526  ATH_MSG_ERROR("Could not cast DataHeader");
527  return(StatusCode::FAILURE);
528  }
529  }
530  return(StatusCode::SUCCESS);
531 }
532 //__________________________________________________________________________
534  const std::string hltKey = "HLTAutoKey";
537  if (m_store->retrieve(beg, ending).isFailure() || beg == ending) {
538  ATH_MSG_DEBUG("No DataHeaders present in StoreGate");
539  } else {
540  for ( ; beg != ending; ++beg) {
541  if (m_store->transientContains<DataHeader>(beg.key()) && beg->isInput()) {
542  for (std::vector<DataHeaderElement>::const_iterator it = beg->begin(), itLast = beg->end();
543  it != itLast; ++it) {
544  // Only insert the primary clid, not the ones for the symlinks!
545  CLID clid = it->getPrimaryClassID();
546  if (clid != ClassID_traits<DataHeader>::ID()) {
547  //check the typename is known ... we make an exception if the key contains 'Aux.' ... aux containers may not have their keys known yet in some cases
548  //see https://its.cern.ch/jira/browse/ATLASG-59 for the solution
549  std::string typeName;
550  if (m_clidSvc->getTypeNameOfID(clid, typeName).isFailure() && it->getKey().find("Aux.") == std::string::npos) {
551  if (m_skippedItems.find(it->getKey()) == m_skippedItems.end()) {
552  ATH_MSG_WARNING("Skipping " << it->getKey() << " with unknown clid " << clid << " . Further warnings for this item are suppressed" );
553  m_skippedItems.insert(it->getKey());
554  }
555  continue;
556  }
557  ATH_MSG_DEBUG("Adding " << typeName << "#" << it->getKey() << " (clid " << clid << ") to itemlist");
558  const std::string keyName = it->getKey();
559  if (keyName.size() > 10 && keyName.compare(0, 10,hltKey)==0) {
560  p2BWrittenFromTool->add(clid, hltKey + "*").ignore();
561  } else if (keyName.size() > 10 && keyName.compare(keyName.size() - 10, 10, hltKey)==0) {
562  p2BWrittenFromTool->add(clid, "*" + hltKey).ignore();
563  } else {
564  p2BWrittenFromTool->add(clid, keyName).ignore();
565  }
566  }
567  }
568  }
569  }
570  }
571  ATH_MSG_DEBUG("Adding DataHeader for stream " << name());
572  return(StatusCode::SUCCESS);
573 }
AthenaOutputStreamTool::commitOutput
StatusCode commitOutput(bool doCommit=false) override
Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.
Definition: AthenaOutputStreamTool.cxx:338
AthenaOutputStreamTool::propagateProvenance
void propagateProvenance(const DataHeader &src_dh)
copy provenance records when creating new DataHeaders
Definition: AthenaOutputStreamTool.cxx:283
DataHeader::setProcessTag
void setProcessTag(const std::string &processTag)
Set ProcessTag for DataHeader.
Definition: DataHeader.cxx:242
TileDCSDataPlotter.dp
dp
Definition: TileDCSDataPlotter.py:842
createLinkingScheme.iter
iter
Definition: createLinkingScheme.py:62
SGIFolder.h
DataHeader::endProvenance
std::vector< DataHeaderElement >::const_iterator endProvenance() const
Definition: DataHeader.cxx:288
AthenaOutputStreamTool::finalizeOutput
StatusCode finalizeOutput() override
Finalize the output stream after the last commit, e.g.
Definition: AthenaOutputStreamTool.cxx:350
StateLessPT_NewConfig.proxy
proxy
Definition: StateLessPT_NewConfig.py:407
AthenaOutputStreamTool::getInputItemList
virtual StatusCode getInputItemList(SG::IFolder *m_p2BWrittenFromTool) override
Definition: AthenaOutputStreamTool.cxx:533
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
python.outputTest_v2.streams
streams
Definition: outputTest_v2.py:55
AthenaOutputStreamTool::m_extend
BooleanProperty m_extend
Definition: AthenaOutputStreamTool.h:96
DataHeader::setAttributeList
void setAttributeList(const coral::AttributeList *attrList)
Definition: DataHeader.cxx:317
DataHeader::addHash
void addHash(IStringPool *pool)
Add new entry to hash map.
Definition: DataHeader.cxx:296
skel.it
it
Definition: skel.GENtoEVGEN.py:407
AthCnvSvc.h
AthenaOutputStreamTool::m_dataHeader
DataHeader * m_dataHeader
Current DataHeader for streamed objects.
Definition: AthenaOutputStreamTool.h:112
AthenaOutputStreamTool::m_skippedItems
std::set< std::string > m_skippedItems
set of skipped item keys, because of missing CLID
Definition: AthenaOutputStreamTool.h:126
AthenaOutputStreamTool::m_metaDataContainerPrefix
StringProperty m_metaDataContainerPrefix
Definition: AthenaOutputStreamTool.h:94
AthenaOutputStreamTool::m_keepProvenancesStr
std::string m_keepProvenancesStr
RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStrea...
Definition: AthenaOutputStreamTool.h:119
PyPoolBrowser.dh
dh
Definition: PyPoolBrowser.py:102
AthenaOutputStreamTool::m_processTag
StringProperty m_processTag
Definition: AthenaOutputStreamTool.h:89
AthenaOutputStreamTool::m_keepProvenancesRE
std::regex m_keepProvenancesRE
RegEx pattern created from m_keepProvenancesStr.
Definition: AthenaOutputStreamTool.h:121
SG::VarHandleKey::key
const std::string & key() const
Return the StoreGate ID for the referenced object.
Definition: AthToolSupport/AsgDataHandles/Root/VarHandleKey.cxx:141
AthenaOutputStreamTool::connectServices
StatusCode connectServices()
Do the real connection to services.
Definition: AthenaOutputStreamTool.cxx:150
DataHeader::insertProvenance
void insertProvenance(const DataHeaderElement &dhe)
Insert a new element into the "Provenance" vector.
Definition: DataHeader.cxx:292
SG::IFolder
a run-time configurable list of data objects
Definition: SGIFolder.h:21
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
AthenaOutputStreamTool::connectOutput
StatusCode connectOutput(const std::string &outputName="") override
Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wan...
Definition: AthenaOutputStreamTool.cxx:159
PrepareReferenceFile.regex
regex
Definition: PrepareReferenceFile.py:43
TokenAddress
This class provides a Generic Transient Address for POOL tokens.
Definition: TokenAddress.h:23
SG::makeHandle
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
Definition: ReadCondHandle.h:274
AthenaOutputStreamTool::m_containerNameHint
StringProperty m_containerNameHint
Definition: AthenaOutputStreamTool.h:92
DataHeaderElement
This class provides a persistent form for the TransientAddress.
Definition: DataHeader.h:37
DataHeader::beginProvenance
std::vector< DataHeaderElement >::const_iterator beginProvenance() const
Definition: DataHeader.cxx:284
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
DataHeader
This class provides the layout for summary information stored for data written to POOL.
Definition: DataHeader.h:126
DataHeader::Output
@ Output
Definition: DataHeader.h:128
ReadCalibFromCool.keep
keep
Definition: ReadCalibFromCool.py:85
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaOutputStreamTool::m_dataHeaderKey
StringProperty m_dataHeaderKey
Definition: AthenaOutputStreamTool.h:88
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:37
test_pyathena.parent
parent
Definition: test_pyathena.py:15
parseDir.wh
wh
Definition: parseDir.py:45
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaAttributeList
An AttributeList represents a logical row of attributes in a metadata table. The name and type of eac...
Definition: PersistentDataModel/PersistentDataModel/AthenaAttributeList.h:45
plotmaker.keyName
keyName
Definition: plotmaker.py:145
SG::VarHandleKey::initialize
StatusCode initialize(bool used=true)
If this object is used as a property, then this should be called during the initialize phase.
Definition: AthToolSupport/AsgDataHandles/Root/VarHandleKey.cxx:103
jobOptions.pTag
string pTag
Definition: jobOptions.py:28
DataHeader.h
This file contains the class definition for the DataHeader and DataHeaderElement classes.
AthenaOutputStreamTool::AthenaOutputStreamTool
AthenaOutputStreamTool(const std::string &type, const std::string &name, const IInterface *parent)
Standard AlgTool Constructor.
Definition: AthenaOutputStreamTool.cxx:43
CLID
uint32_t CLID
The Class ID type.
Definition: Event/xAOD/xAODCore/xAODCore/ClassID_traits.h:47
AthenaOutputStreamTool::m_containerPrefix
StringProperty m_containerPrefix
Definition: AthenaOutputStreamTool.h:91
IDecisionSvc.h
AthenaOutputStreamTool::TypeKeyPairs
std::vector< TypeKeyPair > TypeKeyPairs
Definition: AthenaOutputStreamTool.h:67
DataHeader::insert
void insert(const SG::TransientAddress *sgAddress, IOpaqueAddress *tokAddress=0, const std::string &pTag="")
Insert a new element into the "DataObject" vector.
Definition: DataHeader.cxx:266
AthenaOutputStreamTool::m_branchNameHint
StringProperty m_branchNameHint
Definition: AthenaOutputStreamTool.h:95
AthenaOutputStreamTool::m_outputCollection
StringProperty m_outputCollection
Definition: AthenaOutputStreamTool.h:90
DataHeader::setStatus
void setStatus(statusFlag status)
Set StatusFlag enum for DataHeader.
Definition: DataHeader.cxx:234
AthenaOutputStreamTool::m_conversionSvc
ServiceHandle< IConversionSvc > m_conversionSvc
Keep reference to the data conversion service.
Definition: AthenaOutputStreamTool.h:106
AthenaOutputStreamTool::DataObjectVec
std::vector< DataObject * > DataObjectVec
Stream out a vector of objects Must convert to DataObject, e.g.
Definition: AthenaOutputStreamTool.h:75
WriteBchToCool.beg
beg
Definition: WriteBchToCool.py:69
AthenaOutputStreamTool::m_metaDataOutputCollection
StringProperty m_metaDataOutputCollection
Definition: AthenaOutputStreamTool.h:93
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AthenaOutputStreamTool::m_metaDataOutputAttributes
std::string m_metaDataOutputAttributes
Definition: AthenaOutputStreamTool.h:99
lumiFormat.outputName
string outputName
Definition: lumiFormat.py:65
AthenaOutputStreamTool::m_outputAttributes
std::string m_outputAttributes
Definition: AthenaOutputStreamTool.h:98
AthenaAttributeList.h
An AttributeList represents a logical row of attributes in a metadata table. The name and type of eac...
python.LumiBlobConversion.pos
pos
Definition: LumiBlobConversion.py:16
AthenaOutputStreamTool::m_attrListKey
SG::ReadHandleKey< AthenaAttributeList > m_attrListKey
Definition: AthenaOutputStreamTool.h:100
TokenAddress.h
This file contains the class definition for the TokenAddress class.
SG::WriteHandle
Definition: StoreGate/StoreGate/WriteHandle.h:73
AthenaOutputStreamTool::m_extendProvenanceRecord
bool m_extendProvenanceRecord
Flag as to whether to extend provenance via the DataHeader.
Definition: AthenaOutputStreamTool.h:117
AthenaOutputStreamTool::m_store
ServiceHandle< StoreGateSvc > m_store
Definition: AthenaOutputStreamTool.h:104
SG::IFolder::add
virtual StatusCode add(const std::string &typeName, const std::string &skey)=0
add a data object identifier to the list
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaOutputStreamTool::m_connectionOpen
bool m_connectionOpen
Flag to tell whether connectOutput has been called.
Definition: AthenaOutputStreamTool.h:114
DeMoScan.first
bool first
Definition: DeMoScan.py:534
AthenaOutputStreamTool.h
This is the implementation of IAthenaOutputStreamTool.
ReadCalibFromCool.typeName
typeName
Definition: ReadCalibFromCool.py:477
AthCnvSvc::disconnectOutput
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.
Definition: AthCnvSvc.cxx:408
merge.status
status
Definition: merge.py:16
StoreID::EVENT_STORE
@ EVENT_STORE
Definition: StoreID.h:26
AthenaOutputStreamTool::m_clidSvc
ServiceHandle< IClassIDSvc > m_clidSvc
Ref to ClassIDSvc to convert type name to clid.
Definition: AthenaOutputStreamTool.h:108
AthenaOutputStreamTool::m_decSvc
ServiceHandle< IDecisionSvc > m_decSvc
Ref to DecisionSvc.
Definition: AthenaOutputStreamTool.h:110
AthenaOutputStreamTool::m_keepProvenanceMatch
std::map< std::string, bool > m_keepProvenanceMatch
Cache provenance RegEx matching result in a map.
Definition: AthenaOutputStreamTool.h:123
SG::DataProxy
Definition: DataProxy.h:45
LArParamsProperties::keyword
std::string keyword(const std::string &classname)
Definition: LArParamsProperties.cxx:160
StoreGateSvc.h
SG::ConstIterator
Definition: SGIterator.h:164
AthCnvSvc
Definition: AthCnvSvc.h:66
AthenaOutputStreamTool::m_attrListWrite
std::string m_attrListWrite
Definition: AthenaOutputStreamTool.h:102
AthenaOutputStreamTool::streamObjects
virtual StatusCode streamObjects(const TypeKeyPairs &typeKeys, const std::string &outputName="") override
Definition: AthenaOutputStreamTool.cxx:361
ServiceHandle< StoreGateSvc >
mapkey::key
key
Definition: TElectronEfficiencyCorrectionTool.cxx:37
AthenaOutputStreamTool::initialize
virtual StatusCode initialize() override
AthAlgTool Interface method implementations:
Definition: AthenaOutputStreamTool.cxx:50
DataProxy.h
AthenaOutputStreamTool::m_outputName
StringProperty m_outputName
Definition: AthenaOutputStreamTool.h:87