ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaPoolSharedIOCnvSvc Class Reference

This class provides the interface between Athena and PoolSvc. More...

#include <AthenaPoolSharedIOCnvSvc.h>

Inheritance diagram for AthenaPoolSharedIOCnvSvc:
Collaboration diagram for AthenaPoolSharedIOCnvSvc:

Public Member Functions

virtual StatusCode initialize () override
 Required of all Gaudi Services.
virtual StatusCode finalize () override
 Required of all Gaudi Services.
virtual StatusCode connectOutput (const std::string &outputConnectionSpec, const std::string &openMode) override
 Implementation of IConversionSvc: Connect to the output connection specification with open mode.
virtual StatusCode connectOutput (const std::string &outputConnectionSpec) override
 Implementation of IConversionSvc: Connect to the output connection specification with open mode.
virtual StatusCode commitOutput (const std::string &outputConnectionSpec, bool doCommit) override
 Implementation of IConversionSvc: Commit pending output.
virtual StatusCode disconnectOutput (const std::string &outputConnectionSpec) override
 Disconnect to the output connection.
virtual TokenregisterForWrite (Placement *placement, const void *obj, const RootType &classDesc) override
virtual void setObjPtr (void *&obj, const Token *token) override
StatusCode createAddress (long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
 Create a Generic address using explicit arguments to identify a single object.
virtual StatusCode createAddress (long svcType, const CLID &clid, const std::string &refAddress, IOpaqueAddress *&refpAddress) override
 Create address from string form.
virtual StatusCode makeServer (int num) override
 Make this a server.
virtual StatusCode makeClient (int num) override
 Make this a client.
virtual StatusCode readData () override
 Read the next data object.
virtual StatusCode commitCatalog () override
 Commit Catalog.
StatusCode abortSharedWrClients (int client_n)
 Send abort to SharedWriter clients if the server quits on error.
virtual void handle (const Incident &incident) override
 Implementation of IIncidentListener: Handle for EndEvent incidence.
 AthenaPoolSharedIOCnvSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor.
virtual ~AthenaPoolSharedIOCnvSvc ()=default
 Destructor.

Private Attributes

ServiceHandle< IAthenaSerializeSvcm_serializeSvc {this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"}
ToolHandle< IAthenaIPCToolm_inputStreamingTool {this,"InputStreamingTool",{}}
ToolHandle< IAthenaIPCToolm_outputStreamingTool {this,"OutputStreamingTool",{}}
bool m_streamServerActive =false
int m_metadataClient =0
Gaudi::Property< std::string > m_metadataContainerProp {this,"OutputMetadataContainer","MetaData"}
 For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug {this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"}
Gaudi::Property< int > m_makeStreamingToolClient {this,"MakeStreamingToolClient",0}
 Make this instance a Streaming Client during first connect/write automatically.
Gaudi::Property< int > m_streamingTechnology {this,"StreamingTechnology",-1}
 Use Streaming for selected technologies only.
Gaudi::Property< bool > m_parallelCompression {this,"ParallelCompression",true}
 Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Gaudi::Property< std::string > m_streamPortString {this,"StreamPortString","?pmerge=localhost:0"}
 Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".

Friends

class SvcFactory< AthenaPoolSharedIOCnvSvc >

Detailed Description

This class provides the interface between Athena and PoolSvc.

Definition at line 25 of file AthenaPoolSharedIOCnvSvc.h.

Constructor & Destructor Documentation

◆ AthenaPoolSharedIOCnvSvc()

AthenaPoolSharedIOCnvSvc::AthenaPoolSharedIOCnvSvc ( const std::string & name,
ISvcLocator * pSvcLocator )

Standard Service Constructor.

Definition at line 809 of file AthenaPoolSharedIOCnvSvc.cxx.

809 :
810 base_class(name, pSvcLocator) {
811}

◆ ~AthenaPoolSharedIOCnvSvc()

virtual AthenaPoolSharedIOCnvSvc::~AthenaPoolSharedIOCnvSvc ( )
virtualdefault

Destructor.

Member Function Documentation

◆ abortSharedWrClients()

StatusCode AthenaPoolSharedIOCnvSvc::abortSharedWrClients ( int client_n)

Send abort to SharedWriter clients if the server quits on error.

Parameters
client_n[IN] number of the current client, -1 if no current

Definition at line 783 of file AthenaPoolSharedIOCnvSvc.cxx.

784{
785 ATH_MSG_ERROR("Sending ABORT to clients");
786 // the master process will kill this process once workers abort
787 // but it could be a time-limited loop
788 StatusCode sc = StatusCode::SUCCESS;
789 while (sc.isSuccess()) {
790 if (client_n >= 0) {
791 sc = m_outputStreamingTool->lockObject("ABORT", client_n);
792 }
793 const char* dummy;
794 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
795 while (sc.isRecoverable()) {
796 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
797 }
798 }
799 return StatusCode::FAILURE;
800}
#define ATH_MSG_ERROR(x)
static Double_t sc
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
::StatusCode StatusCode
StatusCode definition for legacy code.

◆ commitCatalog()

StatusCode AthenaPoolSharedIOCnvSvc::commitCatalog ( )
overridevirtual

Commit Catalog.

Definition at line 774 of file AthenaPoolSharedIOCnvSvc.cxx.

774 {
775 pool::IFileCatalog* catalog ATLAS_THREAD_SAFE = // This is on the SharedWriter, after mother process finishes events
776 const_cast<pool::IFileCatalog*>(getPoolSvc()->catalog());
777 catalog->commit();
778 catalog->start();
779 return(StatusCode::SUCCESS);
780}
#define ATLAS_THREAD_SAFE

◆ commitOutput()

StatusCode AthenaPoolSharedIOCnvSvc::commitOutput ( const std::string & outputConnectionSpec,
bool doCommit )
overridevirtual

Implementation of IConversionSvc: Commit pending output.

Parameters
doCommit[IN] boolean to force full commit

Definition at line 128 of file AthenaPoolSharedIOCnvSvc.cxx.

128 {
129 // This is called after all DataObjects are converted.
130 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
131 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
132 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
133 m_outputStreamingTool->lockObject("wait").ignore();
134 if (!this->cleanUp(outputConnection).isSuccess()) {
135 ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
136 return(StatusCode::FAILURE);
137 }
138 return(StatusCode::SUCCESS);
139 }
140 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
141 ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server.");
142 return(StatusCode::SUCCESS);
143 }
144 std::map<void*, RootType> commitCache;
145 std::string fileName;
146 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && m_outputStreamingTool->isServer() && m_streamServerActive) {
147 // Clear object to get Placements for all objects in a Stream
148 const char* placementStr = nullptr;
149 int num = -1;
150 StatusCode sc = m_outputStreamingTool->clearObject(&placementStr, num);
151 if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
152 const char * matchedChars = strstr(placementStr, "[FILE=");
153 if (!matchedChars){
154 ATH_MSG_ERROR(std::format("No matching filename in {}", placementStr));
155 return abortSharedWrClients(num);
156 }
157 fileName = matchedChars;
158 fileName = fileName.substr(6, fileName.find(']') - 6);
159 if (!this->connectOutput(fileName).isSuccess()) {
160 ATH_MSG_ERROR(std::format("Failed to connectOutput for {}", fileName));
161 return abortSharedWrClients(num);
162 }
163 IConverter* DHcnv = converter(ClassID_traits<DataHeader>::ID());
164 bool dataHeaderSeen = false;
165 std::string dataHeaderID;
166 while (num > 0) {
167 std::string objName = "ALL";
168 if (useDetailChronoStat()) {
169 objName = placementStr; //FIXME, better descriptor
170 }
171 // StopWatch listens from here until the end of this current scope
172 {
173 PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
174 std::string_view pStr = placementStr;
175 std::string::size_type cpos = pStr.find ("[CONT=");
176 if (cpos == std::string::npos) {
177 ATH_MSG_ERROR(std::format("No CONT field in placement string: {}", pStr));
178 return StatusCode::FAILURE;
179 }
180 std::string tokenStr (pStr.substr(0, cpos));
181 std::string contName (pStr.substr(cpos, std::string::npos));
182 std::string::size_type cl1 = contName.find(']');
183 if (cl1 == std::string::npos) {
184 ATH_MSG_ERROR(std::format("Missing close bracket after CONT field in placement string: {}", pStr));
185 return StatusCode::FAILURE;
186 }
187 tokenStr.append(contName, cl1 + 1);
188 contName = contName.substr(6, cl1 - 6);
189
190 std::string::size_type ppos = pStr.find ("[PNAME=");
191 if (ppos == std::string::npos) {
192 ATH_MSG_ERROR(std::format("No PNAME field in placement string: {}", pStr));
193 return StatusCode::FAILURE;
194 }
195 std::string className (pStr.substr(ppos, std::string::npos));
196 std::string::size_type cl2 = className.find(']');
197 if (cl2 == std::string::npos) {
198 ATH_MSG_ERROR(std::format("Missing close bracket after PNAME field in placement string: {}", pStr));
199 return StatusCode::FAILURE;
200 }
201 className = className.substr(7, cl2 - 7);
202 RootType classDesc = RootType::ByNameNoQuiet(className);
203 void* obj = nullptr;
204 const std::string numStr = std::to_string(num);
205 std::string::size_type len = m_metadataContainerProp.value().size();
206 bool foundContainer = false;
207 std::size_t opPos = contName.find('(');
208 if (contName.compare(0, opPos, m_metadataContainerProp.value()) == 0) {
209 foundContainer = true;
210 } else {
211 for (const auto& item: m_metadataContainersAug.value()) {
212 if (contName.compare(0, opPos, item) == 0){
213 foundContainer = true;
214 len = item.size();
215 break;
216 }
217 }
218 }
219 if (len > 0 && foundContainer && contName[len] == '(' ) {
220 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
221 // For Metadata, before moving to next client, fire file incidents
222 if (m_metadataClient != num) {
223 if (m_metadataClient != 0) {
224 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
225 FileIncident beginInputIncident(name(), "BeginInputMemFile", memName);
226 incSvc->fireIncident(beginInputIncident);
227 FileIncident endInputIncident(name(), "EndInputMemFile", std::move(memName));
228 incSvc->fireIncident(endInputIncident);
229 }
231 }
232 // Retrieve MetaDataSvc
233 ServiceHandle<IAthMetaDataSvc> metadataSvc("MetaDataSvc", name());
234 ATH_CHECK(metadataSvc.retrieve());
235 sc = metadataSvc->shmProxy(std::format("{}[NUM={}]", pStr, numStr));
236 if (sc.isRecoverable()) {
237 ATH_MSG_WARNING("MetaDataSvc::shmProxy() no proxy added.");
238 } else if (sc.isFailure()) {
239 ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!");
240 return abortSharedWrClients(num);
241 }
242 } else {
243 Token readToken;
244 readToken.setOid(Token::OID_t(num, 0));
245 readToken.setAuxString("[PNAME=" + className + "]");
246 this->setObjPtr(obj, &readToken); // Pull/read Object out of shared memory
247 if (len == 0 || contName.compare(0, len, m_metadataContainerProp.value()) != 0) {
248 // Write object
249 if( m_oneDataHeaderForm.value() ) {
250 auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
251 if( className == "DataHeaderForm_p6" ) {
252 // Pass DHForms to the converter for later writing in the correct order - do not write it now
254 "", placementWithSwn());
255 DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
256 tokenStr = "";
257 } else {
258 Placement placement;
259 placement.fromString(placementStr);
260 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
261 if (token == nullptr) {
262 ATH_MSG_ERROR("Failed to write Data for: " << className);
263 return abortSharedWrClients(num);
264 }
265 tokenStr = token->toString();
266 }
267 if( className == "DataHeader_p6" ) {
268 // Found DataHeader - call the converter to update DHForm Ref
270 tokenStr, placementWithSwn());
271 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
272 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
273 return abortSharedWrClients(num);
274 }
275 } else
276 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
277 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
278 }
279 placementStr = nullptr;
280 } else {
281 // Multiple shared DataHeaderForms
282 Placement placement;
283 placement.fromString(placementStr); placementStr = nullptr;
284 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
285 if (token == nullptr) {
286 ATH_MSG_ERROR("Failed to write Data for: " << className);
287 return abortSharedWrClients(num);
288 }
289 tokenStr = token->toString();
290 if (className == "DataHeader_p6") {
291 // Found DataHeader
293 tokenStr, placement.auxString());
294 // call DH converter to add the ref to DHForm (stored earlier) and to itself
295 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
296 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
297 return abortSharedWrClients(num);
298 }
299 dataHeaderSeen = true;
300 // This dataHeaderID is used in DataHeaderCnv to index the DataHeaderForm cache.
301 // It must be unique per worker per stream so that we have a correct DataHeader(Form) association.
302 // This is achieved by building it as "CONTID/WORKERID/DBID".
303 // CONTID, e.g., POOLContainer(DataHeader), allows us to distinguish data and metadata headers,
304 // WORKERID allows us to distinguish AthenaMP workers,
305 // and DBID allows us to distinguish streams.
306 dataHeaderID = std::format("{}/{}/{}", token->contID(), numStr, token->dbID().toString());
307 } else if (dataHeaderSeen) {
308 dataHeaderSeen = false;
309 // next object after DataHeader - may be a DataHeaderForm
310 // in any case we need to call the DH converter to update the DHForm Ref
311 if (className == "DataHeaderForm_p6") {
312 // Tell DataHeaderCnv that it should use a new DHForm
314 tokenStr, dataHeaderID);
315 if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
316 ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr);
317 return abortSharedWrClients(num);
318 }
319 } else {
320 // Tell DataHeaderCnv that it should use the old DHForm
321 GenericAddress address(0, 0, "", dataHeaderID);
322 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
323 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
324 return abortSharedWrClients(num);
325 }
326 }
327 }
328 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
329 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
330 }
331 }
332 }
333 }
334 // Send Token back to Client
335 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
336 while (sc.isRecoverable()) {
337 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
338 }
339 if (!sc.isSuccess()) {
340 ATH_MSG_ERROR("Failed to lock Data for " << tokenStr);
341 return abortSharedWrClients(-1);
342 }
343 }
344 sc = m_outputStreamingTool->clearObject(&placementStr, num);
345 while (sc.isRecoverable()) {
346 sc = m_outputStreamingTool->clearObject(&placementStr, num);
347 }
348 if (sc.isFailure()) {
349 // no more clients, break the loop and exit
350 num = -1;
351 }
352 }
353 if (dataHeaderSeen) {
354 // DataHeader was the last object, need to tell the converter there is no DHForm coming
355 GenericAddress address(0, 0, "", std::move(dataHeaderID));
356 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
357 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
358 return abortSharedWrClients(-1);
359 }
360 }
361 placementStr = nullptr;
362 } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) {
363 return(StatusCode::RECOVERABLE);
364 } else if (sc.isRecoverable() || num == -1) {
365 return(StatusCode::RECOVERABLE);
366 }
367 if (sc.isFailure() || fileName.empty()) {
368 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
369 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
370 FileIncident beginInputIncident(name(), "BeginInputMemFile", memName);
371 incSvc->fireIncident(beginInputIncident);
372 FileIncident endInputIncident(name(), "EndInputMemFile", std::move(memName));
373 incSvc->fireIncident(endInputIncident);
374 if (sc.isFailure()) {
375 ATH_MSG_INFO("All SharedWriter clients stopped - exiting");
376 } else {
377 ATH_MSG_INFO("Failed to get Data for client: " << num);
378 }
379 return(StatusCode::FAILURE);
380 }
381 }
382 if (m_parallelCompression && !fileName.empty()) {
383 ATH_MSG_DEBUG(std::format("commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
384 return(StatusCode::SUCCESS);
385 }
386 if (outputConnection.empty()) {
387 outputConnection = std::move(fileName);
388 } else {
389 outputConnection = outputConnectionSpec;
391 outputConnection += m_streamPortString.value();
392 }
393 }
394 StatusCode status = AthenaPoolCnvSvc::commitOutput(outputConnection, doCommit);
395 for (auto& [ptr, rootType] : commitCache) {
396 rootType.Destruct(ptr);
397 }
398 return(status);
399}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
TTypeAdapter RootType
Definition RootType.h:211
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual void setObjPtr(void *&obj, const Token *token) override
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
const std::string & auxString() const
Access auxiliary string.
Definition Placement.h:40
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
Definition Placement.cxx:28
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition RootType.cxx:586
Bool_t IsFundamental() const
Definition RootType.cxx:731
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition Token.h:93
Token & setOid(const OID_t &oid)
Set object identifier.
Definition Token.h:85
status
Definition merge.py:16
static const DbType POOL_StorageType
Definition DbType.h:98
char rootType(char typeidType)
This function is used internally in the code when creating primitive dynamic auxiliary branches.

