ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaPoolSharedIOCnvSvc Class Reference

This class provides the interface between Athena and PoolSvc. More...

#include <AthenaPoolSharedIOCnvSvc.h>

Inheritance diagram for AthenaPoolSharedIOCnvSvc:
Collaboration diagram for AthenaPoolSharedIOCnvSvc:

Public Member Functions

virtual StatusCode initialize () override
 Required of all Gaudi Services.
virtual StatusCode finalize () override
 Required of all Gaudi Services.
virtual StatusCode connectOutput (const std::string &outputConnectionSpec, const std::string &openMode) override
 Implementation of IConversionSvc: Connect to the output connection specification with open mode.
virtual StatusCode connectOutput (const std::string &outputConnectionSpec) override
 Implementation of IConversionSvc: Connect to the output connection specification with open mode.
virtual StatusCode commitOutput (const std::string &outputConnectionSpec, bool doCommit) override
 Implementation of IConversionSvc: Commit pending output.
virtual StatusCode disconnectOutput (const std::string &outputConnectionSpec) override
 Disconnect to the output connection.
virtual TokenregisterForWrite (Placement *placement, const void *obj, const RootType &classDesc) override
virtual void setObjPtr (void *&obj, const Token *token) override
StatusCode createAddress (long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
 Create a Generic address using explicit arguments to identify a single object.
virtual StatusCode createAddress (long svcType, const CLID &clid, const std::string &refAddress, IOpaqueAddress *&refpAddress) override
 Create address from string form.
virtual StatusCode cleanUp (const std::string &connection) override
 Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual StatusCode makeServer (int num) override
 Make this a server.
virtual StatusCode makeClient (int num) override
 Make this a client.
virtual StatusCode readData () override
 Read the next data object.
virtual StatusCode commitCatalog () override
 Commit Catalog.
StatusCode abortSharedWrClients (int client_n)
 Send abort to SharedWriter clients if the server quits on error.
virtual void handle (const Incident &incident) override
 Implementation of IIncidentListener: Handle for EndEvent incidence.
 AthenaPoolSharedIOCnvSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor.
virtual ~AthenaPoolSharedIOCnvSvc ()=default
 Destructor.

Private Attributes

ServiceHandle< IAthenaSerializeSvcm_serializeSvc {this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"}
ToolHandle< IAthenaIPCToolm_inputStreamingTool {this,"InputStreamingTool",{}}
ToolHandle< IAthenaIPCToolm_outputStreamingTool {this,"OutputStreamingTool",{}}
bool m_streamServerActive =false
int m_metadataClient =0
Gaudi::Property< std::string > m_metadataContainerProp {this,"OutputMetadataContainer","MetaData"}
 For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug {this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"}
Gaudi::Property< int > m_makeStreamingToolClient {this,"MakeStreamingToolClient",0}
 Make this instance a Streaming Client during first connect/write automatically.
Gaudi::Property< int > m_streamingTechnology {this,"StreamingTechnology",-1}
 Use Streaming for selected technologies only.
Gaudi::Property< bool > m_parallelCompression {this,"ParallelCompression",true}
 Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Gaudi::Property< std::string > m_streamPortString {this,"StreamPortString","?pmerge=localhost:0"}
 Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
std::map< std::string, int > m_fileCommitCounter
 Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting {this,"FileFlushSetting",{}}

Friends

class SvcFactory< AthenaPoolSharedIOCnvSvc >

Detailed Description

This class provides the interface between Athena and PoolSvc.

Definition at line 25 of file AthenaPoolSharedIOCnvSvc.h.

Constructor & Destructor Documentation

◆ AthenaPoolSharedIOCnvSvc()

AthenaPoolSharedIOCnvSvc::AthenaPoolSharedIOCnvSvc ( const std::string & name,
ISvcLocator * pSvcLocator )

Standard Service Constructor.

Definition at line 825 of file AthenaPoolSharedIOCnvSvc.cxx.

825 :
826 base_class(name, pSvcLocator) {
827}

◆ ~AthenaPoolSharedIOCnvSvc()

virtual AthenaPoolSharedIOCnvSvc::~AthenaPoolSharedIOCnvSvc ( )
virtualdefault

Destructor.

Member Function Documentation

◆ abortSharedWrClients()

StatusCode AthenaPoolSharedIOCnvSvc::abortSharedWrClients ( int client_n)

Send abort to SharedWriter clients if the server quits on error.

Parameters
client_n[IN] number of the current client, -1 if no current

Definition at line 799 of file AthenaPoolSharedIOCnvSvc.cxx.

800{
801 ATH_MSG_ERROR("Sending ABORT to clients");
802 // the master process will kill this process once workers abort
803 // but it could be a time-limited loop
804 StatusCode sc = StatusCode::SUCCESS;
805 while (sc.isSuccess()) {
806 if (client_n >= 0) {
807 sc = m_outputStreamingTool->lockObject("ABORT", client_n);
808 }
809 const char* dummy;
810 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
811 while (sc.isRecoverable()) {
812 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
813 }
814 }
815 return StatusCode::FAILURE;
816}
#define ATH_MSG_ERROR(x)
static Double_t sc
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
::StatusCode StatusCode
StatusCode definition for legacy code.

◆ cleanUp()

StatusCode AthenaPoolSharedIOCnvSvc::cleanUp ( const std::string & connection)
overridevirtual

Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.

Definition at line 667 of file AthenaPoolSharedIOCnvSvc.cxx.

667 {
668 auto pos = connection.find("?pmerge=");
669 std::string conn = (pos == std::string::npos) ? connection : connection.substr(0, pos);
670 return AthenaPoolCnvSvc::cleanUp(conn);
671}
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.

◆ commitCatalog()

StatusCode AthenaPoolSharedIOCnvSvc::commitCatalog ( )
overridevirtual

Commit Catalog.

Definition at line 790 of file AthenaPoolSharedIOCnvSvc.cxx.

790 {
791 pool::IFileCatalog* catalog ATLAS_THREAD_SAFE = // This is on the SharedWriter, after mother process finishes events
792 const_cast<pool::IFileCatalog*>(getPoolSvc()->catalog());
793 catalog->commit();
794 catalog->start();
795 return(StatusCode::SUCCESS);
796}
#define ATLAS_THREAD_SAFE

◆ commitOutput()

StatusCode AthenaPoolSharedIOCnvSvc::commitOutput ( const std::string & outputConnectionSpec,
bool doCommit )
overridevirtual

Implementation of IConversionSvc: Commit pending output.

Parameters
doCommit[IN] boolean to force full commit

Definition at line 128 of file AthenaPoolSharedIOCnvSvc.cxx.

128 {
129 // This is called after all DataObjects are converted.
130 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
131 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
132 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
133 m_outputStreamingTool->lockObject("wait").ignore();
134 if (!this->cleanUp(outputConnection).isSuccess()) {
135 ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
136 return(StatusCode::FAILURE);
137 }
138 return(StatusCode::SUCCESS);
139 }
140 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
141 ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server.");
142 return(StatusCode::SUCCESS);
143 }
144 std::map<void*, RootType> commitCache;
145 std::string fileName;
146 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && m_outputStreamingTool->isServer() && m_streamServerActive) {
147 // Clear object to get Placements for all objects in a Stream
148 const char* placementStr = nullptr;
149 int num = -1;
150 StatusCode sc = m_outputStreamingTool->clearObject(&placementStr, num);
151 if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
152 const char * matchedChars = strstr(placementStr, "[FILE=");
153 if (!matchedChars){
154 ATH_MSG_ERROR(std::format("No matching filename in {}", placementStr));
155 return abortSharedWrClients(num);
156 }
157 fileName = matchedChars;
158 fileName = fileName.substr(6, fileName.find(']') - 6);
159 if (!this->connectOutput(fileName).isSuccess()) {
160 ATH_MSG_ERROR(std::format("Failed to connectOutput for {}", fileName));
161 return abortSharedWrClients(num);
162 }
163 IConverter* DHcnv = converter(ClassID_traits<DataHeader>::ID());
164 bool dataHeaderSeen = false;
165 std::string dataHeaderID;
166 while (num > 0) {
167 std::string objName = "ALL";
168 if (useDetailChronoStat()) {
169 objName = placementStr; //FIXME, better descriptor
170 }
171 // StopWatch listens from here until the end of this current scope
172 {
173 PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
174 std::string_view pStr = placementStr;
175 std::string::size_type cpos = pStr.find ("[CONT=");
176 if (cpos == std::string::npos) {
177 ATH_MSG_ERROR(std::format("No CONT field in placement string: {}", pStr));
178 return StatusCode::FAILURE;
179 }
180 std::string tokenStr (pStr.substr(0, cpos));
181 std::string contName (pStr.substr(cpos, std::string::npos));
182 std::string::size_type cl1 = contName.find(']');
183 if (cl1 == std::string::npos) {
184 ATH_MSG_ERROR(std::format("Missing close bracket after CONT field in placement string: {}", pStr));
185 return StatusCode::FAILURE;
186 }
187 tokenStr.append(contName, cl1 + 1);
188 contName = contName.substr(6, cl1 - 6);
189
190 std::string::size_type ppos = pStr.find ("[PNAME=");
191 if (ppos == std::string::npos) {
192 ATH_MSG_ERROR(std::format("No PNAME field in placement string: {}", pStr));
193 return StatusCode::FAILURE;
194 }
195 std::string className (pStr.substr(ppos, std::string::npos));
196 std::string::size_type cl2 = className.find(']');
197 if (cl2 == std::string::npos) {
198 ATH_MSG_ERROR(std::format("Missing close bracket after PNAME field in placement string: {}", pStr));
199 return StatusCode::FAILURE;
200 }
201 className = className.substr(7, cl2 - 7);
202 RootType classDesc = RootType::ByNameNoQuiet(className);
203 void* obj = nullptr;
204 const std::string numStr = std::to_string(num);
205 std::string::size_type len = m_metadataContainerProp.value().size();
206 bool foundContainer = false;
207 std::size_t opPos = contName.find('(');
208 if (contName.compare(0, opPos, m_metadataContainerProp.value()) == 0) {
209 foundContainer = true;
210 } else {
211 for (const auto& item: m_metadataContainersAug.value()) {
212 if (contName.compare(0, opPos, item) == 0){
213 foundContainer = true;
214 len = item.size();
215 break;
216 }
217 }
218 }
219 if (len > 0 && foundContainer && contName[len] == '(' ) {
220 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
221 // For Metadata, before moving to next client, fire file incidents
222 if (m_metadataClient != num) {
223 if (m_metadataClient != 0) {
224 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
225 auto guard = InputFileIncidentGuard::begin(*incSvc, name(),
226 memName, {}, /*endFileName=*/memName,
227 "BeginInputMemFile", "EndInputMemFile");
228 }
230 }
231 // Retrieve MetaDataSvc
232 ServiceHandle<IAthMetaDataSvc> metadataSvc("MetaDataSvc", name());
233 ATH_CHECK(metadataSvc.retrieve());
234 sc = metadataSvc->shmProxy(std::format("{}[NUM={}]", pStr, numStr));
235 if (sc.isRecoverable()) {
236 ATH_MSG_WARNING("MetaDataSvc::shmProxy() no proxy added.");
237 } else if (sc.isFailure()) {
238 ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!");
239 return abortSharedWrClients(num);
240 }
241 } else {
242 Token readToken;
243 readToken.setOid(Token::OID_t(num, 0));
244 readToken.setAuxString("[PNAME=" + className + "]");
245 this->setObjPtr(obj, &readToken); // Pull/read Object out of shared memory
246 if (len == 0 || contName.compare(0, len, m_metadataContainerProp.value()) != 0) {
247 // Write object
248 if( m_oneDataHeaderForm.value() ) {
249 auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
250 if( className == "DataHeaderForm_p6" ) {
251 // Pass DHForms to the converter for later writing in the correct order - do not write it now
253 "", placementWithSwn());
254 DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
255 tokenStr = "";
256 } else {
257 Placement placement;
258 placement.fromString(placementStr);
259 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
260 if (token == nullptr) {
261 ATH_MSG_ERROR("Failed to write Data for: " << className);
262 return abortSharedWrClients(num);
263 }
264 tokenStr = token->toString();
265 }
266 if( className == "DataHeader_p6" ) {
267 // Found DataHeader - call the converter to update DHForm Ref
269 tokenStr, placementWithSwn());
270 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
271 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
272 return abortSharedWrClients(num);
273 }
274 } else
275 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
276 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
277 }
278 placementStr = nullptr;
279 } else {
280 // Multiple shared DataHeaderForms
281 Placement placement;
282 placement.fromString(placementStr); placementStr = nullptr;
283 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
284 if (token == nullptr) {
285 ATH_MSG_ERROR("Failed to write Data for: " << className);
286 return abortSharedWrClients(num);
287 }
288 tokenStr = token->toString();
289 if (className == "DataHeader_p6") {
290 // Found DataHeader
292 tokenStr, placement.auxString());
293 // call DH converter to add the ref to DHForm (stored earlier) and to itself
294 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
295 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
296 return abortSharedWrClients(num);
297 }
298 dataHeaderSeen = true;
299 // This dataHeaderID is used in DataHeaderCnv to index the DataHeaderForm cache.
300 // It must be unique per worker per stream so that we have a correct DataHeader(Form) association.
301 // This is achieved by building it as "CONTID/WORKERID/DBID".
302 // CONTID, e.g., POOLContainer(DataHeader), allows us to distinguish data and metadata headers,
303 // WORKERID allows us to distinguish AthenaMP workers,
304 // and DBID allows us to distinguish streams.
305 dataHeaderID = std::format("{}/{}/{}", token->contID(), numStr, token->dbID().toString());
306 } else if (dataHeaderSeen) {
307 dataHeaderSeen = false;
308 // next object after DataHeader - may be a DataHeaderForm
309 // in any case we need to call the DH converter to update the DHForm Ref
310 if (className == "DataHeaderForm_p6") {
311 // Tell DataHeaderCnv that it should use a new DHForm
313 tokenStr, dataHeaderID);
314 if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
315 ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr);
316 return abortSharedWrClients(num);
317 }
318 } else {
319 // Tell DataHeaderCnv that it should use the old DHForm
320 GenericAddress address(0, 0, "", dataHeaderID);
321 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
322 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
323 return abortSharedWrClients(num);
324 }
325 }
326 }
327 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
328 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
329 }
330 }
331 }
332 }
333 // Send Token back to Client
334 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
335 while (sc.isRecoverable()) {
336 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
337 }
338 if (!sc.isSuccess()) {
339 ATH_MSG_ERROR("Failed to lock Data for " << tokenStr);
340 return abortSharedWrClients(-1);
341 }
342 }
343 sc = m_outputStreamingTool->clearObject(&placementStr, num);
344 while (sc.isRecoverable()) {
345 sc = m_outputStreamingTool->clearObject(&placementStr, num);
346 }
347 if (sc.isFailure()) {
348 // no more clients, break the loop and exit
349 num = -1;
350 }
351 }
352 if (dataHeaderSeen) {
353 // DataHeader was the last object, need to tell the converter there is no DHForm coming
354 GenericAddress address(0, 0, "", std::move(dataHeaderID));
355 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
356 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
357 return abortSharedWrClients(-1);
358 }
359 }
360 placementStr = nullptr;
361 } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) {
362 return(StatusCode::RECOVERABLE);
363 } else if (sc.isRecoverable() || num == -1) {
364 return(StatusCode::RECOVERABLE);
365 }
366 if (sc.isFailure() || fileName.empty()) {
367 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
368 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
369 {
370 auto guard = InputFileIncidentGuard::begin(*incSvc, name(),
371 memName, {}, /*endFileName=*/memName,
372 "BeginInputMemFile", "EndInputMemFile");
373 }
374 if (sc.isFailure()) {
375 ATH_MSG_INFO("All SharedWriter clients stopped - exiting");
376 } else {
377 ATH_MSG_INFO("Failed to get Data for client: " << num);
378 }
379 return(StatusCode::FAILURE);
380 }
381 }
382 if (m_parallelCompression && !fileName.empty()) {
383 ATH_MSG_DEBUG(std::format("commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
384 return(StatusCode::SUCCESS);
385 }
386 if (outputConnection.empty()) {
387 outputConnection = std::move(fileName);
388 } else {
389 outputConnection = outputConnectionSpec;
391 outputConnection += m_streamPortString.value();
392 }
393 }
394 std::size_t merge = outputConnection.find("?pmerge="); // Used to remove trailing TMemFile
395 const std::string baseOutputConnection = outputConnection.substr(0, merge);
396 m_fileCommitCounter[baseOutputConnection]++;
398 m_fileFlushSetting.value().contains(baseOutputConnection) &&
399 m_fileFlushSetting[baseOutputConnection] > 0 &&
400 m_fileCommitCounter[baseOutputConnection] % m_fileFlushSetting[baseOutputConnection] == 0) {
401 doCommit = true;
402 ATH_MSG_DEBUG("commitOutput sending data.");
403 }
404 StatusCode status = AthenaPoolCnvSvc::commitOutput(outputConnection, doCommit);
405 for (auto& [ptr, rootType] : commitCache) {
406 rootType.Destruct(ptr);
407 }
408 return(status);
409}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
TTypeAdapter RootType
Definition RootType.h:211
bool merge(const StringPool &other)
Merge another pool into this one.
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
std::map< std::string, int > m_fileCommitCounter
Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual void setObjPtr(void *&obj, const Token *token) override
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
static InputFileIncidentGuard begin(IIncidentSvc &incSvc, std::string_view source, std::string_view beginFileName, std::string_view guid, std::string_view endFileName={}, std::string_view beginType=IncidentType::BeginInputFile, std::string_view endType=IncidentType::EndInputFile)
Factory: fire the begin incident and return a guard whose destructor fires the matching end incident.
const std::string & auxString() const
Access auxiliary string.
Definition Placement.h:40
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
Definition Placement.cxx:28
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition RootType.cxx:586
Bool_t IsFundamental() const
Definition RootType.cxx:731
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition Token.h:93
Token & setOid(const OID_t &oid)
Set object identifier.
Definition Token.h:85
status
Definition merge.py:16
static const DbType POOL_StorageType
Definition DbType.h:84
char rootType(char typeidType)
This function is used internally in the code when creating primitive dynamic auxiliary branches.

