12#include "GaudiKernel/ClassID.h"
13#include "GaudiKernel/FileIncident.h"
14#include "GaudiKernel/GenericAddress.h"
15#include "GaudiKernel/IIncidentSvc.h"
16#include "GaudiKernel/IOpaqueAddress.h"
52 getPoolSvc()->setShareMode(
true);
61 incSvc->addListener(
this,
"StoreCleared", pri);
64 return this->AthenaPoolCnvSvc::initialize();
86 return this->AthenaPoolCnvSvc::finalize();
90 const std::string& ) {
96 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
99 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
100 return(StatusCode::FAILURE);
105 return(StatusCode::SUCCESS);
109 ATH_MSG_DEBUG(std::format(
"connectOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
110 return(StatusCode::SUCCESS);
114 return(StatusCode::SUCCESS);
120 std::size_t apend = outputConnectionSpec.find(
'[');
121 if (apend != std::string::npos) {
122 outputConnection += outputConnectionSpec.substr(apend);
124 return AthenaPoolCnvSvc::connectOutput(outputConnection);
130 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
134 if (!this->
cleanUp(outputConnection).isSuccess()) {
136 return(StatusCode::FAILURE);
138 return(StatusCode::SUCCESS);
141 ATH_MSG_DEBUG(
"commitOutput SKIPPED for uninitialized server.");
142 return(StatusCode::SUCCESS);
144 std::map<void*, RootType> commitCache;
145 std::string fileName;
148 const char* placementStr =
nullptr;
151 if (
sc.isSuccess() && placementStr !=
nullptr && strlen(placementStr) > 6 && num > 0) {
152 const char * matchedChars = strstr(placementStr,
"[FILE=");
154 ATH_MSG_ERROR(std::format(
"No matching filename in {}", placementStr));
157 fileName = matchedChars;
158 fileName = fileName.substr(6, fileName.find(
']') - 6);
160 ATH_MSG_ERROR(std::format(
"Failed to connectOutput for {}", fileName));
164 bool dataHeaderSeen =
false;
165 std::string dataHeaderID;
167 std::string objName =
"ALL";
168 if (useDetailChronoStat()) {
169 objName = placementStr;
174 std::string_view pStr = placementStr;
175 std::string::size_type cpos = pStr.find (
"[CONT=");
176 if (cpos == std::string::npos) {
177 ATH_MSG_ERROR(std::format(
"No CONT field in placement string: {}", pStr));
178 return StatusCode::FAILURE;
180 std::string tokenStr (pStr.substr(0, cpos));
181 std::string contName (pStr.substr(cpos, std::string::npos));
182 std::string::size_type cl1 = contName.find(
']');
183 if (cl1 == std::string::npos) {
184 ATH_MSG_ERROR(std::format(
"Missing close bracket after CONT field in placement string: {}", pStr));
185 return StatusCode::FAILURE;
187 tokenStr.append(contName, cl1 + 1);
188 contName = contName.substr(6, cl1 - 6);
190 std::string::size_type ppos = pStr.find (
"[PNAME=");
191 if (ppos == std::string::npos) {
192 ATH_MSG_ERROR(std::format(
"No PNAME field in placement string: {}", pStr));
193 return StatusCode::FAILURE;
195 std::string className (pStr.substr(ppos, std::string::npos));
196 std::string::size_type cl2 = className.find(
']');
197 if (cl2 == std::string::npos) {
198 ATH_MSG_ERROR(std::format(
"Missing close bracket after PNAME field in placement string: {}", pStr));
199 return StatusCode::FAILURE;
201 className = className.substr(7, cl2 - 7);
204 const std::string numStr = std::to_string(num);
206 bool foundContainer =
false;
207 std::size_t opPos = contName.find(
'(');
209 foundContainer =
true;
212 if (contName.compare(0, opPos, item) == 0){
213 foundContainer =
true;
219 if (len > 0 && foundContainer && contName[len] ==
'(' ) {
226 memName, {}, memName,
227 "BeginInputMemFile",
"EndInputMemFile");
234 sc = metadataSvc->shmProxy(std::format(
"{}[NUM={}]", pStr, numStr));
235 if (
sc.isRecoverable()) {
237 }
else if (
sc.isFailure()) {
248 if( m_oneDataHeaderForm.value() ) {
249 auto placementWithSwn = [&] {
return std::format(
"{}[SWN={}]", placementStr, num); };
250 if( className ==
"DataHeaderForm_p6" ) {
253 "", placementWithSwn());
254 DHcnv->updateRepRefs(&address,
static_cast<DataObject*
>(obj)).ignore();
260 if (token ==
nullptr) {
264 tokenStr = token->toString();
266 if( className ==
"DataHeader_p6" ) {
269 tokenStr, placementWithSwn());
270 if (!DHcnv->updateRep(&address,
static_cast<DataObject*
>(obj)).isSuccess()) {
275 if (className !=
"Token" && className !=
"DataHeaderForm_p6" && !classDesc.
IsFundamental()) {
276 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
278 placementStr =
nullptr;
282 placement.
fromString(placementStr); placementStr =
nullptr;
284 if (token ==
nullptr) {
288 tokenStr = token->toString();
289 if (className ==
"DataHeader_p6") {
294 if (!DHcnv->updateRep(&address,
static_cast<DataObject*
>(obj)).isSuccess()) {
298 dataHeaderSeen =
true;
305 dataHeaderID = std::format(
"{}/{}/{}", token->contID(), numStr, token->dbID().toString());
306 }
else if (dataHeaderSeen) {
307 dataHeaderSeen =
false;
310 if (className ==
"DataHeaderForm_p6") {
313 tokenStr, dataHeaderID);
314 if (!DHcnv->updateRepRefs(&address,
static_cast<DataObject*
>(obj)).isSuccess()) {
315 ATH_MSG_ERROR(
"Failed updateRepRefs for obj = " << tokenStr);
320 GenericAddress address(0, 0,
"", dataHeaderID);
321 if (!DHcnv->updateRepRefs(&address,
nullptr).isSuccess()) {
327 if (className !=
"Token" && className !=
"DataHeaderForm_p6" && !classDesc.
IsFundamental()) {
328 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
335 while (
sc.isRecoverable()) {
338 if (!
sc.isSuccess()) {
344 while (
sc.isRecoverable()) {
347 if (
sc.isFailure()) {
352 if (dataHeaderSeen) {
354 GenericAddress address(0, 0,
"", std::move(dataHeaderID));
355 if (!DHcnv->updateRepRefs(&address,
nullptr).isSuccess()) {
360 placementStr =
nullptr;
361 }
else if (
sc.isSuccess() && placementStr !=
nullptr && strncmp(placementStr,
"stop", 4) == 0) {
362 return(StatusCode::RECOVERABLE);
363 }
else if (
sc.isRecoverable() || num == -1) {
364 return(StatusCode::RECOVERABLE);
366 if (
sc.isFailure() || fileName.empty()) {
371 memName, {}, memName,
372 "BeginInputMemFile",
"EndInputMemFile");
374 if (
sc.isFailure()) {
375 ATH_MSG_INFO(
"All SharedWriter clients stopped - exiting");
379 return(StatusCode::FAILURE);
383 ATH_MSG_DEBUG(std::format(
"commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
384 return(StatusCode::SUCCESS);
386 if (outputConnection.empty()) {
387 outputConnection = std::move(fileName);
389 outputConnection = outputConnectionSpec;
394 std::size_t
merge = outputConnection.find(
"?pmerge=");
395 const std::string baseOutputConnection = outputConnection.substr(0,
merge);
404 StatusCode status = AthenaPoolCnvSvc::commitOutput(outputConnection, doCommit);
405 for (
auto& [ptr, rootType] : commitCache) {
406 rootType.Destruct(ptr);
413 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
416 return(StatusCode::SUCCESS);
422 return(StatusCode::SUCCESS);
431 return AthenaPoolCnvSvc::disconnectOutput(outputConnectionSpec +
m_streamPortString.value());
438 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
442 Token* token =
nullptr;
446 std::string placementStr = placement->
toString();
447 placementStr +=
"[PNAME=";
448 placementStr += classDesc.
Name();
452 while (
sc.isRecoverable()) {
456 if (!
sc.isSuccess()) {
461 const void* buffer =
nullptr;
462 std::size_t nbytes = 0;
464 if (classDesc.
Name() ==
"Token") {
465 nbytes = strlen(
static_cast<const char*
>(obj)) + 1;
469 nbytes = classDesc.
SizeOf();
477 while (
sc.isRecoverable()) {
481 if (own) {
delete []
static_cast<const char*
>(buffer); }
483 if (!
sc.isSuccess()) {
484 ATH_MSG_ERROR(
"Could not share object for: " << placementStr);
490 ATH_MSG_ERROR(
"Could not share dynamic aux store for: " << placementStr);
499 const char* tokenStr =
nullptr;
502 while (
sc.isRecoverable()) {
506 if (!
sc.isSuccess()) {
510 if (!strcmp(tokenStr,
"ABORT")) {
517 tempToken->
fromString(tokenStr); tokenStr =
nullptr;
519 token = tempToken; tempToken =
nullptr;
523 ATH_MSG_DEBUG(
"registerForWrite SKIPPED for uninitialized server, Placement = " << placement->
toString());
526 token = tempToken; tempToken =
nullptr;
529 token = getPoolSvc()->registerForWrite(placement, obj, classDesc);
534 token = AthenaPoolCnvSvc::registerForWrite(placement, obj, classDesc);
543 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
548 int num = token->
oid().first;
550 void* buffer =
nullptr;
551 std::size_t nbytes = 0;
553 while (
sc.isRecoverable()) {
557 if (!
sc.isSuccess()) {
565 obj =
m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer =
nullptr;
568 std::string className = token->
auxString();
569 className = className.substr(className.find(
"[PNAME="));
570 className = className.substr(7, className.find(
']') - 7);
572 obj =
m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer =
nullptr;
588 void* buffer =
nullptr;
589 std::size_t nbytes = 0;
590 StatusCode
sc = StatusCode::FAILURE;
595 while (
sc.isRecoverable()) {
600 if (!
sc.isSuccess()) {
613 AthenaPoolCnvSvc::setObjPtr(obj, token);
619 const std::string* par,
620 const unsigned long* ip,
621 IOpaqueAddress*& refpAddress) {
627 addressToken.
setDb(par[0].substr(4));
631 void* buffer =
nullptr;
632 std::size_t nbytes = 0;
634 while (
sc.isRecoverable()) {
638 if (!
sc.isSuccess()) {
640 return(StatusCode::FAILURE);
642 auto token = std::make_unique<Token>();
643 token->fromString(
static_cast<const char*
>(buffer)); buffer =
nullptr;
650 return(StatusCode::SUCCESS);
653 return(StatusCode::RECOVERABLE);
656 return AthenaPoolCnvSvc::createAddress(svcType, clid, par, ip, refpAddress);
662 const std::string& refAddress,
663 IOpaqueAddress*& refpAddress) {
664 return AthenaPoolCnvSvc::createAddress(svcType, clid, refAddress, refpAddress);
668 auto pos = connection.find(
"?pmerge=");
669 std::string conn = (pos == std::string::npos) ? connection : connection.substr(0, pos);
670 return AthenaPoolCnvSvc::cleanUp(conn);
684 return(StatusCode::FAILURE);
687 m_persSvcPerOutput.setValue(
false);
688 return(StatusCode::SUCCESS);
690 return(StatusCode::RECOVERABLE);
693 return(StatusCode::RECOVERABLE);
702 std::string streamPortSuffix;
705 return(StatusCode::FAILURE);
708 ATH_MSG_DEBUG(
"makeClient: Setting conversion service port suffix to " << streamPortSuffix);
713 return(StatusCode::SUCCESS);
716 std::string dummyStr;
722 return(StatusCode::FAILURE);
724 const char* tokenStr =
nullptr;
727 if (
sc.isSuccess() && tokenStr !=
nullptr && strlen(tokenStr) > 0 && num > 0) {
728 ATH_MSG_DEBUG(
"readData: " << tokenStr <<
", for client: " << num);
735 token.
fromString(tokenStr); tokenStr =
nullptr;
737 std::string objName =
"ALL";
738 if (useDetailChronoStat()) {
746 void* buffer =
nullptr;
747 std::size_t nbytes = 0;
750 while (
sc.isRecoverable()) {
753 delete []
static_cast<char*
>(buffer); buffer =
nullptr;
754 if (!
sc.isSuccess()) {
756 return(StatusCode::FAILURE);
761 return(StatusCode::FAILURE);
766 return(StatusCode::FAILURE);
769 std::string returnToken;
771 if( metadataToken ) {
772 returnToken = metadataToken->
toString();
773 metadataToken->
release(); metadataToken =
nullptr;
781 return(StatusCode::FAILURE);
784 return(StatusCode::RECOVERABLE);
786 return(StatusCode::SUCCESS);
795 return(StatusCode::SUCCESS);
804 StatusCode
sc = StatusCode::SUCCESS;
805 while (
sc.isSuccess()) {
811 while (
sc.isRecoverable()) {
815 return StatusCode::FAILURE;
826 base_class(name, pSvcLocator) {
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
This file contains the class definition for the AthenaPoolSharedIOCnvSvc class.
This file contains the class definition for the AuxDiscoverySvc class.
uint32_t CLID
The Class ID type.
This file contains the class definition for the Placement class (migrated from POOL).
This file contains the class definition for the TokenAddress class.
This file contains the class definition for the Token class (migrated from POOL).
#define ATLAS_THREAD_SAFE
std::map< std::string, int > m_fileCommitCounter
Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual StatusCode finalize() override
Required of all Gaudi Services.
AthenaPoolSharedIOCnvSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual void setObjPtr(void *&obj, const Token *token) override
bool m_streamServerActive
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
virtual StatusCode makeServer(int num) override
Make this a server.
virtual void handle(const Incident &incident) override
Implementation of IIncidentListener: Handle for EndEvent incidence.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
virtual StatusCode readData() override
Read the next data object.
virtual StatusCode commitCatalog() override
Commit Catalog.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting
Gaudi::Property< int > m_streamingTechnology
Use Streaming for selected technologies only.
virtual StatusCode initialize() override
Required of all Gaudi Services.
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
virtual StatusCode makeClient(int num) override
Make this a client.
This class provides the interface between AthenaPoolCnvSvc and AuxStore classes.
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
static const Guid & null() noexcept
NULL-Guid: static class method.
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
This class holds all the necessary information to guide the writing of an object in a physical place.
const std::string & auxString() const
Access auxiliary string.
const std::string & containerName() const
Access container name.
Placement & setFileName(const std::string &fileName)
Set file name.
const std::string toString() const
Retrieve the string representation of the placement.
const std::string & fileName() const
Access file name.
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Bool_t IsFundamental() const
std::string Name(unsigned int mod=Reflex::SCOPED) const
void Destruct(void *place) const
This class provides a Generic Transient Address for POOL tokens.
This class provides a token that identifies in a unique way objects on the persistent storage.
const std::string & auxString() const
Access auxiliary string.
Token & setCont(const std::string &cnt)
Set container name.
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Token & setDb(const Guid &db)
Set database name.
const std::string & contID() const
Access container identifier.
const Guid & classID() const
Access database identifier.
Token & setClassID(const Guid &cl_id)
Access database identifier.
virtual const std::string toString() const
Retrieve the string representation of the token.
int technology() const
Access technology type.
int release()
Release token: Decrease reference count and eventually delete.
Token & setOid(const OID_t &oid)
Set object identifier.
const OID_t & oid() const
Access object identifier.
Token & fromString(const std::string_view from)
Build from the string representation of a token.
const Guid & dbID() const
Access database identifier.
static const TypeH forGuid(const Guid &info)
Access classes by Guid.
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.
void commit()
Save catalog to file.
static const DbType POOL_StorageType
static constexpr CLID ID()