◆ connectOutput() [1/2]

StatusCode AthenaPoolSharedIOCnvSvc::connectOutput ( const std::string & outputConnectionSpec)
overridevirtual

Implementation of IConversionSvc: Connect to the output connection specification with open mode.

Parameters
outputConnectionSpec[IN] the name of the output connection specification as string.

Definition at line 94 of file AthenaPoolSharedIOCnvSvc.cxx.

94 {
95// This is called before DataObjects are being converted.
96 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
97 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
98 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
99 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
100 return(StatusCode::FAILURE);
101 }
102 }
103 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
104 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
105 return(StatusCode::SUCCESS);
106 }
107 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
108 if (m_parallelCompression && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) {
109 ATH_MSG_DEBUG(std::format("connectOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
110 return(StatusCode::SUCCESS);
111 }
113 ATH_MSG_DEBUG("connectOutput SKIPPED for expired server.");
114 return(StatusCode::SUCCESS);
115 }
116 }
118 outputConnection += m_streamPortString.value();
119 }
120 std::size_t apend = outputConnectionSpec.find('[');
121 if (apend != std::string::npos) {
122 outputConnection += outputConnectionSpec.substr(apend);
123 }
124 return AthenaPoolCnvSvc::connectOutput(outputConnection);
125}
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
virtual StatusCode makeClient(int num) override
Make this a client.

