12#include "GaudiKernel/ClassID.h"
13#include "GaudiKernel/FileIncident.h"
14#include "GaudiKernel/GenericAddress.h"
15#include "GaudiKernel/IIncidentSvc.h"
16#include "GaudiKernel/IOpaqueAddress.h"
52 getPoolSvc()->setShareMode(
true);
61 incSvc->addListener(
this,
"StoreCleared", pri);
64 return this->AthenaPoolCnvSvc::initialize();
86 return this->AthenaPoolCnvSvc::finalize();
90 const std::string& openMode) {
91 return AthenaPoolCnvSvc::connectOutput(outputConnectionSpec, openMode);
96 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
99 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
100 return(StatusCode::FAILURE);
105 return(StatusCode::SUCCESS);
109 ATH_MSG_DEBUG(std::format(
"connectOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
110 return(StatusCode::SUCCESS);
114 return(StatusCode::SUCCESS);
120 std::size_t apend = outputConnectionSpec.find(
'[');
121 if (apend != std::string::npos) {
122 outputConnection += outputConnectionSpec.substr(apend);
124 if (outputConnectionSpec.find(
"[PoolContainerPrefix=" +
m_metadataContainerProp.value() +
"]") != std::string::npos) {
125 return AthenaPoolCnvSvc::connectOutput(outputConnection,
"APPEND");
127 return AthenaPoolCnvSvc::connectOutput(outputConnection);
133 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
137 if (!this->
cleanUp(outputConnection).isSuccess()) {
139 return(StatusCode::FAILURE);
141 return(StatusCode::SUCCESS);
144 ATH_MSG_DEBUG(
"commitOutput SKIPPED for uninitialized server.");
145 return(StatusCode::SUCCESS);
147 std::map<void*, RootType> commitCache;
148 std::string fileName;
151 const char* placementStr =
nullptr;
154 if (
sc.isSuccess() && placementStr !=
nullptr && strlen(placementStr) > 6 && num > 0) {
155 const char * matchedChars = strstr(placementStr,
"[FILE=");
157 ATH_MSG_ERROR(std::format(
"No matching filename in {}", placementStr));
160 fileName = matchedChars;
161 fileName = fileName.substr(6, fileName.find(
']') - 6);
163 ATH_MSG_ERROR(std::format(
"Failed to connectOutput for {}", fileName));
167 bool dataHeaderSeen =
false;
168 std::string dataHeaderID;
170 std::string objName =
"ALL";
171 if (useDetailChronoStat()) {
172 objName = placementStr;
177 std::string_view pStr = placementStr;
178 std::string::size_type cpos = pStr.find (
"[CONT=");
179 if (cpos == std::string::npos) {
180 ATH_MSG_ERROR(std::format(
"No CONT field in placement string: {}", pStr));
181 return StatusCode::FAILURE;
183 std::string tokenStr (pStr.substr(0, cpos));
184 std::string contName (pStr.substr(cpos, std::string::npos));
185 std::string::size_type cl1 = contName.find(
']');
186 if (cl1 == std::string::npos) {
187 ATH_MSG_ERROR(std::format(
"Missing close bracket after CONT field in placement string: {}", pStr));
188 return StatusCode::FAILURE;
190 tokenStr.append(contName, cl1 + 1);
191 contName = contName.substr(6, cl1 - 6);
193 std::string::size_type ppos = pStr.find (
"[PNAME=");
194 if (ppos == std::string::npos) {
195 ATH_MSG_ERROR(std::format(
"No PNAME field in placement string: {}", pStr));
196 return StatusCode::FAILURE;
198 std::string className (pStr.substr(ppos, std::string::npos));
199 std::string::size_type cl2 = className.find(
']');
200 if (cl2 == std::string::npos) {
201 ATH_MSG_ERROR(std::format(
"Missing close bracket after PNAME field in placement string: {}", pStr));
202 return StatusCode::FAILURE;
204 className = className.substr(7, cl2 - 7);
207 const std::string numStr = std::to_string(num);
209 bool foundContainer =
false;
210 std::size_t opPos = contName.find(
'(');
212 foundContainer =
true;
215 if (contName.compare(0, opPos, item) == 0){
216 foundContainer =
true;
222 if (len > 0 && foundContainer && contName[len] ==
'(' ) {
229 memName, {}, memName,
230 "BeginInputMemFile",
"EndInputMemFile");
237 sc = metadataSvc->shmProxy(std::format(
"{}[NUM={}]", pStr, numStr));
238 if (
sc.isRecoverable()) {
240 }
else if (
sc.isFailure()) {
251 if( m_oneDataHeaderForm.value() ) {
252 auto placementWithSwn = [&] {
return std::format(
"{}[SWN={}]", placementStr, num); };
253 if( className ==
"DataHeaderForm_p6" ) {
256 "", placementWithSwn());
257 DHcnv->updateRepRefs(&address,
static_cast<DataObject*
>(obj)).ignore();
263 if (token ==
nullptr) {
267 tokenStr = token->toString();
269 if( className ==
"DataHeader_p6" ) {
272 tokenStr, placementWithSwn());
273 if (!DHcnv->updateRep(&address,
static_cast<DataObject*
>(obj)).isSuccess()) {
278 if (className !=
"Token" && className !=
"DataHeaderForm_p6" && !classDesc.
IsFundamental()) {
279 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
281 placementStr =
nullptr;
285 placement.
fromString(placementStr); placementStr =
nullptr;
287 if (token ==
nullptr) {
291 tokenStr = token->toString();
292 if (className ==
"DataHeader_p6") {
297 if (!DHcnv->updateRep(&address,
static_cast<DataObject*
>(obj)).isSuccess()) {
301 dataHeaderSeen =
true;
308 dataHeaderID = std::format(
"{}/{}/{}", token->contID(), numStr, token->dbID().toString());
309 }
else if (dataHeaderSeen) {
310 dataHeaderSeen =
false;
313 if (className ==
"DataHeaderForm_p6") {
316 tokenStr, dataHeaderID);
317 if (!DHcnv->updateRepRefs(&address,
static_cast<DataObject*
>(obj)).isSuccess()) {
318 ATH_MSG_ERROR(
"Failed updateRepRefs for obj = " << tokenStr);
323 GenericAddress address(0, 0,
"", dataHeaderID);
324 if (!DHcnv->updateRepRefs(&address,
nullptr).isSuccess()) {
330 if (className !=
"Token" && className !=
"DataHeaderForm_p6" && !classDesc.
IsFundamental()) {
331 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
338 while (
sc.isRecoverable()) {
341 if (!
sc.isSuccess()) {
347 while (
sc.isRecoverable()) {
350 if (
sc.isFailure()) {
355 if (dataHeaderSeen) {
357 GenericAddress address(0, 0,
"", std::move(dataHeaderID));
358 if (!DHcnv->updateRepRefs(&address,
nullptr).isSuccess()) {
363 placementStr =
nullptr;
364 }
else if (
sc.isSuccess() && placementStr !=
nullptr && strncmp(placementStr,
"stop", 4) == 0) {
365 return(StatusCode::RECOVERABLE);
366 }
else if (
sc.isRecoverable() || num == -1) {
367 return(StatusCode::RECOVERABLE);
369 if (
sc.isFailure() || fileName.empty()) {
374 memName, {}, memName,
375 "BeginInputMemFile",
"EndInputMemFile");
377 if (
sc.isFailure()) {
378 ATH_MSG_INFO(
"All SharedWriter clients stopped - exiting");
382 return(StatusCode::FAILURE);
386 ATH_MSG_DEBUG(std::format(
"commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
387 return(StatusCode::SUCCESS);
389 if (outputConnection.empty()) {
390 outputConnection = std::move(fileName);
392 outputConnection = outputConnectionSpec;
397 std::size_t
merge = outputConnection.find(
"?pmerge=");
398 const std::string baseOutputConnection = outputConnection.substr(0,
merge);
407 StatusCode status = AthenaPoolCnvSvc::commitOutput(outputConnection, doCommit);
408 for (
auto& [ptr, rootType] : commitCache) {
409 rootType.Destruct(ptr);
416 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
419 return(StatusCode::SUCCESS);
425 return(StatusCode::SUCCESS);
434 return AthenaPoolCnvSvc::disconnectOutput(outputConnectionSpec +
m_streamPortString.value());
441 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
445 Token* token =
nullptr;
449 std::string placementStr = placement->
toString();
450 placementStr +=
"[PNAME=";
451 placementStr += classDesc.
Name();
455 while (
sc.isRecoverable()) {
459 if (!
sc.isSuccess()) {
464 const void* buffer =
nullptr;
465 std::size_t nbytes = 0;
467 if (classDesc.
Name() ==
"Token") {
468 nbytes = strlen(
static_cast<const char*
>(obj)) + 1;
472 nbytes = classDesc.
SizeOf();
480 while (
sc.isRecoverable()) {
484 if (own) {
delete []
static_cast<const char*
>(buffer); }
486 if (!
sc.isSuccess()) {
487 ATH_MSG_ERROR(
"Could not share object for: " << placementStr);
493 ATH_MSG_ERROR(
"Could not share dynamic aux store for: " << placementStr);
502 const char* tokenStr =
nullptr;
505 while (
sc.isRecoverable()) {
509 if (!
sc.isSuccess()) {
513 if (!strcmp(tokenStr,
"ABORT")) {
520 tempToken->
fromString(tokenStr); tokenStr =
nullptr;
522 token = tempToken; tempToken =
nullptr;
526 ATH_MSG_DEBUG(
"registerForWrite SKIPPED for uninitialized server, Placement = " << placement->
toString());
529 token = tempToken; tempToken =
nullptr;
535 token = getPoolSvc()->registerForWrite(placement, obj, classDesc);
540 token = AthenaPoolCnvSvc::registerForWrite(placement, obj, classDesc);
549 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
554 int num = token->
oid().first;
556 void* buffer =
nullptr;
557 std::size_t nbytes = 0;
559 while (
sc.isRecoverable()) {
563 if (!
sc.isSuccess()) {
571 obj =
m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer =
nullptr;
574 std::string className = token->
auxString();
575 className = className.substr(className.find(
"[PNAME="));
576 className = className.substr(7, className.find(
']') - 7);
578 obj =
m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer =
nullptr;
594 void* buffer =
nullptr;
595 std::size_t nbytes = 0;
596 StatusCode
sc = StatusCode::FAILURE;
601 while (
sc.isRecoverable()) {
606 if (!
sc.isSuccess()) {
619 AthenaPoolCnvSvc::setObjPtr(obj, token);
625 const std::string* par,
626 const unsigned long* ip,
627 IOpaqueAddress*& refpAddress) {
633 addressToken.
setDb(par[0].substr(4));
637 void* buffer =
nullptr;
638 std::size_t nbytes = 0;
640 while (
sc.isRecoverable()) {
644 if (!
sc.isSuccess()) {
646 return(StatusCode::FAILURE);
648 auto token = std::make_unique<Token>();
649 token->fromString(
static_cast<const char*
>(buffer)); buffer =
nullptr;
656 return(StatusCode::SUCCESS);
659 return(StatusCode::RECOVERABLE);
662 return AthenaPoolCnvSvc::createAddress(svcType, clid, par, ip, refpAddress);
668 const std::string& refAddress,
669 IOpaqueAddress*& refpAddress) {
670 return AthenaPoolCnvSvc::createAddress(svcType, clid, refAddress, refpAddress);
674 auto pos = connection.find(
"?pmerge=");
675 std::string conn = (pos == std::string::npos) ? connection : connection.substr(0, pos);
676 return AthenaPoolCnvSvc::cleanUp(conn);
690 return(StatusCode::FAILURE);
693 m_persSvcPerOutput.setValue(
false);
694 return(StatusCode::SUCCESS);
696 return(StatusCode::RECOVERABLE);
699 return(StatusCode::RECOVERABLE);
708 std::string streamPortSuffix;
711 return(StatusCode::FAILURE);
714 ATH_MSG_DEBUG(
"makeClient: Setting conversion service port suffix to " << streamPortSuffix);
719 return(StatusCode::SUCCESS);
722 std::string dummyStr;
728 return(StatusCode::FAILURE);
730 const char* tokenStr =
nullptr;
733 if (
sc.isSuccess() && tokenStr !=
nullptr && strlen(tokenStr) > 0 && num > 0) {
734 ATH_MSG_DEBUG(
"readData: " << tokenStr <<
", for client: " << num);
741 token.
fromString(tokenStr); tokenStr =
nullptr;
743 std::string objName =
"ALL";
744 if (useDetailChronoStat()) {
752 void* buffer =
nullptr;
753 std::size_t nbytes = 0;
756 while (
sc.isRecoverable()) {
759 delete []
static_cast<char*
>(buffer); buffer =
nullptr;
760 if (!
sc.isSuccess()) {
762 return(StatusCode::FAILURE);
767 return(StatusCode::FAILURE);
772 return(StatusCode::FAILURE);
775 std::string returnToken;
777 if( metadataToken ) {
778 returnToken = metadataToken->
toString();
779 metadataToken->
release(); metadataToken =
nullptr;
787 return(StatusCode::FAILURE);
790 return(StatusCode::RECOVERABLE);
792 return(StatusCode::SUCCESS);
801 return(StatusCode::SUCCESS);
810 StatusCode
sc = StatusCode::SUCCESS;
811 while (
sc.isSuccess()) {
817 while (
sc.isRecoverable()) {
821 return StatusCode::FAILURE;
832 base_class(name, pSvcLocator) {
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
This file contains the class definition for the AthenaPoolSharedIOCnvSvc class.
This file contains the class definition for the AuxDiscoverySvc class.
uint32_t CLID
The Class ID type.
This file contains the class definition for the Placement class (migrated from POOL).
This file contains the class definition for the TokenAddress class.
This file contains the class definition for the Token class (migrated from POOL).
#define ATLAS_THREAD_SAFE
std::map< std::string, int > m_fileCommitCounter
Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual StatusCode finalize() override
Required of all Gaudi Services.
AthenaPoolSharedIOCnvSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual void setObjPtr(void *&obj, const Token *token) override
bool m_streamServerActive
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
virtual StatusCode makeServer(int num) override
Make this a server.
virtual void handle(const Incident &incident) override
Implementation of IIncidentListener: Handle for EndEvent incidence.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
virtual StatusCode readData() override
Read the next data object.
virtual StatusCode commitCatalog() override
Commit Catalog.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting
Gaudi::Property< int > m_streamingTechnology
Use Streaming for selected technologies only.
virtual StatusCode initialize() override
Required of all Gaudi Services.
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
virtual StatusCode makeClient(int num) override
Make this a client.
This class provides the interface between AthenaPoolCnvSvc and AuxStore classes.
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
static const Guid & null() noexcept
NULL-Guid: static class method.
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
This class holds all the necessary information to guide the writing of an object in a physical place.
const std::string & auxString() const
Access auxiliary string.
const std::string & containerName() const
Access container name.
Placement & setFileName(const std::string &fileName)
Set file name.
const std::string toString() const
Retrieve the string representation of the placement.
Placement & setTechnology(int technology)
Set technology type.
const std::string & fileName() const
Access file name.
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
int technology() const
Access technology type.
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Bool_t IsFundamental() const
std::string Name(unsigned int mod=Reflex::SCOPED) const
void Destruct(void *place) const
This class provides a Generic Transient Address for POOL tokens.
This class provides a token that identifies in a unique way objects on the persistent storage.
const std::string & auxString() const
Access auxiliary string.
Token & setCont(const std::string &cnt)
Set container name.
Token & setDb(const Guid &db)
Set database name.
const std::string & contID() const
Access container identifier.
const Guid & classID() const
Access database identifier.
Token & setClassID(const Guid &cl_id)
Access database identifier.
virtual const std::string toString() const
Retrieve the string representation of the token.
int technology() const
Access technology type.
int release()
Release token: Decrease reference count and eventually delete.
Token & setOid(const OID_t &oid)
Set object identifier.
const OID_t & oid() const
Access object identifier.
Token & fromString(const std::string_view from)
Build from the string representation of a token.
const Guid & dbID() const
Access database identifier.
Token & setAuxString(std::string &&auxString)
Set auxiliary string.
static const TypeH forGuid(const Guid &info)
Access classes by Guid.
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.
static DbType getType(const std::string &name)
Access known storage type object by name.
void commit()
Save catalog to file.
static const DbType POOL_StorageType
static constexpr CLID ID()