◆ connectOutput() [1/2]

StatusCode AthenaPoolSharedIOCnvSvc::connectOutput ( const std::string & outputConnectionSpec)
overridevirtual

Implementation of IConversionSvc: Connect to the output connection specification with open mode.

Parameters
outputConnectionSpec[IN] the name of the output connection specification as string.

Definition at line 94 of file AthenaPoolSharedIOCnvSvc.cxx.

94 {
95// This is called before DataObjects are being converted.
96 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
97 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
98 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
99 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
100 return(StatusCode::FAILURE);
101 }
102 }
103 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
104 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
105 return(StatusCode::SUCCESS);
106 }
107 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
108 if (m_parallelCompression && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) {
109 ATH_MSG_DEBUG(std::format("connectOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
110 return(StatusCode::SUCCESS);
111 }
113 ATH_MSG_DEBUG("connectOutput SKIPPED for expired server.");
114 return(StatusCode::SUCCESS);
115 }
116 }
118 outputConnection += m_streamPortString.value();
119 }
120 std::size_t apend = outputConnectionSpec.find('[');
121 if (apend != std::string::npos) {
122 outputConnection += outputConnectionSpec.substr(apend);
123 }
124 return AthenaPoolCnvSvc::connectOutput(outputConnection);
125}
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
virtual StatusCode makeClient(int num) override
Make this a client.

