12#include "GaudiKernel/AttribStringParser.h"
13#include "GaudiKernel/ClassID.h"
14#include "GaudiKernel/FileIncident.h"
15#include "GaudiKernel/IIncidentSvc.h"
16#include "GaudiKernel/IIoComponentMgr.h"
17#include "GaudiKernel/IOpaqueAddress.h"
41 StringProperty defContainerType(
"DefaultContainerType",
"ROOTTREEINDEX");
42 if(IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_poolSvc.get())) {
43 propertyServer->getProperty(&defContainerType).ignore();
51 if (!iomgr->io_register(
this).isSuccess()) {
52 ATH_MSG_FATAL(
"Could not register myself with the IoComponentMgr !");
53 return(StatusCode::FAILURE);
57 if (
auto p = maxFileSizeSpec.find(
'='); p != std::string::npos) {
58 long long maxFileSize = 0;
59 const char* start = maxFileSizeSpec.data() + (p + 1);
60 const char* end = maxFileSizeSpec.data() + maxFileSizeSpec.size();
61 if (
auto [ptr, ec] = std::from_chars(start, end, maxFileSize); ec != std::errc{}) {
62 ATH_MSG_WARNING(std::format(
"Invalid MaxFileSize value: {}", std::string(start, end)));
64 std::string databaseName = maxFileSizeSpec.substr(0, maxFileSizeSpec.find_first_of(
" ="));
65 std::unique_lock<std::mutex> lock(
m_mutex);
68 if (
auto [ptr, ec] = std::from_chars(maxFileSizeSpec.data(), maxFileSizeSpec.data() + maxFileSizeSpec.size(),
m_domainMaxFileSize); ec != std::errc{}) {
69 ATH_MSG_WARNING(std::format(
"Invalid MaxFileSize value: {}", maxFileSizeSpec));
78 ATH_MSG_FATAL(std::format(
"Unknown storage type requested for file {}: {}", key, value));
79 return StatusCode::FAILURE;
82 }
catch (
const std::exception& e) {
83 ATH_MSG_FATAL(std::format(
"Exception while getting storage type for file {}: {}", key, e.what()));
84 return StatusCode::FAILURE;
86 ATH_MSG_FATAL(std::format(
"Unknown exception while getting storage type for file {}", key));
87 return StatusCode::FAILURE;
99 incSvc->addListener(
this,
"EndEvent", pri);
100 ATH_MSG_DEBUG(
"Subscribed to EndEvent for printing out input file attributes.");
103 ATH_MSG_DEBUG(
"setInputAttribute failed setting POOL domain attributes.");
108 TClass::GetClass (
"TLeafI");
109 TClass::GetClass (
"TLeafL");
110 TClass::GetClass (
"TLeafD");
111 TClass::GetClass (
"TLeafF");
113 return(StatusCode::SUCCESS);
119 return(StatusCode::SUCCESS);
125 FileIncident incident(name(),
"WriteDataHeaderForms", streamName);
126 if( DHCnvListener ) DHCnvListener->handle(incident);
133 return StatusCode::SUCCESS;
150 const std::string msgPrefix{
"PerfStats "};
152 ATH_MSG_INFO(msgPrefix <<
"Timing Measurements for AthenaPoolCnvSvc");
155 ATH_MSG_INFO(msgPrefix <<
"| " << std::left << std::setw(15) << key <<
" | "
156 << std::right << std::setw(15) << std::fixed << std::setprecision(0) << value <<
" ms |");
161 m_cnvs.shrink_to_fit();
162 return(StatusCode::SUCCESS);
167 return(StatusCode::SUCCESS);
172 std::string objName =
"ALL";
174 if (
m_clidSvc->getTypeNameOfID(pAddress->clID(), objName).isFailure()) {
175 objName = std::to_string(pAddress->clID());
178 objName += *(pAddress->par() + 1);
185 const unsigned int maxContext =
m_poolSvc->getInputContextMap().size();
188 const std::string contextStr = std::format(
"[CTXT={:08X}]", auxContext);
189 std::strncpy(text, contextStr.c_str(),
sizeof(text) - 1);
190 text[
sizeof(text) - 1] =
'\0';
191 if (
m_poolSvc->getInputContextMap().size() > maxContext) {
193 ATH_MSG_DEBUG(
"setInputAttribute failed to switch off TTreeCache for id = " << auxContext <<
".");
206 std::string objName =
"ALL";
208 if (
m_clidSvc->getTypeNameOfID(pObject->clID(), objName).isFailure()) {
209 objName = std::to_string(pObject->clID());
212 objName += pObject->registry()->name();
216 StatusCode status = StatusCode::FAILURE;
217 if (pObject->clID() == 1) {
220 if (proxy !=
nullptr) {
221 IConverter* cnv = converter(proxy->clID());
222 status = cnv->createRep(pObject, refpAddress);
228 }
catch(std::runtime_error& e) {
237 std::string objName =
"ALL";
239 if (
m_clidSvc->getTypeNameOfID(pObject->clID(), objName).isFailure()) {
240 objName = std::to_string(pObject->clID());
243 objName += pObject->registry()->name();
247 StatusCode status = StatusCode::FAILURE;
248 if (pObject->clID() == 1) {
251 if (proxy !=
nullptr) {
252 IConverter* cnv = converter(proxy->clID());
253 status = cnv->fillRepRefs(pAddress, pObject);
259 }
catch(std::runtime_error& e) {
267 const std::string& ) {
273 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
277 ATH_MSG_ERROR(
"connectOutput FAILED extract file name and technology.");
278 return(StatusCode::FAILURE);
283 ATH_MSG_ERROR(
"connectOutput FAILED to open an UPDATE transaction.");
284 return(StatusCode::FAILURE);
286 }
catch (std::exception& e) {
287 ATH_MSG_ERROR(
"connectOutput - caught exception: " << e.what());
288 return(StatusCode::FAILURE);
291 std::unique_lock<std::mutex> lock(
m_mutex);
293 std::size_t
merge = outputConnection.find(
"?pmerge=");
297 std::vector<std::string> maxFileSize;
298 maxFileSize.push_back(
"TREE_MAX_SIZE");
299 maxFileSize.push_back(
"1099511627776L");
305 const std::string& opt = dbAttrEntry[0];
306 std::string&
data = dbAttrEntry[1];
307 const std::string&
file = dbAttrEntry[2];
308 const std::string& cont = dbAttrEntry[3];
309 std::size_t equal = cont.find(
'=');
310 if (equal == std::string::npos) equal = 0;
313 std::size_t colon = prefix.find(
':');
314 if (colon == std::string::npos) colon = 0;
317 const auto& strProp = (prefix ==
"Default") ? defaultContName : prefix;
318 if (
merge != std::string::npos && opt ==
"TREE_AUTO_FLUSH" && 0 == outputConnection.compare(0,
merge,
file) &&cont.compare(equal, std::string::npos, strProp, colon) == 0 &&
data !=
"int" &&
data !=
"DbLonglong" &&
data !=
"double" &&
data !=
"string") {
319 flush = atoi(
data.c_str());
322 data = std::to_string(flush);
328 if (
merge != std::string::npos) {
329 ATH_MSG_INFO(
"connectOutput setting auto write for: " << outputConnection <<
" to " << flush <<
" events");
334 ATH_MSG_DEBUG(
"connectOutput failed process POOL domain attributes.");
337 ATH_MSG_DEBUG(
"connectOutput failed process POOL database attributes.");
339 return(StatusCode::SUCCESS);
345 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
348 std::unique_lock<std::mutex> lock(
m_mutex);
352 ATH_MSG_ERROR(
"connectOutput FAILED extract file name and technology.");
353 return(StatusCode::FAILURE);
357 ATH_MSG_DEBUG(
"commitOutput failed process POOL domain attributes.");
360 ATH_MSG_DEBUG(
"commitOutput failed process POOL database attributes.");
363 ATH_MSG_DEBUG(
"commitOutput failed process POOL container attributes.");
365 std::size_t
merge = outputConnection.find(
"?pmerge=");
366 const std::string baseOutputConnection = outputConnection.substr(0,
merge);
376 if (!
m_poolSvc->commit(contextId).isSuccess()) {
377 ATH_MSG_ERROR(
"commitOutput FAILED to commit OutputStream.");
378 return(StatusCode::FAILURE);
381 if (!
m_poolSvc->commitAndHold(contextId).isSuccess()) {
382 ATH_MSG_ERROR(
"commitOutput FAILED to commitAndHold OutputStream.");
383 return(StatusCode::FAILURE);
386 }
catch (std::exception& e) {
387 ATH_MSG_ERROR(
"commitOutput - caught exception: " << e.what());
388 return(StatusCode::FAILURE);
390 if (!this->
cleanUp(baseOutputConnection).isSuccess()) {
392 return(StatusCode::FAILURE);
395 long long int currentFileSize =
m_poolSvc->getFileSize(outputConnection, tech, contextId);
399 return(StatusCode::RECOVERABLE);
403 return(StatusCode::RECOVERABLE);
405 return(StatusCode::SUCCESS);
410 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
430 Token* token =
nullptr;
433 const std::string contextStr = std::format(
"[CTXT={:08X}]",
m_poolSvc->getOutputContext(placement->
fileName()));
434 std::strncpy(text, contextStr.c_str(),
sizeof(text) - 1);
435 text[
sizeof(text) - 1] =
'\0';
438 token =
m_poolSvc->registerForWrite(placement, obj, classDesc);
458 const std::string* par,
459 const unsigned long* ip,
460 IOpaqueAddress*& refpAddress) {
461 if( svcType != repSvcType() ) {
462 ATH_MSG_ERROR(
"createAddress: svcType != POOL_StorageType " << svcType <<
" " << repSvcType());
463 return(StatusCode::FAILURE);
465 std::unique_ptr<Token> token;
466 if (par[0].compare(0, 3,
"SHM") == 0) {
467 token = std::make_unique<Token>();
469 token->setAuxString(
"[PNAME=" + par[2] +
"]");
473 token.reset(
m_poolSvc->getToken(par[0], par[1], ip[0]));
475 if (token ==
nullptr) {
476 return(StatusCode::RECOVERABLE);
479 return(StatusCode::SUCCESS);
484 const std::string& refAddress,
485 IOpaqueAddress*& refpAddress) {
486 if (svcType != repSvcType()) {
487 ATH_MSG_ERROR(
"createAddress: svcType != POOL_StorageType " << svcType <<
" " << repSvcType());
488 return(StatusCode::FAILURE);
490 refpAddress =
new GenericAddress(repSvcType(), clid, refAddress);
491 return(StatusCode::SUCCESS);
495 std::string& refAddress) {
498 if (tokAddr !=
nullptr && tokAddr->
getToken() !=
nullptr) {
501 refAddress = *pAddress->par();
503 return(StatusCode::SUCCESS);
507 if (fileSpec.starts_with (
"ROOTKEY:")) {
509 fileSpec.erase(0, 8);
510 }
else if (fileSpec.starts_with (
"ROOTTREE:")) {
512 fileSpec.erase(0, 9);
513 }
else if (fileSpec.starts_with (
"ROOTTREEINDEX:")) {
515 fileSpec.erase(0, 14);
516 }
else if (fileSpec.starts_with (
"ROOTRNTUPLE:")) {
518 fileSpec.erase(0, 12);
519 }
else if (outputTech == 0) {
521 std::string fileName{fileSpec};
522 if (
auto pos = fileSpec.find(
"?pmerge="); pos != std::string::npos) {
523 fileName = fileSpec.substr(0, pos);
531 outputTech = it->second;
533 outputTech = it->second;
538 return StatusCode::SUCCESS;
542 m_cnvs.push_back(cnv);
543 return(StatusCode::SUCCESS);
547 bool retError =
false;
548 std::size_t cpos = connection.find(
':');
549 std::size_t bpos = connection.find(
'[');
550 if (cpos == std::string::npos) {
555 if (bpos != std::string::npos) bpos = bpos - cpos;
556 const std::string conn = connection.substr(cpos, bpos);
558 for (
auto converter : m_cnvs) {
559 if (!converter->cleanUp(conn).isSuccess()) {
564 return(retError ? StatusCode::FAILURE : StatusCode::SUCCESS);
571 ATH_MSG_DEBUG(
"setInputAttribute failed setting POOL database/container attributes.");
574 ATH_MSG_DEBUG(
"setInputAttribute failed getting POOL database/container attributes.");
578 const auto& extraInputContextMap =
m_poolSvc->getInputContextMap();
579 for (
const auto& [
label,
id]: extraInputContextMap) {
581 ATH_MSG_DEBUG(
"setInputAttribute failed to switch off TTreeCache for = " <<
label <<
".");
585 return(StatusCode::SUCCESS);
590 if (incident.type() ==
"EndEvent") {
592 ATH_MSG_DEBUG(
"handle EndEvent failed process POOL database attributes.");
598 base_class(name, pSvcLocator,
pool::POOL_StorageType.
type()) {
602 std::vector<std::vector<std::string> >* contAttr,
603 std::vector<std::vector<std::string> >* dbAttr,
604 std::vector<std::vector<std::string> >* domAttr)
const {
605 std::vector<std::string> opt;
606 std::string attributeName, containerName, databaseName, valueString;
607 for (
const auto& propertyValue : property.value()) {
609 attributeName.clear();
610 containerName.clear();
611 databaseName.clear();
613 using Gaudi::Utils::AttribStringParser;
614 for (
const AttribStringParser::Attrib& attrib : AttribStringParser (propertyValue)) {
615 if (attrib.tag ==
"DatabaseName") {
616 databaseName = attrib.value;
617 }
else if (attrib.tag ==
"ContainerName") {
618 if (databaseName.empty()) {
621 containerName = attrib.value;
623 attributeName = attrib.tag;
624 valueString = attrib.value;
627 if (!attributeName.empty() && !valueString.empty()) {
628 opt.push_back(attributeName);
629 opt.push_back(valueString);
630 if (!databaseName.empty()) {
631 opt.push_back(databaseName);
632 if (!containerName.empty()) {
633 opt.push_back(containerName);
634 if (containerName.compare(0, 6,
"TTree=") == 0) {
635 dbAttr->push_back(opt);
637 contAttr->push_back(opt);
641 dbAttr->push_back(opt);
643 }
else if (domAttr != 0) {
644 domAttr->push_back(opt);
648 dbAttr->push_back(opt);
655 const std::string& fileName,
656 unsigned long contextId,
659 bool doClear)
const {
660 bool retError =
false;
661 for (
auto& attrEntry : attr) {
662 if (attrEntry.size() == 2) {
663 const std::string& opt = attrEntry[0];
664 std::string
data = attrEntry[1];
665 if (
data ==
"int" ||
data ==
"DbLonglong" ||
data ==
"double" ||
data ==
"string") {
668 ATH_MSG_DEBUG(
"getAttribute failed for domain attr " << opt);
684 if (attrEntry.size() == 4) {
685 const std::string& opt = attrEntry[0];
686 std::string
data = attrEntry[1];
687 const std::string&
file = attrEntry[2];
688 const std::string& cont = attrEntry[3];
689 if (!fileName.empty() && (0 == fileName.compare(0, fileName.find(
'?'),
file)
690 || (
file[0] ==
'*' &&
file.find(
"," + fileName +
",") == std::string::npos))) {
691 if (
data ==
"int" ||
data ==
"DbLonglong" ||
data ==
"double" ||
data ==
"string") {
694 ATH_MSG_DEBUG(
"getAttribute failed for database/container attr " << opt);
700 ATH_MSG_DEBUG(
"setAttribute " << opt <<
" to " <<
data <<
" for db: " << fileName <<
" and cont: " << cont);
703 attrEntry[2] +=
"," + fileName +
",";
709 ATH_MSG_DEBUG(
"setAttribute failed for " << opt <<
" to " <<
data <<
" for db: " << fileName <<
" and cont: " << cont);
716 std::erase_if(attr, [](
const auto& entry) {
return entry.empty(); });
717 return(retError ? StatusCode::FAILURE : StatusCode::SUCCESS);
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
This file contains the class definition for the AthenaPoolCnvSvc class.
uint32_t CLID
The Class ID type.
char data[hepevt_bytes_allocation_ATLAS]
This file contains the class definition for the Placement class (migrated from POOL).
This file contains the class definition for the TokenAddress class.
This file contains the class definition for the Token class (migrated from POOL).
virtual StatusCode createRep(DataObject *pObject, IOpaqueAddress *&refpAddress) override
Implementation of IConverter: Convert the transient object to the requested representation.
virtual StatusCode createObj(IOpaqueAddress *pAddress, DataObject *&refpObject) override
Implementation of IConverter: Create the transient representation of an object.
virtual StatusCode fillRepRefs(IOpaqueAddress *pAddress, DataObject *pObject) override
Implementation of IConverter: Resolve the references of the converted object.
ServiceHandle< IClassIDSvc > m_clidSvc
virtual IPoolSvc * getPoolSvc() override
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
std::map< std::string, int > m_fileCommitCounter
virtual void handle(const Incident &incident) override
Implementation of IIncidentListener: Handle for EndEvent incidence.
Gaudi::Property< bool > m_useDetailChronoStat
UseDetailChronoStat, enable detailed output for time and size statistics for AthenaPOOL: default = fa...
StatusCode processPoolAttributes(std::vector< std::vector< std::string > > &attr, const std::string &fileName, unsigned long contextId, bool doGet=true, bool doSet=true, bool doClear=true) const
Set/get technology dependent POOL attributes.
std::map< std::string, int > m_storageTechMap
std::vector< std::vector< std::string > > m_inputAttrPerEvent
virtual StatusCode io_finalize() override
virtual StatusCode initialize() override
Required of all Gaudi Services.
std::vector< unsigned int > m_contextAttr
virtual StatusCode stop() override
long long m_domainMaxFileSize
virtual StatusCode io_reinit() override
Gaudi::Property< std::string > m_containerPrefixProp
POOL Container name prefix - will be part of or whole TTree/RNTuple name 'Default' takes the prefix f...
std::map< std::string, long long > m_databaseMaxFileSize
virtual StatusCode registerCleanUp(IAthenaPoolCleanUp *cnv) override
Implement registerCleanUp to register a IAthenaPoolCleanUp to be called during cleanUp.
virtual StatusCode fillRepRefs(IOpaqueAddress *pAddress, DataObject *pObject) override
Implementation of IConversionSvc: Resolve the references of the converted object.
virtual StatusCode decodeOutputSpec(std::string &connectionSpec, int &outputTech) const override
Extract/deduce the DB technology from the connection string/file specification.
std::vector< std::vector< std::string > > m_databaseAttr
virtual StatusCode createObj(IOpaqueAddress *pAddress, DataObject *&refpObject) override
Implementation of IConversionSvc: Create the transient representation of an object from persistent st...
std::map< std::string, int > m_fileFlushSetting
void extractPoolAttributes(const Gaudi::Property< std::vector< std::string > > &property, std::vector< std::vector< std::string > > *contAttr, std::vector< std::vector< std::string > > *dbAttr, std::vector< std::vector< std::string > > *domAttr=0) const
Extract POOL ItechnologySpecificAttributes for Domain, Database and Container from property.
virtual bool useDetailChronoStat() const override
std::vector< std::vector< std::string > > m_containerAttr
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
Gaudi::Property< std::string > m_persSvcPerInputType
PersSvcPerInputType, string property, tree name to use multiple persistency services,...
Gaudi::Property< std::vector< std::string > > m_inputPoolAttr
Input PoolAttributes, vector with names and values of technology specific attributes for POOL.
virtual StatusCode createRep(DataObject *pObject, IOpaqueAddress *&refpAddress) override
Implementation of IConversionSvc: Convert the transient object to the requested representation.
PMonUtils::BasicStopWatchResultMap_t m_chronoMap
Map that holds chrono information.
std::string m_defContainerType
Default container type (from PoolSvc)
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
AthenaPoolCnvSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Gaudi::Property< bool > m_persSvcPerOutput
PersSvcPerOutput, boolean property to use multiple persistency services, one per output stream.
std::string m_lastInputFileName
decoded storage tech requested in "StorageTechnology" property
virtual StatusCode finalize() override
Required of all Gaudi Services.
Gaudi::Property< int > m_numberEventsPerWrite
To use MetadataSvc to merge data placed in a certain container When using TMemFile call Write on numb...
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
void flushDataHeaderForms(const std::string &streamName="*")
Tell DataHeaderCnv to write out all DataHeaderForms for a given streamName (default is all)
unsigned outputContextId(const std::string &outputConnection)
Gaudi::Property< std::vector< std::string > > m_maxFileSizes
MaxFileSizes, vector with maximum file sizes for Athena POOL output files.
ServiceHandle< IPoolSvc > m_poolSvc
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
std::vector< std::vector< std::string > > m_inputAttr
virtual StatusCode convertAddress(const IOpaqueAddress *pAddress, std::string &refAddress) override
Convert address to string form.
Gaudi::Property< std::map< std::string, std::string > > m_storageTechProp
Default Storage Tech for containers (ROOTTREE, ROOTTREEINDEX, ROOTRNTUPLE)
virtual void setObjPtr(void *&obj, const Token *token) override
std::vector< std::vector< std::string > > m_domainAttr
Gaudi::Property< std::vector< std::string > > m_poolAttr
Output PoolAttributes, vector with names and values of technology specific attributes for POOL.
virtual StatusCode setInputAttributes(const std::string &fileName) override
Set the input file attributes, if any are requested from jobOpts.
Gaudi::Property< std::vector< std::string > > m_inputPoolAttrPerEvent
Print input PoolAttributes per event, vector with names of technology specific attributes for POOL to...
static const Guid & null() noexcept
NULL-Guid: static class method.
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
This class provides the interface for the AthenaPoolCleanUp which is used to clean up AthenaPoolConve...
This class provides the interface to the LCG POOL persistency software.
This class holds all the necessary information to guide the writing of an object in a physical place.
Placement & setAuxString(const std::string &auxString)
Set auxiliary string.
const std::string & fileName() const
Access file name.
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
This class provides a Generic Transient Address for POOL tokens.
This class provides a token that identifies in a unique way objects on the persistent storage.
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
const std::string & contID() const
Access container identifier.
const Guid & classID() const
Access database identifier.
virtual const std::string toString() const
Retrieve the string representation of the token.
const Guid & dbID() const
Access database identifier.
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.
int type() const
Access to full type.
static DbType getType(const std::string &name)
Access known storage type object by name.
std::string label(const std::string &format, int i)
static const DbType TEST_StorageType
static const DbType ROOTTREE_StorageType
static const DbType ROOTRNTUPLE_StorageType
static const DbType ROOTTREEINDEX_StorageType
static const DbType ROOTKEY_StorageType
std::size_t erase_if(T_container &container, T_Func pred)
static constexpr const char * EventData
static constexpr const char * EventData