 |
ATLAS Offline Software
|
This is the implementation of IAthenaOutputStreamTool.
More...
#include <AthenaOutputStreamTool.h>
|
StringProperty | m_outputName { this, "OutputFile", "", "name of the output db name"} |
|
StringProperty | m_dataHeaderKey { this, "DataHeaderKey", "", "name of the data header key: defaults to tool name"} |
|
StringProperty | m_processTag { this, "ProcessingTag", "", "tag of processing stage: defaults to SG key of DataHeader (Stream name)"} |
|
StringProperty | m_outputCollection { this, "OutputCollection", "", "custom container name prefix for DataHeader: default = "" (will result in \"POOLContainer_\")"} |
|
StringProperty | m_containerPrefix { this, "PoolContainerPrefix", "", "prefix for top level POOL container: default = \"CollectionTree\""} |
|
StringProperty | m_containerNameHint { this, "TopLevelContainerName", "0", "naming hint policy for top level POOL container: default = \"0\""} |
|
StringProperty | m_metaDataOutputCollection { this, "MetaDataOutputCollection", "", "custom container name prefix for MetaDataHeader: default = "" (will result in \"MetaDataHdr\")"} |
|
StringProperty | m_metaDataContainerPrefix { this, "MetaDataPoolContainerPrefix", "", "prefix for top level MetaData container: default = "" (will result in \"MetaData\")"} |
|
StringProperty | m_branchNameHint { this, "SubLevelBranchName", "0", "naming hint policy for POOL branching: default = \"0\"" } |
|
BooleanProperty | m_extend { this, "SaveDecisions", false, "Set to true to add streaming decisions to an attributeList"} |
|
std::string | m_outputAttributes |
|
std::string | m_metaDataOutputAttributes |
|
SG::ReadHandleKey< AthenaAttributeList > | m_attrListKey {this, "AttributeListKey", "", "optional key for AttributeList to be written as part of the DataHeader: default = \"\""} |
|
std::string | m_attrListWrite {""} |
|
ServiceHandle< StoreGateSvc > | m_store { this, "Store", "StoreGateSvc/DetectorStore", "Pointer to the data store"} |
|
ServiceHandle< IConversionSvc > | m_conversionSvc { this, "ConversionService", "AthenaPoolCnvSvc" } |
| Keep reference to the data conversion service. More...
|
|
ServiceHandle< IClassIDSvc > | m_clidSvc |
| Ref to ClassIDSvc to convert type name to clid. More...
|
|
ServiceHandle< IDecisionSvc > | m_decSvc |
| Ref to DecisionSvc. More...
|
|
DataHeader * | m_dataHeader {nullptr} |
| Current DataHeader for streamed objects. More...
|
|
bool | m_connectionOpen {false} |
| Flag to tell whether connectOutput has been called. More...
|
|
bool | m_extendProvenanceRecord {false} |
| Flag as to whether to extend provenance via the DataHeader. More...
|
|
std::string | m_keepProvenancesStr |
| RegEx string to match provenance tags to keep in the output DataHeader. Retrieved from an OutputStream property. More...
|
|
std::regex | m_keepProvenancesRE |
| RegEx pattern created from m_keepProvenancesStr. More...
|
|
std::map< std::string, bool > | m_keepProvenanceMatch |
| Cache provenance RegEx matching result in a map. More...
|
|
std::set< std::string > | m_skippedItems |
| set of skipped item keys, because of missing CLID More...
|
|
This is the implementation of IAthenaOutputStreamTool.
Definition at line 33 of file AthenaOutputStreamTool.h.
◆ DataObjectVec
Stream out a vector of objects Must convert to DataObject, e.g.
#include "AthenaKernel/StorableConversions.h" T* obj = xxx; DataObject* dataObject = SG::asStorable(obj);
Definition at line 75 of file AthenaOutputStreamTool.h.
◆ TypeKeyPair
Stream out objects.
Provide vector of typeName/key pairs. If key is empty, assumes only one object and this will fail if there is more than one
Definition at line 66 of file AthenaOutputStreamTool.h.
◆ TypeKeyPairs
◆ AthenaOutputStreamTool()
AthenaOutputStreamTool::AthenaOutputStreamTool |
( |
const std::string & |
type, |
|
|
const std::string & |
name, |
|
|
const IInterface * |
parent |
|
) |
| |
◆ commitOutput()
StatusCode AthenaOutputStreamTool::commitOutput |
( |
bool |
doCommit = false | ) |
|
|
override |
Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.
Definition at line 338 of file AthenaOutputStreamTool.cxx.
343 return(StatusCode::FAILURE);
347 return(StatusCode::SUCCESS);
◆ connectOutput()
StatusCode AthenaOutputStreamTool::connectOutput |
( |
const std::string & |
outputName = "" | ) |
|
|
override |
Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wants to override jobOptions.
Definition at line 159 of file AthenaOutputStreamTool.cxx.
168 return(StatusCode::FAILURE);
177 return(StatusCode::FAILURE);
186 if (
m_store->removeDataAndProxy(preDh).isFailure()) {
188 return(StatusCode::FAILURE);
200 std::vector<std::string> dhKeys;
202 for (
const std::string& dhKey : dhKeys) {
203 bool primaryDH =
false;
205 if (dhKey ==
"EventSelector") primaryDH =
true;
206 ATH_MSG_DEBUG(
"No transientContains DataHeader with key " << dhKey);
208 if (
m_store->retrieve(
dh, dhKey).isFailure()) {
209 ATH_MSG_DEBUG(
"Unable to retrieve the DataHeader with key " << dhKey);
211 if (
dh->isInput() || hasInputAlias(*
m_store->proxy(
dh)) || primaryDH) {
219 if (!attrListHandle.isValid()) {
230 newone->copyData(*attlist);
236 newone->extend(*
it,
"bool");
237 (*newone)[*
it].data<
bool>() =
m_decSvc->isEventAccepted(*
it,Gaudi::Hive::currentContext());
247 ATH_MSG_DEBUG(
"Decisions already added by a different stream");
270 if (
wh.record(std::unique_ptr<DataHeader>(
m_dataHeader)).isFailure()) {
272 return(StatusCode::FAILURE);
279 return(StatusCode::SUCCESS);
◆ connectServices() [1/2]
StatusCode AthenaOutputStreamTool::connectServices |
( |
| ) |
|
|
private |
Do the real connection to services.
Definition at line 150 of file AthenaOutputStreamTool.cxx.
154 return(StatusCode::FAILURE);
156 return(StatusCode::SUCCESS);
◆ connectServices() [2/2]
StatusCode AthenaOutputStreamTool::connectServices |
( |
const std::string & |
dataStore, |
|
|
const std::string & |
cnvSvc, |
|
|
bool |
extendProvenenceRecord |
|
) |
| |
|
override |
Specify which data store and conversion service to use and whether to extend provenance Only use if one wants to override jobOptions.
Definition at line 116 of file AthenaOutputStreamTool.cxx.
121 if (
m_store.release().isFailure()) {
126 if (cnvSvc !=
m_conversionSvc.type() && cnvSvc !=
"EventPersistencySvc") {
133 return(StatusCode::FAILURE);
137 auto pprop =
dynamic_cast<const IProperty*
>(
parent());
140 return(StatusCode::FAILURE);
142 auto keep =
dynamic_cast<const StringProperty&
>( pprop->getProperty(
"KeepProvenanceTagsRegEx") );
◆ finalizeOutput()
StatusCode AthenaOutputStreamTool::finalizeOutput |
( |
| ) |
|
|
override |
Finalize the output stream after the last commit, e.g.
in finalize
Definition at line 350 of file AthenaOutputStreamTool.cxx.
352 if (athConversionSvc != 0) {
355 return(StatusCode::FAILURE);
358 return(StatusCode::SUCCESS);
◆ getInputItemList()
StatusCode AthenaOutputStreamTool::getInputItemList |
( |
SG::IFolder * |
m_p2BWrittenFromTool | ) |
|
|
overridevirtual |
Definition at line 533 of file AthenaOutputStreamTool.cxx.
534 const std::string hltKey =
"HLTAutoKey";
537 if (
m_store->retrieve(
beg, ending).isFailure() ||
beg == ending) {
540 for ( ;
beg != ending; ++
beg) {
542 for (std::vector<DataHeaderElement>::const_iterator
it =
beg->begin(), itLast =
beg->end();
543 it != itLast; ++
it) {
545 CLID clid =
it->getPrimaryClassID();
550 if (
m_clidSvc->getTypeNameOfID(clid,
typeName).isFailure() &&
it->getKey().find(
"Aux.") == std::string::npos) {
552 ATH_MSG_WARNING(
"Skipping " <<
it->getKey() <<
" with unknown clid " << clid <<
" . Further warnings for this item are suppressed" );
558 const std::string
keyName =
it->getKey();
560 p2BWrittenFromTool->add(clid, hltKey +
"*").ignore();
562 p2BWrittenFromTool->add(clid,
"*" + hltKey).ignore();
564 p2BWrittenFromTool->add(clid,
keyName).ignore();
572 return(StatusCode::SUCCESS);
◆ initialize()
StatusCode AthenaOutputStreamTool::initialize |
( |
| ) |
|
|
overridevirtual |
AthAlgTool Interface method implementations:
Definition at line 50 of file AthenaOutputStreamTool.cxx.
67 const INamedInterface* parentAlg =
dynamic_cast<const INamedInterface*
>(
parent());
78 const std::string
keyword =
"[AttributeListKey=";
80 if( (
pos != std::string::npos) ) {
113 return(StatusCode::SUCCESS);
◆ propagateProvenance()
void AthenaOutputStreamTool::propagateProvenance |
( |
const DataHeader & |
src_dh | ) |
|
|
private |
copy provenance records when creating new DataHeaders
Definition at line 283 of file AthenaOutputStreamTool.cxx.
286 std::set<std::string> insertedTags{};
290 std::unique_ptr<SG::TransientAddress> dhTransAddr;
294 dhTransAddr.reset( dhe.getAddress(0) );
298 if(
auto dhProxy=
m_store->proxy(&src_dh); dhProxy && dhProxy->address() ) {
301 insertedTags.insert(std::move(
pTag));
303 else if( dhTransAddr ) {
306 insertedTags.insert(std::move(
pTag));
317 for(
auto iter=src_dh.beginProvenance(), iEnd=src_dh.endProvenance();
iter != iEnd; ++
iter) {
318 const auto & currentKey = (*iter).getKey();
319 if( insertedTags.insert(currentKey).second ) {
◆ streamObjects() [1/2]
StatusCode AthenaOutputStreamTool::streamObjects |
( |
const DataObjectVec & |
dataObjects, |
|
|
const std::string & |
outputName = "" |
|
) |
| |
|
overridevirtual |
Definition at line 422 of file AthenaOutputStreamTool.cxx.
425 ATH_MSG_ERROR(
"Connection NOT open. Please open a connection before streaming out objects.");
426 return(StatusCode::FAILURE);
429 std::string outputConnectionString =
outputName;
430 const std::string defaultMetaDataString =
"[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData]";
431 if (std::string::size_type mpos = outputConnectionString.find(defaultMetaDataString); mpos!=std::string::npos) {
448 std::map<DataObject*, IOpaqueAddress*> written;
449 for (DataObject* dobj : dataObjects) {
452 ATH_MSG_DEBUG(
"Explicit request to write DataHeader: " << dobj->name() <<
" - skipping it.");
454 }
else if (written.find(dobj) != written.end()) {
456 ATH_MSG_DEBUG(
"Trying to write DataObject twice (clid/key): " << dobj->clID() <<
" " << dobj->name());
460 IOpaqueAddress* addr =
new TokenAddress(0, dobj->clID(), outputConnectionString);
463 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dobj, addr));
465 ATH_MSG_ERROR(
"Could not create Rep for DataObject (clid/key):" << dobj->clID() <<
" " << dobj->name());
466 return(StatusCode::FAILURE);
472 IOpaqueAddress* addr =
new TokenAddress(0, dataHeaderObj->clID(), outputConnectionString);
475 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dataHeaderObj, addr));
478 return(StatusCode::FAILURE);
481 for (DataObject* dobj : dataObjects) {
484 if (
proxy !=
nullptr && written.find(dobj) != written.end()) {
485 IOpaqueAddress* addr(written.find(dobj)->second);
487 if (dobj->clID() != 1 || addr->par()[0] !=
"\n") {
493 if (
proxy->address() ==
nullptr) {
494 proxy->setAddress(addr);
499 ATH_MSG_ERROR(
"Could not fill Object Refs for DataObject (clid/key):" << dobj->clID() <<
" " << dobj->name());
500 return(StatusCode::FAILURE);
503 ATH_MSG_WARNING(
"Could cast DataObject " << dobj->clID() <<
" " << dobj->name());
510 if (
proxy !=
nullptr && written.find(dataHeaderObj) != written.end()) {
511 IOpaqueAddress* addr(written.find(dataHeaderObj)->second);
513 if (dataHeaderObj->clID() != 1 || addr->par()[0] !=
"\n") {
523 return(StatusCode::FAILURE);
527 return(StatusCode::FAILURE);
530 return(StatusCode::SUCCESS);
◆ streamObjects() [2/2]
StatusCode AthenaOutputStreamTool::streamObjects |
( |
const TypeKeyPairs & |
typeKeys, |
|
|
const std::string & |
outputName = "" |
|
) |
| |
|
overridevirtual |
Definition at line 361 of file AthenaOutputStreamTool.cxx.
365 ATH_MSG_ERROR(
"Connection NOT open. Please open a connection before streaming out objects.");
366 return(StatusCode::FAILURE);
374 return(StatusCode::FAILURE);
377 std::vector<DataObject*> dataObjects;
378 for (TypeKeyPairs::const_iterator
first = typeKeys.begin(), last = typeKeys.end();
380 const std::string&
type = (*first).first;
381 const std::string&
key = (*first).second;
386 return(StatusCode::FAILURE);
388 DataObject* dObj = 0;
393 dObj =
m_store->accessData(clid);
402 return(StatusCode::SUCCESS);
407 dataObjects.push_back(dObj);
410 if (dataObjects.size() == 0) {
412 return(StatusCode::SUCCESS);
415 if (!
status.isSuccess()) {
419 return(StatusCode::SUCCESS);
◆ m_attrListKey
SG::ReadHandleKey<AthenaAttributeList> AthenaOutputStreamTool::m_attrListKey {this, "AttributeListKey", "", "optional key for AttributeList to be written as part of the DataHeader: default = \"\""} |
|
private |
◆ m_attrListWrite
std::string AthenaOutputStreamTool::m_attrListWrite {""} |
|
private |
◆ m_branchNameHint
StringProperty AthenaOutputStreamTool::m_branchNameHint { this, "SubLevelBranchName", "0", "naming hint policy for POOL branching: default = \"0\"" } |
|
private |
◆ m_clidSvc
◆ m_connectionOpen
bool AthenaOutputStreamTool::m_connectionOpen {false} |
|
private |
◆ m_containerNameHint
StringProperty AthenaOutputStreamTool::m_containerNameHint { this, "TopLevelContainerName", "0", "naming hint policy for top level POOL container: default = \"0\""} |
|
private |
◆ m_containerPrefix
StringProperty AthenaOutputStreamTool::m_containerPrefix { this, "PoolContainerPrefix", "", "prefix for top level POOL container: default = \"CollectionTree\""} |
|
private |
◆ m_conversionSvc
◆ m_dataHeader
DataHeader* AthenaOutputStreamTool::m_dataHeader {nullptr} |
|
private |
◆ m_dataHeaderKey
StringProperty AthenaOutputStreamTool::m_dataHeaderKey { this, "DataHeaderKey", "", "name of the data header key: defaults to tool name"} |
|
private |
◆ m_decSvc
◆ m_extend
BooleanProperty AthenaOutputStreamTool::m_extend { this, "SaveDecisions", false, "Set to true to add streaming decisions to an attributeList"} |
|
private |
◆ m_extendProvenanceRecord
bool AthenaOutputStreamTool::m_extendProvenanceRecord {false} |
|
private |
◆ m_keepProvenanceMatch
std::map<std::string, bool> AthenaOutputStreamTool::m_keepProvenanceMatch |
|
private |
◆ m_keepProvenancesRE
std::regex AthenaOutputStreamTool::m_keepProvenancesRE |
|
private |
◆ m_keepProvenancesStr
std::string AthenaOutputStreamTool::m_keepProvenancesStr |
|
private |
◆ m_metaDataContainerPrefix
StringProperty AthenaOutputStreamTool::m_metaDataContainerPrefix { this, "MetaDataPoolContainerPrefix", "", "prefix for top level MetaData container: default = "" (will result in \"MetaData\")"} |
|
private |
◆ m_metaDataOutputAttributes
std::string AthenaOutputStreamTool::m_metaDataOutputAttributes |
|
private |
◆ m_metaDataOutputCollection
StringProperty AthenaOutputStreamTool::m_metaDataOutputCollection { this, "MetaDataOutputCollection", "", "custom container name prefix for MetaDataHeader: default = "" (will result in \"MetaDataHdr\")"} |
|
private |
◆ m_outputAttributes
std::string AthenaOutputStreamTool::m_outputAttributes |
|
private |
◆ m_outputCollection
StringProperty AthenaOutputStreamTool::m_outputCollection { this, "OutputCollection", "", "custom container name prefix for DataHeader: default = "" (will result in \"POOLContainer_\")"} |
|
private |
◆ m_outputName
StringProperty AthenaOutputStreamTool::m_outputName { this, "OutputFile", "", "name of the output db name"} |
|
private |
◆ m_processTag
StringProperty AthenaOutputStreamTool::m_processTag { this, "ProcessingTag", "", "tag of processing stage: defaults to SG key of DataHeader (Stream name)"} |
|
private |
◆ m_skippedItems
std::set<std::string> AthenaOutputStreamTool::m_skippedItems |
|
private |
◆ m_store
The documentation for this class was generated from the following files:
const std::string & key() const
Return the StoreGate ID for the referenced object.
This class provides a Generic Transient Address for POOL tokens.
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
::StatusCode StatusCode
StatusCode definition for legacy code.
Default, invalid implementation of ClassID_traits.
An AttributeList represents a logical row of attributes in a metadata table. The name and type of eac...
StatusCode initialize(bool used=true)
If this object is used as a property, then this should be called during the initialize phase.
uint32_t CLID
The Class ID type.
#define ATH_MSG_WARNING(x)
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.
std::string keyword(const std::string &classname)