|
ATLAS Offline Software
|
This is the implementation of IAthenaOutputStreamTool.
More...
#include <AthenaOutputStreamTool.h>
|
StringProperty | m_outputName { this, "OutputFile", "", "name of the output db name"} |
|
StringProperty | m_dataHeaderKey { this, "DataHeaderKey", "", "name of the data header key: defaults to tool name"} |
|
StringProperty | m_processTag { this, "ProcessingTag", "", "tag of processing stage: defaults to SG key of DataHeader (Stream name)"} |
|
StringProperty | m_outputCollection { this, "OutputCollection", "", "custom container name prefix for DataHeader: default = "" (will result in \"POOLContainer_\")"} |
|
StringProperty | m_containerPrefix { this, "PoolContainerPrefix", "", "prefix for top level POOL container: default = \"CollectionTree\""} |
|
StringProperty | m_containerNameHint { this, "TopLevelContainerName", "0", "naming hint policy for top level POOL container: default = \"0\""} |
|
StringProperty | m_metaDataOutputCollection { this, "MetaDataOutputCollection", "", "custom container name prefix for MetaDataHeader: default = "" (will result in \"MetaDataHdr\")"} |
|
StringProperty | m_metaDataContainerPrefix { this, "MetaDataPoolContainerPrefix", "", "prefix for top level MetaData container: default = "" (will result in \"MetaData\")"} |
|
StringProperty | m_branchNameHint { this, "SubLevelBranchName", "0", "naming hint policy for POOL branching: default = \"0\"" } |
|
std::string | m_outputAttributes {""} |
|
std::string | m_metaDataOutputAttributes {""} |
|
SG::ReadHandleKey< AthenaAttributeList > | m_attrListKey {this, "AttributeListKey", "", "optional key for AttributeList to be written as part of the DataHeader: default = \"\""} |
|
std::string | m_attrListWrite {""} |
|
ServiceHandle< StoreGateSvc > | m_store { this, "Store", "StoreGateSvc/DetectorStore", "Pointer to the data store"} |
|
ServiceHandle< IConversionSvc > | m_conversionSvc |
| Keep reference to the data conversion service. More...
|
|
ServiceHandle< IClassIDSvc > | m_clidSvc |
| Ref to ClassIDSvc to convert type name to clid. More...
|
|
ServiceHandle< IDecisionSvc > | m_decSvc |
| Ref to DecisionSvc. More...
|
|
DataHeader * | m_dataHeader |
| Current DataHeader for streamed objects. More...
|
|
bool | m_connectionOpen |
| Flag to tell whether connectOutput has been called. More...
|
|
bool | m_extendProvenanceRecord |
| Flag as to whether to extend provenance via the DataHeader. More...
|
|
bool | m_extend |
| Flag to extend attribute list with stream flags from DecisionSvc. More...
|
|
std::set< std::string > | m_skippedItems |
| set of skipped item keys, because of missing CLID More...
|
|
This is the implementation of IAthenaOutputStreamTool.
Definition at line 31 of file AthenaOutputStreamTool.h.
◆ DataObjectVec
Stream out a vector of objects Must convert to DataObject, e.g.
#include "AthenaKernel/StorableConversions.h" T* obj = xxx; DataObject* dataObject = SG::asStorable(obj);
Definition at line 76 of file AthenaOutputStreamTool.h.
◆ TypeKeyPair
Stream out objects.
Provide vector of typeName/key pairs. If key is empty, assumes only one object and this will fail if there is more than one
Definition at line 67 of file AthenaOutputStreamTool.h.
◆ TypeKeyPairs
◆ AthenaOutputStreamTool()
AthenaOutputStreamTool::AthenaOutputStreamTool |
( |
const std::string & |
type, |
|
|
const std::string & |
name, |
|
|
const IInterface * |
parent |
|
) |
| |
Standard AlgTool Constructor.
Constructor.
Definition at line 43 of file AthenaOutputStreamTool.cxx.
54 declareInterface<IAthenaOutputStreamTool>(
this);
56 declareProperty(
"SaveDecisions",
m_extend =
false,
"Set to true to add streaming decisions to an attributeList");
◆ ~AthenaOutputStreamTool()
AthenaOutputStreamTool::~AthenaOutputStreamTool |
( |
| ) |
|
|
virtual |
◆ commitOutput()
StatusCode AthenaOutputStreamTool::commitOutput |
( |
bool |
doCommit = false | ) |
|
Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.
Definition at line 325 of file AthenaOutputStreamTool.cxx.
330 return(StatusCode::FAILURE);
334 return(StatusCode::SUCCESS);
◆ connectOutput()
StatusCode AthenaOutputStreamTool::connectOutput |
( |
const std::string & |
outputName = "" | ) |
|
Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wants to override jobOptions.
Definition at line 172 of file AthenaOutputStreamTool.cxx.
181 return(StatusCode::FAILURE);
187 return(StatusCode::FAILURE);
193 return(StatusCode::FAILURE);
202 if (
m_store->removeDataAndProxy(preDh).isFailure()) {
204 return(StatusCode::FAILURE);
216 std::vector<std::string> dhKeys;
218 for (
const std::string& dhKey : dhKeys) {
219 bool primaryDH =
false;
221 if (dhKey ==
"EventSelector") primaryDH =
true;
222 ATH_MSG_DEBUG(
"No transientContains DataHeader with key " << dhKey);
224 if (
m_store->retrieve(
dh, dhKey).isFailure()) {
225 ATH_MSG_DEBUG(
"Unable to retrieve the DataHeader with key " << dhKey);
228 if (
dh->isInput() || hasInputAlias (*dhProxy) || primaryDH) {
236 delete dhTransAddr; dhTransAddr = dhe.getAddress(0);
240 if (dhProxy != 0 && dhProxy->
address() != 0) {
241 delete dhTransAddr; dhTransAddr = 0;
246 else if (dhTransAddr !=
nullptr) {
248 dhTransAddr->address(),
250 delete dhTransAddr; dhTransAddr = 0;
253 for(
auto iter=
dh->beginProvenance(), iEnd=
dh->endProvenance(); iter != iEnd; ++iter) {
262 if (!attrListHandle.isValid()) {
273 newone->copyData(*attlist);
279 newone->extend(*
it,
"bool");
280 (*newone)[*
it].data<
bool>() =
m_decSvc->isEventAccepted(*
it,Gaudi::Hive::currentContext());
290 ATH_MSG_DEBUG(
"Decisions already added by a different stream");
313 if (
wh.record(std::unique_ptr<DataHeader>(
m_dataHeader)).isFailure()) {
315 return(StatusCode::FAILURE);
322 return(StatusCode::SUCCESS);
◆ connectServices() [1/2]
StatusCode AthenaOutputStreamTool::connectServices |
( |
| ) |
|
|
privatevirtual |
Do the real connection to services.
Definition at line 163 of file AthenaOutputStreamTool.cxx.
167 return(StatusCode::FAILURE);
169 return(StatusCode::SUCCESS);
◆ connectServices() [2/2]
StatusCode AthenaOutputStreamTool::connectServices |
( |
const std::string & |
dataStore, |
|
|
const std::string & |
cnvSvc, |
|
|
bool |
extendProvenenceRecord |
|
) |
| |
Specify which data store and conversion service to use and whether to extend provenance Only use if one wants to override jobOptions.
Definition at line 139 of file AthenaOutputStreamTool.cxx.
144 if (
m_store.release().isFailure()) {
149 if (cnvSvc !=
m_conversionSvc.type() && cnvSvc !=
"EventPersistencySvc") {
156 return(StatusCode::FAILURE);
◆ finalize()
StatusCode AthenaOutputStreamTool::finalize |
( |
| ) |
|
◆ finalizeOutput()
StatusCode AthenaOutputStreamTool::finalizeOutput |
( |
| ) |
|
Finalize the output stream after the last commit, e.g.
in finalize
Definition at line 337 of file AthenaOutputStreamTool.cxx.
339 if (athConversionSvc != 0) {
342 return(StatusCode::FAILURE);
345 return(StatusCode::SUCCESS);
◆ getInputItemList()
StatusCode AthenaOutputStreamTool::getInputItemList |
( |
SG::IFolder * |
m_p2BWrittenFromTool | ) |
|
|
virtual |
Definition at line 520 of file AthenaOutputStreamTool.cxx.
521 const std::string hltKey =
"HLTAutoKey";
524 if (
m_store->retrieve(
beg, ending).isFailure() ||
beg == ending) {
527 for ( ;
beg != ending; ++
beg) {
529 for (std::vector<DataHeaderElement>::const_iterator
it =
beg->begin(), itLast =
beg->end();
530 it != itLast; ++
it) {
532 CLID clid =
it->getPrimaryClassID();
537 if (
m_clidSvc->getTypeNameOfID(clid,
typeName).isFailure() &&
it->getKey().find(
"Aux.") == std::string::npos) {
539 ATH_MSG_WARNING(
"Skipping " <<
it->getKey() <<
" with unknown clid " << clid <<
" . Further warnings for this item are suppressed" );
545 const std::string
keyName =
it->getKey();
547 p2BWrittenFromTool->add(clid, hltKey +
"*").ignore();
549 p2BWrittenFromTool->add(clid,
"*" + hltKey).ignore();
551 p2BWrittenFromTool->add(clid,
keyName).ignore();
559 return(StatusCode::SUCCESS);
◆ initialize()
StatusCode AthenaOutputStreamTool::initialize |
( |
| ) |
|
AthAlgTool Interface method implementations:
Definition at line 62 of file AthenaOutputStreamTool.cxx.
79 const INamedInterface* parentAlg =
dynamic_cast<const INamedInterface*
>(
parent());
90 const std::string
keyword =
"[AttributeListKey=";
92 if( (
pos != std::string::npos) ) {
125 return(StatusCode::SUCCESS);
◆ streamObjects() [1/2]
StatusCode AthenaOutputStreamTool::streamObjects |
( |
const DataObjectVec & |
dataObjects, |
|
|
const std::string & |
outputName = "" |
|
) |
| |
|
virtual |
Definition at line 409 of file AthenaOutputStreamTool.cxx.
412 ATH_MSG_ERROR(
"Connection NOT open. Please open a connection before streaming out objects.");
413 return(StatusCode::FAILURE);
416 std::string outputConnectionString =
outputName;
417 const std::string defaultMetaDataString =
"[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData]";
418 if (std::string::size_type mpos = outputConnectionString.find(defaultMetaDataString); mpos!=std::string::npos) {
435 std::map<DataObject*, IOpaqueAddress*> written;
436 for (DataObject* dobj : dataObjects) {
439 ATH_MSG_DEBUG(
"Explicit request to write DataHeader: " << dobj->name() <<
" - skipping it.");
441 }
else if (written.find(dobj) != written.end()) {
443 ATH_MSG_DEBUG(
"Trying to write DataObject twice (clid/key): " << dobj->clID() <<
" " << dobj->name());
447 IOpaqueAddress* addr =
new TokenAddress(0, dobj->clID(), outputConnectionString);
450 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dobj, addr));
452 ATH_MSG_ERROR(
"Could not create Rep for DataObject (clid/key):" << dobj->clID() <<
" " << dobj->name());
453 return(StatusCode::FAILURE);
458 if (
m_conversionSvc.type() ==
"AthenaPoolCnvSvc" && dataHeaderObj !=
nullptr) {
459 IOpaqueAddress* addr =
new TokenAddress(0, dataHeaderObj->clID(), outputConnectionString);
462 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dataHeaderObj, addr));
465 return(StatusCode::FAILURE);
468 for (DataObject* dobj : dataObjects) {
471 if (
proxy !=
nullptr && written.find(dobj) != written.end()) {
472 IOpaqueAddress* addr(written.find(dobj)->second);
474 if (dobj->clID() != 1 || addr->par()[0] !=
"\n") {
480 if (
proxy->address() ==
nullptr) {
481 proxy->setAddress(addr);
486 ATH_MSG_ERROR(
"Could not fill Object Refs for DataObject (clid/key):" << dobj->clID() <<
" " << dobj->name());
487 return(StatusCode::FAILURE);
490 ATH_MSG_WARNING(
"Could cast DataObject " << dobj->clID() <<
" " << dobj->name());
494 if (
m_conversionSvc.type() ==
"AthenaPoolCnvSvc" && dataHeaderObj !=
nullptr) {
497 if (
proxy !=
nullptr && written.find(dataHeaderObj) != written.end()) {
498 IOpaqueAddress* addr(written.find(dataHeaderObj)->second);
500 if (dataHeaderObj->clID() != 1 || addr->par()[0] !=
"\n") {
510 return(StatusCode::FAILURE);
514 return(StatusCode::FAILURE);
517 return(StatusCode::SUCCESS);
◆ streamObjects() [2/2]
StatusCode AthenaOutputStreamTool::streamObjects |
( |
const TypeKeyPairs & |
typeKeys, |
|
|
const std::string & |
outputName = "" |
|
) |
| |
|
virtual |
Definition at line 348 of file AthenaOutputStreamTool.cxx.
352 ATH_MSG_ERROR(
"Connection NOT open. Please open a connection before streaming out objects.");
353 return(StatusCode::FAILURE);
361 return(StatusCode::FAILURE);
364 std::vector<DataObject*> dataObjects;
365 for (TypeKeyPairs::const_iterator
first = typeKeys.begin(), last = typeKeys.end();
367 const std::string&
type = (*first).first;
368 const std::string&
key = (*first).second;
373 return(StatusCode::FAILURE);
375 DataObject* dObj = 0;
380 dObj =
m_store->accessData(clid);
389 return(StatusCode::SUCCESS);
394 dataObjects.push_back(dObj);
397 if (dataObjects.size() == 0) {
399 return(StatusCode::SUCCESS);
402 if (!
status.isSuccess()) {
406 return(StatusCode::SUCCESS);
◆ m_attrListKey
SG::ReadHandleKey<AthenaAttributeList> AthenaOutputStreamTool::m_attrListKey {this, "AttributeListKey", "", "optional key for AttributeList to be written as part of the DataHeader: default = \"\""} |
|
private |
◆ m_attrListWrite
std::string AthenaOutputStreamTool::m_attrListWrite {""} |
|
private |
◆ m_branchNameHint
StringProperty AthenaOutputStreamTool::m_branchNameHint { this, "SubLevelBranchName", "0", "naming hint policy for POOL branching: default = \"0\"" } |
|
private |
◆ m_clidSvc
◆ m_connectionOpen
bool AthenaOutputStreamTool::m_connectionOpen |
|
private |
◆ m_containerNameHint
StringProperty AthenaOutputStreamTool::m_containerNameHint { this, "TopLevelContainerName", "0", "naming hint policy for top level POOL container: default = \"0\""} |
|
private |
◆ m_containerPrefix
StringProperty AthenaOutputStreamTool::m_containerPrefix { this, "PoolContainerPrefix", "", "prefix for top level POOL container: default = \"CollectionTree\""} |
|
private |
◆ m_conversionSvc
ServiceHandle<IConversionSvc> AthenaOutputStreamTool::m_conversionSvc |
|
private |
◆ m_dataHeader
◆ m_dataHeaderKey
StringProperty AthenaOutputStreamTool::m_dataHeaderKey { this, "DataHeaderKey", "", "name of the data header key: defaults to tool name"} |
|
private |
◆ m_decSvc
◆ m_extend
bool AthenaOutputStreamTool::m_extend |
|
private |
◆ m_extendProvenanceRecord
bool AthenaOutputStreamTool::m_extendProvenanceRecord |
|
private |
◆ m_metaDataContainerPrefix
StringProperty AthenaOutputStreamTool::m_metaDataContainerPrefix { this, "MetaDataPoolContainerPrefix", "", "prefix for top level MetaData container: default = "" (will result in \"MetaData\")"} |
|
private |
◆ m_metaDataOutputAttributes
std::string AthenaOutputStreamTool::m_metaDataOutputAttributes {""} |
|
private |
◆ m_metaDataOutputCollection
StringProperty AthenaOutputStreamTool::m_metaDataOutputCollection { this, "MetaDataOutputCollection", "", "custom container name prefix for MetaDataHeader: default = "" (will result in \"MetaDataHdr\")"} |
|
private |
◆ m_outputAttributes
std::string AthenaOutputStreamTool::m_outputAttributes {""} |
|
private |
◆ m_outputCollection
StringProperty AthenaOutputStreamTool::m_outputCollection { this, "OutputCollection", "", "custom container name prefix for DataHeader: default = "" (will result in \"POOLContainer_\")"} |
|
private |
◆ m_outputName
StringProperty AthenaOutputStreamTool::m_outputName { this, "OutputFile", "", "name of the output db name"} |
|
private |
◆ m_processTag
StringProperty AthenaOutputStreamTool::m_processTag { this, "ProcessingTag", "", "tag of processing stage: defaults to SG key of DataHeader (Stream name)"} |
|
private |
◆ m_skippedItems
std::set<std::string> AthenaOutputStreamTool::m_skippedItems |
|
private |
◆ m_store
The documentation for this class was generated from the following files:
const std::string & key() const
Return the StoreGate ID for the referenced object.
This class provides a Generic Transient Address for POOL tokens.
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
virtual IOpaqueAddress * address() const override final
Retrieve IOpaqueAddress.
::StatusCode StatusCode
StatusCode definition for legacy code.
Default, invalid implementation of ClassID_traits.
An AttributeList represents a logical row of attributes in a metadata table. The name and type of eac...
StatusCode initialize(bool used=true)
If this object is used as a property, then this should be called during the initialize phase.
uint32_t CLID
The Class ID type.
#define ATH_MSG_WARNING(x)
virtual StatusCode disconnectOutput(const std::string &output)
Disconnect output files from the service.
std::string keyword(const std::string &classname)