◆ connectOutput() [2/2]

StatusCode AthenaPoolSharedIOCnvSvc::connectOutput ( const std::string & outputConnectionSpec,
const std::string & openMode )
overridevirtual

Implementation of IConversionSvc: Connect to the output connection specification with open mode.

Parameters
outputConnectionSpec[IN] the name of the output connection specification as string.
openMode[IN] the open mode of the file as string.

Definition at line 89 of file AthenaPoolSharedIOCnvSvc.cxx.

90 {
91 return(connectOutput(outputConnectionSpec));
92}

◆ createAddress() [1/2]

StatusCode AthenaPoolSharedIOCnvSvc::createAddress ( long svcType,
const CLID & clid,
const std::string & refAddress,
IOpaqueAddress *& refpAddress )
overridevirtual

Create address from string form.

Parameters
svcType[IN] service type of the address.
clid[IN] class id for the address.
refAddress[IN] string form to be converted.
refpAddress[OUT] converted address.

Definition at line 650 of file AthenaPoolSharedIOCnvSvc.cxx.

653 {
654 return AthenaPoolCnvSvc::createAddress(svcType, clid, refAddress, refpAddress);
655}
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.

◆ createAddress() [2/2]

StatusCode AthenaPoolSharedIOCnvSvc::createAddress ( long svcType,
const CLID & clid,
const std::string * par,
const unsigned long * ip,
IOpaqueAddress *& refpAddress )
override

