12 #include "GaudiKernel/ClassID.h"
13 #include "GaudiKernel/FileIncident.h"
14 #include "GaudiKernel/GenericAddress.h"
15 #include "GaudiKernel/IIncidentSvc.h"
16 #include "GaudiKernel/IOpaqueAddress.h"
52 getPoolSvc()->setShareMode(
true);
61 incSvc->addListener(
this,
"StoreCleared", pri);
90 const std::string& ) {
96 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
99 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
100 return(StatusCode::FAILURE);
105 return(StatusCode::SUCCESS);
110 return(StatusCode::SUCCESS);
114 return(StatusCode::SUCCESS);
120 std::size_t apend = outputConnectionSpec.find(
'[');
121 if (apend != std::string::npos) {
122 outputConnection += outputConnectionSpec.substr(apend);
130 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
134 if (!this->cleanUp(outputConnection).isSuccess()) {
136 return(StatusCode::FAILURE);
138 return(StatusCode::SUCCESS);
141 ATH_MSG_DEBUG(
"commitOutput SKIPPED for uninitialized server.");
142 return(StatusCode::SUCCESS);
144 std::map<void*, RootType> commitCache;
148 const char* placementStr =
nullptr;
151 if (
sc.isSuccess() && placementStr !=
nullptr && strlen(placementStr) > 6 &&
num > 0) {
152 const char * matchedChars = strstr(placementStr,
"[FILE=");
164 bool dataHeaderSeen =
false;
165 std::string dataHeaderID;
167 std::string objName =
"ALL";
168 if (useDetailChronoStat()) {
169 objName = placementStr;
174 std::string_view pStr = placementStr;
175 std::string::size_type cpos = pStr.find (
"[CONT=");
176 if (cpos == std::string::npos) {
178 return StatusCode::FAILURE;
180 std::string tokenStr (pStr.substr(0, cpos));
181 std::string contName (pStr.substr(cpos, std::string::npos));
182 std::string::size_type cl1 = contName.find(
']');
183 if (cl1 == std::string::npos) {
185 return StatusCode::FAILURE;
187 tokenStr.append(contName, cl1 + 1);
188 contName = contName.substr(6, cl1 - 6);
190 std::string::size_type ppos = pStr.find (
"[PNAME=");
191 if (ppos == std::string::npos) {
193 return StatusCode::FAILURE;
195 std::string
className (pStr.substr(ppos, std::string::npos));
196 std::string::size_type cl2 =
className.find(
']');
197 if (cl2 == std::string::npos) {
199 return StatusCode::FAILURE;
206 bool foundContainer =
false;
207 std::size_t opPos = contName.find(
'(');
209 foundContainer =
true;
212 if (contName.compare(0, opPos,
item) == 0){
213 foundContainer =
true;
219 if (len > 0 && foundContainer && contName[len] ==
'(' ) {
225 FileIncident beginInputIncident(
name(),
"BeginInputMemFile", memName);
226 incSvc->fireIncident(beginInputIncident);
227 FileIncident endInputIncident(
name(),
"EndInputMemFile", std::move(memName));
228 incSvc->fireIncident(endInputIncident);
235 sc = metadataSvc->shmProxy(
std::format(
"{}[NUM={}]", pStr, numStr));
236 if (
sc.isRecoverable()) {
238 }
else if (
sc.isFailure()) {
249 if( m_oneDataHeaderForm.value() ) {
250 auto placementWithSwn = [&] {
return std::format(
"{}[SWN={}]", placementStr,
num); };
254 "", placementWithSwn());
255 DHcnv->updateRepRefs(&
address,
static_cast<DataObject*
>(
obj)).ignore();
261 if (token ==
nullptr) {
270 tokenStr, placementWithSwn());
271 if (!DHcnv->updateRep(&
address,
static_cast<DataObject*
>(
obj)).isSuccess()) {
277 commitCache.insert(std::pair<void*, RootType>(
obj, classDesc));
279 placementStr =
nullptr;
283 placement.
fromString(placementStr); placementStr =
nullptr;
285 if (token ==
nullptr) {
295 if (!DHcnv->updateRep(&
address,
static_cast<DataObject*
>(
obj)).isSuccess()) {
299 dataHeaderSeen =
true;
307 }
else if (dataHeaderSeen) {
308 dataHeaderSeen =
false;
314 tokenStr, dataHeaderID);
315 if (!DHcnv->updateRepRefs(&
address,
static_cast<DataObject*
>(
obj)).isSuccess()) {
316 ATH_MSG_ERROR(
"Failed updateRepRefs for obj = " << tokenStr);
321 GenericAddress
address(0, 0,
"", dataHeaderID);
322 if (!DHcnv->updateRepRefs(&
address,
nullptr).isSuccess()) {
329 commitCache.insert(std::pair<void*, RootType>(
obj, classDesc));
336 while (
sc.isRecoverable()) {
339 if (!
sc.isSuccess()) {
345 while (
sc.isRecoverable()) {
348 if (
sc.isFailure()) {
353 if (dataHeaderSeen) {
355 GenericAddress
address(0, 0,
"", std::move(dataHeaderID));
356 if (!DHcnv->updateRepRefs(&
address,
nullptr).isSuccess()) {
361 placementStr =
nullptr;
362 }
else if (
sc.isSuccess() && placementStr !=
nullptr && strncmp(placementStr,
"stop", 4) == 0) {
363 return(StatusCode::RECOVERABLE);
364 }
else if (
sc.isRecoverable() ||
num == -1) {
365 return(StatusCode::RECOVERABLE);
370 FileIncident beginInputIncident(
name(),
"BeginInputMemFile", memName);
371 incSvc->fireIncident(beginInputIncident);
372 FileIncident endInputIncident(
name(),
"EndInputMemFile", memName);
373 incSvc->fireIncident(endInputIncident);
374 if (
sc.isFailure()) {
375 ATH_MSG_INFO(
"All SharedWriter clients stopped - exiting");
379 return(StatusCode::FAILURE);
384 return(StatusCode::SUCCESS);
386 if (outputConnection.empty()) {
387 outputConnection = std::move(
fileName);
389 outputConnection = outputConnectionSpec;
403 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
406 return(StatusCode::SUCCESS);
412 return(StatusCode::SUCCESS);
428 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
432 Token* token =
nullptr;
436 std::string placementStr = placement->
toString();
437 placementStr +=
"[PNAME=";
438 placementStr += classDesc.
Name();
442 while (
sc.isRecoverable()) {
446 if (!
sc.isSuccess()) {
451 const void*
buffer =
nullptr;
452 std::size_t nbytes = 0;
454 if (classDesc.
Name() ==
"Token") {
455 nbytes = strlen(
static_cast<const char*
>(
obj)) + 1;
459 nbytes = classDesc.
SizeOf();
467 while (
sc.isRecoverable()) {
471 if (own) {
delete []
static_cast<const char*
>(
buffer); }
473 if (!
sc.isSuccess()) {
474 ATH_MSG_ERROR(
"Could not share object for: " << placementStr);
480 ATH_MSG_ERROR(
"Could not share dynamic aux store for: " << placementStr);
489 const char* tokenStr =
nullptr;
492 while (
sc.isRecoverable()) {
496 if (!
sc.isSuccess()) {
500 if (!strcmp(tokenStr,
"ABORT")) {
507 tempToken->
fromString(tokenStr); tokenStr =
nullptr;
509 token = tempToken; tempToken =
nullptr;
513 ATH_MSG_DEBUG(
"registerForWrite SKIPPED for uninitialized server, Placement = " << placement->
toString());
516 token = tempToken; tempToken =
nullptr;
519 token = getPoolSvc()->registerForWrite(placement,
obj, classDesc);
533 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
538 int num = token->
oid().first;
541 std::size_t nbytes = 0;
543 while (
sc.isRecoverable()) {
547 if (!
sc.isSuccess()) {
579 std::size_t nbytes = 0;
585 while (
sc.isRecoverable()) {
590 if (!
sc.isSuccess()) {
609 const std::string*
par,
610 const unsigned long*
ip,
611 IOpaqueAddress*& refpAddress) {
617 addressToken.
setDb(
par[0].substr(4));
622 std::size_t nbytes = 0;
624 while (
sc.isRecoverable()) {
628 if (!
sc.isSuccess()) {
630 return(StatusCode::FAILURE);
632 auto token = std::make_unique<Token>();
633 token->fromString(
static_cast<const char*
>(
buffer));
buffer =
nullptr;
640 return(StatusCode::SUCCESS);
643 return(StatusCode::RECOVERABLE);
652 const std::string& refAddress,
653 IOpaqueAddress*& refpAddress) {
668 return(StatusCode::FAILURE);
671 m_persSvcPerOutput.setValue(
false);
672 return(StatusCode::SUCCESS);
674 return(StatusCode::RECOVERABLE);
677 return(StatusCode::RECOVERABLE);
686 std::string streamPortSuffix;
689 return(StatusCode::FAILURE);
692 ATH_MSG_DEBUG(
"makeClient: Setting conversion service port suffix to " << streamPortSuffix);
697 return(StatusCode::SUCCESS);
700 std::string dummyStr;
706 return(StatusCode::FAILURE);
708 const char* tokenStr =
nullptr;
711 if (
sc.isSuccess() && tokenStr !=
nullptr && strlen(tokenStr) > 0 &&
num > 0) {
719 token.
fromString(tokenStr); tokenStr =
nullptr;
721 std::string objName =
"ALL";
722 if (useDetailChronoStat()) {
731 std::size_t nbytes = 0;
734 while (
sc.isRecoverable()) {
738 if (!
sc.isSuccess()) {
740 return(StatusCode::FAILURE);
745 return(StatusCode::FAILURE);
750 return(StatusCode::FAILURE);
753 std::string returnToken;
755 if (metadataToken !=
nullptr) {
756 returnToken = metadataToken->
toString();
760 delete metadataToken; metadataToken =
nullptr;
765 return(StatusCode::FAILURE);
768 return(StatusCode::RECOVERABLE);
770 return(StatusCode::SUCCESS);
779 return(StatusCode::SUCCESS);
789 while (
sc.isSuccess()) {
795 while (
sc.isRecoverable()) {
799 return StatusCode::FAILURE;
810 base_class(
name, pSvcLocator) {