12 #include "GaudiKernel/ClassID.h"
13 #include "GaudiKernel/FileIncident.h"
14 #include "GaudiKernel/GenericAddress.h"
15 #include "GaudiKernel/IIncidentSvc.h"
16 #include "GaudiKernel/IOpaqueAddress.h"
51 getPoolSvc()->setShareMode(
true);
60 incSvc->addListener(
this,
"StoreCleared", pri);
89 const std::string& ) {
95 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
98 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
99 return(StatusCode::FAILURE);
104 return(StatusCode::SUCCESS);
108 ATH_MSG_DEBUG(
"connectOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
109 return(StatusCode::SUCCESS);
113 return(StatusCode::SUCCESS);
119 std::size_t apend = outputConnectionSpec.find(
'[');
120 if (apend != std::string::npos) {
121 outputConnection += outputConnectionSpec.substr(apend);
129 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
133 if (!this->cleanUp(outputConnection).isSuccess()) {
135 return(StatusCode::FAILURE);
137 return(StatusCode::SUCCESS);
140 ATH_MSG_DEBUG(
"commitOutput SKIPPED for uninitialized server.");
141 return(StatusCode::SUCCESS);
143 std::map<void*, RootType> commitCache;
147 const char* placementStr =
nullptr;
150 if (
sc.isSuccess() && placementStr !=
nullptr && strlen(placementStr) > 6 &&
num > 0) {
151 const char * matchedChars = strstr(placementStr,
"[FILE=");
152 if (not matchedChars){
163 bool dataHeaderSeen =
false;
164 std::string dataHeaderID;
166 std::string objName =
"ALL";
167 if (useDetailChronoStat()) {
168 objName = placementStr;
173 std::string_view pStr = placementStr;
174 std::string::size_type cpos = pStr.find (
"[CONT=");
175 if (cpos == std::string::npos) {
176 ATH_MSG_ERROR(
"No CONT field in placement string: " << pStr);
177 return StatusCode::FAILURE;
179 std::string tokenStr (pStr.substr(0, cpos));
180 std::string contName (pStr.substr(cpos, std::string::npos));
181 std::string::size_type cl1 = contName.find(
']');
182 if (cl1 == std::string::npos) {
183 ATH_MSG_ERROR(
"Missing close bracket after CONT field in placement string: " << pStr);
184 return StatusCode::FAILURE;
186 tokenStr.append(contName, cl1 + 1);
187 contName = contName.substr(6, cl1 - 6);
189 std::string::size_type ppos = pStr.find (
"[PNAME=");
190 if (ppos == std::string::npos) {
191 ATH_MSG_ERROR(
"No PNAME field in placement string: " << pStr);
192 return StatusCode::FAILURE;
194 std::string
className (pStr.substr(ppos, std::string::npos));
195 std::string::size_type cl2 =
className.find(
']');
196 if (cl2 == std::string::npos) {
197 ATH_MSG_ERROR(
"Missing close bracket after PNAME field in placement string: " << pStr);
198 return StatusCode::FAILURE;
203 std::ostringstream oss2;
204 oss2 << std::dec <<
num;
206 bool foundContainer =
false;
207 std::size_t opPos = contName.find(
'(');
209 foundContainer =
true;
212 if (contName.compare(0, opPos,
item) == 0){
213 foundContainer =
true;
219 if (len > 0 && foundContainer && contName[len] ==
'(' ) {
224 std::ostringstream oss1;
226 std::string memName =
"SHM[NUM=" + oss1.str() +
"]";
227 FileIncident beginInputIncident(
name(),
"BeginInputMemFile", memName);
228 incSvc->fireIncident(beginInputIncident);
229 FileIncident endInputIncident(
name(),
"EndInputMemFile", std::move(memName));
230 incSvc->fireIncident(endInputIncident);
237 sc = metadataSvc->shmProxy(std::string(pStr) +
"[NUM=" + oss2.str() +
"]");
238 if (
sc.isRecoverable()) {
240 }
else if (
sc.isFailure()) {
251 if( m_oneDataHeaderForm.value() ) {
252 auto placementWithSwn = [&] {
return std::format(
"{}[SWN={}]", placementStr,
num); };
256 "", placementWithSwn());
257 DHcnv->updateRepRefs(&
address,
static_cast<DataObject*
>(
obj)).ignore();
263 if (token ==
nullptr) {
272 tokenStr, placementWithSwn());
273 if (!DHcnv->updateRep(&
address,
static_cast<DataObject*
>(
obj)).isSuccess()) {
279 commitCache.insert(std::pair<void*, RootType>(
obj, classDesc));
281 placementStr =
nullptr;
285 placement.
fromString(placementStr); placementStr =
nullptr;
287 if (token ==
nullptr) {
297 if (!DHcnv->updateRep(&
address,
static_cast<DataObject*
>(
obj)).isSuccess()) {
301 dataHeaderSeen =
true;
309 }
else if (dataHeaderSeen) {
310 dataHeaderSeen =
false;
316 tokenStr, dataHeaderID);
317 if (!DHcnv->updateRepRefs(&
address,
static_cast<DataObject*
>(
obj)).isSuccess()) {
318 ATH_MSG_ERROR(
"Failed updateRepRefs for obj = " << tokenStr);
323 GenericAddress
address(0, 0,
"", dataHeaderID);
324 if (!DHcnv->updateRepRefs(&
address,
nullptr).isSuccess()) {
331 commitCache.insert(std::pair<void*, RootType>(
obj, classDesc));
338 while (
sc.isRecoverable()) {
341 if (!
sc.isSuccess()) {
347 while (
sc.isRecoverable()) {
350 if (
sc.isFailure()) {
355 if (dataHeaderSeen) {
357 GenericAddress
address(0, 0,
"", std::move(dataHeaderID));
358 if (!DHcnv->updateRepRefs(&
address,
nullptr).isSuccess()) {
363 placementStr =
nullptr;
364 }
else if (
sc.isSuccess() && placementStr !=
nullptr && strncmp(placementStr,
"stop", 4) == 0) {
365 return(StatusCode::RECOVERABLE);
366 }
else if (
sc.isRecoverable() ||
num == -1) {
367 return(StatusCode::RECOVERABLE);
371 std::ostringstream oss1;
373 std::string memName =
"SHM[NUM=" + oss1.str() +
"]";
374 FileIncident beginInputIncident(
name(),
"BeginInputMemFile", memName);
375 incSvc->fireIncident(beginInputIncident);
376 FileIncident endInputIncident(
name(),
"EndInputMemFile", memName);
377 incSvc->fireIncident(endInputIncident);
378 if (
sc.isFailure()) {
379 ATH_MSG_INFO(
"All SharedWriter clients stopped - exiting");
383 return(StatusCode::FAILURE);
387 ATH_MSG_DEBUG(
"commitOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
388 return(StatusCode::SUCCESS);
390 if (outputConnection.empty()) {
391 outputConnection = std::move(
fileName);
393 outputConnection = outputConnectionSpec;
407 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
410 return(StatusCode::SUCCESS);
416 return(StatusCode::SUCCESS);
432 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
436 Token* token =
nullptr;
440 std::string placementStr = placement->
toString();
441 placementStr +=
"[PNAME=";
442 placementStr += classDesc.
Name();
446 while (
sc.isRecoverable()) {
450 if (!
sc.isSuccess()) {
455 const void*
buffer =
nullptr;
456 std::size_t nbytes = 0;
458 if (classDesc.
Name() ==
"Token") {
459 nbytes = strlen(
static_cast<const char*
>(
obj)) + 1;
463 nbytes = classDesc.
SizeOf();
471 while (
sc.isRecoverable()) {
475 if (own) {
delete []
static_cast<const char*
>(
buffer); }
477 if (!
sc.isSuccess()) {
478 ATH_MSG_ERROR(
"Could not share object for: " << placementStr);
484 ATH_MSG_ERROR(
"Could not share dynamic aux store for: " << placementStr);
493 const char* tokenStr =
nullptr;
496 while (
sc.isRecoverable()) {
500 if (!
sc.isSuccess()) {
504 if (!strcmp(tokenStr,
"ABORT")) {
511 tempToken->
fromString(tokenStr); tokenStr =
nullptr;
513 token = tempToken; tempToken =
nullptr;
517 ATH_MSG_DEBUG(
"registerForWrite SKIPPED for uninitialized server, Placement = " << placement->
toString());
520 token = tempToken; tempToken =
nullptr;
523 token = getPoolSvc()->registerForWrite(placement,
obj, classDesc);
537 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
542 int num = token->
oid().first;
545 std::size_t nbytes = 0;
547 while (
sc.isRecoverable()) {
551 if (!
sc.isSuccess()) {
583 std::size_t nbytes = 0;
589 while (
sc.isRecoverable()) {
594 if (!
sc.isSuccess()) {
613 const std::string*
par,
614 const unsigned long*
ip,
615 IOpaqueAddress*& refpAddress) {
618 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
619 return(StatusCode::FAILURE);
624 addressToken.
setDb(
par[0].substr(4));
629 return(StatusCode::FAILURE);
632 std::size_t nbytes = 0;
634 while (
sc.isRecoverable()) {
638 if (!
sc.isSuccess()) {
640 return(StatusCode::FAILURE);
642 auto token = std::make_unique<Token>();
643 token->fromString(
static_cast<const char*
>(
buffer));
buffer =
nullptr;
650 return(StatusCode::SUCCESS);
653 return(StatusCode::RECOVERABLE);
662 const std::string& refAddress,
663 IOpaqueAddress*& refpAddress) {
678 return(StatusCode::FAILURE);
681 m_persSvcPerOutput.setValue(
false);
682 return(StatusCode::SUCCESS);
684 return(StatusCode::RECOVERABLE);
687 return(StatusCode::RECOVERABLE);
696 std::string streamPortSuffix;
699 return(StatusCode::FAILURE);
702 ATH_MSG_DEBUG(
"makeClient: Setting conversion service port suffix to " << streamPortSuffix);
707 return(StatusCode::SUCCESS);
710 std::string dummyStr;
716 return(StatusCode::FAILURE);
718 const char* tokenStr =
nullptr;
721 if (
sc.isSuccess() && tokenStr !=
nullptr && strlen(tokenStr) > 0 &&
num > 0) {
729 token.
fromString(tokenStr); tokenStr =
nullptr;
731 std::string objName =
"ALL";
732 if (useDetailChronoStat()) {
741 std::size_t nbytes = 0;
744 while (
sc.isRecoverable()) {
748 if (!
sc.isSuccess()) {
750 return(StatusCode::FAILURE);
755 return(StatusCode::FAILURE);
760 return(StatusCode::FAILURE);
763 std::string returnToken;
765 if (metadataToken !=
nullptr) {
766 returnToken = metadataToken->
toString();
770 delete metadataToken; metadataToken =
nullptr;
775 return(StatusCode::FAILURE);
778 return(StatusCode::RECOVERABLE);
780 return(StatusCode::SUCCESS);
789 return(StatusCode::SUCCESS);
799 while (
sc.isSuccess()) {
805 while (
sc.isRecoverable()) {
809 return StatusCode::FAILURE;
820 base_class(
name, pSvcLocator) {