Create a Generic address using explicit arguments to identify a single object.

Parameters
svcType[IN] service type of the address.
clid[IN] class id for the address.
par[IN] string containing the database name.
ip[IN] object identifier.
refpAddress[OUT] converted address.

Definition at line 607 of file AthenaPoolSharedIOCnvSvc.cxx.

611 {
612 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
614 }
615 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) {
616 Token addressToken;
617 addressToken.setDb(par[0].substr(4));
618 addressToken.setCont(par[1]);
619 addressToken.setOid(Token::OID_t(ip[0], ip[1]));
620 ATH_CHECK(m_inputStreamingTool->lockObject(addressToken.toString().c_str()));
621 void* buffer = nullptr;
622 std::size_t nbytes = 0;
623 StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes);
624 while (sc.isRecoverable()) {
625 // sleep
626 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
627 }
628 if (!sc.isSuccess()) {
629 ATH_MSG_WARNING("Failed to get Address Token: " << addressToken.toString());
630 return(StatusCode::FAILURE);
631 }
632 auto token = std::make_unique<Token>();
633 token->fromString(static_cast<const char*>(buffer)); buffer = nullptr;
634 if (token->classID() == Guid::null()) {
635 token.reset();
636 }
637 m_inputStreamingTool->getObject(&buffer, nbytes).ignore();
638 if (token) {
639 refpAddress = new TokenAddress(pool::POOL_StorageType.type(), clid, "", par[1], IPoolSvc::kInputStream, std::move(token));
640 return(StatusCode::SUCCESS);
641 }
642 else {
643 return(StatusCode::RECOVERABLE);
644 }
645 } else {
646 return AthenaPoolCnvSvc::createAddress(svcType, clid, par, ip, refpAddress);
647 }
648}
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
@ kInputStream
Definition IPoolSvc.h:39
Token & setCont(const std::string &cnt)
Set container name.
Definition Token.h:71
Token & setDb(const Guid &db)
Set database name.
Definition Token.h:66
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:134