◆ connectOutput() [2/2]

StatusCode AthenaPoolSharedIOCnvSvc::connectOutput ( const std::string & outputConnectionSpec,
const std::string & openMode )
overridevirtual

Implementation of IConversionSvc: Connect to the output connection specification with open mode.

Parameters
outputConnectionSpec[IN] the name of the output connection specification as string.
openMode[IN] the open mode of the file as string.

Definition at line 89 of file AthenaPoolSharedIOCnvSvc.cxx.

90 {
91 return(connectOutput(outputConnectionSpec));
92}

◆ createAddress() [1/2]

StatusCode AthenaPoolSharedIOCnvSvc::createAddress ( long svcType,
const CLID & clid,
const std::string & refAddress,
IOpaqueAddress *& refpAddress )
overridevirtual

Create address from string form.

Parameters
svcType[IN] service type of the address.
clid[IN] class id for the address.
refAddress[IN] string form to be converted.
refpAddress[OUT] converted address.

Definition at line 660 of file AthenaPoolSharedIOCnvSvc.cxx.

663 {
664 return AthenaPoolCnvSvc::createAddress(svcType, clid, refAddress, refpAddress);
665}
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.

◆ createAddress() [2/2]

StatusCode AthenaPoolSharedIOCnvSvc::createAddress ( long svcType,
const CLID & clid,
const std::string * par,
const unsigned long * ip,
IOpaqueAddress *& refpAddress )
override

