12 #include "GaudiKernel/AttribStringParser.h"
13 #include "GaudiKernel/ClassID.h"
14 #include "GaudiKernel/FileIncident.h"
15 #include "GaudiKernel/IIncidentSvc.h"
16 #include "GaudiKernel/IIoComponentMgr.h"
17 #include "GaudiKernel/IOpaqueAddress.h"
69 if (!iomgr->io_register(
this).isSuccess()) {
70 ATH_MSG_FATAL(
"Could not register myself with the IoComponentMgr !");
71 return(StatusCode::FAILURE);
74 for (std::vector<std::string>::const_iterator iter =
m_maxFileSizes.value().begin(),
76 if (
auto p = iter->find(
'=');
p != std::string::npos) {
77 long long maxFileSize = atoll(iter->data() + (
p + 1));
78 std::string databaseName = iter->substr(0, iter->find_first_of(
" ="));
88 return StatusCode::FAILURE;
105 incSvc->addListener(
this,
"EndEvent", pri);
106 ATH_MSG_DEBUG(
"Subscribed to EndEvent for printing out input file attributes.");
109 ATH_MSG_DEBUG(
"setInputAttribute failed setting POOL domain attributes.");
113 incSvc->addListener(
this,
"StoreCleared", pri);
118 TClass::GetClass (
"TLeafI");
119 TClass::GetClass (
"TLeafL");
120 TClass::GetClass (
"TLeafD");
121 TClass::GetClass (
"TLeafF");
123 return(StatusCode::SUCCESS);
129 return(StatusCode::SUCCESS);
161 const std::string msgPrefix{
"PerfStats "};
163 ATH_MSG_INFO(msgPrefix <<
"Timing Measurements for AthenaPoolCnvSvc");
166 ATH_MSG_INFO(msgPrefix <<
"| " << std::left << std::setw(15) <<
key <<
" | "
167 << std::right << std::setw(15) << std::fixed << std::setprecision(0) <<
value <<
" ms |");
173 return(StatusCode::SUCCESS);
178 return(StatusCode::SUCCESS);
189 return(StatusCode::SUCCESS);
194 std::string objName =
"ALL";
196 if (
m_clidSvc->getTypeNameOfID(pAddress->clID(), objName).isFailure()) {
197 std::ostringstream oss;
198 oss << std::dec << pAddress->clID();
202 objName += *(pAddress->par() + 1);
209 const unsigned int maxContext =
m_poolSvc->getInputContextMap().size();
212 ::sprintf(
text,
"[CTXT=%08X]", auxContext);
213 if (
m_poolSvc->getInputContextMap().size() > maxContext) {
215 ATH_MSG_DEBUG(
"setInputAttribute failed to switch off TTreeCache for id = " << auxContext <<
".");
228 std::string objName =
"ALL";
230 if (
m_clidSvc->getTypeNameOfID(pObject->clID(), objName).isFailure()) {
231 std::ostringstream oss;
232 oss << std::dec << pObject->clID();
236 objName += pObject->registry()->name();
241 if (pObject->clID() == 1) {
244 if (
proxy !=
nullptr) {
246 status = cnv->createRep(pObject, refpAddress);
252 }
catch(std::runtime_error&
e) {
261 std::string objName =
"ALL";
263 if (
m_clidSvc->getTypeNameOfID(pObject->clID(), objName).isFailure()) {
264 std::ostringstream oss;
265 oss << std::dec << pObject->clID();
269 objName += pObject->registry()->name();
274 if (pObject->clID() == 1) {
277 if (
proxy !=
nullptr) {
279 status = cnv->fillRepRefs(pAddress, pObject);
285 }
catch(std::runtime_error&
e) {
293 const std::string& ) {
299 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
303 ATH_MSG_ERROR(
"connectOutput FAILED extract file name and technology.");
304 return(StatusCode::FAILURE);
308 ATH_MSG_ERROR(
"Could not make AthenaPoolCnvSvc a Share Client");
309 return(StatusCode::FAILURE);
314 return(StatusCode::SUCCESS);
318 ATH_MSG_DEBUG(
"connectOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
319 return(StatusCode::SUCCESS);
323 return(StatusCode::SUCCESS);
333 ATH_MSG_ERROR(
"connectOutput FAILED to open an UPDATE transaction.");
334 return(StatusCode::FAILURE);
338 return(StatusCode::FAILURE);
341 std::unique_lock<std::mutex> lock(
m_mutex);
347 std::vector<std::string> maxFileSize;
348 maxFileSize.push_back(
"TREE_MAX_SIZE");
349 maxFileSize.push_back(
"1099511627776L");
354 iter != last; ++iter) {
355 const std::string&
opt = (*iter)[0];
356 std::string&
data = (*iter)[1];
357 const std::string&
file = (*iter)[2];
358 const std::string& cont = (*iter)[3];
359 std::size_t
equal = cont.find(
'=');
363 if (colon == std::string::npos) colon = 0;
366 if (
merge != std::string::npos &&
opt ==
"TREE_AUTO_FLUSH" && 0 == outputConnection.compare(0,
merge,
file) && cont.compare(
equal, std::string::npos, strProp, colon) == 0 &&
data !=
"int" &&
data !=
"DbLonglong" &&
data !=
"double" &&
data !=
"string") {
370 std::ostringstream eventAutoFlush;
371 eventAutoFlush <<
flush;
372 data = eventAutoFlush.str();
378 if (
merge != std::string::npos) {
379 ATH_MSG_INFO(
"connectOutput setting auto write for: " << outputConnection <<
" to " <<
flush <<
" events");
384 ATH_MSG_DEBUG(
"connectOutput failed process POOL domain attributes.");
387 ATH_MSG_DEBUG(
"connectOutput failed process POOL database attributes.");
389 return(StatusCode::SUCCESS);
394 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
398 if (!this->
cleanUp(outputConnection).isSuccess()) {
400 return(StatusCode::FAILURE);
402 return(StatusCode::SUCCESS);
405 ATH_MSG_DEBUG(
"commitOutput SKIPPED for uninitialized server.");
406 return(StatusCode::SUCCESS);
408 std::map<void*, RootType> commitCache;
412 const char* placementStr =
nullptr;
415 if (
sc.isSuccess() && placementStr !=
nullptr && strlen(placementStr) > 6 &&
num > 0) {
416 const char * matchedChars = strstr(placementStr,
"[FILE=");
417 if (not matchedChars){
428 bool dataHeaderSeen =
false;
429 std::string dataHeaderID;
431 std::string objName =
"ALL";
433 std::string objName(placementStr);
438 std::string tokenStr = placementStr;
439 std::string contName = strstr(placementStr,
"[CONT=");
440 tokenStr.erase(tokenStr.find(
"[CONT="));
441 tokenStr.append(contName, contName.find(
']') + 1);
442 contName = contName.substr(6, contName.find(
']') - 6);
443 std::string
className = strstr(placementStr,
"[PNAME=");
447 std::ostringstream oss2;
448 oss2 << std::dec <<
num;
450 bool foundContainer =
false;
451 std::size_t pPos = contName.find(
'(');
453 foundContainer =
true;
457 if (contName.compare(0, pPos,
item) == 0){
458 foundContainer =
true;
464 if (len > 0 && foundContainer && contName[len] ==
'(' ) {
469 std::ostringstream oss1;
471 std::string memName =
"SHM[NUM=" + oss1.str() +
"]";
472 FileIncident beginInputIncident(
name(),
"BeginInputFile", memName);
473 incSvc->fireIncident(beginInputIncident);
474 FileIncident endInputIncident(
name(),
"EndInputFile", memName);
475 incSvc->fireIncident(endInputIncident);
482 sc = metadataSvc->shmProxy(std::string(placementStr) +
"[NUM=" + oss2.str() +
"]");
483 if (
sc.isRecoverable()) {
485 }
else if (
sc.isFailure()) {
497 placement.
fromString(placementStr); placementStr =
nullptr;
499 if (token ==
nullptr) {
509 if (!DHcnv->updateRep(&
address,
static_cast<DataObject*
>(
obj)).isSuccess()) {
513 dataHeaderSeen =
true;
520 dataHeaderID = token->
contID();
522 dataHeaderID += oss2.str();
525 }
else if (dataHeaderSeen) {
526 dataHeaderSeen =
false;
532 tokenStr, dataHeaderID);
533 if (!DHcnv->updateRepRefs(&
address,
static_cast<DataObject*
>(
obj)).isSuccess()) {
534 ATH_MSG_ERROR(
"Failed updateRepRefs for obj = " << tokenStr);
539 GenericAddress
address(0, 0,
"", dataHeaderID);
540 if (!DHcnv->updateRepRefs(&
address,
nullptr).isSuccess()) {
547 commitCache.insert(std::pair<void*, RootType>(
obj, classDesc));
553 while (
sc.isRecoverable()) {
556 if (!
sc.isSuccess()) {
562 while (
sc.isRecoverable()) {
565 if (
sc.isFailure()) {
570 if (dataHeaderSeen) {
572 GenericAddress
address(0, 0,
"", dataHeaderID);
573 if (!DHcnv->updateRepRefs(&
address,
nullptr).isSuccess()) {
578 placementStr =
nullptr;
579 }
else if (
sc.isSuccess() && placementStr !=
nullptr && strncmp(placementStr,
"stop", 4) == 0) {
580 return(StatusCode::RECOVERABLE);
581 }
else if (
sc.isRecoverable() ||
num == -1) {
582 return(StatusCode::RECOVERABLE);
586 std::ostringstream oss1;
588 std::string memName =
"SHM[NUM=" + oss1.str() +
"]";
589 FileIncident beginInputIncident(
name(),
"BeginInputFile", memName);
590 incSvc->fireIncident(beginInputIncident);
591 FileIncident endInputIncident(
name(),
"EndInputFile", memName);
592 incSvc->fireIncident(endInputIncident);
593 if (
sc.isFailure()) {
594 ATH_MSG_INFO(
"All SharedWriter clients stopped - exiting");
598 return(StatusCode::FAILURE);
602 ATH_MSG_DEBUG(
"commitOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
603 return(StatusCode::SUCCESS);
607 std::unique_lock<std::mutex> lock(
m_mutex);
608 if (outputConnection.empty()) {
614 ATH_MSG_ERROR(
"connectOutput FAILED extract file name and technology.");
615 return(StatusCode::FAILURE);
617 const std::string oldOutputConnection = outputConnection;
628 ATH_MSG_DEBUG(
"commitOutput failed process POOL domain attributes.");
631 ATH_MSG_DEBUG(
"commitOutput failed process POOL database attributes.");
634 ATH_MSG_DEBUG(
"commitOutput failed process POOL container attributes.");
639 if (!
m_poolSvc->commit(contextId).isSuccess()) {
640 ATH_MSG_ERROR(
"commitOutput FAILED to commit OutputStream.");
641 return(StatusCode::FAILURE);
644 if (!
m_poolSvc->commitAndHold(contextId).isSuccess()) {
645 ATH_MSG_ERROR(
"commitOutput FAILED to commitAndHold OutputStream.");
646 return(StatusCode::FAILURE);
651 return(StatusCode::FAILURE);
653 if (!this->
cleanUp(oldOutputConnection).isSuccess()) {
655 return(StatusCode::FAILURE);
658 iter->second.Destruct(iter->first);
661 long long int currentFileSize =
m_poolSvc->getFileSize(outputConnection,
m_dbType.
type(), contextId);
665 return(StatusCode::RECOVERABLE);
669 return(StatusCode::RECOVERABLE);
671 return(StatusCode::SUCCESS);
676 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
679 return(StatusCode::SUCCESS);
685 return(StatusCode::SUCCESS);
715 ATH_MSG_ERROR(
"Could not make AthenaPoolCnvSvc a Share Client");
719 Token* token =
nullptr;
723 std::string placementStr = placement->
toString();
724 placementStr +=
"[PNAME=";
725 placementStr += classDesc.
Name();
729 while (
sc.isRecoverable()) {
733 if (!
sc.isSuccess()) {
738 const void*
buffer =
nullptr;
739 std::size_t nbytes = 0;
741 if (classDesc.
Name() ==
"Token") {
742 nbytes = strlen(
static_cast<const char*
>(
obj)) + 1;
746 nbytes = classDesc.
SizeOf();
754 while (
sc.isRecoverable()) {
758 if (own) {
delete []
static_cast<const char*
>(
buffer); }
760 if (!
sc.isSuccess()) {
761 ATH_MSG_ERROR(
"Could not share object for: " << placementStr);
767 ATH_MSG_ERROR(
"Could not share dynamic aux store for: " << placementStr);
776 const char* tokenStr =
nullptr;
779 while (
sc.isRecoverable()) {
783 if (!
sc.isSuccess()) {
787 if (!strcmp(tokenStr,
"ABORT")) {
794 tempToken->
fromString(tokenStr); tokenStr =
nullptr;
796 token = tempToken; tempToken =
nullptr;
800 ATH_MSG_DEBUG(
"registerForWrite SKIPPED for uninitialized server, Placement = " << placement->
toString());
803 token = tempToken; tempToken =
nullptr;
806 token =
m_poolSvc->registerForWrite(placement,
obj, classDesc);
816 token =
m_poolSvc->registerForWrite(placement,
obj, classDesc);
828 ATH_MSG_ERROR(
"Could not make AthenaPoolCnvSvc a Share Client");
833 int num = token->
oid().first;
836 std::size_t nbytes = 0;
838 while (
sc.isRecoverable()) {
842 if (!
sc.isSuccess()) {
873 std::size_t nbytes = 0;
879 while (
sc.isRecoverable()) {
884 if (!
sc.isSuccess()) {
908 const std::string*
par,
909 const unsigned long*
ip,
910 IOpaqueAddress*& refpAddress) {
911 if (svcType != POOL_StorageType) {
912 ATH_MSG_ERROR(
"createAddress: svcType != POOL_StorageType " << svcType <<
" " << POOL_StorageType);
913 return(StatusCode::FAILURE);
917 ATH_MSG_ERROR(
"Could not make AthenaPoolCnvSvc a Share Client");
918 return(StatusCode::FAILURE);
921 Token* token =
nullptr;
930 addressToken.
setDb(
par[0].substr(4));
935 return(StatusCode::FAILURE);
938 std::size_t nbytes = 0;
940 while (
sc.isRecoverable()) {
944 if (!
sc.isSuccess()) {
946 return(StatusCode::FAILURE);
951 delete token; token =
nullptr;
957 if (token ==
nullptr) {
958 return(StatusCode::RECOVERABLE);
961 return(StatusCode::SUCCESS);
966 const std::string& refAddress,
967 IOpaqueAddress*& refpAddress) {
968 if (svcType != POOL_StorageType) {
969 ATH_MSG_ERROR(
"createAddress: svcType != POOL_StorageType " << svcType <<
" " << POOL_StorageType);
970 return(StatusCode::FAILURE);
972 refpAddress =
new GenericAddress(POOL_StorageType, clid, refAddress);
973 return(StatusCode::SUCCESS);
977 std::string& refAddress) {
980 if (tokAddr !=
nullptr && tokAddr->
getToken() !=
nullptr) {
983 refAddress = *pAddress->par();
985 return(StatusCode::SUCCESS);
991 if (fileSpec.starts_with (
"oracle") || fileSpec.starts_with (
"mysql")) {
992 outputTech = pool::POOL_RDBMS_StorageType.
type();
993 }
else if (fileSpec.starts_with (
"ROOTKEY:")) {
994 outputTech = pool::ROOTKEY_StorageType.
type();
995 fileSpec.erase(0, 8);
996 }
else if (fileSpec.starts_with (
"ROOTTREE:")) {
997 outputTech = pool::ROOTTREE_StorageType.
type();
998 fileSpec.erase(0, 9);
999 }
else if (fileSpec.starts_with (
"ROOTTREEINDEX:")) {
1000 outputTech = pool::ROOTTREEINDEX_StorageType.
type();
1001 fileSpec.erase(0, 14);
1002 }
else if (fileSpec.starts_with (
"ROOTRNTUPLE:")) {
1003 outputTech = pool::ROOTRNTUPLE_StorageType.
type();
1004 fileSpec.erase(0, 12);
1005 }
else if (outputTech == 0) {
1008 return(StatusCode::SUCCESS);
1013 return(StatusCode::SUCCESS);
1017 bool retError =
false;
1018 std::size_t cpos = connection.find(
':');
1019 std::size_t bpos = connection.find(
'[');
1020 if (cpos == std::string::npos) {
1025 if (bpos != std::string::npos) bpos = bpos - cpos;
1026 const std::string
conn = connection.substr(cpos, bpos);
1034 return(retError ? StatusCode::FAILURE : StatusCode::SUCCESS);
1041 ATH_MSG_DEBUG(
"setInputAttribute failed setting POOL database/container attributes.");
1044 ATH_MSG_DEBUG(
"setInputAttribute failed getting POOL database/container attributes.");
1048 const auto& extraInputContextMap =
m_poolSvc->getInputContextMap();
1049 for (
const auto& [
label,
id]: extraInputContextMap) {
1051 ATH_MSG_DEBUG(
"setInputAttribute failed to switch off TTreeCache for = " <<
label <<
".");
1055 return(StatusCode::SUCCESS);
1069 return(StatusCode::FAILURE);
1073 return(StatusCode::SUCCESS);
1075 return(StatusCode::RECOVERABLE);
1078 return(StatusCode::RECOVERABLE);
1087 std::string streamPortSuffix;
1090 return(StatusCode::FAILURE);
1093 ATH_MSG_DEBUG(
"makeClient: Setting conversion service port suffix to " << streamPortSuffix);
1098 return(StatusCode::SUCCESS);
1101 std::string dummyStr;
1107 return(StatusCode::FAILURE);
1109 const char* tokenStr =
nullptr;
1112 if (
sc.isSuccess() && tokenStr !=
nullptr && strlen(tokenStr) > 0 &&
num > 0) {
1120 token.
fromString(tokenStr); tokenStr =
nullptr;
1122 std::string objName =
"ALL";
1132 std::size_t nbytes = 0;
1135 while (
sc.isRecoverable()) {
1138 delete []
static_cast<char*
>(
buffer);
buffer =
nullptr;
1139 if (!
sc.isSuccess()) {
1141 return(StatusCode::FAILURE);
1146 return(StatusCode::FAILURE);
1151 return(StatusCode::FAILURE);
1154 std::string returnToken;
1156 if (metadataToken !=
nullptr) {
1157 returnToken = metadataToken->
toString();
1161 delete metadataToken; metadataToken =
nullptr;
1166 return(StatusCode::FAILURE);
1169 return(StatusCode::RECOVERABLE);
1171 return(StatusCode::SUCCESS);
1180 return(StatusCode::SUCCESS);
1190 while (
sc.isSuccess()) {
1191 if (client_n >= 0) {
1196 while (
sc.isRecoverable()) {
1200 return StatusCode::FAILURE;
1208 if (incident.type() ==
"EndEvent") {
1210 ATH_MSG_DEBUG(
"handle EndEvent failed process POOL database attributes.");
1217 m_outputStreamingTool(this)
1223 std::vector<std::vector<std::string> >* contAttr,
1224 std::vector<std::vector<std::string> >* dbAttr,
1225 std::vector<std::vector<std::string> >* domAttr)
const {
1226 std::vector<std::string>
opt;
1227 std::string attributeName, containerName, databaseName, valueString;
1228 for (std::vector<std::string>::const_iterator iter = property.value().begin(),
1229 last = property.value().end(); iter != last; ++iter) {
1231 attributeName.clear();
1232 containerName.clear();
1233 databaseName.clear();
1234 valueString.clear();
1235 using Gaudi::Utils::AttribStringParser;
1236 for (
const AttribStringParser::Attrib& attrib : AttribStringParser (*iter)) {
1237 const std::string
tag = attrib.tag;
1238 const std::string
val = attrib.value;
1239 if (
tag ==
"DatabaseName") {
1241 }
else if (
tag ==
"ContainerName") {
1242 if (databaseName.empty()) {
1245 containerName =
val;
1247 attributeName =
tag;
1251 if (!attributeName.empty() && !valueString.empty()) {
1252 opt.push_back(attributeName);
1253 opt.push_back(valueString);
1254 if (!databaseName.empty()) {
1255 opt.push_back(databaseName);
1256 if (!containerName.empty()) {
1257 opt.push_back(containerName);
1258 if (containerName.compare(0, 6,
"TTree=") == 0) {
1259 dbAttr->push_back(
opt);
1261 contAttr->push_back(
opt);
1265 dbAttr->push_back(
opt);
1267 }
else if (domAttr != 0) {
1268 domAttr->push_back(
opt);
1272 dbAttr->push_back(
opt);
1280 unsigned long contextId,
1283 bool doClear)
const {
1284 bool retError =
false;
1286 for (std::vector<std::vector<std::string> >::
iterator iter = attr.begin(), last = attr.end();
1287 iter != last; ++iter) {
1288 if (iter->size() == 2) {
1289 const std::string&
opt = (*iter)[0];
1290 std::string
data = (*iter)[1];
1291 if (
data ==
"int" ||
data ==
"DbLonglong" ||
data ==
"double" ||
data ==
"string") {
1310 if (iter->size() == 4) {
1311 const std::string&
opt = (*iter)[0];
1312 std::string
data = (*iter)[1];
1313 const std::string&
file = (*iter)[2];
1314 const std::string& cont = (*iter)[3];
1316 || (
file[0] ==
'*' &&
file.find(
"," +
fileName +
",") == std::string::npos))) {
1317 if (
data ==
"int" ||
data ==
"DbLonglong" ||
data ==
"double" ||
data ==
"string") {
1329 (*iter)[2] +=
"," +
fileName +
",";
1342 for (std::vector<std::vector<std::string> >::
iterator iter = attr.begin(); iter != attr.end(); ) {
1343 if (iter->empty()) {
1344 iter = attr.erase(iter);
1349 return(retError ? StatusCode::FAILURE : StatusCode::SUCCESS);