◆ disconnectOutput()

StatusCode AthenaPoolSharedIOCnvSvc::disconnectOutput ( const std::string & outputConnectionSpec)
overridevirtual

Disconnect to the output connection.

Definition at line 402 of file AthenaPoolSharedIOCnvSvc.cxx.

402 {
403 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
404 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
405 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
406 return(StatusCode::SUCCESS);
407 }
408 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
410 m_streamServerActive = false;
411 ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server.");
412 return(StatusCode::SUCCESS);
413 } else {
414 m_streamServerActive = false;
415 }
416 ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server.");
417 }
419 outputConnection += m_streamPortString.value();
420 }
421 return AthenaPoolCnvSvc::disconnectOutput(outputConnectionSpec + m_streamPortString.value());
422}
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.

◆ finalize()

StatusCode AthenaPoolSharedIOCnvSvc::finalize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 67 of file AthenaPoolSharedIOCnvSvc.cxx.

67 {
68 // Release AthenaSerializeSvc
69 if (!m_serializeSvc.empty()) {
70 if (!m_serializeSvc.release().isSuccess()) {
71 ATH_MSG_WARNING("Cannot release AthenaSerializeSvc.");
72 }
73 }
74 // Release OutputStreamingTool (if configured)
75 if (!m_outputStreamingTool.empty()) {
76 if (!m_outputStreamingTool.release().isSuccess()) {
77 ATH_MSG_WARNING("Cannot release Output AthenaIPCTool.");
78 }
79 }
80 // Release InputStreamingTool (if configured)
81 if (!m_inputStreamingTool.empty()) {
82 if (!m_inputStreamingTool.release().isSuccess()) {
83 ATH_MSG_WARNING("Cannot release Input AthenaIPCTool.");
84 }
85 }
86 return this->AthenaPoolCnvSvc::finalize();
87}
virtual StatusCode finalize() override
Required of all Gaudi Services.
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc

◆ handle()

void AthenaPoolSharedIOCnvSvc::handle ( const Incident & incident)
overridevirtual

Implementation of IIncidentListener: Handle for EndEvent incidence.

Definition at line 803 of file AthenaPoolSharedIOCnvSvc.cxx.

803 {
804 if (incident.type() == "StoreCleared" && m_outputStreamingTool->isClient() && !m_parallelCompression) {
805 m_outputStreamingTool->lockObject("release").ignore();
806 }
807}

◆ initialize()

StatusCode AthenaPoolSharedIOCnvSvc::initialize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 38 of file AthenaPoolSharedIOCnvSvc.cxx.

38 {
39 // Retrieve InputStreamingTool (if configured)
40 if (!m_inputStreamingTool.empty()) {
42 }
43 // Retrieve OutputStreamingTool (if configured)
44 if (!m_outputStreamingTool.empty()) {
46 if (m_makeStreamingToolClient.value() == -1) {
47 // Initialize AthenaRootSharedWriter
48 ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name());
49 ATH_CHECK(arswsvc.retrieve());
50 }
51 // Put PoolSvc into share mode to avoid duplicating catalog.
52 getPoolSvc()->setShareMode(true);
53 }
54 if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) {
55 // Retrieve AthenaSerializeSvc
56 ATH_CHECK(m_serializeSvc.retrieve());
57 }
58 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
59 long int pri = 1000;
60 if (!m_outputStreamingTool.empty()) {
61 incSvc->addListener(this, "StoreCleared", pri);
62 ATH_MSG_DEBUG("Subscribed to StoreCleared");
63 }
64 return this->AthenaPoolCnvSvc::initialize();
65}
virtual StatusCode initialize() override
Required of all Gaudi Services.

◆ makeClient()

StatusCode AthenaPoolSharedIOCnvSvc::makeClient ( int num)
overridevirtual

Make this a client.

Definition at line 683 of file AthenaPoolSharedIOCnvSvc.cxx.

683 {
684 if (!m_outputStreamingTool.empty()) {
685 ATH_MSG_DEBUG("makeClient: " << m_outputStreamingTool << " = " << num);
686 std::string streamPortSuffix;
687 if (m_outputStreamingTool->makeClient(num, streamPortSuffix).isFailure()) {
688 ATH_MSG_ERROR("makeClient: " << m_outputStreamingTool << " failed");
689 return(StatusCode::FAILURE);
690 } else if (m_streamPortString.value().find("localhost:0") != std::string::npos) {
691 // We don't seem to use a dedicated port per stream so doing this for the first client is probably OK
692 ATH_MSG_DEBUG("makeClient: Setting conversion service port suffix to " << streamPortSuffix);
693 m_streamPortString.setValue(streamPortSuffix);
694 }
695 }
696 if (m_inputStreamingTool.empty()) {
697 return(StatusCode::SUCCESS);
698 }
699 ATH_MSG_DEBUG("makeClient: " << m_inputStreamingTool << " = " << num);
700 std::string dummyStr;
701 return(m_inputStreamingTool->makeClient(num, dummyStr));
702}