Create a Generic address using explicit arguments to identify a single object.

Parameters
svcType[IN] service type of the address.
clid[IN] class id for the address.
par[IN] string containing the database name.
ip[IN] object identifier.
refpAddress[OUT] converted address.

Definition at line 617 of file AthenaPoolSharedIOCnvSvc.cxx.

621 {
622 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
624 }
625 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) {
626 Token addressToken;
627 addressToken.setDb(par[0].substr(4));
628 addressToken.setCont(par[1]);
629 addressToken.setOid(Token::OID_t(ip[0], ip[1]));
630 ATH_CHECK(m_inputStreamingTool->lockObject(addressToken.toString().c_str()));
631 void* buffer = nullptr;
632 std::size_t nbytes = 0;
633 StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes);
634 while (sc.isRecoverable()) {
635 // sleep
636 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
637 }
638 if (!sc.isSuccess()) {
639 ATH_MSG_WARNING("Failed to get Address Token: " << addressToken.toString());
640 return(StatusCode::FAILURE);
641 }
642 auto token = std::make_unique<Token>();
643 token->fromString(static_cast<const char*>(buffer)); buffer = nullptr;
644 if (token->classID() == Guid::null()) {
645 token.reset();
646 }
647 m_inputStreamingTool->getObject(&buffer, nbytes).ignore();
648 if (token) {
649 refpAddress = new TokenAddress(pool::POOL_StorageType.type(), clid, "", par[1], IPoolSvc::kInputStream, std::move(token));
650 return(StatusCode::SUCCESS);
651 }
652 else {
653 return(StatusCode::RECOVERABLE);
654 }
655 } else {
656 return AthenaPoolCnvSvc::createAddress(svcType, clid, par, ip, refpAddress);
657 }
658}
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
@ kInputStream
Definition IPoolSvc.h:40
Token & setCont(const std::string &cnt)
Set container name.
Definition Token.h:71
Token & setDb(const Guid &db)
Set database name.
Definition Token.h:66
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:134

