ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaOutputStreamTool.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
9
11
12// Gaudi
13#include "GaudiKernel/IOpaqueAddress.h"
14#include "GaudiKernel/INamedInterface.h"
15#include "GaudiKernel/IClassIDSvc.h"
16#include "GaudiKernel/ThreadLocalContext.h"
17
18// Athena
22#include "SGTools/DataProxy.h"
23#include "SGTools/SGIFolder.h"
27
28
29namespace {
30
33bool hasInputAlias (const SG::DataProxy& dp)
34{
35 std::string inputName = dp.name() + "_Input";
36 return dp.hasAlias (inputName);
37}
38
39
40} // anonymous namespace
41
44 const std::string& name,
45 const IInterface* parent) : base_class(type, name, parent),
46 m_clidSvc("ClassIDSvc", name),
47 m_decSvc("DecisionSvc/DecisionSvc", name) {
48}
49//__________________________________________________________________________
51 ATH_MSG_INFO("Initializing " << name());
52
53 ATH_CHECK( m_clidSvc.retrieve() );
54 ATH_CHECK( m_conversionSvc.retrieve() );
55
56 // Autoconfigure
57 if (m_dataHeaderKey.empty()) {
58 m_dataHeaderKey.setValue(name());
59 // Remove "ToolSvc." from m_dataHeaderKey.
60 if (m_dataHeaderKey.value().starts_with( "ToolSvc.")) {
61 m_dataHeaderKey.setValue(m_dataHeaderKey.value().substr(8));
62 // Remove "Tool" from m_dataHeaderKey.
63 if (m_dataHeaderKey.value().find("Tool") == m_dataHeaderKey.size() - 4) {
64 m_dataHeaderKey.setValue(m_dataHeaderKey.value().substr(0, m_dataHeaderKey.size() - 4));
65 }
66 } else {
67 const INamedInterface* parentAlg = dynamic_cast<const INamedInterface*>(parent());
68 if (parentAlg != 0) {
69 m_dataHeaderKey.setValue(parentAlg->name());
70 }
71 }
72 }
73 if (m_processTag.empty()) {
75 }
76
77 { // handle the AttrKey overwrite
78 const std::string keyword = "[AttributeListKey=";
79 std::string::size_type pos = m_outputName.value().find(keyword);
80 if( (pos != std::string::npos) ) {
81 ATH_MSG_INFO("The AttrListKey will be overwritten/set by the value from the OutputName: " << m_outputName);
82 const std::string attrListKey = m_outputName.value().substr(pos + keyword.size(),
83 m_outputName.value().find(']', pos + keyword.size()) - pos - keyword.size());
84 m_attrListKey = attrListKey;
85 }
86 }
87 if ( ! m_attrListKey.key().empty() ) {
88 ATH_CHECK(m_attrListKey.initialize());
89 m_attrListWrite = m_attrListKey.key() + "Decisions";
90 //ATH_CHECK(m_attrListWrite.initialize());
91 }
92
93 if (!m_outputCollection.value().empty()) {
94 m_outputAttributes += "[OutputCollection=" + m_outputCollection.value() + "]";
95 }
96 if (!m_containerPrefix.value().empty()) {
97 m_outputAttributes += "[PoolContainerPrefix=" + m_containerPrefix.value() + "]";
98 }
99 if (m_containerNameHint.value() != "0") {
100 m_outputAttributes += "[TopLevelContainerName=" + m_containerNameHint.value() + "]";
101 }
102 if (m_branchNameHint.value() != "0") {
103 m_outputAttributes += "[SubLevelBranchName=" + m_branchNameHint.value() + "]";
104 }
105 if (!m_metaDataOutputCollection.value().empty()) {
106 m_metaDataOutputAttributes += "[OutputCollection=" + m_metaDataOutputCollection.value() + "]";
107 }
108 if (!m_metaDataContainerPrefix.value().empty()) {
109 m_metaDataOutputAttributes += "[PoolContainerPrefix=" + m_metaDataContainerPrefix.value() + "]";
110 }
111
112 if (m_extend) ATH_CHECK(m_decSvc.retrieve());
113 return(StatusCode::SUCCESS);
114}
115//__________________________________________________________________________
116StatusCode AthenaOutputStreamTool::connectServices(const std::string& dataStore,
117 const std::string& cnvSvc,
118 bool extendProvenenceRecord) {
119 // Release old data store
120 if (m_store.isValid()) {
121 if (m_store.release().isFailure()) {
122 ATH_MSG_ERROR("Could not release " << m_store.typeAndName() << " store");
123 }
124 }
125 m_store = ServiceHandle<StoreGateSvc>(dataStore, this->name());
126 if (cnvSvc != m_conversionSvc.type() && cnvSvc != "EventPersistencySvc") {
127 if (m_conversionSvc.release().isFailure()) {
128 ATH_MSG_ERROR("Could not release " << m_conversionSvc.type());
129 }
130 m_conversionSvc = ServiceHandle<IConversionSvc>(cnvSvc, this->name());
131 if (m_conversionSvc.retrieve().isFailure() || m_conversionSvc == 0) {
132 ATH_MSG_ERROR("Could not locate " << m_conversionSvc.type());
133 return(StatusCode::FAILURE);
134 }
135 }
136 m_extendProvenanceRecord = extendProvenenceRecord;
137 auto pprop = dynamic_cast<const IProperty*>(parent());
138 if (not pprop){
139 ATH_MSG_ERROR("'parent' could not be cast to IProperty");
140 return(StatusCode::FAILURE);
141 }
142 auto keep = dynamic_cast<const StringProperty&>( pprop->getProperty("KeepProvenanceTagsRegEx") );
143 m_keepProvenancesStr = keep.value();
144 // create RegEx pattern from the property value, specify extended grammar
145 m_keepProvenancesRE = std::regex(m_keepProvenancesStr, std::regex::extended);
146
147 return(connectServices());
148}
149//__________________________________________________________________________
151 // Find the data store
152 if (m_store.retrieve().isFailure() || m_store == 0) {
153 ATH_MSG_ERROR("Could not locate " << m_store.typeAndName() << " store");
154 return(StatusCode::FAILURE);
155 }
156 return(StatusCode::SUCCESS);
157}
158//__________________________________________________________________________
159StatusCode AthenaOutputStreamTool::connectOutput(const std::string& outputName) {
160 ATH_MSG_DEBUG("In connectOutput " << outputName);
161
162 // Use arg if not empty, save the output name
163 if (!outputName.empty()) {
164 m_outputName.setValue(outputName);
165 }
166 if (m_outputName.value().empty()) {
167 ATH_MSG_ERROR("No OutputName provided");
168 return(StatusCode::FAILURE);
169 }
170 // Connect services if not already available
171 if (m_store == 0 || m_conversionSvc == 0) {
173 }
174 // Connect the output file to the service
175 if (m_conversionSvc->connectOutput(m_outputName.value()).isFailure()) {
176 ATH_MSG_ERROR("Unable to connect output " << m_outputName.value());
177 return(StatusCode::FAILURE);
178 } else {
179 ATH_MSG_DEBUG("Connected to " << m_outputName.value());
180 }
181
182 // Remove DataHeader with same key if it exists
183 if (m_store->contains<DataHeader>(m_dataHeaderKey)) {
184 const DataHeader* preDh = nullptr;
185 if (m_store->retrieve(preDh, m_dataHeaderKey).isSuccess()) {
186 if (m_store->removeDataAndProxy(preDh).isFailure()) {
187 ATH_MSG_ERROR("Unable to get proxy for the DataHeader with key " << m_dataHeaderKey);
188 return(StatusCode::FAILURE);
189 }
190 ATH_MSG_DEBUG("Released DataHeader with key " << m_dataHeaderKey);
191 }
192 }
193
194 // Create new DataHeader
195 m_dataHeader = new DataHeader();
196 m_dataHeader->setProcessTag(m_processTag);
197
198 // Retrieve all existing DataHeaders from StoreGate
199 const DataHeader* dh = nullptr;
200 std::vector<std::string> dhKeys;
201 m_store->keys<DataHeader>(dhKeys);
202 for (const std::string& dhKey : dhKeys) {
203 bool primaryDH = false;
204 if (!m_store->transientContains<DataHeader>(dhKey)) {
205 if (dhKey == "EventSelector") primaryDH = true;
206 ATH_MSG_DEBUG("No transientContains DataHeader with key " << dhKey);
207 }
208 if (m_store->retrieve(dh, dhKey).isFailure()) {
209 ATH_MSG_DEBUG("Unable to retrieve the DataHeader with key " << dhKey);
210 }
211 if (dh->isInput() || hasInputAlias(*m_store->proxy(dh)) || primaryDH) {
212 propagateProvenance( *dh );
213 }
214 }
215
216 // Attach the attribute list to the DataHeader if requested
217 if (!m_attrListKey.key().empty() && m_store->storeID() == StoreID::EVENT_STORE) {
218 auto attrListHandle = SG::makeHandle(m_attrListKey);
219 if (!attrListHandle.isValid()) {
220 ATH_MSG_WARNING("Unable to retrieve AttributeList with key " << m_attrListKey);
221 } else {
222 m_dataHeader->setAttributeList(attrListHandle.cptr());
223 if (m_extend) { // Add streaming decisions
224 ATH_MSG_DEBUG("Adding stream decisions to " << m_attrListWrite);
225 // Look for attribute list created for mini-EventInfo
226 const AthenaAttributeList* attlist(attrListHandle.cptr());
227
228 // Build new attribute list for modification
229 AthenaAttributeList* newone = new AthenaAttributeList(attlist->specification());
230 newone->copyData(*attlist);
231
232 // Now loop over stream definitions and add decisions
233 auto streams = m_decSvc->getStreams();
234 for (auto it = streams.begin();
235 it != streams.end(); ++it) {
236 newone->extend(*it,"bool");
237 (*newone)[*it].data<bool>() = m_decSvc->isEventAccepted(*it,Gaudi::Hive::currentContext());
238 ATH_MSG_DEBUG("Added stream decision for " << *it << " to " << m_attrListKey);
239 }
240 // record new attribute list with old key + suffix
241 const AthenaAttributeList* attrList2 = nullptr;
243 if (m_store->record(newone,m_attrListWrite).isFailure()) {
244 ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
245 }
246 } else {
247 ATH_MSG_DEBUG("Decisions already added by a different stream");
248 }
249 if (m_store->retrieve(attrList2,m_attrListWrite).isFailure()) {
250 ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
251 } else {
252 m_dataHeader->setAttributeList(attrList2);
253 }
254/*
255 SG::WriteHandle<AthenaAttributeList> attrWrite(m_attrListWrite);
256 std::unique_ptr<AthenaAttributeList> uptr = std::make_unique<AthenaAttributeList>(*newone);
257 if ( attrWrite.record(std::move(uptr)).isFailure() ) {
258 ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
259 } else {
260 ATH_MSG_DEBUG("Decisions already added by a different stream");
261 }
262*/
263 //m_dataHeader->setAttributeList(newone);
264 } // list extend check
265 } // list retrieve check
266 } // list property check
267
268 // Record DataHeader in StoreGate
270 if (wh.record(std::unique_ptr<DataHeader>(m_dataHeader)).isFailure()) {
271 ATH_MSG_ERROR("Unable to record DataHeader with key " << m_dataHeaderKey);
272 return(StatusCode::FAILURE);
273 } else {
274 ATH_MSG_DEBUG("Recorded DataHeader with key " << m_dataHeaderKey);
275 }
277 // Set flag that connection is open
278 m_connectionOpen = true;
279 return(StatusCode::SUCCESS);
280}
281
282//__________________________________________________________________________
284{
285 // keep track of provenance entries inserted into the new DataHeader
286 std::set<std::string> insertedTags{};
287 // Add DataHeader token to the new DataHeader
289 std::string pTag;
290 std::unique_ptr<SG::TransientAddress> dhTransAddr;
291 for (const DataHeaderElement& dhe : src_dh) {
292 if (dhe.getPrimaryClassID() == ClassID_traits<DataHeader>::ID()) {
293 pTag = dhe.getKey();
294 dhTransAddr.reset( dhe.getAddress( m_conversionSvc->repSvcType() ) );
295 }
296 }
297 // Update dhTransAddr to handle fast merged files.
298 if( auto dhProxy=m_store->proxy(&src_dh); dhProxy && dhProxy->address() ) {
299 DataHeaderElement dhe(dhProxy, dhProxy->address(), pTag);
300 m_dataHeader->insertProvenance(dhe);
301 insertedTags.insert(std::move(pTag));
302 }
303 else if( dhTransAddr ) {
304 DataHeaderElement dhe(dhTransAddr.get(), dhTransAddr->address(), pTag);
305 m_dataHeader->insertProvenance(dhe);
306 insertedTags.insert(std::move(pTag));
307 }
308 }
309
310 // empty regexpr means do not keep any provenance
311 if( !m_keepProvenancesStr.empty() ) {
312 // Each stream tag is written only once in the provenance record
313 // In files where there are multiple entries per stream tag
314 // the record is in reverse, i.e., the latest appears first.
315 // Therefore, only keep the first entry if there are multiple
316 // matches so that we retain the latest one.
317 for(auto iter=src_dh.beginProvenance(), iEnd=src_dh.endProvenance(); iter != iEnd; ++iter) {
318 const auto & currentKey = (*iter).getKey();
319 if( insertedTags.insert(currentKey).second ) {
320 // first prov with that tag. Now check if we want to keep that tag
321 bool keep = false;
322 auto it = m_keepProvenanceMatch.find( currentKey );
323 if( it != m_keepProvenanceMatch.end() ) {
324 keep = it->second;
325 } else {
326 keep = std::regex_search(currentKey, m_keepProvenancesRE);
327 m_keepProvenanceMatch[currentKey] = keep;
328 }
329 if( keep ) {
330 m_dataHeader->insertProvenance(*iter);
331 }
332 }
333 }
334 }
335}
336
337//__________________________________________________________________________
338StatusCode AthenaOutputStreamTool::commitOutput(bool doCommit) {
339 ATH_MSG_DEBUG("In commitOutput");
340 // Connect the output file to the service
341 if (m_conversionSvc->commitOutput(m_outputName.value(), doCommit).isFailure()) {
342 ATH_MSG_ERROR("Unable to commit output " << m_outputName.value());
343 return(StatusCode::FAILURE);
344 }
345 // Set flag that connection is closed
346 m_connectionOpen = false;
347 return(StatusCode::SUCCESS);
348}
349//__________________________________________________________________________
351 AthCnvSvc* athConversionSvc = dynamic_cast<AthCnvSvc*>(m_conversionSvc.get());
352 if (athConversionSvc != 0) {
353 if (athConversionSvc->disconnectOutput(m_outputName.value()).isFailure()) {
354 ATH_MSG_ERROR("Unable to finalize output " << m_outputName.value());
355 return(StatusCode::FAILURE);
356 }
357 }
358 return(StatusCode::SUCCESS);
359}
360//__________________________________________________________________________
361StatusCode AthenaOutputStreamTool::streamObjects(const TypeKeyPairs& typeKeys, const std::string& outputName) {
362 ATH_MSG_DEBUG("In streamObjects");
363 // Check that a connection has been opened
364 if (!m_connectionOpen) {
365 ATH_MSG_ERROR("Connection NOT open. Please open a connection before streaming out objects.");
366 return(StatusCode::FAILURE);
367 }
368 // Use arg if not empty, save the output name
369 if (!outputName.empty()) {
370 m_outputName.setValue(outputName);
371 }
372 if (m_outputName.value().empty()) {
373 ATH_MSG_ERROR("No OutputName provided");
374 return(StatusCode::FAILURE);
375 }
376 // Now iterate over the type/key pairs and stream out each object
377 std::vector<DataObject*> dataObjects;
378 for (TypeKeyPairs::const_iterator first = typeKeys.begin(), last = typeKeys.end();
379 first != last; ++first) {
380 const std::string& type = (*first).first;
381 const std::string& key = (*first).second;
382 // Find the clid for type name from the CLIDSvc
383 CLID clid;
384 if (m_clidSvc->getIDOfTypeName(type, clid).isFailure()) {
385 ATH_MSG_ERROR("Could not get clid for typeName " << type);
386 return(StatusCode::FAILURE);
387 }
388 DataObject* dObj = 0;
389 // Two options: no key or explicit key
390 if (key.empty()) {
391 ATH_MSG_DEBUG("Get data object with no key");
392 // Get DataObject without key
393 dObj = m_store->accessData(clid);
394 } else {
395 ATH_MSG_DEBUG("Get data object with key");
396 // Get DataObjects with key
397 dObj = m_store->accessData(clid, key);
398 }
399 if (dObj == 0) {
400 // No object - print warning and return
401 ATH_MSG_DEBUG("No object found for type " << type << " key " << key);
402 return(StatusCode::SUCCESS);
403 } else {
404 ATH_MSG_DEBUG("Found object for type " << type << " key " << key);
405 }
406 // Save the dObj
407 dataObjects.push_back(dObj);
408 }
409 // Stream out objects
410 if (dataObjects.size() == 0) {
411 ATH_MSG_DEBUG("No data objects found");
412 return(StatusCode::SUCCESS);
413 }
414 StatusCode status = streamObjects(dataObjects, m_outputName.value());
415 if (!status.isSuccess()) {
416 ATH_MSG_ERROR("Could not stream out objects");
417 return(status);
418 }
419 return(StatusCode::SUCCESS);
420}
421//__________________________________________________________________________
422StatusCode AthenaOutputStreamTool::streamObjects(const DataObjectVec& dataObjects, const std::string& outputName) {
423 // Check that a connection has been opened
424 if (!m_connectionOpen) {
425 ATH_MSG_ERROR("Connection NOT open. Please open a connection before streaming out objects.");
426 return(StatusCode::FAILURE);
427 }
428 // Connect the output file to the service
429 std::string outputConnectionString = outputName;
430 const std::string defaultMetaDataString = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData]";
431 if (std::string::size_type mpos = outputConnectionString.find(defaultMetaDataString); mpos!=std::string::npos) {
432 // If we're in here we're writing MetaData
433 // Now let's see if we should be overwriting the MetaData attributes
434 // For the time-being this happens when we're writing MetaData in the augmentation mode
435 if (!m_metaDataOutputAttributes.empty()) {
436 // Note: This won't work quite right if only one attribute is set though!
437 outputConnectionString.replace(mpos, defaultMetaDataString.length(), m_metaDataOutputAttributes);
438 }
439 }
440 for (std::string::size_type pos = m_outputAttributes.find('['); pos != std::string::npos; pos = m_outputAttributes.find('[', ++pos)) {
441 if (outputConnectionString.find(m_outputAttributes.substr(pos, m_outputAttributes.find('=', pos) + 1 - pos)) == std::string::npos) {
442 outputConnectionString += m_outputAttributes.substr(pos, m_outputAttributes.find(']', pos) + 1 - pos);
443 }
444 }
445
446 // Check that the DataHeader is still valid
447 DataObject* dataHeaderObj = m_store->accessData(ClassID_traits<DataHeader>::ID(), m_dataHeaderKey);
448 std::map<DataObject*, IOpaqueAddress*> written;
449 for (DataObject* dobj : dataObjects) {
450 // Do not write the DataHeader via the explicit list
451 if (dobj->clID() == ClassID_traits<DataHeader>::ID()) {
452 ATH_MSG_DEBUG("Explicit request to write DataHeader: " << dobj->name() << " - skipping it.");
453 // Do not stream out same object twice
454 } else if (written.find(dobj) != written.end()) {
455 // Print warning and skip
456 ATH_MSG_DEBUG("Trying to write DataObject twice (clid/key): " << dobj->clID() << " " << dobj->name());
457 ATH_MSG_DEBUG(" Skipping this one.");
458 } else {
459 // Write object
460 IOpaqueAddress* addr = new TokenAddress(0, dobj->clID(), outputConnectionString);
461 addr->addRef();
462 if (m_conversionSvc->createRep(dobj, addr).isSuccess()) {
463 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dobj, addr));
464 } else {
465 ATH_MSG_ERROR("Could not create Rep for DataObject (clid/key):" << dobj->clID() << " " << dobj->name());
466 return(StatusCode::FAILURE);
467 }
468 }
469 }
470 // End of loop over DataObjects, write DataHeader
471 if ((m_conversionSvc.type() == "AthenaPoolCnvSvc" || m_conversionSvc.type() == "AthenaPoolSharedIOCnvSvc") && dataHeaderObj != nullptr) {
472 IOpaqueAddress* addr = new TokenAddress(0, dataHeaderObj->clID(), outputConnectionString);
473 addr->addRef();
474 if (m_conversionSvc->createRep(dataHeaderObj, addr).isSuccess()) {
475 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dataHeaderObj, addr));
476 } else {
477 ATH_MSG_ERROR("Could not create Rep for DataHeader");
478 return(StatusCode::FAILURE);
479 }
480 }
481 for (DataObject* dobj : dataObjects) {
482 // call fillRepRefs of persistency service
483 SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(dobj->registry());
484 if (proxy != nullptr && written.find(dobj) != written.end()) {
485 IOpaqueAddress* addr(written.find(dobj)->second);
486 if ((m_conversionSvc->fillRepRefs(addr, dobj)).isSuccess()) {
487 if (dobj->clID() != 1 || addr->par()[0] != "\n") {
488 if (dobj->clID() != ClassID_traits<DataHeader>::ID()) {
489 m_dataHeader->insert(proxy, addr);
490 } else {
491 m_dataHeader->insert(proxy, addr, m_processTag);
492 }
493 if (proxy->address() == nullptr) {
494 proxy->setAddress(addr);
495 }
496 addr->release();
497 }
498 } else {
499 ATH_MSG_ERROR("Could not fill Object Refs for DataObject (clid/key):" << dobj->clID() << " " << dobj->name());
500 return(StatusCode::FAILURE);
501 }
502 } else {
503 ATH_MSG_WARNING("Could cast DataObject " << dobj->clID() << " " << dobj->name());
504 }
505 }
506 m_dataHeader->addHash(&*m_store);
507 if ((m_conversionSvc.type() == "AthenaPoolCnvSvc" || m_conversionSvc.type() == "AthenaPoolSharedIOCnvSvc") && dataHeaderObj != nullptr) {
508 // End of DataObjects, fill refs for DataHeader
509 SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(dataHeaderObj->registry());
510 if (proxy != nullptr && written.find(dataHeaderObj) != written.end()) {
511 IOpaqueAddress* addr(written.find(dataHeaderObj)->second);
512 if ((m_conversionSvc->fillRepRefs(addr, dataHeaderObj)).isSuccess()) {
513 if (dataHeaderObj->clID() != 1 || addr->par()[0] != "\n") {
514 if (dataHeaderObj->clID() != ClassID_traits<DataHeader>::ID()) {
515 m_dataHeader->insert(proxy, addr);
516 } else {
517 m_dataHeader->insert(proxy, addr, m_processTag);
518 }
519 addr->release();
520 }
521 } else {
522 ATH_MSG_ERROR("Could not fill Object Refs for DataHeader");
523 return(StatusCode::FAILURE);
524 }
525 } else {
526 ATH_MSG_ERROR("Could not cast DataHeader");
527 return(StatusCode::FAILURE);
528 }
529 }
530 return(StatusCode::SUCCESS);
531}
532//__________________________________________________________________________
534 const std::string hltKey = "HLTAutoKey";
537 if (m_store->retrieve(beg, ending).isFailure() || beg == ending) {
538 ATH_MSG_DEBUG("No DataHeaders present in StoreGate");
539 } else {
540 for ( ; beg != ending; ++beg) {
541 if (m_store->transientContains<DataHeader>(beg.key()) && beg->isInput()) {
542 for (std::vector<DataHeaderElement>::const_iterator it = beg->begin(), itLast = beg->end();
543 it != itLast; ++it) {
544 // Only insert the primary clid, not the ones for the symlinks!
545 CLID clid = it->getPrimaryClassID();
546 if (clid != ClassID_traits<DataHeader>::ID()) {
547 //check the typename is known ... we make an exception if the key contains 'Aux.' ... aux containers may not have their keys known yet in some cases
548 //see https://its.cern.ch/jira/browse/ATLASG-59 for the solution
549 std::string typeName;
550 if (m_clidSvc->getTypeNameOfID(clid, typeName).isFailure() && it->getKey().find("Aux.") == std::string::npos) {
551 if (m_skippedItems.find(it->getKey()) == m_skippedItems.end()) {
552 ATH_MSG_WARNING("Skipping " << it->getKey() << " with unknown clid " << clid << " . Further warnings for this item are suppressed" );
553 m_skippedItems.insert(it->getKey());
554 }
555 continue;
556 }
557 ATH_MSG_DEBUG("Adding " << typeName << "#" << it->getKey() << " (clid " << clid << ") to itemlist");
558 const std::string keyName = it->getKey();
559 if (keyName.size() > 10 && keyName.compare(0, 10,hltKey)==0) {
560 p2BWrittenFromTool->add(clid, hltKey + "*").ignore();
561 } else if (keyName.size() > 10 && keyName.compare(keyName.size() - 10, 10, hltKey)==0) {
562 p2BWrittenFromTool->add(clid, "*" + hltKey).ignore();
563 } else {
564 p2BWrittenFromTool->add(clid, keyName).ignore();
565 }
566 }
567 }
568 }
569 }
570 }
571 ATH_MSG_DEBUG("Adding DataHeader for stream " << name());
572 return(StatusCode::SUCCESS);
573}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This is the implementation of IAthenaOutputStreamTool.
This file contains the class definition for the DataHeader and DataHeaderElement classes.
uint32_t CLID
The Class ID type.
An AttributeList represents a logical row of attributes in a metadata table.
This file contains the class definition for the TokenAddress class.
Base class for all conversion services.
Definition AthCnvSvc.h:66
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.
An AttributeList represents a logical row of attributes in a metadata table.
StringProperty m_metaDataContainerPrefix
std::map< std::string, bool > m_keepProvenanceMatch
Cache provenance RegEx matching result in a map.
StatusCode finalizeOutput() override
Finalize the output stream after the last commit, e.g.
AthenaOutputStreamTool(const std::string &type, const std::string &name, const IInterface *parent)
Standard AlgTool Constructor.
std::set< std::string > m_skippedItems
set of skipped item keys, because of missing CLID
bool m_extendProvenanceRecord
Flag as to whether to extend provenance via the DataHeader.
DataHeader * m_dataHeader
Current DataHeader for streamed objects.
std::vector< DataObject * > DataObjectVec
Stream out a vector of objects Must convert to DataObject, e.g.
StatusCode connectOutput(const std::string &outputName="") override
Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wan...
ServiceHandle< IDecisionSvc > m_decSvc
Ref to DecisionSvc.
ServiceHandle< IClassIDSvc > m_clidSvc
Ref to ClassIDSvc to convert type name to clid.
std::vector< TypeKeyPair > TypeKeyPairs
virtual StatusCode streamObjects(const TypeKeyPairs &typeKeys, const std::string &outputName="") override
bool m_connectionOpen
Flag to tell whether connectOutput has been called.
ServiceHandle< IConversionSvc > m_conversionSvc
Keep reference to the data conversion service.
std::regex m_keepProvenancesRE
RegEx pattern created from m_keepProvenancesStr.
StatusCode connectServices()
Do the real connection to services.
void propagateProvenance(const DataHeader &src_dh)
copy provenance records when creating new DataHeaders
ServiceHandle< StoreGateSvc > m_store
virtual StatusCode initialize() override
AthAlgTool Interface method implementations:
virtual StatusCode getInputItemList(SG::IFolder *m_p2BWrittenFromTool) override
std::string m_keepProvenancesStr
RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStrea...
SG::ReadHandleKey< AthenaAttributeList > m_attrListKey
StatusCode commitOutput(bool doCommit=false) override
Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.
StringProperty m_metaDataOutputCollection
This class provides a persistent form for the TransientAddress.
Definition DataHeader.h:37
This class provides the layout for summary information stored for data written to POOL.
Definition DataHeader.h:123
std::vector< DataHeaderElement >::const_iterator beginProvenance() const
std::vector< DataHeaderElement >::const_iterator endProvenance() const
a const_iterator facade to DataHandle.
Definition SGIterator.h:164
a run-time configurable list of data objects
Definition SGIFolder.h:21
virtual StatusCode add(const std::string &typeName, const std::string &skey)=0
add a data object identifier to the list
@ EVENT_STORE
Definition StoreID.h:26
This class provides a Generic Transient Address for POOL tokens.
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())