◆ makeServer()

StatusCode AthenaPoolSharedIOCnvSvc::makeServer ( int num)
overridevirtual

Make this a server.

Definition at line 657 of file AthenaPoolSharedIOCnvSvc.cxx.

657 {
658 if (num < 0) {
659 num = -num;
661 num = num % 1024;
662 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer()) {
663 ATH_MSG_DEBUG(std::format("makeServer: {} = {}", m_outputStreamingTool.name(), num));
664 ATH_MSG_DEBUG(std::format("makeServer: Calling shared memory tool with port suffix {}", m_streamPortString.value()));
665 const std::string streamPortSuffix = m_streamPortString.value();
666 if (m_outputStreamingTool->makeServer(num, streamPortSuffix).isFailure()) {
667 ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed");
668 return(StatusCode::FAILURE);
669 }
670 // Disable PersistencySvc per output file mode, for SharedWriter Server
671 m_persSvcPerOutput.setValue(false);
672 return(StatusCode::SUCCESS);
673 }
674 return(StatusCode::RECOVERABLE);
675 }
676 if (m_inputStreamingTool.empty()) {
677 return(StatusCode::RECOVERABLE);
678 }
679 ATH_MSG_DEBUG("makeServer: " << m_inputStreamingTool << " = " << num);
680 return(m_inputStreamingTool->makeServer(num, ""));
681}

◆ readData()

StatusCode AthenaPoolSharedIOCnvSvc::readData ( )
overridevirtual

Read the next data object.

Definition at line 704 of file AthenaPoolSharedIOCnvSvc.cxx.

704 {
705 if (m_inputStreamingTool.empty()) {
706 return(StatusCode::FAILURE);
707 }
708 const char* tokenStr = nullptr;
709 int num = -1;
710 StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
711 if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
712 ATH_MSG_DEBUG("readData: " << tokenStr << ", for client: " << num);
713 } else {
714 return(sc);
715 }
716 // Read object instance via POOL/ROOT
717 void* instance = nullptr;
718 Token token;
719 token.fromString(tokenStr); tokenStr = nullptr;
720 if (token.classID() != Guid::null()) {
721 std::string objName = "ALL";
722 if (useDetailChronoStat()) {
723 objName = token.classID().toString();
724 }
725 // StopWatch listens from here until the end of this current scope
726 PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, this->m_chronoMap);
727 this->setObjPtr(instance, &token);
728 // Serialize object via ROOT
730 void* buffer = nullptr;
731 std::size_t nbytes = 0;
732 buffer = m_serializeSvc->serialize(instance, cltype, nbytes);
733 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
734 while (sc.isRecoverable()) {
735 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
736 }
737 delete [] static_cast<char*>(buffer); buffer = nullptr;
738 if (!sc.isSuccess()) {
739 ATH_MSG_ERROR("Could not share object for: " << token.toString());
740 return(StatusCode::FAILURE);
741 }
742 AuxDiscoverySvc auxDiscover;
743 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) {
744 ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString());
745 return(StatusCode::FAILURE);
746 }
747 cltype.Destruct(instance); instance = nullptr;
748 if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
749 ATH_MSG_ERROR("Could not share object for: " << token.toString());
750 return(StatusCode::FAILURE);
751 }
752 } else if (token.dbID() != Guid::null()) {
753 std::string returnToken;
754 const Token* metadataToken = getPoolSvc()->getToken("FID:" + token.dbID().toString(), token.contID(), token.oid().first);
755 if (metadataToken != nullptr) {
756 returnToken = metadataToken->toString();
757 } else {
758 returnToken = token.toString();
759 }
760 delete metadataToken; metadataToken = nullptr;
761 // Share token
762 sc = m_inputStreamingTool->putObject(returnToken.c_str(), returnToken.size() + 1, num);
763 if (!sc.isSuccess() || !m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
764 ATH_MSG_ERROR("Could not share token for: " << token.toString());
765 return(StatusCode::FAILURE);
766 }
767 } else {
768 return(StatusCode::RECOVERABLE);
769 }
770 return(StatusCode::SUCCESS);
771}
std::map< std::string, double > instance
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
const std::string & contID() const
Access container identifier.
Definition Token.h:69
const Guid & classID() const
Access database identifier.
Definition Token.h:73
const OID_t & oid() const
Access object identifier.
Definition Token.h:81
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition Token.cxx:147
const Guid & dbID() const
Access database identifier.
Definition Token.h:64
static const TypeH forGuid(const Guid &info)
Access classes by Guid.

◆ registerForWrite()

Token * AthenaPoolSharedIOCnvSvc::registerForWrite ( Placement * placement,
const void * obj,
const RootType & classDesc )
overridevirtual
Returns
a string token to a Data Object written to Pool
Parameters
placement[IN] pointer to the placement hint
obj[IN] pointer to the Data Object to be written to Pool
classDesc[IN] pointer to the Seal class description for the Data Object.

Definition at line 425 of file AthenaPoolSharedIOCnvSvc.cxx.