◆ disconnectOutput()

StatusCode AthenaPoolSharedIOCnvSvc::disconnectOutput ( const std::string & outputConnectionSpec)
overridevirtual

Disconnect to the output connection.

Definition at line 412 of file AthenaPoolSharedIOCnvSvc.cxx.

412 {
413 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
414 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
415 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
416 return(StatusCode::SUCCESS);
417 }
418 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
420 m_streamServerActive = false;
421 ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server.");
422 return(StatusCode::SUCCESS);
423 } else {
424 m_streamServerActive = false;
425 }
426 ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server.");
427 }
429 outputConnection += m_streamPortString.value();
430 }
431 return AthenaPoolCnvSvc::disconnectOutput(outputConnectionSpec + m_streamPortString.value());
432}
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.

◆ finalize()

StatusCode AthenaPoolSharedIOCnvSvc::finalize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 67 of file AthenaPoolSharedIOCnvSvc.cxx.

67 {
68 // Release AthenaSerializeSvc
69 if (!m_serializeSvc.empty()) {
70 if (!m_serializeSvc.release().isSuccess()) {
71 ATH_MSG_WARNING("Cannot release AthenaSerializeSvc.");
72 }
73 }
74 // Release OutputStreamingTool (if configured)
75 if (!m_outputStreamingTool.empty()) {
76 if (!m_outputStreamingTool.release().isSuccess()) {
77 ATH_MSG_WARNING("Cannot release Output AthenaIPCTool.");
78 }
79 }
80 // Release InputStreamingTool (if configured)
81 if (!m_inputStreamingTool.empty()) {
82 if (!m_inputStreamingTool.release().isSuccess()) {
83 ATH_MSG_WARNING("Cannot release Input AthenaIPCTool.");
84 }
85 }
86 return this->AthenaPoolCnvSvc::finalize();
87}
virtual StatusCode finalize() override
Required of all Gaudi Services.
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc

◆ handle()

void AthenaPoolSharedIOCnvSvc::handle ( const Incident & incident)
overridevirtual

Implementation of IIncidentListener: Handle for EndEvent incidence.

Definition at line 819 of file AthenaPoolSharedIOCnvSvc.cxx.

819 {
820 if (incident.type() == "StoreCleared" && m_outputStreamingTool->isClient() && !m_parallelCompression) {
821 m_outputStreamingTool->lockObject("release").ignore();
822 }
823}

◆ initialize()

StatusCode AthenaPoolSharedIOCnvSvc::initialize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 38 of file AthenaPoolSharedIOCnvSvc.cxx.

38 {
39 // Retrieve InputStreamingTool (if configured)
40 if (!m_inputStreamingTool.empty()) {
42 }
43 // Retrieve OutputStreamingTool (if configured)
44 if (!m_outputStreamingTool.empty()) {
46 if (m_makeStreamingToolClient.value() == -1) {
47 // Initialize AthenaRootSharedWriter
48 ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name());
49 ATH_CHECK(arswsvc.retrieve());
50 }
51 // Put PoolSvc into share mode to avoid duplicating catalog.
52 getPoolSvc()->setShareMode(true);
53 }
54 if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) {
55 // Retrieve AthenaSerializeSvc
56 ATH_CHECK(m_serializeSvc.retrieve());
57 }
58 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
59 long int pri = 1000;
60 if (!m_outputStreamingTool.empty()) {
61 incSvc->addListener(this, "StoreCleared", pri);
62 ATH_MSG_DEBUG("Subscribed to StoreCleared");
63 }
64 return this->AthenaPoolCnvSvc::initialize();
65}
virtual StatusCode initialize() override
Required of all Gaudi Services.

◆ makeClient()

StatusCode AthenaPoolSharedIOCnvSvc::makeClient ( int num)
overridevirtual

Make this a client.

Definition at line 699 of file AthenaPoolSharedIOCnvSvc.cxx.

699 {
700 if (!m_outputStreamingTool.empty()) {
701 ATH_MSG_DEBUG("makeClient: " << m_outputStreamingTool << " = " << num);
702 std::string streamPortSuffix;
703 if (m_outputStreamingTool->makeClient(num, streamPortSuffix).isFailure()) {
704 ATH_MSG_ERROR("makeClient: " << m_outputStreamingTool << " failed");
705 return(StatusCode::FAILURE);
706 } else if (m_streamPortString.value().find("localhost:0") != std::string::npos) {
707 // We don't seem to use a dedicated port per stream so doing this for the first client is probably OK
708 ATH_MSG_DEBUG("makeClient: Setting conversion service port suffix to " << streamPortSuffix);
709 m_streamPortString.setValue(streamPortSuffix);
710 }
711 }
712 if (m_inputStreamingTool.empty()) {
713 return(StatusCode::SUCCESS);
714 }
715 ATH_MSG_DEBUG("makeClient: " << m_inputStreamingTool << " = " << num);
716 std::string dummyStr;
717 return(m_inputStreamingTool->makeClient(num, dummyStr));
718}

