12 #include "GaudiKernel/AttribStringParser.h"
13 #include "GaudiKernel/ClassID.h"
14 #include "GaudiKernel/FileIncident.h"
15 #include "GaudiKernel/IIncidentSvc.h"
16 #include "GaudiKernel/IIoComponentMgr.h"
17 #include "GaudiKernel/IOpaqueAddress.h"
69 if (!iomgr->io_register(
this).isSuccess()) {
70 ATH_MSG_FATAL(
"Could not register myself with the IoComponentMgr !");
71 return(StatusCode::FAILURE);
74 for (std::vector<std::string>::const_iterator iter =
m_maxFileSizes.value().begin(),
76 if (
auto p = iter->find(
'=');
p != std::string::npos) {
77 long long maxFileSize = atoll(iter->data() + (
p + 1));
78 std::string databaseName = iter->substr(0, iter->find_first_of(
" ="));
88 return StatusCode::FAILURE;
105 incSvc->addListener(
this,
"EndEvent", pri);
106 ATH_MSG_DEBUG(
"Subscribed to EndEvent for printing out input file attributes.");
109 ATH_MSG_DEBUG(
"setInputAttribute failed setting POOL domain attributes.");
113 incSvc->addListener(
this,
"StoreCleared", pri);
118 TClass::GetClass (
"TLeafI");
119 TClass::GetClass (
"TLeafL");
120 TClass::GetClass (
"TLeafD");
121 TClass::GetClass (
"TLeafF");
123 return(StatusCode::SUCCESS);
129 return(StatusCode::SUCCESS);
135 FileIncident incident(
name(),
"WriteDataHeaderForms",
streamName);
136 if( DHCnvListener ) DHCnvListener->handle(incident);
143 return StatusCode::SUCCESS;
178 const std::string msgPrefix{
"PerfStats "};
180 ATH_MSG_INFO(msgPrefix <<
"Timing Measurements for AthenaPoolCnvSvc");
183 ATH_MSG_INFO(msgPrefix <<
"| " << std::left << std::setw(15) <<
key <<
" | "
184 << std::right << std::setw(15) << std::fixed << std::setprecision(0) <<
value <<
" ms |");
189 m_cnvs.shrink_to_fit();
190 return(StatusCode::SUCCESS);
195 return(StatusCode::SUCCESS);
200 std::string objName =
"ALL";
202 if (
m_clidSvc->getTypeNameOfID(pAddress->clID(), objName).isFailure()) {
203 std::ostringstream oss;
204 oss << std::dec << pAddress->clID();
208 objName += *(pAddress->par() + 1);
215 const unsigned int maxContext =
m_poolSvc->getInputContextMap().size();
218 ::sprintf(
text,
"[CTXT=%08X]", auxContext);
219 if (
m_poolSvc->getInputContextMap().size() > maxContext) {
221 ATH_MSG_DEBUG(
"setInputAttribute failed to switch off TTreeCache for id = " << auxContext <<
".");
234 std::string objName =
"ALL";
236 if (
m_clidSvc->getTypeNameOfID(pObject->clID(), objName).isFailure()) {
237 std::ostringstream oss;
238 oss << std::dec << pObject->clID();
242 objName += pObject->registry()->name();
247 if (pObject->clID() == 1) {
250 if (
proxy !=
nullptr) {
251 IConverter* cnv = converter(
proxy->clID());
252 status = cnv->createRep(pObject, refpAddress);
258 }
catch(std::runtime_error&
e) {
267 std::string objName =
"ALL";
269 if (
m_clidSvc->getTypeNameOfID(pObject->clID(), objName).isFailure()) {
270 std::ostringstream oss;
271 oss << std::dec << pObject->clID();
275 objName += pObject->registry()->name();
280 if (pObject->clID() == 1) {
283 if (
proxy !=
nullptr) {
284 IConverter* cnv = converter(
proxy->clID());
285 status = cnv->fillRepRefs(pAddress, pObject);
291 }
catch(std::runtime_error&
e) {
299 const std::string& ) {
305 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
309 ATH_MSG_ERROR(
"connectOutput FAILED extract file name and technology.");
310 return(StatusCode::FAILURE);
314 ATH_MSG_ERROR(
"Could not make AthenaPoolCnvSvc a Share Client");
315 return(StatusCode::FAILURE);
320 return(StatusCode::SUCCESS);
324 ATH_MSG_DEBUG(
"connectOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
325 return(StatusCode::SUCCESS);
329 return(StatusCode::SUCCESS);
339 ATH_MSG_ERROR(
"connectOutput FAILED to open an UPDATE transaction.");
340 return(StatusCode::FAILURE);
344 return(StatusCode::FAILURE);
347 std::unique_lock<std::mutex> lock(
m_mutex);
353 std::vector<std::string> maxFileSize;
354 maxFileSize.push_back(
"TREE_MAX_SIZE");
355 maxFileSize.push_back(
"1099511627776L");
360 iter != last; ++iter) {
361 const std::string&
opt = (*iter)[0];
362 std::string&
data = (*iter)[1];
363 const std::string&
file = (*iter)[2];
364 const std::string& cont = (*iter)[3];
365 std::size_t
equal = cont.find(
'=');
369 if (colon == std::string::npos) colon = 0;
372 if (
merge != std::string::npos &&
opt ==
"TREE_AUTO_FLUSH" && 0 == outputConnection.compare(0,
merge,
file) && cont.compare(
equal, std::string::npos, strProp, colon) == 0 &&
data !=
"int" &&
data !=
"DbLonglong" &&
data !=
"double" &&
data !=
"string") {
376 std::ostringstream eventAutoFlush;
377 eventAutoFlush <<
flush;
378 data = eventAutoFlush.str();
384 if (
merge != std::string::npos) {
385 ATH_MSG_INFO(
"connectOutput setting auto write for: " << outputConnection <<
" to " <<
flush <<
" events");
390 ATH_MSG_DEBUG(
"connectOutput failed process POOL domain attributes.");
393 ATH_MSG_DEBUG(
"connectOutput failed process POOL database attributes.");
395 return(StatusCode::SUCCESS);
401 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
405 if (!this->
cleanUp(outputConnection).isSuccess()) {
407 return(StatusCode::FAILURE);
409 return(StatusCode::SUCCESS);
412 ATH_MSG_DEBUG(
"commitOutput SKIPPED for uninitialized server.");
413 return(StatusCode::SUCCESS);
415 std::map<void*, RootType> commitCache;
419 const char* placementStr =
nullptr;
422 if (
sc.isSuccess() && placementStr !=
nullptr && strlen(placementStr) > 6 &&
num > 0) {
423 const char * matchedChars = strstr(placementStr,
"[FILE=");
424 if (not matchedChars){
435 bool dataHeaderSeen =
false;
436 std::string dataHeaderID;
438 std::string objName =
"ALL";
440 std::string objName(placementStr);
445 std::string tokenStr = placementStr;
446 std::string contName = strstr(placementStr,
"[CONT=");
447 tokenStr.erase(tokenStr.find(
"[CONT="));
448 tokenStr.append(contName, contName.find(
']') + 1);
449 contName = contName.substr(6, contName.find(
']') - 6);
450 std::string
className = strstr(placementStr,
"[PNAME=");
454 std::ostringstream oss2;
455 oss2 << std::dec <<
num;
457 bool foundContainer =
false;
458 std::size_t pPos = contName.find(
'(');
460 foundContainer =
true;
464 if (contName.compare(0, pPos,
item) == 0){
465 foundContainer =
true;
471 if (len > 0 && foundContainer && contName[len] ==
'(' ) {
476 std::ostringstream oss1;
478 std::string memName =
"SHM[NUM=" + oss1.str() +
"]";
479 FileIncident beginInputIncident(
name(),
"BeginInputFile", memName);
480 incSvc->fireIncident(beginInputIncident);
481 FileIncident endInputIncident(
name(),
"EndInputFile", memName);
482 incSvc->fireIncident(endInputIncident);
489 sc = metadataSvc->shmProxy(std::string(placementStr) +
"[NUM=" + oss2.str() +
"]");
490 if (
sc.isRecoverable()) {
492 }
else if (
sc.isFailure()) {
504 auto placementWithSwn = [&] {
return std::format(
"{}[SWN={}]", placementStr,
num); };
508 "", placementWithSwn());
509 DHcnv->updateRepRefs(&
address,
static_cast<DataObject*
>(
obj)).ignore();
515 if (token ==
nullptr) {
524 tokenStr, placementWithSwn());
525 if (!DHcnv->updateRep(&
address,
static_cast<DataObject*
>(
obj)).isSuccess()) {
531 commitCache.insert(std::pair<void*, RootType>(
obj, classDesc));
533 placementStr =
nullptr;
538 placement.
fromString(placementStr); placementStr =
nullptr;
540 if (token ==
nullptr) {
550 if (!DHcnv->updateRep(&
address,
static_cast<DataObject*
>(
obj)).isSuccess()) {
554 dataHeaderSeen =
true;
562 }
else if (dataHeaderSeen) {
563 dataHeaderSeen =
false;
569 tokenStr, dataHeaderID);
570 if (!DHcnv->updateRepRefs(&
address,
static_cast<DataObject*
>(
obj)).isSuccess()) {
571 ATH_MSG_ERROR(
"Failed updateRepRefs for obj = " << tokenStr);
576 GenericAddress
address(0, 0,
"", dataHeaderID);
577 if (!DHcnv->updateRepRefs(&
address,
nullptr).isSuccess()) {
584 commitCache.insert(std::pair<void*, RootType>(
obj, classDesc));
591 while (
sc.isRecoverable()) {
594 if (!
sc.isSuccess()) {
600 while (
sc.isRecoverable()) {
603 if (
sc.isFailure()) {
608 if (dataHeaderSeen) {
610 GenericAddress
address(0, 0,
"", dataHeaderID);
611 if (!DHcnv->updateRepRefs(&
address,
nullptr).isSuccess()) {
616 placementStr =
nullptr;
617 }
else if (
sc.isSuccess() && placementStr !=
nullptr && strncmp(placementStr,
"stop", 4) == 0) {
618 return(StatusCode::RECOVERABLE);
619 }
else if (
sc.isRecoverable() ||
num == -1) {
620 return(StatusCode::RECOVERABLE);
624 std::ostringstream oss1;
626 std::string memName =
"SHM[NUM=" + oss1.str() +
"]";
627 FileIncident beginInputIncident(
name(),
"BeginInputFile", memName);
628 incSvc->fireIncident(beginInputIncident);
629 FileIncident endInputIncident(
name(),
"EndInputFile", memName);
630 incSvc->fireIncident(endInputIncident);
631 if (
sc.isFailure()) {
632 ATH_MSG_INFO(
"All SharedWriter clients stopped - exiting");
636 return(StatusCode::FAILURE);
640 ATH_MSG_DEBUG(
"commitOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
641 return(StatusCode::SUCCESS);
645 std::unique_lock<std::mutex> lock(
m_mutex);
646 if (outputConnection.empty()) {
652 ATH_MSG_ERROR(
"connectOutput FAILED extract file name and technology.");
653 return(StatusCode::FAILURE);
655 const std::string oldOutputConnection = outputConnection;
666 ATH_MSG_DEBUG(
"commitOutput failed process POOL domain attributes.");
669 ATH_MSG_DEBUG(
"commitOutput failed process POOL database attributes.");
672 ATH_MSG_DEBUG(
"commitOutput failed process POOL container attributes.");
677 if (!
m_poolSvc->commit(contextId).isSuccess()) {
678 ATH_MSG_ERROR(
"commitOutput FAILED to commit OutputStream.");
679 return(StatusCode::FAILURE);
682 if (!
m_poolSvc->commitAndHold(contextId).isSuccess()) {
683 ATH_MSG_ERROR(
"commitOutput FAILED to commitAndHold OutputStream.");
684 return(StatusCode::FAILURE);
689 return(StatusCode::FAILURE);
691 if (!this->
cleanUp(oldOutputConnection).isSuccess()) {
693 return(StatusCode::FAILURE);
696 iter->second.Destruct(iter->first);
699 long long int currentFileSize =
m_poolSvc->getFileSize(outputConnection,
m_dbType.
type(), contextId);
703 return(StatusCode::RECOVERABLE);
707 return(StatusCode::RECOVERABLE);
709 return(StatusCode::SUCCESS);
714 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
717 return(StatusCode::SUCCESS);
723 return(StatusCode::SUCCESS);
753 ATH_MSG_ERROR(
"Could not make AthenaPoolCnvSvc a Share Client");
757 Token* token =
nullptr;
761 std::string placementStr = placement->
toString();
762 placementStr +=
"[PNAME=";
763 placementStr += classDesc.
Name();
767 while (
sc.isRecoverable()) {
771 if (!
sc.isSuccess()) {
776 const void*
buffer =
nullptr;
777 std::size_t nbytes = 0;
779 if (classDesc.
Name() ==
"Token") {
780 nbytes = strlen(
static_cast<const char*
>(
obj)) + 1;
784 nbytes = classDesc.
SizeOf();
792 while (
sc.isRecoverable()) {
796 if (own) {
delete []
static_cast<const char*
>(
buffer); }
798 if (!
sc.isSuccess()) {
799 ATH_MSG_ERROR(
"Could not share object for: " << placementStr);
805 ATH_MSG_ERROR(
"Could not share dynamic aux store for: " << placementStr);
814 const char* tokenStr =
nullptr;
817 while (
sc.isRecoverable()) {
821 if (!
sc.isSuccess()) {
825 if (!strcmp(tokenStr,
"ABORT")) {
832 tempToken->
fromString(tokenStr); tokenStr =
nullptr;
834 token = tempToken; tempToken =
nullptr;
838 ATH_MSG_DEBUG(
"registerForWrite SKIPPED for uninitialized server, Placement = " << placement->
toString());
841 token = tempToken; tempToken =
nullptr;
844 token =
m_poolSvc->registerForWrite(placement,
obj, classDesc);
854 token =
m_poolSvc->registerForWrite(placement,
obj, classDesc);
866 ATH_MSG_ERROR(
"Could not make AthenaPoolCnvSvc a Share Client");
871 int num = token->
oid().first;
874 std::size_t nbytes = 0;
876 while (
sc.isRecoverable()) {
880 if (!
sc.isSuccess()) {
912 std::size_t nbytes = 0;
918 while (
sc.isRecoverable()) {
923 if (!
sc.isSuccess()) {
947 const std::string*
par,
948 const unsigned long*
ip,
949 IOpaqueAddress*& refpAddress) {
950 if (svcType != POOL_StorageType) {
951 ATH_MSG_ERROR(
"createAddress: svcType != POOL_StorageType " << svcType <<
" " << POOL_StorageType);
952 return(StatusCode::FAILURE);
956 ATH_MSG_ERROR(
"Could not make AthenaPoolCnvSvc a Share Client");
957 return(StatusCode::FAILURE);
960 Token* token =
nullptr;
969 addressToken.
setDb(
par[0].substr(4));
974 return(StatusCode::FAILURE);
977 std::size_t nbytes = 0;
979 while (
sc.isRecoverable()) {
983 if (!
sc.isSuccess()) {
985 return(StatusCode::FAILURE);
990 delete token; token =
nullptr;
996 if (token ==
nullptr) {
997 return(StatusCode::RECOVERABLE);
1000 return(StatusCode::SUCCESS);
1005 const std::string& refAddress,
1006 IOpaqueAddress*& refpAddress) {
1007 if (svcType != POOL_StorageType) {
1008 ATH_MSG_ERROR(
"createAddress: svcType != POOL_StorageType " << svcType <<
" " << POOL_StorageType);
1009 return(StatusCode::FAILURE);
1011 refpAddress =
new GenericAddress(POOL_StorageType, clid, refAddress);
1012 return(StatusCode::SUCCESS);
1016 std::string& refAddress) {
1019 if (tokAddr !=
nullptr && tokAddr->
getToken() !=
nullptr) {
1022 refAddress = *pAddress->par();
1024 return(StatusCode::SUCCESS);
1030 if (fileSpec.starts_with (
"oracle") || fileSpec.starts_with (
"mysql")) {
1031 outputTech = pool::POOL_RDBMS_StorageType.
type();
1032 }
else if (fileSpec.starts_with (
"ROOTKEY:")) {
1033 outputTech = pool::ROOTKEY_StorageType.
type();
1034 fileSpec.erase(0, 8);
1035 }
else if (fileSpec.starts_with (
"ROOTTREE:")) {
1036 outputTech = pool::ROOTTREE_StorageType.
type();
1037 fileSpec.erase(0, 9);
1038 }
else if (fileSpec.starts_with (
"ROOTTREEINDEX:")) {
1039 outputTech = pool::ROOTTREEINDEX_StorageType.
type();
1040 fileSpec.erase(0, 14);
1041 }
else if (fileSpec.starts_with (
"ROOTRNTUPLE:")) {
1042 outputTech = pool::ROOTRNTUPLE_StorageType.
type();
1043 fileSpec.erase(0, 12);
1044 }
else if (outputTech == 0) {
1047 return(StatusCode::SUCCESS);
1051 m_cnvs.push_back(cnv);
1052 return(StatusCode::SUCCESS);
1056 bool retError =
false;
1057 std::size_t cpos = connection.find(
':');
1058 std::size_t bpos = connection.find(
'[');
1059 if (cpos == std::string::npos) {
1064 if (bpos != std::string::npos) bpos = bpos - cpos;
1065 const std::string
conn = connection.substr(cpos, bpos);
1067 for (
auto converter : m_cnvs) {
1068 if (!converter->cleanUp(
conn).isSuccess()) {
1073 return(retError ? StatusCode::FAILURE : StatusCode::SUCCESS);
1080 ATH_MSG_DEBUG(
"setInputAttribute failed setting POOL database/container attributes.");
1083 ATH_MSG_DEBUG(
"setInputAttribute failed getting POOL database/container attributes.");
1087 const auto& extraInputContextMap =
m_poolSvc->getInputContextMap();
1088 for (
const auto& [
label,
id]: extraInputContextMap) {
1090 ATH_MSG_DEBUG(
"setInputAttribute failed to switch off TTreeCache for = " <<
label <<
".");
1094 return(StatusCode::SUCCESS);
1108 return(StatusCode::FAILURE);
1112 return(StatusCode::SUCCESS);
1114 return(StatusCode::RECOVERABLE);
1117 return(StatusCode::RECOVERABLE);
1126 std::string streamPortSuffix;
1129 return(StatusCode::FAILURE);
1132 ATH_MSG_DEBUG(
"makeClient: Setting conversion service port suffix to " << streamPortSuffix);
1137 return(StatusCode::SUCCESS);
1140 std::string dummyStr;
1146 return(StatusCode::FAILURE);
1148 const char* tokenStr =
nullptr;
1151 if (
sc.isSuccess() && tokenStr !=
nullptr && strlen(tokenStr) > 0 &&
num > 0) {
1159 token.
fromString(tokenStr); tokenStr =
nullptr;
1161 std::string objName =
"ALL";
1171 std::size_t nbytes = 0;
1174 while (
sc.isRecoverable()) {
1177 delete []
static_cast<char*
>(
buffer);
buffer =
nullptr;
1178 if (!
sc.isSuccess()) {
1180 return(StatusCode::FAILURE);
1185 return(StatusCode::FAILURE);
1190 return(StatusCode::FAILURE);
1193 std::string returnToken;
1195 if (metadataToken !=
nullptr) {
1196 returnToken = metadataToken->
toString();
1200 delete metadataToken; metadataToken =
nullptr;
1205 return(StatusCode::FAILURE);
1208 return(StatusCode::RECOVERABLE);
1210 return(StatusCode::SUCCESS);
1219 return(StatusCode::SUCCESS);
1229 while (
sc.isSuccess()) {
1230 if (client_n >= 0) {
1235 while (
sc.isRecoverable()) {
1239 return StatusCode::FAILURE;
1247 if (incident.type() ==
"EndEvent") {
1249 ATH_MSG_DEBUG(
"handle EndEvent failed process POOL database attributes.");
1255 base_class(
name, pSvcLocator, POOL_StorageType)
1260 std::vector<std::vector<std::string> >* contAttr,
1261 std::vector<std::vector<std::string> >* dbAttr,
1262 std::vector<std::vector<std::string> >* domAttr)
const {
1263 std::vector<std::string>
opt;
1264 std::string attributeName, containerName, databaseName, valueString;
1265 for (std::vector<std::string>::const_iterator iter = property.value().begin(),
1266 last = property.value().end(); iter != last; ++iter) {
1268 attributeName.clear();
1269 containerName.clear();
1270 databaseName.clear();
1271 valueString.clear();
1272 using Gaudi::Utils::AttribStringParser;
1273 for (
const AttribStringParser::Attrib& attrib : AttribStringParser (*iter)) {
1274 const std::string
tag = attrib.tag;
1275 const std::string
val = attrib.value;
1276 if (
tag ==
"DatabaseName") {
1278 }
else if (
tag ==
"ContainerName") {
1279 if (databaseName.empty()) {
1282 containerName =
val;
1284 attributeName =
tag;
1288 if (!attributeName.empty() && !valueString.empty()) {
1289 opt.push_back(attributeName);
1290 opt.push_back(valueString);
1291 if (!databaseName.empty()) {
1292 opt.push_back(databaseName);
1293 if (!containerName.empty()) {
1294 opt.push_back(containerName);
1295 if (containerName.compare(0, 6,
"TTree=") == 0) {
1296 dbAttr->push_back(
opt);
1298 contAttr->push_back(
opt);
1302 dbAttr->push_back(
opt);
1304 }
else if (domAttr != 0) {
1305 domAttr->push_back(
opt);
1309 dbAttr->push_back(
opt);
1317 unsigned long contextId,
1320 bool doClear)
const {
1321 bool retError =
false;
1323 for (std::vector<std::vector<std::string> >::
iterator iter = attr.begin(), last = attr.end();
1324 iter != last; ++iter) {
1325 if (iter->size() == 2) {
1326 const std::string&
opt = (*iter)[0];
1327 std::string
data = (*iter)[1];
1328 if (
data ==
"int" ||
data ==
"DbLonglong" ||
data ==
"double" ||
data ==
"string") {
1347 if (iter->size() == 4) {
1348 const std::string&
opt = (*iter)[0];
1349 std::string
data = (*iter)[1];
1350 const std::string&
file = (*iter)[2];
1351 const std::string& cont = (*iter)[3];
1353 || (
file[0] ==
'*' &&
file.find(
"," +
fileName +
",") == std::string::npos))) {
1354 if (
data ==
"int" ||
data ==
"DbLonglong" ||
data ==
"double" ||
data ==
"string") {
1366 (*iter)[2] +=
"," +
fileName +
",";
1379 for (std::vector<std::vector<std::string> >::
iterator iter = attr.begin(); iter != attr.end(); ) {
1380 if (iter->empty()) {
1381 iter = attr.erase(iter);
1386 return(retError ? StatusCode::FAILURE : StatusCode::SUCCESS);