425 {
426 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
427 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
428 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
429 return(nullptr);
430 }
431 }
432 Token* token = nullptr;
433 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
434 && (!m_parallelCompression || placement->containerName().compare(0, m_metadataContainerProp.value().size(), m_metadataContainerProp.value()) == 0)) {
435 // Lock object
436 std::string placementStr = placement->toString();
437 placementStr += "[PNAME=";
438 placementStr += classDesc.Name();
439 placementStr += ']';
440 ATH_MSG_VERBOSE("Requesting write object for: " << placementStr);
441 StatusCode sc = m_outputStreamingTool->lockObject(placementStr.c_str());
442 while (sc.isRecoverable()) {
443 //usleep(100);
444 sc = m_outputStreamingTool->lockObject(placementStr.c_str());
445 }
446 if (!sc.isSuccess()) {
447 ATH_MSG_ERROR("Failed to lock Data for " << placementStr);
448 return(nullptr);
449 }
450 // Serialize object via ROOT
451 const void* buffer = nullptr;
452 std::size_t nbytes = 0;
453 bool own = true;
454 if (classDesc.Name() == "Token") {
455 nbytes = strlen(static_cast<const char*>(obj)) + 1;
456 buffer = obj;
457 own = false;
458 } else if (classDesc.IsFundamental()) {
459 nbytes = classDesc.SizeOf();
460 buffer = obj;
461 own = false;
462 } else {
463 buffer = m_serializeSvc->serialize(obj, classDesc, nbytes);
464 }
465 // Share object
466 sc = m_outputStreamingTool->putObject(buffer, nbytes);
467 while (sc.isRecoverable()) {
468 //usleep(100);
469 sc = m_outputStreamingTool->putObject(buffer, nbytes);
470 }
471 if (own) { delete [] static_cast<const char*>(buffer); }
472 buffer = nullptr;
473 if (!sc.isSuccess()) {
474 ATH_MSG_ERROR("Could not share object for: " << placementStr);
475 m_outputStreamingTool->putObject(nullptr, 0).ignore();
476 return(nullptr);
477 }
478 AuxDiscoverySvc auxDiscover;
479 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) {
480 ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr);
481 m_outputStreamingTool->putObject(nullptr, 0).ignore();
482 return(nullptr);
483 }
484 if (!m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) {
485 ATH_MSG_ERROR("Failed to put Data for " << placementStr);
486 return(nullptr);
487 }
488 // Get Token back from Server
489 const char* tokenStr = nullptr;
490 int num = -1;
491 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
492 while (sc.isRecoverable()) {
493 //usleep(100);
494 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
495 }
496 if (!sc.isSuccess()) {
497 ATH_MSG_ERROR("Failed to get Token");
498 return(nullptr);
499 }
500 if (!strcmp(tokenStr, "ABORT")) {
501 ATH_MSG_ERROR("Writer requested ABORT");
502 // tell the server we are leaving
503 m_outputStreamingTool->stop().ignore();
504 return nullptr;
505 }
506 Token* tempToken = new Token();
507 tempToken->fromString(tokenStr); tokenStr = nullptr;
508 tempToken->setClassID(pool::DbReflex::guid(classDesc));
509 token = tempToken; tempToken = nullptr;
510// Client Write Request
511 } else {
512 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
513 ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString());
514 Token* tempToken = new Token();
515 tempToken->setClassID(pool::DbReflex::guid(classDesc));
516 token = tempToken; tempToken = nullptr;
517 } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_streamServerActive) {
518 ATH_MSG_DEBUG("Requested write object for: " << placement->toString());
519 token = getPoolSvc()->registerForWrite(placement, obj, classDesc);
520 } else {
522 placement->setFileName(placement->fileName() + m_streamPortString.value());
523 }
524 token = AthenaPoolCnvSvc::registerForWrite(placement, obj, classDesc);
525 }
526 }
527 return(token);
528}
#define ATH_MSG_VERBOSE(x)
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
const std::string & containerName() const
Access container name.
Definition Placement.h:32
Placement & setFileName(const std::string &fileName)
Set file name.
Definition Placement.h:30
const std::string toString() const
Retrieve the string representation of the placement.
Definition Placement.cxx:15
const std::string & fileName() const
Access file name.
Definition Placement.h:28
std::string Name(unsigned int mod=Reflex::SCOPED) const
Definition RootType.cxx:612
size_t SizeOf() const
Definition RootType.cxx:765
Token & setClassID(const Guid &cl_id)
Access database identifier.
Definition Token.h:75
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.

◆ setObjPtr()

void AthenaPoolSharedIOCnvSvc::setObjPtr ( void *& obj,
const Token * token )
overridevirtual
Parameters
obj[OUT] pointer to the Data Object.
token[IN] string token of the Data Object for which a Pool Ref is filled.

Definition at line 530 of file AthenaPoolSharedIOCnvSvc.cxx.

