13 #include "GaudiKernel/IConversionSvc.h"
14 #include "GaudiKernel/IOpaqueAddress.h"
15 #include "GaudiKernel/INamedInterface.h"
16 #include "GaudiKernel/IClassIDSvc.h"
17 #include "GaudiKernel/ThreadLocalContext.h"
35 std::string inputName =
dp.name() +
"_Input";
36 return dp.hasAlias (inputName);
44 const std::string&
name,
46 m_store(
"DetectorStore",
name),
47 m_conversionSvc(
"AthenaPoolCnvSvc",
name),
48 m_clidSvc(
"ClassIDSvc",
name),
49 m_decSvc(
"DecisionSvc/DecisionSvc",
name),
50 m_dataHeader(nullptr),
51 m_connectionOpen(false),
52 m_extendProvenanceRecord(false) {
54 declareInterface<IAthenaOutputStreamTool>(
this);
56 declareProperty(
"SaveDecisions",
m_extend =
false,
"Set to true to add streaming decisions to an attributeList");
79 const INamedInterface* parentAlg =
dynamic_cast<const INamedInterface*
>(
parent());
90 const std::string
keyword =
"[AttributeListKey=";
92 if( (
pos != std::string::npos) ) {
125 return(StatusCode::SUCCESS);
136 return(StatusCode::SUCCESS);
140 const std::string& cnvSvc,
141 bool extendProvenenceRecord) {
144 if (
m_store.release().isFailure()) {
149 if (cnvSvc !=
m_conversionSvc.type() && cnvSvc !=
"EventPersistencySvc") {
156 return(StatusCode::FAILURE);
167 return(StatusCode::FAILURE);
169 return(StatusCode::SUCCESS);
181 return(StatusCode::FAILURE);
187 return(StatusCode::FAILURE);
193 return(StatusCode::FAILURE);
202 if (
m_store->removeDataAndProxy(preDh).isFailure()) {
204 return(StatusCode::FAILURE);
216 std::vector<std::string> dhKeys;
218 for (
const std::string& dhKey : dhKeys) {
219 bool primaryDH =
false;
221 if (dhKey ==
"EventSelector") primaryDH =
true;
222 ATH_MSG_DEBUG(
"No transientContains DataHeader with key " << dhKey);
224 if (
m_store->retrieve(
dh, dhKey).isFailure()) {
225 ATH_MSG_DEBUG(
"Unable to retrieve the DataHeader with key " << dhKey);
228 if (
dh->isInput() || hasInputAlias (*dhProxy) || primaryDH) {
236 delete dhTransAddr; dhTransAddr = dhe.getAddress(0);
240 if (dhProxy != 0 && dhProxy->
address() != 0) {
241 delete dhTransAddr; dhTransAddr = 0;
246 else if (dhTransAddr !=
nullptr) {
248 dhTransAddr->address(),
250 delete dhTransAddr; dhTransAddr = 0;
258 std::set<std::string> insertedTags{};
259 for(
auto iter=
dh->beginProvenance(), iEnd=
dh->endProvenance(); iter != iEnd; ++iter) {
260 const auto & currentKey = (*iter).getKey();
261 if(!insertedTags.contains(currentKey)) {
262 insertedTags.insert(currentKey);
272 if (!attrListHandle.isValid()) {
283 newone->copyData(*attlist);
289 newone->extend(*
it,
"bool");
290 (*newone)[*
it].data<
bool>() =
m_decSvc->isEventAccepted(*
it,Gaudi::Hive::currentContext());
300 ATH_MSG_DEBUG(
"Decisions already added by a different stream");
323 if (
wh.record(std::unique_ptr<DataHeader>(
m_dataHeader)).isFailure()) {
325 return(StatusCode::FAILURE);
332 return(StatusCode::SUCCESS);
340 return(StatusCode::FAILURE);
344 return(StatusCode::SUCCESS);
349 if (athConversionSvc != 0) {
352 return(StatusCode::FAILURE);
355 return(StatusCode::SUCCESS);
362 ATH_MSG_ERROR(
"Connection NOT open. Please open a connection before streaming out objects.");
363 return(StatusCode::FAILURE);
371 return(StatusCode::FAILURE);
374 std::vector<DataObject*> dataObjects;
375 for (TypeKeyPairs::const_iterator
first = typeKeys.begin(), last = typeKeys.end();
377 const std::string&
type = (*first).first;
378 const std::string&
key = (*first).second;
383 return(StatusCode::FAILURE);
385 DataObject* dObj = 0;
390 dObj =
m_store->accessData(clid);
399 return(StatusCode::SUCCESS);
404 dataObjects.push_back(dObj);
407 if (dataObjects.size() == 0) {
409 return(StatusCode::SUCCESS);
412 if (!
status.isSuccess()) {
416 return(StatusCode::SUCCESS);
422 ATH_MSG_ERROR(
"Connection NOT open. Please open a connection before streaming out objects.");
423 return(StatusCode::FAILURE);
426 std::string outputConnectionString =
outputName;
427 const std::string defaultMetaDataString =
"[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData]";
428 if (std::string::size_type mpos = outputConnectionString.find(defaultMetaDataString); mpos!=std::string::npos) {
445 std::map<DataObject*, IOpaqueAddress*> written;
446 for (DataObject* dobj : dataObjects) {
449 ATH_MSG_DEBUG(
"Explicit request to write DataHeader: " << dobj->name() <<
" - skipping it.");
451 }
else if (written.find(dobj) != written.end()) {
453 ATH_MSG_DEBUG(
"Trying to write DataObject twice (clid/key): " << dobj->clID() <<
" " << dobj->name());
457 IOpaqueAddress* addr =
new TokenAddress(0, dobj->clID(), outputConnectionString);
460 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dobj, addr));
462 ATH_MSG_ERROR(
"Could not create Rep for DataObject (clid/key):" << dobj->clID() <<
" " << dobj->name());
463 return(StatusCode::FAILURE);
468 if (
m_conversionSvc.type() ==
"AthenaPoolCnvSvc" && dataHeaderObj !=
nullptr) {
469 IOpaqueAddress* addr =
new TokenAddress(0, dataHeaderObj->clID(), outputConnectionString);
472 written.insert(std::pair<DataObject*, IOpaqueAddress*>(dataHeaderObj, addr));
475 return(StatusCode::FAILURE);
478 for (DataObject* dobj : dataObjects) {
481 if (
proxy !=
nullptr && written.find(dobj) != written.end()) {
482 IOpaqueAddress* addr(written.find(dobj)->second);
484 if (dobj->clID() != 1 || addr->par()[0] !=
"\n") {
490 if (
proxy->address() ==
nullptr) {
491 proxy->setAddress(addr);
496 ATH_MSG_ERROR(
"Could not fill Object Refs for DataObject (clid/key):" << dobj->clID() <<
" " << dobj->name());
497 return(StatusCode::FAILURE);
500 ATH_MSG_WARNING(
"Could cast DataObject " << dobj->clID() <<
" " << dobj->name());
504 if (
m_conversionSvc.type() ==
"AthenaPoolCnvSvc" && dataHeaderObj !=
nullptr) {
507 if (
proxy !=
nullptr && written.find(dataHeaderObj) != written.end()) {
508 IOpaqueAddress* addr(written.find(dataHeaderObj)->second);
510 if (dataHeaderObj->clID() != 1 || addr->par()[0] !=
"\n") {
520 return(StatusCode::FAILURE);
524 return(StatusCode::FAILURE);
527 return(StatusCode::SUCCESS);
531 const std::string hltKey =
"HLTAutoKey";
534 if (
m_store->retrieve(
beg, ending).isFailure() ||
beg == ending) {
537 for ( ;
beg != ending; ++
beg) {
539 for (std::vector<DataHeaderElement>::const_iterator
it =
beg->begin(), itLast =
beg->end();
540 it != itLast; ++
it) {
542 CLID clid =
it->getPrimaryClassID();
547 if (
m_clidSvc->getTypeNameOfID(clid,
typeName).isFailure() &&
it->getKey().find(
"Aux.") == std::string::npos) {
549 ATH_MSG_WARNING(
"Skipping " <<
it->getKey() <<
" with unknown clid " << clid <<
" . Further warnings for this item are suppressed" );
555 const std::string
keyName =
it->getKey();
557 p2BWrittenFromTool->
add(clid, hltKey +
"*").ignore();
559 p2BWrittenFromTool->
add(clid,
"*" + hltKey).ignore();
561 p2BWrittenFromTool->
add(clid,
keyName).ignore();
569 return(StatusCode::SUCCESS);