13 #include "GaudiKernel/IConversionSvc.h"
14 #include "GaudiKernel/IOpaqueAddress.h"
15 #include "GaudiKernel/INamedInterface.h"
16 #include "GaudiKernel/IClassIDSvc.h"
17 #include "GaudiKernel/ThreadLocalContext.h"
36 std::string inputName =
dp.name() +
"_Input";
37 return dp.hasAlias (inputName);
45 const std::string&
name,
47 m_store(
"DetectorStore",
name),
48 m_conversionSvc(
"AthenaPoolCnvSvc",
name),
49 m_clidSvc(
"ClassIDSvc",
name),
50 m_decSvc(
"DecisionSvc/DecisionSvc",
name),
51 m_dataHeader(nullptr),
52 m_connectionOpen(false),
53 m_extendProvenanceRecord(false) {
55 declareInterface<IAthenaOutputStreamTool>(
this);
57 declareProperty(
"SaveDecisions",
m_extend =
false,
"Set to true to add streaming decisions to an attributeList");
80 const INamedInterface* parentAlg =
dynamic_cast<const INamedInterface*
>(
parent());
91 const std::string
keyword =
"[AttributeListKey=";
93 if( (
pos != std::string::npos) ) {
126 return(StatusCode::SUCCESS);
137 return(StatusCode::SUCCESS);
141 const std::string& cnvSvc,
142 bool extendProvenenceRecord) {
145 if (
m_store.release().isFailure()) {
150 if (cnvSvc !=
m_conversionSvc.type() && cnvSvc !=
"EventPersistencySvc") {
157 return(StatusCode::FAILURE);
168 return(StatusCode::FAILURE);
170 return(StatusCode::SUCCESS);
182 return(StatusCode::FAILURE);
188 return(StatusCode::FAILURE);
194 return(StatusCode::FAILURE);
203 if (
m_store->removeDataAndProxy(preDh).isFailure()) {
205 return(StatusCode::FAILURE);
217 std::vector<std::string> dhKeys;
219 for (
const std::string& dhKey : dhKeys) {
220 bool primaryDH =
false;
222 if (dhKey ==
"EventSelector") primaryDH =
true;
223 ATH_MSG_DEBUG(
"No transientContains DataHeader with key " << dhKey);
225 if (
m_store->retrieve(
dh, dhKey).isFailure()) {
226 ATH_MSG_DEBUG(
"Unable to retrieve the DataHeader with key " << dhKey);
229 if (
dh->isInput() || hasInputAlias (*dhProxy) || primaryDH) {
237 delete dhTransAddr; dhTransAddr = dhe.getAddress(0);
241 if (dhProxy != 0 && dhProxy->
address() != 0) {
242 delete dhTransAddr; dhTransAddr = 0;
247 else if (dhTransAddr !=
nullptr) {
249 dhTransAddr->address(),
251 delete dhTransAddr; dhTransAddr = 0;
254 for(
auto iter=
dh->beginProvenance(), iEnd=
dh->endProvenance(); iter != iEnd; ++iter) {
263 if (!attrListHandle.isValid()) {
274 newone->copyData(*attlist);
280 newone->extend(*
it,
"bool");
281 (*newone)[*
it].data<
bool>() =
m_decSvc->isEventAccepted(*
it,Gaudi::Hive::currentContext());
291 ATH_MSG_DEBUG(
"Decisions already added by a different stream");
314 if (
wh.record(std::unique_ptr<DataHeader>(
m_dataHeader)).isFailure()) {
316 return(StatusCode::FAILURE);
323 return(StatusCode::SUCCESS);
331 return(StatusCode::FAILURE);
335 return(StatusCode::SUCCESS);
340 if (athConversionSvc != 0) {
343 return(StatusCode::FAILURE);
346 return(StatusCode::SUCCESS);
353 ATH_MSG_ERROR(
"Connection NOT open. Please open a connection before streaming out objects.");
354 return(StatusCode::FAILURE);
362 return(StatusCode::FAILURE);
365 std::vector<DataObject*> dataObjects;
366 for (TypeKeyPairs::const_iterator
first = typeKeys.begin(), last = typeKeys.end();
368 const std::string&
type = (*first).first;
369 const std::string&
key = (*first).second;
374 return(StatusCode::FAILURE);
376 DataObject* dObj = 0;
381 dObj =
m_store->accessData(clid);
390 return(StatusCode::SUCCESS);
395 dataObjects.push_back(dObj);
398 if (dataObjects.size() == 0) {
400 return(StatusCode::SUCCESS);
403 if (!
status.isSuccess()) {
407 return(StatusCode::SUCCESS);
413 ATH_MSG_ERROR(
"Connection NOT open. Please open a connection before streaming out objects.");
414 return(StatusCode::FAILURE);
417 std::string outputConnectionString =
outputName;
418 const std::string defaultMetaDataString =
"[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData]";
419 if (std::string::size_type mpos = outputConnectionString.find(defaultMetaDataString); mpos!=std::string::npos) {
436 std::map<DataObject*, IOpaqueAddress*> written;
437 for (DataObject* dobj : dataObjects) {
440 ATH_MSG_DEBUG(
"Explicit request to write DataHeader: " << dobj->name() <<
" - skipping it.");
442 }
else if (written.find(dobj) != written.end()) {
444 ATH_MSG_DEBUG(
"Trying to write DataObject twice (clid/key): " << dobj->clID() <<
" " << dobj->name());
448 IOpaqueAddress* addr =
new TokenAddress(0, dobj->clID(), outputConnectionString);
451 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dobj, addr));
453 ATH_MSG_ERROR(
"Could not create Rep for DataObject (clid/key):" << dobj->clID() <<
" " << dobj->name());
454 return(StatusCode::FAILURE);
459 if (
m_conversionSvc.type() ==
"AthenaPoolCnvSvc" && dataHeaderObj !=
nullptr) {
460 IOpaqueAddress* addr =
new TokenAddress(0, dataHeaderObj->clID(), outputConnectionString);
463 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dataHeaderObj, addr));
466 return(StatusCode::FAILURE);
469 for (DataObject* dobj : dataObjects) {
472 if (
proxy !=
nullptr && written.find(dobj) != written.end()) {
473 IOpaqueAddress* addr(written.find(dobj)->second);
475 if (dobj->clID() != 1 || addr->par()[0] !=
"\n") {
481 if (
proxy->address() ==
nullptr) {
482 proxy->setAddress(addr);
487 ATH_MSG_ERROR(
"Could not fill Object Refs for DataObject (clid/key):" << dobj->clID() <<
" " << dobj->name());
488 return(StatusCode::FAILURE);
491 ATH_MSG_WARNING(
"Could cast DataObject " << dobj->clID() <<
" " << dobj->name());
495 if (
m_conversionSvc.type() ==
"AthenaPoolCnvSvc" && dataHeaderObj !=
nullptr) {
498 if (
proxy !=
nullptr && written.find(dataHeaderObj) != written.end()) {
499 IOpaqueAddress* addr(written.find(dataHeaderObj)->second);
501 if (dataHeaderObj->clID() != 1 || addr->par()[0] !=
"\n") {
511 return(StatusCode::FAILURE);
515 return(StatusCode::FAILURE);
518 return(StatusCode::SUCCESS);
522 const std::string hltKey =
"HLTAutoKey";
525 if (
m_store->retrieve(
beg, ending).isFailure() ||
beg == ending) {
528 for ( ;
beg != ending; ++
beg) {
530 for (std::vector<DataHeaderElement>::const_iterator
it =
beg->begin(), itLast =
beg->end();
531 it != itLast; ++
it) {
533 CLID clid =
it->getPrimaryClassID();
538 if (
m_clidSvc->getTypeNameOfID(clid,
typeName).isFailure() &&
it->getKey().find(
"Aux.") == std::string::npos) {
540 ATH_MSG_WARNING(
"Skipping " <<
it->getKey() <<
" with unknown clid " << clid <<
" . Further warnings for this item are suppressed" );
546 const std::string
keyName =
it->getKey();
548 p2BWrittenFromTool->
add(clid, hltKey +
"*").ignore();
550 p2BWrittenFromTool->
add(clid,
"*" + hltKey).ignore();
552 p2BWrittenFromTool->
add(clid,
keyName).ignore();
560 return(StatusCode::SUCCESS);