13 #include "GaudiKernel/IConversionSvc.h"
14 #include "GaudiKernel/IOpaqueAddress.h"
15 #include "GaudiKernel/INamedInterface.h"
16 #include "GaudiKernel/IClassIDSvc.h"
17 #include "GaudiKernel/ThreadLocalContext.h"
35 std::string inputName =
dp.name() +
"_Input";
36 return dp.hasAlias (inputName);
44 const std::string&
name,
46 m_store(
"DetectorStore",
name),
47 m_conversionSvc(
"AthenaPoolCnvSvc",
name),
48 m_clidSvc(
"ClassIDSvc",
name),
49 m_decSvc(
"DecisionSvc/DecisionSvc",
name),
50 m_dataHeader(nullptr),
51 m_connectionOpen(false),
52 m_extendProvenanceRecord(false) {
54 declareInterface<IAthenaOutputStreamTool>(
this);
56 declareProperty(
"SaveDecisions",
m_extend =
false,
"Set to true to add streaming decisions to an attributeList");
79 const INamedInterface* parentAlg =
dynamic_cast<const INamedInterface*
>(
parent());
90 const std::string
keyword =
"[AttributeListKey=";
92 if( (
pos != std::string::npos) ) {
125 return(StatusCode::SUCCESS);
136 return(StatusCode::SUCCESS);
140 const std::string& cnvSvc,
141 bool extendProvenenceRecord) {
144 if (
m_store.release().isFailure()) {
149 if (cnvSvc !=
m_conversionSvc.type() && cnvSvc !=
"EventPersistencySvc") {
156 return(StatusCode::FAILURE);
167 return(StatusCode::FAILURE);
169 return(StatusCode::SUCCESS);
181 return(StatusCode::FAILURE);
187 return(StatusCode::FAILURE);
193 return(StatusCode::FAILURE);
202 if (
m_store->removeDataAndProxy(preDh).isFailure()) {
204 return(StatusCode::FAILURE);
216 std::vector<std::string> dhKeys;
218 for (
const std::string& dhKey : dhKeys) {
219 bool primaryDH =
false;
221 if (dhKey ==
"EventSelector") primaryDH =
true;
222 ATH_MSG_DEBUG(
"No transientContains DataHeader with key " << dhKey);
224 if (
m_store->retrieve(
dh, dhKey).isFailure()) {
225 ATH_MSG_DEBUG(
"Unable to retrieve the DataHeader with key " << dhKey);
228 if (
dh->isInput() || hasInputAlias (*dhProxy) || primaryDH) {
236 delete dhTransAddr; dhTransAddr = dhe.getAddress(0);
240 if (dhProxy != 0 && dhProxy->
address() != 0) {
241 delete dhTransAddr; dhTransAddr = 0;
246 else if (dhTransAddr !=
nullptr) {
248 dhTransAddr->address(),
250 delete dhTransAddr; dhTransAddr = 0;
253 for(
auto iter=
dh->beginProvenance(), iEnd=
dh->endProvenance(); iter != iEnd; ++iter) {
262 if (!attrListHandle.isValid()) {
273 newone->copyData(*attlist);
279 newone->extend(*
it,
"bool");
280 (*newone)[*
it].data<
bool>() =
m_decSvc->isEventAccepted(*
it,Gaudi::Hive::currentContext());
290 ATH_MSG_DEBUG(
"Decisions already added by a different stream");
313 if (
wh.record(std::unique_ptr<DataHeader>(
m_dataHeader)).isFailure()) {
315 return(StatusCode::FAILURE);
322 return(StatusCode::SUCCESS);
330 return(StatusCode::FAILURE);
334 return(StatusCode::SUCCESS);
339 if (athConversionSvc != 0) {
342 return(StatusCode::FAILURE);
345 return(StatusCode::SUCCESS);
352 ATH_MSG_ERROR(
"Connection NOT open. Please open a connection before streaming out objects.");
353 return(StatusCode::FAILURE);
361 return(StatusCode::FAILURE);
364 std::vector<DataObject*> dataObjects;
365 for (TypeKeyPairs::const_iterator
first = typeKeys.begin(), last = typeKeys.end();
367 const std::string&
type = (*first).first;
368 const std::string&
key = (*first).second;
373 return(StatusCode::FAILURE);
375 DataObject* dObj = 0;
380 dObj =
m_store->accessData(clid);
389 return(StatusCode::SUCCESS);
394 dataObjects.push_back(dObj);
397 if (dataObjects.size() == 0) {
399 return(StatusCode::SUCCESS);
402 if (!
status.isSuccess()) {
406 return(StatusCode::SUCCESS);
412 ATH_MSG_ERROR(
"Connection NOT open. Please open a connection before streaming out objects.");
413 return(StatusCode::FAILURE);
416 std::string outputConnectionString =
outputName;
417 const std::string defaultMetaDataString =
"[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData]";
418 if (std::string::size_type mpos = outputConnectionString.find(defaultMetaDataString); mpos!=std::string::npos) {
435 std::map<DataObject*, IOpaqueAddress*> written;
436 for (DataObject* dobj : dataObjects) {
439 ATH_MSG_DEBUG(
"Explicit request to write DataHeader: " << dobj->name() <<
" - skipping it.");
441 }
else if (written.find(dobj) != written.end()) {
443 ATH_MSG_DEBUG(
"Trying to write DataObject twice (clid/key): " << dobj->clID() <<
" " << dobj->name());
447 IOpaqueAddress* addr =
new TokenAddress(0, dobj->clID(), outputConnectionString);
450 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dobj, addr));
452 ATH_MSG_ERROR(
"Could not create Rep for DataObject (clid/key):" << dobj->clID() <<
" " << dobj->name());
453 return(StatusCode::FAILURE);
458 if (
m_conversionSvc.type() ==
"AthenaPoolCnvSvc" && dataHeaderObj !=
nullptr) {
459 IOpaqueAddress* addr =
new TokenAddress(0, dataHeaderObj->clID(), outputConnectionString);
462 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dataHeaderObj, addr));
465 return(StatusCode::FAILURE);
468 for (DataObject* dobj : dataObjects) {
471 if (
proxy !=
nullptr && written.find(dobj) != written.end()) {
472 IOpaqueAddress* addr(written.find(dobj)->second);
474 if (dobj->clID() != 1 || addr->par()[0] !=
"\n") {
480 if (
proxy->address() ==
nullptr) {
481 proxy->setAddress(addr);
486 ATH_MSG_ERROR(
"Could not fill Object Refs for DataObject (clid/key):" << dobj->clID() <<
" " << dobj->name());
487 return(StatusCode::FAILURE);
490 ATH_MSG_WARNING(
"Could cast DataObject " << dobj->clID() <<
" " << dobj->name());
494 if (
m_conversionSvc.type() ==
"AthenaPoolCnvSvc" && dataHeaderObj !=
nullptr) {
497 if (
proxy !=
nullptr && written.find(dataHeaderObj) != written.end()) {
498 IOpaqueAddress* addr(written.find(dataHeaderObj)->second);
500 if (dataHeaderObj->clID() != 1 || addr->par()[0] !=
"\n") {
510 return(StatusCode::FAILURE);
514 return(StatusCode::FAILURE);
517 return(StatusCode::SUCCESS);
521 const std::string hltKey =
"HLTAutoKey";
524 if (
m_store->retrieve(
beg, ending).isFailure() ||
beg == ending) {
527 for ( ;
beg != ending; ++
beg) {
529 for (std::vector<DataHeaderElement>::const_iterator
it =
beg->begin(), itLast =
beg->end();
530 it != itLast; ++
it) {
532 CLID clid =
it->getPrimaryClassID();
537 if (
m_clidSvc->getTypeNameOfID(clid,
typeName).isFailure() &&
it->getKey().find(
"Aux.") == std::string::npos) {
539 ATH_MSG_WARNING(
"Skipping " <<
it->getKey() <<
" with unknown clid " << clid <<
" . Further warnings for this item are suppressed" );
545 const std::string
keyName =
it->getKey();
547 p2BWrittenFromTool->
add(clid, hltKey +
"*").ignore();
549 p2BWrittenFromTool->
add(clid,
"*" + hltKey).ignore();
551 p2BWrittenFromTool->
add(clid,
keyName).ignore();
559 return(StatusCode::SUCCESS);