530 {
531 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
532 if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
533 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
534 }
535 }
536 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isServer()) {
537 if (token->dbID() == Guid::null()) {
538 int num = token->oid().first;
539 // Get object from SHM
540 void* buffer = nullptr;
541 std::size_t nbytes = 0;
542 StatusCode sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
543 while (sc.isRecoverable()) {
544 //usleep(100);
545 sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
546 }
547 if (!sc.isSuccess()) {
548 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
549 obj = nullptr;
550 } else {
551 ATH_MSG_DEBUG("Server deserializing " << token->toString());
552 if (token->classID() != Guid::null()) {
553 // Deserialize object
554 RootType cltype(pool::DbReflex::forGuid(token->classID()));
555 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
556 } else {
557 // Deserialize object
558 std::string className = token->auxString();
559 className = className.substr(className.find("[PNAME="));
560 className = className.substr(7, className.find(']') - 7);
561 RootType cltype(RootType::ByNameNoQuiet(className));
562 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
563 }
564 AuxDiscoverySvc auxDiscover;
565 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, num).isSuccess()) {
566 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
567 obj = nullptr;
568 }
569 }
570 }
571 }
572 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient() && (m_streamingTechnology.value() < 0 || token->technology() == m_streamingTechnology.value())) {
573 ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
574 if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
575 ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
576 obj = nullptr;
577 } else {
578 void* buffer = nullptr;
579 std::size_t nbytes = 0;
580 StatusCode sc = StatusCode::FAILURE;
581 // StopWatch listens from here until the end of this current scope
582 {
583 PMonUtils::BasicStopWatch stopWatch("gObj_ALL", this->m_chronoMap);
584 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
585 while (sc.isRecoverable()) {
586 // sleep
587 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
588 }
589 }
590 if (!sc.isSuccess()) {
591 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
592 obj = nullptr;
593 } else {
594 obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr;
595 AuxDiscoverySvc auxDiscover;
596 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_inputStreamingTool.get(), obj).isSuccess()) {
597 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
598 obj = nullptr;
599 }
600 }
601 }
602 } else if (token->dbID() != Guid::null()) {
603 AthenaPoolCnvSvc::setObjPtr(obj, token);
604 }
605}
virtual void setObjPtr(void *&obj, const Token *token) override
Gaudi::Property< int > m_streamingTechnology
Use Streaming for selected technologies only.
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
const std::string & auxString() const
Access auxiliary string.
Definition Token.h:91
int technology() const
Access technology type.
Definition Token.h:77

◆ SvcFactory< AthenaPoolSharedIOCnvSvc >

friend class SvcFactory< AthenaPoolSharedIOCnvSvc >
friend

Definition at line 1 of file AthenaPoolSharedIOCnvSvc.h.

Member Data Documentation

◆ m_inputStreamingTool

ToolHandle<IAthenaIPCTool> AthenaPoolSharedIOCnvSvc::m_inputStreamingTool {this,"InputStreamingTool",{}}
private

Definition at line 113 of file AthenaPoolSharedIOCnvSvc.h.

113{this,"InputStreamingTool",{}};

◆ m_makeStreamingToolClient

Gaudi::Property<int> AthenaPoolSharedIOCnvSvc::m_makeStreamingToolClient {this,"MakeStreamingToolClient",0}
private

Make this instance a Streaming Client during first connect/write automatically.

Definition at line 125 of file AthenaPoolSharedIOCnvSvc.h.

125{this,"MakeStreamingToolClient",0};

◆ m_metadataClient

int AthenaPoolSharedIOCnvSvc::m_metadataClient =0
private

Definition at line 116 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_metadataContainerProp

Gaudi::Property<std::string> AthenaPoolSharedIOCnvSvc::m_metadataContainerProp {this,"OutputMetadataContainer","MetaData"}
private

For SharedWriter: To use MetadataSvc to merge data placed in a certain container.

Definition at line 121 of file AthenaPoolSharedIOCnvSvc.h.

121{this,"OutputMetadataContainer","MetaData"};

◆ m_metadataContainersAug

Gaudi::Property<std::vector<std::string> > AthenaPoolSharedIOCnvSvc::m_metadataContainersAug {this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"}
private

Definition at line 122 of file AthenaPoolSharedIOCnvSvc.h.

122{this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"};

◆ m_outputStreamingTool

ToolHandle<IAthenaIPCTool> AthenaPoolSharedIOCnvSvc::m_outputStreamingTool {this,"OutputStreamingTool",{}}
private

Definition at line 114 of file AthenaPoolSharedIOCnvSvc.h.

114{this,"OutputStreamingTool",{}};

◆ m_parallelCompression

Gaudi::Property<bool> AthenaPoolSharedIOCnvSvc::m_parallelCompression {this,"ParallelCompression",true}
private

Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.

Definition at line 129 of file AthenaPoolSharedIOCnvSvc.h.

129{this,"ParallelCompression",true};

◆ m_serializeSvc

ServiceHandle<IAthenaSerializeSvc> AthenaPoolSharedIOCnvSvc::m_serializeSvc {this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"}
private

Definition at line 112 of file AthenaPoolSharedIOCnvSvc.h.

112{this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"};

◆ m_streamingTechnology

Gaudi::Property<int> AthenaPoolSharedIOCnvSvc::m_streamingTechnology {this,"StreamingTechnology",-1}
private

Use Streaming for selected technologies only.

Definition at line 127 of file AthenaPoolSharedIOCnvSvc.h.

127{this,"StreamingTechnology",-1};

◆ m_streamPortString

Gaudi::Property<std::string> AthenaPoolSharedIOCnvSvc::m_streamPortString {this,"StreamPortString","?pmerge=localhost:0"}
private

Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".

Definition at line 131 of file AthenaPoolSharedIOCnvSvc.h.

131{this,"StreamPortString","?pmerge=localhost:0"};

◆ m_streamServerActive

bool AthenaPoolSharedIOCnvSvc::m_streamServerActive =false
private

Definition at line 115 of file AthenaPoolSharedIOCnvSvc.h.


The documentation for this class was generated from the following files: