 |
ATLAS Offline Software
|
Go to the documentation of this file.
12 #include "GaudiKernel/ClassID.h"
13 #include "GaudiKernel/FileIncident.h"
14 #include "GaudiKernel/GenericAddress.h"
15 #include "GaudiKernel/IIncidentSvc.h"
16 #include "GaudiKernel/IOpaqueAddress.h"
49 getPoolSvc()->setShareMode(
true);
58 incSvc->addListener(
this,
"StoreCleared", pri);
87 const std::string& ) {
93 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
96 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
97 return(StatusCode::FAILURE);
102 return(StatusCode::SUCCESS);
106 ATH_MSG_DEBUG(
"connectOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
107 return(StatusCode::SUCCESS);
111 return(StatusCode::SUCCESS);
117 std::size_t apend = outputConnectionSpec.find(
'[');
118 if (apend != std::string::npos) {
119 outputConnection += outputConnectionSpec.substr(apend);
127 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
131 if (!this->cleanUp(outputConnection).isSuccess()) {
133 return(StatusCode::FAILURE);
135 return(StatusCode::SUCCESS);
138 ATH_MSG_DEBUG(
"commitOutput SKIPPED for uninitialized server.");
139 return(StatusCode::SUCCESS);
141 std::map<void*, RootType> commitCache;
145 const char* placementStr =
nullptr;
148 if (
sc.isSuccess() && placementStr !=
nullptr && strlen(placementStr) > 6 &&
num > 0) {
149 const char * matchedChars = strstr(placementStr,
"[FILE=");
150 if (not matchedChars){
161 bool dataHeaderSeen =
false;
162 std::string dataHeaderID;
164 std::string objName =
"ALL";
165 if (useDetailChronoStat()) {
166 std::string objName(placementStr);
171 std::string tokenStr = placementStr;
172 std::string contName = strstr(placementStr,
"[CONT=");
173 tokenStr.erase(tokenStr.find(
"[CONT="));
174 tokenStr.append(contName, contName.find(
']') + 1);
175 contName = contName.substr(6, contName.find(
']') - 6);
176 std::string
className = strstr(placementStr,
"[PNAME=");
180 std::ostringstream oss2;
181 oss2 << std::dec <<
num;
183 bool foundContainer =
false;
184 std::size_t pPos = contName.find(
'(');
186 foundContainer =
true;
189 if (contName.compare(0, pPos,
item) == 0){
190 foundContainer =
true;
196 if (len > 0 && foundContainer && contName[len] ==
'(' ) {
201 std::ostringstream oss1;
203 std::string memName =
"SHM[NUM=" + oss1.str() +
"]";
204 FileIncident beginInputIncident(
name(),
"BeginInputFile", memName);
205 incSvc->fireIncident(beginInputIncident);
206 FileIncident endInputIncident(
name(),
"EndInputFile", memName);
207 incSvc->fireIncident(endInputIncident);
214 sc = metadataSvc->shmProxy(std::string(placementStr) +
"[NUM=" + oss2.str() +
"]");
215 if (
sc.isRecoverable()) {
217 }
else if (
sc.isFailure()) {
228 if( m_oneDataHeaderForm.value() ) {
229 auto placementWithSwn = [&] {
return std::format(
"{}[SWN={}]", placementStr,
num); };
233 "", placementWithSwn());
234 DHcnv->updateRepRefs(&
address,
static_cast<DataObject*
>(
obj)).ignore();
240 if (token ==
nullptr) {
249 tokenStr, placementWithSwn());
250 if (!DHcnv->updateRep(&
address,
static_cast<DataObject*
>(
obj)).isSuccess()) {
256 commitCache.insert(std::pair<void*, RootType>(
obj, classDesc));
258 placementStr =
nullptr;
262 placement.
fromString(placementStr); placementStr =
nullptr;
264 if (token ==
nullptr) {
274 if (!DHcnv->updateRep(&
address,
static_cast<DataObject*
>(
obj)).isSuccess()) {
278 dataHeaderSeen =
true;
286 }
else if (dataHeaderSeen) {
287 dataHeaderSeen =
false;
293 tokenStr, dataHeaderID);
294 if (!DHcnv->updateRepRefs(&
address,
static_cast<DataObject*
>(
obj)).isSuccess()) {
295 ATH_MSG_ERROR(
"Failed updateRepRefs for obj = " << tokenStr);
300 GenericAddress
address(0, 0,
"", dataHeaderID);
301 if (!DHcnv->updateRepRefs(&
address,
nullptr).isSuccess()) {
308 commitCache.insert(std::pair<void*, RootType>(
obj, classDesc));
315 while (
sc.isRecoverable()) {
318 if (!
sc.isSuccess()) {
324 while (
sc.isRecoverable()) {
327 if (
sc.isFailure()) {
332 if (dataHeaderSeen) {
334 GenericAddress
address(0, 0,
"", dataHeaderID);
335 if (!DHcnv->updateRepRefs(&
address,
nullptr).isSuccess()) {
340 placementStr =
nullptr;
341 }
else if (
sc.isSuccess() && placementStr !=
nullptr && strncmp(placementStr,
"stop", 4) == 0) {
342 return(StatusCode::RECOVERABLE);
343 }
else if (
sc.isRecoverable() ||
num == -1) {
344 return(StatusCode::RECOVERABLE);
348 std::ostringstream oss1;
350 std::string memName =
"SHM[NUM=" + oss1.str() +
"]";
351 FileIncident beginInputIncident(
name(),
"BeginInputFile", memName);
352 incSvc->fireIncident(beginInputIncident);
353 FileIncident endInputIncident(
name(),
"EndInputFile", memName);
354 incSvc->fireIncident(endInputIncident);
355 if (
sc.isFailure()) {
356 ATH_MSG_INFO(
"All SharedWriter clients stopped - exiting");
360 return(StatusCode::FAILURE);
364 ATH_MSG_DEBUG(
"commitOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
365 return(StatusCode::SUCCESS);
367 if (outputConnection.empty()) {
370 outputConnection = outputConnectionSpec;
384 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find(
'['));
387 return(StatusCode::SUCCESS);
393 return(StatusCode::SUCCESS);
409 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
413 Token* token =
nullptr;
417 std::string placementStr = placement->
toString();
418 placementStr +=
"[PNAME=";
419 placementStr += classDesc.
Name();
423 while (
sc.isRecoverable()) {
427 if (!
sc.isSuccess()) {
432 const void*
buffer =
nullptr;
433 std::size_t nbytes = 0;
435 if (classDesc.
Name() ==
"Token") {
436 nbytes = strlen(
static_cast<const char*
>(
obj)) + 1;
440 nbytes = classDesc.
SizeOf();
448 while (
sc.isRecoverable()) {
452 if (own) {
delete []
static_cast<const char*
>(
buffer); }
454 if (!
sc.isSuccess()) {
455 ATH_MSG_ERROR(
"Could not share object for: " << placementStr);
461 ATH_MSG_ERROR(
"Could not share dynamic aux store for: " << placementStr);
470 const char* tokenStr =
nullptr;
473 while (
sc.isRecoverable()) {
477 if (!
sc.isSuccess()) {
481 if (!strcmp(tokenStr,
"ABORT")) {
488 tempToken->
fromString(tokenStr); tokenStr =
nullptr;
490 token = tempToken; tempToken =
nullptr;
494 ATH_MSG_DEBUG(
"registerForWrite SKIPPED for uninitialized server, Placement = " << placement->
toString());
497 token = tempToken; tempToken =
nullptr;
500 token = getPoolSvc()->registerForWrite(placement,
obj, classDesc);
514 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
519 int num = token->
oid().first;
522 std::size_t nbytes = 0;
524 while (
sc.isRecoverable()) {
528 if (!
sc.isSuccess()) {
560 std::size_t nbytes = 0;
566 while (
sc.isRecoverable()) {
571 if (!
sc.isSuccess()) {
590 const std::string*
par,
591 const unsigned long*
ip,
592 IOpaqueAddress*& refpAddress) {
595 ATH_MSG_ERROR(
"Could not make AthenaPoolSharedIOCnvSvc a Share Client");
596 return(StatusCode::FAILURE);
604 const std::string& refAddress,
605 IOpaqueAddress*& refpAddress) {
620 return(StatusCode::FAILURE);
623 m_persSvcPerOutput.setValue(
false);
624 return(StatusCode::SUCCESS);
626 return(StatusCode::RECOVERABLE);
629 return(StatusCode::RECOVERABLE);
638 std::string streamPortSuffix;
641 return(StatusCode::FAILURE);
644 ATH_MSG_DEBUG(
"makeClient: Setting conversion service port suffix to " << streamPortSuffix);
649 return(StatusCode::SUCCESS);
652 std::string dummyStr;
658 return(StatusCode::FAILURE);
660 const char* tokenStr =
nullptr;
663 if (
sc.isSuccess() && tokenStr !=
nullptr && strlen(tokenStr) > 0 &&
num > 0) {
671 token.
fromString(tokenStr); tokenStr =
nullptr;
673 std::string objName =
"ALL";
674 if (useDetailChronoStat()) {
683 std::size_t nbytes = 0;
686 while (
sc.isRecoverable()) {
690 if (!
sc.isSuccess()) {
692 return(StatusCode::FAILURE);
697 return(StatusCode::FAILURE);
702 return(StatusCode::FAILURE);
705 std::string returnToken;
707 if (metadataToken !=
nullptr) {
708 returnToken = metadataToken->
toString();
712 delete metadataToken; metadataToken =
nullptr;
717 return(StatusCode::FAILURE);
720 return(StatusCode::RECOVERABLE);
722 return(StatusCode::SUCCESS);
731 return(StatusCode::SUCCESS);
741 while (
sc.isSuccess()) {
747 while (
sc.isRecoverable()) {
751 return StatusCode::FAILURE;
762 base_class(
name, pSvcLocator) {
JetConstituentVector::iterator iterator
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
virtual StatusCode makeServer(int num) override
Make this a server.
This file contains the class definition for the AuxDiscoverySvc class.
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc
static const Guid & null()
NULL-Guid: static class method.
This class holds all the necessary information to guide the writing of an object in a physical place.
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
virtual StatusCode initialize() override
Required of all Gaudi Services.
virtual StatusCode readData() override
Read the next data object.
const std::string & containerName() const
Access container name.
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
const std::string & contID() const
Access container identifier.
bool m_streamServerActive
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
const std::string & auxString() const
Access auxiliary string.
const Guid & dbID() const
Access database identifier.
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
const std::string toString() const
Automatic conversion to string representation.
void commit()
Save catalog to file.
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
const Guid & classID() const
Access database identifier.
virtual void setObjPtr(void *&obj, const Token *token) override
#define ATH_MSG_VERBOSE(x)
static const TypeH forGuid(const Guid &info)
Access classes by Guid.
virtual void handle(const Incident &incident) override
Implementation of IIncidentListener: Handle for EndEvent incidence.
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
StringProperty m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
This class provides a token that identifies in a unique way objects on the persistent storage.
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
Token & fromString(const std::string &from)
Build from the string representation of a token.
Token & setClassID(const Guid &cl_id)
Access database identifier.
virtual StatusCode commitCatalog() override
Commit Catalog.
int technology() const
Access technology type.
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
virtual StatusCode finalize() override
Required of all Gaudi Services.
::StatusCode StatusCode
StatusCode definition for legacy code.
IntegerProperty m_streamingTechnology
Use Streaming for selected technologies only.
Default, invalid implementation of ClassID_traits.
const std::string & fileName() const
Access file name.
Placement & setFileName(const std::string &fileName)
Set file name.
StringArrayProperty m_metadataContainersAug
This file contains the class definition for the AthenaPoolSharedIOCnvSvc class.
uint32_t CLID
The Class ID type.
virtual StatusCode finalize() override
Required of all Gaudi Services.
const std::string & auxString() const
Access auxiliary string.
std::string Name(unsigned int mod=Reflex::SCOPED) const
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
virtual void setObjPtr(void *&obj, const Token *token) override
IntegerProperty m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
virtual const std::string toString() const
Retrieve the string representation of the token.
virtual StatusCode makeClient(int num) override
Make this a client.
Token & setOid(const OID_t &oid)
Set object identifier.
const std::string toString() const
Retrieve the string representation of the placement.
Bool_t IsFundamental() const
#define ATH_MSG_WARNING(x)
const OID_t & oid() const
Access object identifier.
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
void Destruct(void *place) const
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
virtual StatusCode initialize() override
Required of all Gaudi Services.
#define ATLAS_THREAD_SAFE
This file contains the class definition for the Placement class (migrated from POOL).
AthenaPoolSharedIOCnvSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
This file contains the class definition for the Token class (migrated from POOL).
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
StringProperty m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
This class provides the interface between AthenaPoolCnvSvc and AuxStore classes.
BooleanProperty m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.