◆ makeServer()

StatusCode AthenaPoolSharedIOCnvSvc::makeServer ( int num)
overridevirtual

Make this a server.

Definition at line 673 of file AthenaPoolSharedIOCnvSvc.cxx.

673 {
674 if (num < 0) {
675 num = -num;
677 num = num % 1024;
678 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer()) {
679 ATH_MSG_DEBUG(std::format("makeServer: {} = {}", m_outputStreamingTool.name(), num));
680 ATH_MSG_DEBUG(std::format("makeServer: Calling shared memory tool with port suffix {}", m_streamPortString.value()));
681 const std::string streamPortSuffix = m_streamPortString.value();
682 if (m_outputStreamingTool->makeServer(num, streamPortSuffix).isFailure()) {
683 ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed");
684 return(StatusCode::FAILURE);
685 }
686 // Disable PersistencySvc per output file mode, for SharedWriter Server
687 m_persSvcPerOutput.setValue(false);
688 return(StatusCode::SUCCESS);
689 }
690 return(StatusCode::RECOVERABLE);
691 }
692 if (m_inputStreamingTool.empty()) {
693 return(StatusCode::RECOVERABLE);
694 }
695 ATH_MSG_DEBUG("makeServer: " << m_inputStreamingTool << " = " << num);
696 return(m_inputStreamingTool->makeServer(num, ""));
697}

◆ readData()

StatusCode AthenaPoolSharedIOCnvSvc::readData ( )
overridevirtual

Read the next data object.

Definition at line 720 of file AthenaPoolSharedIOCnvSvc.cxx.

720 {
721 if (m_inputStreamingTool.empty()) {
722 return(StatusCode::FAILURE);
723 }
724 const char* tokenStr = nullptr;
725 int num = -1;
726 StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
727 if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
728 ATH_MSG_DEBUG("readData: " << tokenStr << ", for client: " << num);
729 } else {
730 return(sc);
731 }
732 // Read object instance via POOL/ROOT
733 void* instance = nullptr;
734 Token token;
735 token.fromString(tokenStr); tokenStr = nullptr;
736 if (token.classID() != Guid::null()) {
737 std::string objName = "ALL";
738 if (useDetailChronoStat()) {
739 objName = token.classID().toString();
740 }
741 // StopWatch listens from here until the end of this current scope
742 PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, this->m_chronoMap);
743 this->setObjPtr(instance, &token);
744 // Serialize object via ROOT
746 void* buffer = nullptr;
747 std::size_t nbytes = 0;
748 buffer = m_serializeSvc->serialize(instance, cltype, nbytes);
749 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
750 while (sc.isRecoverable()) {
751 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
752 }
753 delete [] static_cast<char*>(buffer); buffer = nullptr;
754 if (!sc.isSuccess()) {
755 ATH_MSG_ERROR("Could not share object for: " << token.toString());
756 return(StatusCode::FAILURE);
757 }
758 AuxDiscoverySvc auxDiscover;
759 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) {
760 ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString());
761 return(StatusCode::FAILURE);
762 }
763 cltype.Destruct(instance); instance = nullptr;
764 if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
765 ATH_MSG_ERROR("Could not share object for: " << token.toString());
766 return(StatusCode::FAILURE);
767 }
768 } else if (token.dbID() != Guid::null()) {
769 std::string returnToken;
770 Token* metadataToken = getPoolSvc()->getToken("FID:" + token.dbID().toString(), token.contID(), token.oid().first);
771 if( metadataToken ) {
772 returnToken = metadataToken->toString();
773 metadataToken->release(); metadataToken = nullptr;
774 } else {
775 returnToken = token.toString();
776 }
777 // Share token
778 sc = m_inputStreamingTool->putObject(returnToken.c_str(), returnToken.size() + 1, num);
779 if (!sc.isSuccess() || !m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
780 ATH_MSG_ERROR("Could not share token for: " << token.toString());
781 return(StatusCode::FAILURE);
782 }
783 } else {
784 return(StatusCode::RECOVERABLE);
785 }
786 return(StatusCode::SUCCESS);
787}
std::map< std::string, double > instance
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
const std::string & contID() const
Access container identifier.
Definition Token.h:69
const Guid & classID() const
Access database identifier.
Definition Token.h:73
int release()
Release token: Decrease reference count and eventually delete.
Definition Token.cxx:80
const OID_t & oid() const
Access object identifier.
Definition Token.h:81
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition Token.cxx:147
const Guid & dbID() const
Access database identifier.
Definition Token.h:64
static const TypeH forGuid(const Guid &info)
Access classes by Guid.

◆ registerForWrite()

Token * AthenaPoolSharedIOCnvSvc::registerForWrite ( Placement * placement,
const void * obj,
const RootType & classDesc )
overridevirtual
Returns
a string token to a Data Object written to Pool
Parameters
placement[IN] pointer to the placement hint
obj[IN] pointer to the Data Object to be written to Pool
classDesc[IN] pointer to the Seal class description for the Data Object.

Definition at line 435 of file AthenaPoolSharedIOCnvSvc.cxx.

