ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaOutputStreamTool.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
9
11
12// Gaudi
13#include "GaudiKernel/IOpaqueAddress.h"
14#include "GaudiKernel/INamedInterface.h"
15#include "GaudiKernel/IClassIDSvc.h"
16#include "GaudiKernel/ThreadLocalContext.h"
17
18// Athena
22#include "SGTools/DataProxy.h"
23#include "SGTools/SGIFolder.h"
27
28
29namespace {
30
33bool hasInputAlias (const SG::DataProxy& dp)
34{
35 std::string inputName = dp.name() + "_Input";
36 return dp.hasAlias (inputName);
37}
38
39
40} // anonymous namespace
41
44 const std::string& name,
45 const IInterface* parent) : base_class(type, name, parent),
46 m_clidSvc("ClassIDSvc", name),
47 m_decSvc("DecisionSvc/DecisionSvc", name) {
48}
49//__________________________________________________________________________
51 ATH_MSG_INFO("Initializing " << name());
52
53 ATH_CHECK( m_clidSvc.retrieve() );
54 ATH_CHECK( m_conversionSvc.retrieve() );
55
56 // Autoconfigure
57 if (m_dataHeaderKey.empty()) {
58 m_dataHeaderKey.setValue(name());
59 // Remove "ToolSvc." from m_dataHeaderKey.
60 if (m_dataHeaderKey.value().starts_with( "ToolSvc.")) {
61 m_dataHeaderKey.setValue(m_dataHeaderKey.value().substr(8));
62 // Remove "Tool" from m_dataHeaderKey.
63 if (m_dataHeaderKey.value().find("Tool") == m_dataHeaderKey.size() - 4) {
64 m_dataHeaderKey.setValue(m_dataHeaderKey.value().substr(0, m_dataHeaderKey.size() - 4));
65 }
66 } else {
67 const INamedInterface* parentAlg = dynamic_cast<const INamedInterface*>(parent());
68 if (parentAlg != 0) {
69 m_dataHeaderKey.setValue(parentAlg->name());
70 }
71 }
72 }
73 if (m_processTag.empty()) {
75 }
76
77 { // handle the AttrKey overwrite
78 const std::string keyword = "[AttributeListKey=";
79 std::string::size_type pos = m_outputName.value().find(keyword);
80 if( (pos != std::string::npos) ) {
81 ATH_MSG_INFO("The AttrListKey will be overwritten/set by the value from the OutputName: " << m_outputName);
82 const std::string attrListKey = m_outputName.value().substr(pos + keyword.size(),
83 m_outputName.value().find(']', pos + keyword.size()) - pos - keyword.size());
84 m_attrListKey = attrListKey;
85 }
86 }
87 if ( ! m_attrListKey.key().empty() ) {
88 ATH_CHECK(m_attrListKey.initialize());
89 m_attrListWrite = m_attrListKey.key() + "Decisions";
90 //ATH_CHECK(m_attrListWrite.initialize());
91 }
92
93 if (!m_outputCollection.value().empty()) {
94 m_outputAttributes += "[OutputCollection=" + m_outputCollection.value() + "]";
95 }
96 if (!m_containerPrefix.value().empty()) {
97 m_outputAttributes += "[PoolContainerPrefix=" + m_containerPrefix.value() + "]";
98 }
99 if (m_containerNameHint.value() != "0") {
100 m_outputAttributes += "[TopLevelContainerName=" + m_containerNameHint.value() + "]";
101 }
102 if (m_branchNameHint.value() != "0") {
103 m_outputAttributes += "[SubLevelBranchName=" + m_branchNameHint.value() + "]";
104 }
105 if (!m_metaDataOutputCollection.value().empty()) {
106 m_metaDataOutputAttributes += "[OutputCollection=" + m_metaDataOutputCollection.value() + "]";
107 }
108 if (!m_metaDataContainerPrefix.value().empty()) {
109 m_metaDataOutputAttributes += "[PoolContainerPrefix=" + m_metaDataContainerPrefix.value() + "]";
110 }
111
112 if (m_extend) ATH_CHECK(m_decSvc.retrieve());
113 return(StatusCode::SUCCESS);
114}
115//__________________________________________________________________________
116StatusCode AthenaOutputStreamTool::connectServices(const std::string& dataStore,
117 const std::string& cnvSvc,
118 bool extendProvenenceRecord) {
119 // Release old data store
120 if (m_store.isValid()) {
121 if (m_store.release().isFailure()) {
122 ATH_MSG_ERROR("Could not release " << m_store.typeAndName() << " store");
123 }
124 }
125 m_store = ServiceHandle<StoreGateSvc>(dataStore, this->name());
126 if (cnvSvc != m_conversionSvc.type() && cnvSvc != "EventPersistencySvc") {
127 if (m_conversionSvc.release().isFailure()) {
128 ATH_MSG_ERROR("Could not release " << m_conversionSvc.type());
129 }
130 m_conversionSvc = ServiceHandle<IConversionSvc>(cnvSvc, this->name());
131 if (m_conversionSvc.retrieve().isFailure() || m_conversionSvc == 0) {
132 ATH_MSG_ERROR("Could not locate " << m_conversionSvc.type());
133 return(StatusCode::FAILURE);
134 }
135 }
136 m_extendProvenanceRecord = extendProvenenceRecord;
137 auto pprop = dynamic_cast<const IProperty*>(parent());
138 if (not pprop){
139 ATH_MSG_ERROR("'parent' could not be cast to IProperty");
140 return(StatusCode::FAILURE);
141 }
142 auto keep = dynamic_cast<const StringProperty&>( pprop->getProperty("KeepProvenanceTagsRegEx") );
143 m_keepProvenancesStr = keep.value();
144 // create RegEx pattern from the property value, specify extended grammar
145 m_keepProvenancesRE = std::regex(m_keepProvenancesStr, std::regex::extended);
146
147 return(connectServices());
148}
149//__________________________________________________________________________
151 // Find the data store
152 if (m_store.retrieve().isFailure() || m_store == 0) {
153 ATH_MSG_ERROR("Could not locate " << m_store.typeAndName() << " store");
154 return(StatusCode::FAILURE);
155 }
156 return(StatusCode::SUCCESS);
157}
158//__________________________________________________________________________
159StatusCode AthenaOutputStreamTool::connectOutput(const std::string& outputName) {
160 ATH_MSG_DEBUG("In connectOutput " << outputName);
161
162 // Use arg if not empty, save the output name
163 if (!outputName.empty()) {
164 m_outputName.setValue(outputName);
165 }
166 if (m_outputName.value().empty()) {
167 ATH_MSG_ERROR("No OutputName provided");
168 return(StatusCode::FAILURE);
169 }
170 // Connect services if not already available
171 if (m_store == 0 || m_conversionSvc == 0) {
173 }
174 // Connect the output file to the service
175 if (m_conversionSvc->connectOutput(m_outputName.value()).isFailure()) {
176 ATH_MSG_ERROR("Unable to connect output " << m_outputName.value());
177 return(StatusCode::FAILURE);
178 } else {
179 ATH_MSG_DEBUG("Connected to " << m_outputName.value());
180 }
181
182 // Remove DataHeader with same key if it exists
183 if (m_store->contains<DataHeader>(m_dataHeaderKey)) {
184 const DataHeader* preDh = nullptr;
185 if (m_store->retrieve(preDh, m_dataHeaderKey).isSuccess()) {
186 if (m_store->removeDataAndProxy(preDh).isFailure()) {
187 ATH_MSG_ERROR("Unable to get proxy for the DataHeader with key " << m_dataHeaderKey);
188 return(StatusCode::FAILURE);
189 }
190 ATH_MSG_DEBUG("Released DataHeader with key " << m_dataHeaderKey);
191 }
192 }
193
194 // Create new DataHeader
195 m_dataHeader = new DataHeader();
196 m_dataHeader->setProcessTag(m_processTag);
197
198 // Retrieve all existing DataHeaders from StoreGate
199 const DataHeader* dh = nullptr;
200 std::vector<std::string> dhKeys;
201 m_store->keys<DataHeader>(dhKeys);
202 //construct string out of loop
203 const std::string boolTypeStr{"bool"};
204 for (const std::string& dhKey : dhKeys) {
205 bool primaryDH = false;
206 if (!m_store->transientContains<DataHeader>(dhKey)) {
207 if (dhKey == "EventSelector") primaryDH = true;
208 ATH_MSG_DEBUG("No transientContains DataHeader with key " << dhKey);
209 }
210 if (m_store->retrieve(dh, dhKey).isFailure()) {
211 ATH_MSG_DEBUG("Unable to retrieve the DataHeader with key " << dhKey);
212 }
213 if (dh->isInput() || hasInputAlias(*m_store->proxy(dh)) || primaryDH) {
214 propagateProvenance( *dh );
215 }
216 }
217
218 // Attach the attribute list to the DataHeader if requested
219 if (!m_attrListKey.key().empty() && m_store->storeID() == StoreID::EVENT_STORE) {
220 auto attrListHandle = SG::makeHandle(m_attrListKey);
221 if (!attrListHandle.isValid()) {
222 ATH_MSG_WARNING("Unable to retrieve AttributeList with key " << m_attrListKey);
223 } else {
224 m_dataHeader->setAttributeList(attrListHandle.cptr());
225 if (m_extend) { // Add streaming decisions
226 ATH_MSG_DEBUG("Adding stream decisions to " << m_attrListWrite);
227 // Look for attribute list created for mini-EventInfo
228 const AthenaAttributeList* attlist(attrListHandle.cptr());
229
230 // Build new attribute list for modification
231 AthenaAttributeList* newone = new AthenaAttributeList(attlist->specification());
232 newone->copyData(*attlist);
233
234 // Now loop over stream definitions and add decisions
235 auto streams = m_decSvc->getStreams();
236 for (auto it = streams.begin();
237 it != streams.end(); ++it) {
238 newone->extend(*it,boolTypeStr);
239 (*newone)[*it].data<bool>() = m_decSvc->isEventAccepted(*it,Gaudi::Hive::currentContext());
240 ATH_MSG_DEBUG("Added stream decision for " << *it << " to " << m_attrListKey);
241 }
242 // record new attribute list with old key + suffix
243 const AthenaAttributeList* attrList2 = nullptr;
245 if (m_store->record(newone,m_attrListWrite).isFailure()) {
246 ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
247 }
248 } else {
249 ATH_MSG_DEBUG("Decisions already added by a different stream");
250 }
251 if (m_store->retrieve(attrList2,m_attrListWrite).isFailure()) {
252 ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
253 } else {
254 m_dataHeader->setAttributeList(attrList2);
255 }
256/*
257 SG::WriteHandle<AthenaAttributeList> attrWrite(m_attrListWrite);
258 std::unique_ptr<AthenaAttributeList> uptr = std::make_unique<AthenaAttributeList>(*newone);
259 if ( attrWrite.record(std::move(uptr)).isFailure() ) {
260 ATH_MSG_ERROR("Unable to record att list " << m_attrListWrite);
261 } else {
262 ATH_MSG_DEBUG("Decisions already added by a different stream");
263 }
264*/
265 //m_dataHeader->setAttributeList(newone);
266 } // list extend check
267 } // list retrieve check
268 } // list property check
269
270 // Record DataHeader in StoreGate
272 if (wh.record(std::unique_ptr<DataHeader>(m_dataHeader)).isFailure()) {
273 ATH_MSG_ERROR("Unable to record DataHeader with key " << m_dataHeaderKey);
274 return(StatusCode::FAILURE);
275 } else {
276 ATH_MSG_DEBUG("Recorded DataHeader with key " << m_dataHeaderKey);
277 }
279 // Set flag that connection is open
280 m_connectionOpen = true;
281 return(StatusCode::SUCCESS);
282}
283
284//__________________________________________________________________________
286{
287 // keep track of provenance entries inserted into the new DataHeader
288 std::set<std::string> insertedTags{};
289 // Add DataHeader token to the new DataHeader
291 std::string pTag;
292 std::unique_ptr<SG::TransientAddress> dhTransAddr;
293 for (const DataHeaderElement& dhe : src_dh) {
294 if (dhe.getPrimaryClassID() == ClassID_traits<DataHeader>::ID()) {
295 pTag = dhe.getKey();
296 dhTransAddr.reset( dhe.getAddress( m_conversionSvc->repSvcType() ) );
297 }
298 }
299 // Update dhTransAddr to handle fast merged files.
300 if( auto dhProxy=m_store->proxy(&src_dh); dhProxy && dhProxy->address() ) {
301 DataHeaderElement dhe(dhProxy, dhProxy->address(), pTag);
302 m_dataHeader->insertProvenance(dhe);
303 insertedTags.insert(std::move(pTag));
304 }
305 else if( dhTransAddr ) {
306 DataHeaderElement dhe(dhTransAddr.get(), dhTransAddr->address(), pTag);
307 m_dataHeader->insertProvenance(dhe);
308 insertedTags.insert(std::move(pTag));
309 }
310 }
311
312 // empty regexpr means do not keep any provenance
313 if( !m_keepProvenancesStr.empty() ) {
314 // Each stream tag is written only once in the provenance record
315 // In files where there are multiple entries per stream tag
316 // the record is in reverse, i.e., the latest appears first.
317 // Therefore, only keep the first entry if there are multiple
318 // matches so that we retain the latest one.
319 for(auto iter=src_dh.beginProvenance(), iEnd=src_dh.endProvenance(); iter != iEnd; ++iter) {
320 const auto & currentKey = (*iter).getKey();
321 if( insertedTags.insert(currentKey).second ) {
322 // first prov with that tag. Now check if we want to keep that tag
323 bool keep = false;
324 auto it = m_keepProvenanceMatch.find( currentKey );
325 if( it != m_keepProvenanceMatch.end() ) {
326 keep = it->second;
327 } else {
328 keep = std::regex_search(currentKey, m_keepProvenancesRE);
329 m_keepProvenanceMatch[currentKey] = keep;
330 }
331 if( keep ) {
332 m_dataHeader->insertProvenance(*iter);
333 }
334 }
335 }
336 }
337}
338
339//__________________________________________________________________________
340StatusCode AthenaOutputStreamTool::commitOutput(bool doCommit) {
341 ATH_MSG_DEBUG("In commitOutput");
342 // Connect the output file to the service
343 if (m_conversionSvc->commitOutput(m_outputName.value(), doCommit).isFailure()) {
344 ATH_MSG_ERROR("Unable to commit output " << m_outputName.value());
345 return(StatusCode::FAILURE);
346 }
347 // Set flag that connection is closed
348 m_connectionOpen = false;
349 return(StatusCode::SUCCESS);
350}
351//__________________________________________________________________________
353 AthCnvSvc* athConversionSvc = dynamic_cast<AthCnvSvc*>(m_conversionSvc.get());
354 if (athConversionSvc != 0) {
355 if (athConversionSvc->disconnectOutput(m_outputName.value()).isFailure()) {
356 ATH_MSG_ERROR("Unable to finalize output " << m_outputName.value());
357 return(StatusCode::FAILURE);
358 }
359 }
360 return(StatusCode::SUCCESS);
361}
362//__________________________________________________________________________
363StatusCode AthenaOutputStreamTool::streamObjects(const TypeKeyPairs& typeKeys, const std::string& outputName) {
364 ATH_MSG_DEBUG("In streamObjects");
365 // Check that a connection has been opened
366 if (!m_connectionOpen) {
367 ATH_MSG_ERROR("Connection NOT open. Please open a connection before streaming out objects.");
368 return(StatusCode::FAILURE);
369 }
370 // Use arg if not empty, save the output name
371 if (!outputName.empty()) {
372 m_outputName.setValue(outputName);
373 }
374 if (m_outputName.value().empty()) {
375 ATH_MSG_ERROR("No OutputName provided");
376 return(StatusCode::FAILURE);
377 }
378 // Now iterate over the type/key pairs and stream out each object
379 std::vector<DataObject*> dataObjects;
380 for (TypeKeyPairs::const_iterator first = typeKeys.begin(), last = typeKeys.end();
381 first != last; ++first) {
382 const std::string& type = (*first).first;
383 const std::string& key = (*first).second;
384 // Find the clid for type name from the CLIDSvc
385 CLID clid;
386 if (m_clidSvc->getIDOfTypeName(type, clid).isFailure()) {
387 ATH_MSG_ERROR("Could not get clid for typeName " << type);
388 return(StatusCode::FAILURE);
389 }
390 DataObject* dObj = 0;
391 // Two options: no key or explicit key
392 if (key.empty()) {
393 ATH_MSG_DEBUG("Get data object with no key");
394 // Get DataObject without key
395 dObj = m_store->accessData(clid);
396 } else {
397 ATH_MSG_DEBUG("Get data object with key");
398 // Get DataObjects with key
399 dObj = m_store->accessData(clid, key);
400 }
401 if (dObj == 0) {
402 // No object - print warning and return
403 ATH_MSG_DEBUG("No object found for type " << type << " key " << key);
404 return(StatusCode::SUCCESS);
405 } else {
406 ATH_MSG_DEBUG("Found object for type " << type << " key " << key);
407 }
408 // Save the dObj
409 dataObjects.push_back(dObj);
410 }
411 // Stream out objects
412 if (dataObjects.size() == 0) {
413 ATH_MSG_DEBUG("No data objects found");
414 return(StatusCode::SUCCESS);
415 }
416 StatusCode status = streamObjects(dataObjects, m_outputName.value());
417 if (!status.isSuccess()) {
418 ATH_MSG_ERROR("Could not stream out objects");
419 return(status);
420 }
421 return(StatusCode::SUCCESS);
422}
423//__________________________________________________________________________
424StatusCode AthenaOutputStreamTool::streamObjects(const DataObjectVec& dataObjects, const std::string& outputName) {
425 // Check that a connection has been opened
426 if (!m_connectionOpen) {
427 ATH_MSG_ERROR("Connection NOT open. Please open a connection before streaming out objects.");
428 return(StatusCode::FAILURE);
429 }
430 // Connect the output file to the service
431 std::string outputConnectionString = outputName;
432 const std::string defaultMetaDataString = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData]";
433 if (std::string::size_type mpos = outputConnectionString.find(defaultMetaDataString); mpos!=std::string::npos) {
434 // If we're in here we're writing MetaData
435 // Now let's see if we should be overwriting the MetaData attributes
436 // For the time-being this happens when we're writing MetaData in the augmentation mode
437 if (!m_metaDataOutputAttributes.empty()) {
438 // Note: This won't work quite right if only one attribute is set though!
439 outputConnectionString.replace(mpos, defaultMetaDataString.length(), m_metaDataOutputAttributes);
440 }
441 }
442 for (std::string::size_type pos = m_outputAttributes.find('['); pos != std::string::npos; pos = m_outputAttributes.find('[', ++pos)) {
443 if (outputConnectionString.find(m_outputAttributes.substr(pos, m_outputAttributes.find('=', pos) + 1 - pos)) == std::string::npos) {
444 outputConnectionString += m_outputAttributes.substr(pos, m_outputAttributes.find(']', pos) + 1 - pos);
445 }
446 }
447
448 // Check that the DataHeader is still valid
449 DataObject* dataHeaderObj = m_store->accessData(ClassID_traits<DataHeader>::ID(), m_dataHeaderKey);
450 std::map<DataObject*, IOpaqueAddress*> written;
451 for (DataObject* dobj : dataObjects) {
452 // Do not write the DataHeader via the explicit list
453 if (dobj->clID() == ClassID_traits<DataHeader>::ID()) {
454 ATH_MSG_DEBUG("Explicit request to write DataHeader: " << dobj->name() << " - skipping it.");
455 // Do not stream out same object twice
456 } else if (written.find(dobj) != written.end()) {
457 // Print warning and skip
458 ATH_MSG_DEBUG("Trying to write DataObject twice (clid/key): " << dobj->clID() << " " << dobj->name());
459 ATH_MSG_DEBUG(" Skipping this one.");
460 } else {
461 // Write object
462 IOpaqueAddress* addr = new TokenAddress(0, dobj->clID(), outputConnectionString);
463 addr->addRef();
464 if (m_conversionSvc->createRep(dobj, addr).isSuccess()) {
465 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dobj, addr));
466 } else {
467 ATH_MSG_ERROR("Could not create Rep for DataObject (clid/key):" << dobj->clID() << " " << dobj->name());
468 return(StatusCode::FAILURE);
469 }
470 }
471 }
472 // End of loop over DataObjects, write DataHeader
473 if ((m_conversionSvc.type() == "AthenaPoolCnvSvc" || m_conversionSvc.type() == "AthenaPoolSharedIOCnvSvc") && dataHeaderObj != nullptr) {
474 IOpaqueAddress* addr = new TokenAddress(0, dataHeaderObj->clID(), outputConnectionString);
475 addr->addRef();
476 if (m_conversionSvc->createRep(dataHeaderObj, addr).isSuccess()) {
477 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dataHeaderObj, addr));
478 } else {
479 ATH_MSG_ERROR("Could not create Rep for DataHeader");
480 return(StatusCode::FAILURE);
481 }
482 }
483 for (DataObject* dobj : dataObjects) {
484 // call fillRepRefs of persistency service
485 SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(dobj->registry());
486 if (proxy != nullptr && written.find(dobj) != written.end()) {
487 IOpaqueAddress* addr(written.find(dobj)->second);
488 if ((m_conversionSvc->fillRepRefs(addr, dobj)).isSuccess()) {
489 if (dobj->clID() != 1 || addr->par()[0] != "\n") {
490 if (dobj->clID() != ClassID_traits<DataHeader>::ID()) {
491 m_dataHeader->insert(proxy, addr);
492 } else {
493 m_dataHeader->insert(proxy, addr, m_processTag);
494 }
495 if (proxy->address() == nullptr) {
496 proxy->setAddress(addr);
497 }
498 addr->release();
499 }
500 } else {
501 ATH_MSG_ERROR("Could not fill Object Refs for DataObject (clid/key):" << dobj->clID() << " " << dobj->name());
502 return(StatusCode::FAILURE);
503 }
504 } else {
505 ATH_MSG_WARNING("Could cast DataObject " << dobj->clID() << " " << dobj->name());
506 }
507 }
508 m_dataHeader->addHash(&*m_store);
509 if ((m_conversionSvc.type() == "AthenaPoolCnvSvc" || m_conversionSvc.type() == "AthenaPoolSharedIOCnvSvc") && dataHeaderObj != nullptr) {
510 // End of DataObjects, fill refs for DataHeader
511 SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(dataHeaderObj->registry());
512 if (proxy != nullptr && written.find(dataHeaderObj) != written.end()) {
513 IOpaqueAddress* addr(written.find(dataHeaderObj)->second);
514 if ((m_conversionSvc->fillRepRefs(addr, dataHeaderObj)).isSuccess()) {
515 if (dataHeaderObj->clID() != 1 || addr->par()[0] != "\n") {
516 if (dataHeaderObj->clID() != ClassID_traits<DataHeader>::ID()) {
517 m_dataHeader->insert(proxy, addr);
518 } else {
519 m_dataHeader->insert(proxy, addr, m_processTag);
520 }
521 addr->release();
522 }
523 } else {
524 ATH_MSG_ERROR("Could not fill Object Refs for DataHeader");
525 return(StatusCode::FAILURE);
526 }
527 } else {
528 ATH_MSG_ERROR("Could not cast DataHeader");
529 return(StatusCode::FAILURE);
530 }
531 }
532 return(StatusCode::SUCCESS);
533}
534//__________________________________________________________________________
536 const std::string hltKey = "HLTAutoKey";
539 if (m_store->retrieve(beg, ending).isFailure() || beg == ending) {
540 ATH_MSG_DEBUG("No DataHeaders present in StoreGate");
541 } else {
542 for ( ; beg != ending; ++beg) {
543 if (m_store->transientContains<DataHeader>(beg.key()) && beg->isInput()) {
544 for (std::vector<DataHeaderElement>::const_iterator it = beg->begin(), itLast = beg->end();
545 it != itLast; ++it) {
546 // Only insert the primary clid, not the ones for the symlinks!
547 CLID clid = it->getPrimaryClassID();
548 if (clid != ClassID_traits<DataHeader>::ID()) {
549 //check the typename is known ... we make an exception if the key contains 'Aux.' ... aux containers may not have their keys known yet in some cases
550 //see https://its.cern.ch/jira/browse/ATLASG-59 for the solution
551 std::string typeName;
552 if (m_clidSvc->getTypeNameOfID(clid, typeName).isFailure() && it->getKey().find("Aux.") == std::string::npos) {
553 if (m_skippedItems.find(it->getKey()) == m_skippedItems.end()) {
554 ATH_MSG_WARNING("Skipping " << it->getKey() << " with unknown clid " << clid << " . Further warnings for this item are suppressed" );
555 m_skippedItems.insert(it->getKey());
556 }
557 continue;
558 }
559 ATH_MSG_DEBUG("Adding " << typeName << "#" << it->getKey() << " (clid " << clid << ") to itemlist");
560 const std::string keyName = it->getKey();
561 if (keyName.size() > 10 && keyName.compare(0, 10,hltKey)==0) {
562 p2BWrittenFromTool->add(clid, hltKey + "*").ignore();
563 } else if (keyName.size() > 10 && keyName.compare(keyName.size() - 10, 10, hltKey)==0) {
564 p2BWrittenFromTool->add(clid, "*" + hltKey).ignore();
565 } else {
566 p2BWrittenFromTool->add(clid, keyName).ignore();
567 }
568 }
569 }
570 }
571 }
572 }
573 ATH_MSG_DEBUG("Adding DataHeader for stream " << name());
574 return(StatusCode::SUCCESS);
575}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This is the implementation of IAthenaOutputStreamTool.
This file contains the class definition for the DataHeader and DataHeaderElement classes.
uint32_t CLID
The Class ID type.
An AttributeList represents a logical row of attributes in a metadata table.
This file contains the class definition for the TokenAddress class.
Base class for all conversion services.
Definition AthCnvSvc.h:66
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.
An AttributeList represents a logical row of attributes in a metadata table.
StringProperty m_metaDataContainerPrefix
std::map< std::string, bool > m_keepProvenanceMatch
Cache provenance RegEx matching result in a map.
StatusCode finalizeOutput() override
Finalize the output stream after the last commit, e.g.
AthenaOutputStreamTool(const std::string &type, const std::string &name, const IInterface *parent)
Standard AlgTool Constructor.
std::set< std::string > m_skippedItems
set of skipped item keys, because of missing CLID
bool m_extendProvenanceRecord
Flag as to whether to extend provenance via the DataHeader.
DataHeader * m_dataHeader
Current DataHeader for streamed objects.
std::vector< DataObject * > DataObjectVec
Stream out a vector of objects Must convert to DataObject, e.g.
StatusCode connectOutput(const std::string &outputName="") override
Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wan...
ServiceHandle< IDecisionSvc > m_decSvc
Ref to DecisionSvc.
ServiceHandle< IClassIDSvc > m_clidSvc
Ref to ClassIDSvc to convert type name to clid.
std::vector< TypeKeyPair > TypeKeyPairs
virtual StatusCode streamObjects(const TypeKeyPairs &typeKeys, const std::string &outputName="") override
bool m_connectionOpen
Flag to tell whether connectOutput has been called.
ServiceHandle< IConversionSvc > m_conversionSvc
Keep reference to the data conversion service.
std::regex m_keepProvenancesRE
RegEx pattern created from m_keepProvenancesStr.
StatusCode connectServices()
Do the real connection to services.
void propagateProvenance(const DataHeader &src_dh)
copy provenance records when creating new DataHeaders
ServiceHandle< StoreGateSvc > m_store
virtual StatusCode initialize() override
AthAlgTool Interface method implementations:
virtual StatusCode getInputItemList(SG::IFolder *m_p2BWrittenFromTool) override
std::string m_keepProvenancesStr
RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStrea...
SG::ReadHandleKey< AthenaAttributeList > m_attrListKey
StatusCode commitOutput(bool doCommit=false) override
Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.
StringProperty m_metaDataOutputCollection
This class provides a persistent form for the TransientAddress.
Definition DataHeader.h:37
This class provides the layout for summary information stored for data written to POOL.
Definition DataHeader.h:123
std::vector< DataHeaderElement >::const_iterator beginProvenance() const
std::vector< DataHeaderElement >::const_iterator endProvenance() const
bool isInput() const
Check whether StatusFlag is "Input".
a const_iterator facade to DataHandle.
Definition SGIterator.h:164
a run-time configurable list of data objects
Definition SGIFolder.h:21
virtual StatusCode add(const std::string &typeName, const std::string &skey)=0
add a data object identifier to the list
@ EVENT_STORE
Definition StoreID.h:26
This class provides a Generic Transient Address for POOL tokens.
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())