435 {
436 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
437 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
438 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
439 return(nullptr);
440 }
441 }
442 Token* token = nullptr;
443 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
444 && (!m_parallelCompression || placement->containerName().compare(0, m_metadataContainerProp.value().size(), m_metadataContainerProp.value()) == 0)) {
445 // Lock object
446 std::string placementStr = placement->toString();
447 placementStr += "[PNAME=";
448 placementStr += classDesc.Name();
449 placementStr += ']';
450 ATH_MSG_VERBOSE("Requesting write object for: " << placementStr);
451 StatusCode sc = m_outputStreamingTool->lockObject(placementStr.c_str());
452 while (sc.isRecoverable()) {
453 //usleep(100);
454 sc = m_outputStreamingTool->lockObject(placementStr.c_str());
455 }
456 if (!sc.isSuccess()) {
457 ATH_MSG_ERROR("Failed to lock Data for " << placementStr);
458 return(nullptr);
459 }
460 // Serialize object via ROOT
461 const void* buffer = nullptr;
462 std::size_t nbytes = 0;
463 bool own = true;
464 if (classDesc.Name() == "Token") {
465 nbytes = strlen(static_cast<const char*>(obj)) + 1;
466 buffer = obj;
467 own = false;
468 } else if (classDesc.IsFundamental()) {
469 nbytes = classDesc.SizeOf();
470 buffer = obj;
471 own = false;
472 } else {
473 buffer = m_serializeSvc->serialize(obj, classDesc, nbytes);
474 }
475 // Share object
476 sc = m_outputStreamingTool->putObject(buffer, nbytes);
477 while (sc.isRecoverable()) {
478 //usleep(100);
479 sc = m_outputStreamingTool->putObject(buffer, nbytes);
480 }
481 if (own) { delete [] static_cast<const char*>(buffer); }
482 buffer = nullptr;
483 if (!sc.isSuccess()) {
484 ATH_MSG_ERROR("Could not share object for: " << placementStr);
485 m_outputStreamingTool->putObject(nullptr, 0).ignore();
486 return(nullptr);
487 }
488 AuxDiscoverySvc auxDiscover;
489 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) {
490 ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr);
491 m_outputStreamingTool->putObject(nullptr, 0).ignore();
492 return(nullptr);
493 }
494 if (!m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) {
495 ATH_MSG_ERROR("Failed to put Data for " << placementStr);
496 return(nullptr);
497 }
498 // Get Token back from Server
499 const char* tokenStr = nullptr;
500 int num = -1;
501 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
502 while (sc.isRecoverable()) {
503 //usleep(100);
504 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
505 }
506 if (!sc.isSuccess()) {
507 ATH_MSG_ERROR("Failed to get Token");
508 return(nullptr);
509 }
510 if (!strcmp(tokenStr, "ABORT")) {
511 ATH_MSG_ERROR("Writer requested ABORT");
512 // tell the server we are leaving
513 m_outputStreamingTool->stop().ignore();
514 return nullptr;
515 }
516 Token* tempToken = new Token();
517 tempToken->fromString(tokenStr); tokenStr = nullptr;
518 tempToken->setClassID(pool::DbReflex::guid(classDesc));
519 token = tempToken; tempToken = nullptr;
520// Client Write Request
521 } else {
522 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
523 ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString());
524 Token* tempToken = new Token();
525 tempToken->setClassID(pool::DbReflex::guid(classDesc));
526 token = tempToken; tempToken = nullptr;
527 } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_streamServerActive) {
528 ATH_MSG_DEBUG("Requested write object for: " << placement->toString());
529 token = getPoolSvc()->registerForWrite(placement, obj, classDesc);
530 } else {
532 placement->setFileName(placement->fileName() + m_streamPortString.value());
533 }
534 token = AthenaPoolCnvSvc::registerForWrite(placement, obj, classDesc);
535 }
536 }
537 return(token);
538}
#define ATH_MSG_VERBOSE(x)
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
const std::string & containerName() const
Access container name.
Definition Placement.h:32
Placement & setFileName(const std::string &fileName)
Set file name.
Definition Placement.h:30
const std::string toString() const
Retrieve the string representation of the placement.
Definition Placement.cxx:15
const std::string & fileName() const
Access file name.
Definition Placement.h:28
std::string Name(unsigned int mod=Reflex::SCOPED) const
Definition RootType.cxx:612
size_t SizeOf() const
Definition RootType.cxx:765
Token & setClassID(const Guid &cl_id)
Access database identifier.
Definition Token.h:75
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.

◆ setObjPtr()

void AthenaPoolSharedIOCnvSvc::setObjPtr ( void *& obj,
const Token * token )
overridevirtual
Parameters
obj[OUT] pointer to the Data Object.
token[IN] string token of the Data Object for which a Pool Ref is filled.

Definition at line 540 of file AthenaPoolSharedIOCnvSvc.cxx.

540 {
541 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
542 if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
543 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
544 }
545 }
546 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isServer()) {
547 if (token->dbID() == Guid::null()) {
548 int num = token->oid().first;
549 // Get object from SHM
550 void* buffer = nullptr;
551 std::size_t nbytes = 0;
552 StatusCode sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
553 while (sc.isRecoverable()) {
554 //usleep(100);
555 sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
556 }
557 if (!sc.isSuccess()) {
558 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
559 obj = nullptr;
560 } else {
561 ATH_MSG_DEBUG("Server deserializing " << token->toString());
562 if (token->classID() != Guid::null()) {
563 // Deserialize object
564 RootType cltype(pool::DbReflex::forGuid(token->classID()));
565 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
566 } else {
567 // Deserialize object
568 std::string className = token->auxString();
569 className = className.substr(className.find("[PNAME="));
570 className = className.substr(7, className.find(']') - 7);
571 RootType cltype(RootType::ByNameNoQuiet(className));
572 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
573 }
574 AuxDiscoverySvc auxDiscover;
575 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, num).isSuccess()) {
576 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
577 obj = nullptr;
578 }
579 }
580 }
581 }
582 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient() && (m_streamingTechnology.value() < 0 || token->technology() == m_streamingTechnology.value())) {
583 ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
584 if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
585 ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
586 obj = nullptr;
587 } else {
588 void* buffer = nullptr;
589 std::size_t nbytes = 0;
590 StatusCode sc = StatusCode::FAILURE;
591 // StopWatch listens from here until the end of this current scope
592 {
593 PMonUtils::BasicStopWatch stopWatch("gObj_ALL", this->m_chronoMap);
594 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
595 while (sc.isRecoverable()) {
596 // sleep
597 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
598 }
599 }
600 if (!sc.isSuccess()) {
601 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
602 obj = nullptr;
603 } else {
604 obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr;
605 AuxDiscoverySvc auxDiscover;
606 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_inputStreamingTool.get(), obj).isSuccess()) {
607 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
608 obj = nullptr;
609 }
610 }
611 }
612 } else if (token->dbID() != Guid::null()) {
613 AthenaPoolCnvSvc::setObjPtr(obj, token);
614 }
615}
virtual void setObjPtr(void *&obj, const Token *token) override
Gaudi::Property< int > m_streamingTechnology
Use Streaming for selected technologies only.
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
const std::string & auxString() const
Access auxiliary string.
Definition Token.h:91
int technology() const
Access technology type.
Definition Token.h:77

◆ SvcFactory< AthenaPoolSharedIOCnvSvc >

friend class SvcFactory< AthenaPoolSharedIOCnvSvc >
friend

Definition at line 1 of file AthenaPoolSharedIOCnvSvc.h.

Member Data Documentation

◆ m_fileCommitCounter

std::map<std::string, int> AthenaPoolSharedIOCnvSvc::m_fileCommitCounter
private

Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.

Definition at line 136 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_fileFlushSetting

Gaudi::Property<std::map<std::string, int> > AthenaPoolSharedIOCnvSvc::m_fileFlushSetting {this,"FileFlushSetting",{}}
private

Definition at line 137 of file AthenaPoolSharedIOCnvSvc.h.

137{this,"FileFlushSetting",{}};

◆ m_inputStreamingTool

ToolHandle<IAthenaIPCTool> AthenaPoolSharedIOCnvSvc::m_inputStreamingTool {this,"InputStreamingTool",{}}
private

Definition at line 116 of file AthenaPoolSharedIOCnvSvc.h.

116{this,"InputStreamingTool",{}};

◆ m_makeStreamingToolClient

Gaudi::Property<int> AthenaPoolSharedIOCnvSvc::m_makeStreamingToolClient {this,"MakeStreamingToolClient",0}
private

Make this instance a Streaming Client during first connect/write automatically.

Definition at line 128 of file AthenaPoolSharedIOCnvSvc.h.

128{this,"MakeStreamingToolClient",0};

◆ m_metadataClient

int AthenaPoolSharedIOCnvSvc::m_metadataClient =0
private

Definition at line 119 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_metadataContainerProp

Gaudi::Property<std::string> AthenaPoolSharedIOCnvSvc::m_metadataContainerProp {this,"OutputMetadataContainer","MetaData"}
private

For SharedWriter: To use MetadataSvc to merge data placed in a certain container.

Definition at line 124 of file AthenaPoolSharedIOCnvSvc.h.

124{this,"OutputMetadataContainer","MetaData"};

◆ m_metadataContainersAug

Gaudi::Property<std::vector<std::string> > AthenaPoolSharedIOCnvSvc::m_metadataContainersAug {this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"}
private

Definition at line 125 of file AthenaPoolSharedIOCnvSvc.h.

125{this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"};

◆ m_outputStreamingTool

ToolHandle<IAthenaIPCTool> AthenaPoolSharedIOCnvSvc::m_outputStreamingTool {this,"OutputStreamingTool",{}}
private

Definition at line 117 of file AthenaPoolSharedIOCnvSvc.h.

117{this,"OutputStreamingTool",{}};

◆ m_parallelCompression

Gaudi::Property<bool> AthenaPoolSharedIOCnvSvc::m_parallelCompression {this,"ParallelCompression",true}
private

Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.

Definition at line 132 of file AthenaPoolSharedIOCnvSvc.h.

132{this,"ParallelCompression",true};

◆ m_serializeSvc

ServiceHandle<IAthenaSerializeSvc> AthenaPoolSharedIOCnvSvc::m_serializeSvc {this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"}
private

Definition at line 115 of file AthenaPoolSharedIOCnvSvc.h.

115{this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"};

◆ m_streamingTechnology

Gaudi::Property<int> AthenaPoolSharedIOCnvSvc::m_streamingTechnology {this,"StreamingTechnology",-1}
private

Use Streaming for selected technologies only.

Definition at line 130 of file AthenaPoolSharedIOCnvSvc.h.

130{this,"StreamingTechnology",-1};

◆ m_streamPortString

Gaudi::Property<std::string> AthenaPoolSharedIOCnvSvc::m_streamPortString {this,"StreamPortString","?pmerge=localhost:0"}
private

Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".

Definition at line 134 of file AthenaPoolSharedIOCnvSvc.h.

134{this,"StreamPortString","?pmerge=localhost:0"};

◆ m_streamServerActive

bool AthenaPoolSharedIOCnvSvc::m_streamServerActive =false
private

Definition at line 118 of file AthenaPoolSharedIOCnvSvc.h.


The documentation for this class was generated from the following files: