ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaPoolSharedIOCnvSvc Class Reference

This class provides the interface between Athena and PoolSvc. More...

#include <AthenaPoolSharedIOCnvSvc.h>

Inheritance diagram for AthenaPoolSharedIOCnvSvc:
Collaboration diagram for AthenaPoolSharedIOCnvSvc:

Public Member Functions

virtual StatusCode initialize () override
 Required of all Gaudi Services.
virtual StatusCode finalize () override
 Required of all Gaudi Services.
virtual StatusCode connectOutput (const std::string &outputConnectionSpec, const std::string &openMode) override
 Implementation of IConversionSvc: Connect to the output connection specification with open mode.
virtual StatusCode connectOutput (const std::string &outputConnectionSpec) override
 Implementation of IConversionSvc: Connect to the output connection specification with open mode.
virtual StatusCode commitOutput (const std::string &outputConnectionSpec, bool doCommit) override
 Implementation of IConversionSvc: Commit pending output.
virtual StatusCode disconnectOutput (const std::string &outputConnectionSpec) override
 Disconnect to the output connection.
virtual TokenregisterForWrite (Placement *placement, const void *obj, const RootType &classDesc) override
virtual void setObjPtr (void *&obj, const Token *token) override
StatusCode createAddress (long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
 Create a Generic address using explicit arguments to identify a single object.
virtual StatusCode createAddress (long svcType, const CLID &clid, const std::string &refAddress, IOpaqueAddress *&refpAddress) override
 Create address from string form.
virtual StatusCode cleanUp (const std::string &connection) override
 Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual StatusCode makeServer (int num) override
 Make this a server.
virtual StatusCode makeClient (int num) override
 Make this a client.
virtual StatusCode readData () override
 Read the next data object.
virtual StatusCode commitCatalog () override
 Commit Catalog.
StatusCode abortSharedWrClients (int client_n)
 Send abort to SharedWriter clients if the server quits on error.
virtual void handle (const Incident &incident) override
 Implementation of IIncidentListener: Handle for EndEvent incidence.
 AthenaPoolSharedIOCnvSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor.
virtual ~AthenaPoolSharedIOCnvSvc ()=default
 Destructor.

Private Attributes

ServiceHandle< IAthenaSerializeSvcm_serializeSvc {this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"}
ToolHandle< IAthenaIPCToolm_inputStreamingTool {this,"InputStreamingTool",{}}
ToolHandle< IAthenaIPCToolm_outputStreamingTool {this,"OutputStreamingTool",{}}
bool m_streamServerActive =false
int m_metadataClient =0
Gaudi::Property< std::string > m_metadataContainerProp {this,"OutputMetadataContainer","MetaData"}
 For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug {this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"}
Gaudi::Property< int > m_makeStreamingToolClient {this,"MakeStreamingToolClient",0}
 Make this instance a Streaming Client during first connect/write automatically.
Gaudi::Property< int > m_streamingTechnology {this,"StreamingTechnology",-1}
 Use Streaming for selected technologies only.
Gaudi::Property< bool > m_parallelCompression {this,"ParallelCompression",true}
 Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Gaudi::Property< std::string > m_streamPortString {this,"StreamPortString","?pmerge=localhost:0"}
 Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
std::map< std::string, int > m_fileCommitCounter
 Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting {this,"FileFlushSetting",{}}

Friends

class SvcFactory< AthenaPoolSharedIOCnvSvc >

Detailed Description

This class provides the interface between Athena and PoolSvc.

Definition at line 25 of file AthenaPoolSharedIOCnvSvc.h.

Constructor & Destructor Documentation

◆ AthenaPoolSharedIOCnvSvc()

AthenaPoolSharedIOCnvSvc::AthenaPoolSharedIOCnvSvc ( const std::string & name,
ISvcLocator * pSvcLocator )

Standard Service Constructor.

Definition at line 831 of file AthenaPoolSharedIOCnvSvc.cxx.

831 :
832 base_class(name, pSvcLocator) {
833}

◆ ~AthenaPoolSharedIOCnvSvc()

virtual AthenaPoolSharedIOCnvSvc::~AthenaPoolSharedIOCnvSvc ( )
virtualdefault

Destructor.

Member Function Documentation

◆ abortSharedWrClients()

StatusCode AthenaPoolSharedIOCnvSvc::abortSharedWrClients ( int client_n)

Send abort to SharedWriter clients if the server quits on error.

Parameters
client_n[IN] number of the current client, -1 if no current

Definition at line 805 of file AthenaPoolSharedIOCnvSvc.cxx.

806{
807 ATH_MSG_ERROR("Sending ABORT to clients");
808 // the master process will kill this process once workers abort
809 // but it could be a time-limited loop
810 StatusCode sc = StatusCode::SUCCESS;
811 while (sc.isSuccess()) {
812 if (client_n >= 0) {
813 sc = m_outputStreamingTool->lockObject("ABORT", client_n);
814 }
815 const char* dummy;
816 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
817 while (sc.isRecoverable()) {
818 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
819 }
820 }
821 return StatusCode::FAILURE;
822}
#define ATH_MSG_ERROR(x)
static Double_t sc
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
::StatusCode StatusCode
StatusCode definition for legacy code.

◆ cleanUp()

StatusCode AthenaPoolSharedIOCnvSvc::cleanUp ( const std::string & connection)
overridevirtual

Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.

Definition at line 673 of file AthenaPoolSharedIOCnvSvc.cxx.

673 {
674 auto pos = connection.find("?pmerge=");
675 std::string conn = (pos == std::string::npos) ? connection : connection.substr(0, pos);
676 return AthenaPoolCnvSvc::cleanUp(conn);
677}
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.

◆ commitCatalog()

StatusCode AthenaPoolSharedIOCnvSvc::commitCatalog ( )
overridevirtual

Commit Catalog.

Definition at line 796 of file AthenaPoolSharedIOCnvSvc.cxx.

796 {
797 pool::IFileCatalog* catalog ATLAS_THREAD_SAFE = // This is on the SharedWriter, after mother process finishes events
798 const_cast<pool::IFileCatalog*>(getPoolSvc()->catalog());
799 catalog->commit();
800 catalog->start();
801 return(StatusCode::SUCCESS);
802}
#define ATLAS_THREAD_SAFE

◆ commitOutput()

StatusCode AthenaPoolSharedIOCnvSvc::commitOutput ( const std::string & outputConnectionSpec,
bool doCommit )
overridevirtual

Implementation of IConversionSvc: Commit pending output.

Parameters
doCommit[IN] boolean to force full commit

Definition at line 131 of file AthenaPoolSharedIOCnvSvc.cxx.

131 {
132 // This is called after all DataObjects are converted.
133 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
134 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
135 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
136 m_outputStreamingTool->lockObject("wait").ignore();
137 if (!this->cleanUp(outputConnection).isSuccess()) {
138 ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
139 return(StatusCode::FAILURE);
140 }
141 return(StatusCode::SUCCESS);
142 }
143 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
144 ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server.");
145 return(StatusCode::SUCCESS);
146 }
147 std::map<void*, RootType> commitCache;
148 std::string fileName;
149 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && m_outputStreamingTool->isServer() && m_streamServerActive) {
150 // Clear object to get Placements for all objects in a Stream
151 const char* placementStr = nullptr;
152 int num = -1;
153 StatusCode sc = m_outputStreamingTool->clearObject(&placementStr, num);
154 if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
155 const char * matchedChars = strstr(placementStr, "[FILE=");
156 if (!matchedChars){
157 ATH_MSG_ERROR(std::format("No matching filename in {}", placementStr));
158 return abortSharedWrClients(num);
159 }
160 fileName = matchedChars;
161 fileName = fileName.substr(6, fileName.find(']') - 6);
162 if (!this->connectOutput(fileName).isSuccess()) {
163 ATH_MSG_ERROR(std::format("Failed to connectOutput for {}", fileName));
164 return abortSharedWrClients(num);
165 }
166 IConverter* DHcnv = converter(ClassID_traits<DataHeader>::ID());
167 bool dataHeaderSeen = false;
168 std::string dataHeaderID;
169 while (num > 0) {
170 std::string objName = "ALL";
171 if (useDetailChronoStat()) {
172 objName = placementStr; //FIXME, better descriptor
173 }
174 // StopWatch listens from here until the end of this current scope
175 {
176 PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
177 std::string_view pStr = placementStr;
178 std::string::size_type cpos = pStr.find ("[CONT=");
179 if (cpos == std::string::npos) {
180 ATH_MSG_ERROR(std::format("No CONT field in placement string: {}", pStr));
181 return StatusCode::FAILURE;
182 }
183 std::string tokenStr (pStr.substr(0, cpos));
184 std::string contName (pStr.substr(cpos, std::string::npos));
185 std::string::size_type cl1 = contName.find(']');
186 if (cl1 == std::string::npos) {
187 ATH_MSG_ERROR(std::format("Missing close bracket after CONT field in placement string: {}", pStr));
188 return StatusCode::FAILURE;
189 }
190 tokenStr.append(contName, cl1 + 1);
191 contName = contName.substr(6, cl1 - 6);
192
193 std::string::size_type ppos = pStr.find ("[PNAME=");
194 if (ppos == std::string::npos) {
195 ATH_MSG_ERROR(std::format("No PNAME field in placement string: {}", pStr));
196 return StatusCode::FAILURE;
197 }
198 std::string className (pStr.substr(ppos, std::string::npos));
199 std::string::size_type cl2 = className.find(']');
200 if (cl2 == std::string::npos) {
201 ATH_MSG_ERROR(std::format("Missing close bracket after PNAME field in placement string: {}", pStr));
202 return StatusCode::FAILURE;
203 }
204 className = className.substr(7, cl2 - 7);
205 RootType classDesc = RootType::ByNameNoQuiet(className);
206 void* obj = nullptr;
207 const std::string numStr = std::to_string(num);
208 std::string::size_type len = m_metadataContainerProp.value().size();
209 bool foundContainer = false;
210 std::size_t opPos = contName.find('(');
211 if (contName.compare(0, opPos, m_metadataContainerProp.value()) == 0) {
212 foundContainer = true;
213 } else {
214 for (const auto& item: m_metadataContainersAug.value()) {
215 if (contName.compare(0, opPos, item) == 0){
216 foundContainer = true;
217 len = item.size();
218 break;
219 }
220 }
221 }
222 if (len > 0 && foundContainer && contName[len] == '(' ) {
223 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
224 // For Metadata, before moving to next client, fire file incidents
225 if (m_metadataClient != num) {
226 if (m_metadataClient != 0) {
227 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
228 auto guard = InputFileIncidentGuard::begin(*incSvc, name(),
229 memName, {}, /*endFileName=*/memName,
230 "BeginInputMemFile", "EndInputMemFile");
231 }
233 }
234 // Retrieve MetaDataSvc
235 ServiceHandle<IAthMetaDataSvc> metadataSvc("MetaDataSvc", name());
236 ATH_CHECK(metadataSvc.retrieve());
237 sc = metadataSvc->shmProxy(std::format("{}[NUM={}]", pStr, numStr));
238 if (sc.isRecoverable()) {
239 ATH_MSG_WARNING("MetaDataSvc::shmProxy() no proxy added.");
240 } else if (sc.isFailure()) {
241 ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!");
242 return abortSharedWrClients(num);
243 }
244 } else {
245 Token readToken;
246 readToken.setOid(Token::OID_t(num, 0));
247 readToken.setAuxString("[PNAME=" + className + "]");
248 this->setObjPtr(obj, &readToken); // Pull/read Object out of shared memory
249 if (len == 0 || contName.compare(0, len, m_metadataContainerProp.value()) != 0) {
250 // Write object
251 if( m_oneDataHeaderForm.value() ) {
252 auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
253 if( className == "DataHeaderForm_p6" ) {
254 // Pass DHForms to the converter for later writing in the correct order - do not write it now
256 "", placementWithSwn());
257 DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
258 tokenStr = "";
259 } else {
260 Placement placement;
261 placement.fromString(placementStr);
262 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
263 if (token == nullptr) {
264 ATH_MSG_ERROR("Failed to write Data for: " << className);
265 return abortSharedWrClients(num);
266 }
267 tokenStr = token->toString();
268 }
269 if( className == "DataHeader_p6" ) {
270 // Found DataHeader - call the converter to update DHForm Ref
272 tokenStr, placementWithSwn());
273 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
274 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
275 return abortSharedWrClients(num);
276 }
277 } else
278 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
279 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
280 }
281 placementStr = nullptr;
282 } else {
283 // Multiple shared DataHeaderForms
284 Placement placement;
285 placement.fromString(placementStr); placementStr = nullptr;
286 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
287 if (token == nullptr) {
288 ATH_MSG_ERROR("Failed to write Data for: " << className);
289 return abortSharedWrClients(num);
290 }
291 tokenStr = token->toString();
292 if (className == "DataHeader_p6") {
293 // Found DataHeader
295 tokenStr, placement.auxString());
296 // call DH converter to add the ref to DHForm (stored earlier) and to itself
297 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
298 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
299 return abortSharedWrClients(num);
300 }
301 dataHeaderSeen = true;
302 // This dataHeaderID is used in DataHeaderCnv to index the DataHeaderForm cache.
303 // It must be unique per worker per stream so that we have a correct DataHeader(Form) association.
304 // This is achieved by building it as "CONTID/WORKERID/DBID".
305 // CONTID, e.g., POOLContainer(DataHeader), allows us to distinguish data and metadata headers,
306 // WORKERID allows us to distinguish AthenaMP workers,
307 // and DBID allows us to distinguish streams.
308 dataHeaderID = std::format("{}/{}/{}", token->contID(), numStr, token->dbID().toString());
309 } else if (dataHeaderSeen) {
310 dataHeaderSeen = false;
311 // next object after DataHeader - may be a DataHeaderForm
312 // in any case we need to call the DH converter to update the DHForm Ref
313 if (className == "DataHeaderForm_p6") {
314 // Tell DataHeaderCnv that it should use a new DHForm
316 tokenStr, dataHeaderID);
317 if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
318 ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr);
319 return abortSharedWrClients(num);
320 }
321 } else {
322 // Tell DataHeaderCnv that it should use the old DHForm
323 GenericAddress address(0, 0, "", dataHeaderID);
324 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
325 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
326 return abortSharedWrClients(num);
327 }
328 }
329 }
330 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
331 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
332 }
333 }
334 }
335 }
336 // Send Token back to Client
337 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
338 while (sc.isRecoverable()) {
339 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
340 }
341 if (!sc.isSuccess()) {
342 ATH_MSG_ERROR("Failed to lock Data for " << tokenStr);
343 return abortSharedWrClients(-1);
344 }
345 }
346 sc = m_outputStreamingTool->clearObject(&placementStr, num);
347 while (sc.isRecoverable()) {
348 sc = m_outputStreamingTool->clearObject(&placementStr, num);
349 }
350 if (sc.isFailure()) {
351 // no more clients, break the loop and exit
352 num = -1;
353 }
354 }
355 if (dataHeaderSeen) {
356 // DataHeader was the last object, need to tell the converter there is no DHForm coming
357 GenericAddress address(0, 0, "", std::move(dataHeaderID));
358 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
359 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
360 return abortSharedWrClients(-1);
361 }
362 }
363 placementStr = nullptr;
364 } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) {
365 return(StatusCode::RECOVERABLE);
366 } else if (sc.isRecoverable() || num == -1) {
367 return(StatusCode::RECOVERABLE);
368 }
369 if (sc.isFailure() || fileName.empty()) {
370 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
371 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
372 {
373 auto guard = InputFileIncidentGuard::begin(*incSvc, name(),
374 memName, {}, /*endFileName=*/memName,
375 "BeginInputMemFile", "EndInputMemFile");
376 }
377 if (sc.isFailure()) {
378 ATH_MSG_INFO("All SharedWriter clients stopped - exiting");
379 } else {
380 ATH_MSG_INFO("Failed to get Data for client: " << num);
381 }
382 return(StatusCode::FAILURE);
383 }
384 }
385 if (m_parallelCompression && !fileName.empty()) {
386 ATH_MSG_DEBUG(std::format("commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
387 return(StatusCode::SUCCESS);
388 }
389 if (outputConnection.empty()) {
390 outputConnection = std::move(fileName);
391 } else {
392 outputConnection = outputConnectionSpec;
394 outputConnection += m_streamPortString.value();
395 }
396 }
397 std::size_t merge = outputConnection.find("?pmerge="); // Used to remove trailing TMemFile
398 const std::string baseOutputConnection = outputConnection.substr(0, merge);
399 m_fileCommitCounter[baseOutputConnection]++;
401 m_fileFlushSetting.value().contains(baseOutputConnection) &&
402 m_fileFlushSetting[baseOutputConnection] > 0 &&
403 m_fileCommitCounter[baseOutputConnection] % m_fileFlushSetting[baseOutputConnection] == 0) {
404 doCommit = true;
405 ATH_MSG_DEBUG("commitOutput sending data.");
406 }
407 StatusCode status = AthenaPoolCnvSvc::commitOutput(outputConnection, doCommit);
408 for (auto& [ptr, rootType] : commitCache) {
409 rootType.Destruct(ptr);
410 }
411 return(status);
412}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
TTypeAdapter RootType
Definition RootType.h:211
bool merge(const StringPool &other)
Merge another pool into this one.
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
std::map< std::string, int > m_fileCommitCounter
Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual void setObjPtr(void *&obj, const Token *token) override
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
static InputFileIncidentGuard begin(IIncidentSvc &incSvc, std::string_view source, std::string_view beginFileName, std::string_view guid, std::string_view endFileName={}, std::string_view beginType=IncidentType::BeginInputFile, std::string_view endType=IncidentType::EndInputFile)
Factory: fire the begin incident and return a guard whose destructor fires the matching end incident.
const std::string & auxString() const
Access auxiliary string.
Definition Placement.h:41
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
Definition Placement.cxx:28
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition RootType.cxx:586
Bool_t IsFundamental() const
Definition RootType.cxx:731
Token & setOid(const OID_t &oid)
Set object identifier.
Definition Token.h:86
Token & setAuxString(std::string &&auxString)
Set auxiliary string.
Definition Token.h:94
status
Definition merge.py:16
static const DbType POOL_StorageType
Definition DbType.h:84
char rootType(char typeidType)
This function is used internally in the code when creating primitive dynamic auxiliary branches.

◆ connectOutput() [1/2]

StatusCode AthenaPoolSharedIOCnvSvc::connectOutput ( const std::string & outputConnectionSpec)
overridevirtual

Implementation of IConversionSvc: Connect to the output connection specification with open mode.

Parameters
outputConnectionSpec[IN] the name of the output connection specification as string.

Definition at line 94 of file AthenaPoolSharedIOCnvSvc.cxx.

94 {
95// This is called before DataObjects are being converted.
96 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
97 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
98 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
99 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
100 return(StatusCode::FAILURE);
101 }
102 }
103 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
104 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
105 return(StatusCode::SUCCESS);
106 }
107 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
108 if (m_parallelCompression && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) {
109 ATH_MSG_DEBUG(std::format("connectOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
110 return(StatusCode::SUCCESS);
111 }
113 ATH_MSG_DEBUG("connectOutput SKIPPED for expired server.");
114 return(StatusCode::SUCCESS);
115 }
116 }
118 outputConnection += m_streamPortString.value();
119 }
120 std::size_t apend = outputConnectionSpec.find('[');
121 if (apend != std::string::npos) {
122 outputConnection += outputConnectionSpec.substr(apend);
123 }
124 if (outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos) {
125 return AthenaPoolCnvSvc::connectOutput(outputConnection, "APPEND");
126 }
127 return AthenaPoolCnvSvc::connectOutput(outputConnection);
128}
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
virtual StatusCode makeClient(int num) override
Make this a client.

◆ connectOutput() [2/2]

StatusCode AthenaPoolSharedIOCnvSvc::connectOutput ( const std::string & outputConnectionSpec,
const std::string & openMode )
overridevirtual

Implementation of IConversionSvc: Connect to the output connection specification with open mode.

Parameters
outputConnectionSpec[IN] the name of the output connection specification as string.
openMode[IN] the open mode of the file as string.

Definition at line 89 of file AthenaPoolSharedIOCnvSvc.cxx.

90 {
91 return AthenaPoolCnvSvc::connectOutput(outputConnectionSpec, openMode);
92}

◆ createAddress() [1/2]

StatusCode AthenaPoolSharedIOCnvSvc::createAddress ( long svcType,
const CLID & clid,
const std::string & refAddress,
IOpaqueAddress *& refpAddress )
overridevirtual

Create address from string form.

Parameters
svcType[IN] service type of the address.
clid[IN] class id for the address.
refAddress[IN] string form to be converted.
refpAddress[OUT] converted address.

Definition at line 666 of file AthenaPoolSharedIOCnvSvc.cxx.

669 {
670 return AthenaPoolCnvSvc::createAddress(svcType, clid, refAddress, refpAddress);
671}
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.

◆ createAddress() [2/2]

StatusCode AthenaPoolSharedIOCnvSvc::createAddress ( long svcType,
const CLID & clid,
const std::string * par,
const unsigned long * ip,
IOpaqueAddress *& refpAddress )
override

Create a Generic address using explicit arguments to identify a single object.

Parameters
svcType[IN] service type of the address.
clid[IN] class id for the address.
par[IN] string containing the database name.
ip[IN] object identifier.
refpAddress[OUT] converted address.

Definition at line 623 of file AthenaPoolSharedIOCnvSvc.cxx.

627 {
628 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
630 }
631 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) {
632 Token addressToken;
633 addressToken.setDb(par[0].substr(4));
634 addressToken.setCont(par[1]);
635 addressToken.setOid(Token::OID_t(ip[0], ip[1]));
636 ATH_CHECK(m_inputStreamingTool->lockObject(addressToken.toString().c_str()));
637 void* buffer = nullptr;
638 std::size_t nbytes = 0;
639 StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes);
640 while (sc.isRecoverable()) {
641 // sleep
642 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
643 }
644 if (!sc.isSuccess()) {
645 ATH_MSG_WARNING("Failed to get Address Token: " << addressToken.toString());
646 return(StatusCode::FAILURE);
647 }
648 auto token = std::make_unique<Token>();
649 token->fromString(static_cast<const char*>(buffer)); buffer = nullptr;
650 if (token->classID() == Guid::null()) {
651 token.reset();
652 }
653 m_inputStreamingTool->getObject(&buffer, nbytes).ignore();
654 if (token) {
655 refpAddress = new TokenAddress(pool::POOL_StorageType.type(), clid, "", par[1], IPoolSvc::kInputStream, std::move(token));
656 return(StatusCode::SUCCESS);
657 }
658 else {
659 return(StatusCode::RECOVERABLE);
660 }
661 } else {
662 return AthenaPoolCnvSvc::createAddress(svcType, clid, par, ip, refpAddress);
663 }
664}
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
@ kInputStream
Definition IPoolSvc.h:41
Token & setCont(const std::string &cnt)
Set container name.
Definition Token.h:72
Token & setDb(const Guid &db)
Set database name.
Definition Token.h:67
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:135

◆ disconnectOutput()

StatusCode AthenaPoolSharedIOCnvSvc::disconnectOutput ( const std::string & outputConnectionSpec)
overridevirtual

Disconnect to the output connection.

Definition at line 415 of file AthenaPoolSharedIOCnvSvc.cxx.

415 {
416 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
417 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
418 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
419 return(StatusCode::SUCCESS);
420 }
421 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
423 m_streamServerActive = false;
424 ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server.");
425 return(StatusCode::SUCCESS);
426 } else {
427 m_streamServerActive = false;
428 }
429 ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server.");
430 }
432 outputConnection += m_streamPortString.value();
433 }
434 return AthenaPoolCnvSvc::disconnectOutput(outputConnectionSpec + m_streamPortString.value());
435}
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.

◆ finalize()

StatusCode AthenaPoolSharedIOCnvSvc::finalize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 67 of file AthenaPoolSharedIOCnvSvc.cxx.

67 {
68 // Release AthenaSerializeSvc
69 if (!m_serializeSvc.empty()) {
70 if (!m_serializeSvc.release().isSuccess()) {
71 ATH_MSG_WARNING("Cannot release AthenaSerializeSvc.");
72 }
73 }
74 // Release OutputStreamingTool (if configured)
75 if (!m_outputStreamingTool.empty()) {
76 if (!m_outputStreamingTool.release().isSuccess()) {
77 ATH_MSG_WARNING("Cannot release Output AthenaIPCTool.");
78 }
79 }
80 // Release InputStreamingTool (if configured)
81 if (!m_inputStreamingTool.empty()) {
82 if (!m_inputStreamingTool.release().isSuccess()) {
83 ATH_MSG_WARNING("Cannot release Input AthenaIPCTool.");
84 }
85 }
86 return this->AthenaPoolCnvSvc::finalize();
87}
virtual StatusCode finalize() override
Required of all Gaudi Services.
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc

◆ handle()

void AthenaPoolSharedIOCnvSvc::handle ( const Incident & incident)
overridevirtual

Implementation of IIncidentListener: Handle for EndEvent incidence.

Definition at line 825 of file AthenaPoolSharedIOCnvSvc.cxx.

825 {
826 if (incident.type() == "StoreCleared" && m_outputStreamingTool->isClient() && !m_parallelCompression) {
827 m_outputStreamingTool->lockObject("release").ignore();
828 }
829}

◆ initialize()

StatusCode AthenaPoolSharedIOCnvSvc::initialize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 38 of file AthenaPoolSharedIOCnvSvc.cxx.

38 {
39 // Retrieve InputStreamingTool (if configured)
40 if (!m_inputStreamingTool.empty()) {
42 }
43 // Retrieve OutputStreamingTool (if configured)
44 if (!m_outputStreamingTool.empty()) {
46 if (m_makeStreamingToolClient.value() == -1) {
47 // Initialize AthenaRootSharedWriter
48 ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name());
49 ATH_CHECK(arswsvc.retrieve());
50 }
51 // Put PoolSvc into share mode to avoid duplicating catalog.
52 getPoolSvc()->setShareMode(true);
53 }
54 if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) {
55 // Retrieve AthenaSerializeSvc
56 ATH_CHECK(m_serializeSvc.retrieve());
57 }
58 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
59 long int pri = 1000;
60 if (!m_outputStreamingTool.empty()) {
61 incSvc->addListener(this, "StoreCleared", pri);
62 ATH_MSG_DEBUG("Subscribed to StoreCleared");
63 }
64 return this->AthenaPoolCnvSvc::initialize();
65}
virtual StatusCode initialize() override
Required of all Gaudi Services.

◆ makeClient()

StatusCode AthenaPoolSharedIOCnvSvc::makeClient ( int num)
overridevirtual

Make this a client.

Definition at line 705 of file AthenaPoolSharedIOCnvSvc.cxx.

705 {
706 if (!m_outputStreamingTool.empty()) {
707 ATH_MSG_DEBUG("makeClient: " << m_outputStreamingTool << " = " << num);
708 std::string streamPortSuffix;
709 if (m_outputStreamingTool->makeClient(num, streamPortSuffix).isFailure()) {
710 ATH_MSG_ERROR("makeClient: " << m_outputStreamingTool << " failed");
711 return(StatusCode::FAILURE);
712 } else if (m_streamPortString.value().find("localhost:0") != std::string::npos) {
713 // We don't seem to use a dedicated port per stream so doing this for the first client is probably OK
714 ATH_MSG_DEBUG("makeClient: Setting conversion service port suffix to " << streamPortSuffix);
715 m_streamPortString.setValue(streamPortSuffix);
716 }
717 }
718 if (m_inputStreamingTool.empty()) {
719 return(StatusCode::SUCCESS);
720 }
721 ATH_MSG_DEBUG("makeClient: " << m_inputStreamingTool << " = " << num);
722 std::string dummyStr;
723 return(m_inputStreamingTool->makeClient(num, dummyStr));
724}

◆ makeServer()

StatusCode AthenaPoolSharedIOCnvSvc::makeServer ( int num)
overridevirtual

Make this a server.

Definition at line 679 of file AthenaPoolSharedIOCnvSvc.cxx.

679 {
680 if (num < 0) {
681 num = -num;
683 num = num % 1024;
684 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer()) {
685 ATH_MSG_DEBUG(std::format("makeServer: {} = {}", m_outputStreamingTool.name(), num));
686 ATH_MSG_DEBUG(std::format("makeServer: Calling shared memory tool with port suffix {}", m_streamPortString.value()));
687 const std::string streamPortSuffix = m_streamPortString.value();
688 if (m_outputStreamingTool->makeServer(num, streamPortSuffix).isFailure()) {
689 ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed");
690 return(StatusCode::FAILURE);
691 }
692 // Disable PersistencySvc per output file mode, for SharedWriter Server
693 m_persSvcPerOutput.setValue(false);
694 return(StatusCode::SUCCESS);
695 }
696 return(StatusCode::RECOVERABLE);
697 }
698 if (m_inputStreamingTool.empty()) {
699 return(StatusCode::RECOVERABLE);
700 }
701 ATH_MSG_DEBUG("makeServer: " << m_inputStreamingTool << " = " << num);
702 return(m_inputStreamingTool->makeServer(num, ""));
703}

◆ readData()

StatusCode AthenaPoolSharedIOCnvSvc::readData ( )
overridevirtual

Read the next data object.

Definition at line 726 of file AthenaPoolSharedIOCnvSvc.cxx.

726 {
727 if (m_inputStreamingTool.empty()) {
728 return(StatusCode::FAILURE);
729 }
730 const char* tokenStr = nullptr;
731 int num = -1;
732 StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
733 if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
734 ATH_MSG_DEBUG("readData: " << tokenStr << ", for client: " << num);
735 } else {
736 return(sc);
737 }
738 // Read object instance via POOL/ROOT
739 void* instance = nullptr;
740 Token token;
741 token.fromString(tokenStr); tokenStr = nullptr;
742 if (token.classID() != Guid::null()) {
743 std::string objName = "ALL";
744 if (useDetailChronoStat()) {
745 objName = token.classID().toString();
746 }
747 // StopWatch listens from here until the end of this current scope
748 PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, this->m_chronoMap);
749 this->setObjPtr(instance, &token);
750 // Serialize object via ROOT
752 void* buffer = nullptr;
753 std::size_t nbytes = 0;
754 buffer = m_serializeSvc->serialize(instance, cltype, nbytes);
755 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
756 while (sc.isRecoverable()) {
757 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
758 }
759 delete [] static_cast<char*>(buffer); buffer = nullptr;
760 if (!sc.isSuccess()) {
761 ATH_MSG_ERROR("Could not share object for: " << token.toString());
762 return(StatusCode::FAILURE);
763 }
764 AuxDiscoverySvc auxDiscover;
765 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) {
766 ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString());
767 return(StatusCode::FAILURE);
768 }
769 cltype.Destruct(instance); instance = nullptr;
770 if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
771 ATH_MSG_ERROR("Could not share object for: " << token.toString());
772 return(StatusCode::FAILURE);
773 }
774 } else if (token.dbID() != Guid::null()) {
775 std::string returnToken;
776 Token* metadataToken = getPoolSvc()->getToken("FID:" + token.dbID().toString(), token.contID(), token.oid().first);
777 if( metadataToken ) {
778 returnToken = metadataToken->toString();
779 metadataToken->release(); metadataToken = nullptr;
780 } else {
781 returnToken = token.toString();
782 }
783 // Share token
784 sc = m_inputStreamingTool->putObject(returnToken.c_str(), returnToken.size() + 1, num);
785 if (!sc.isSuccess() || !m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
786 ATH_MSG_ERROR("Could not share token for: " << token.toString());
787 return(StatusCode::FAILURE);
788 }
789 } else {
790 return(StatusCode::RECOVERABLE);
791 }
792 return(StatusCode::SUCCESS);
793}
std::map< std::string, double > instance
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
const std::string & contID() const
Access container identifier.
Definition Token.h:70
const Guid & classID() const
Access database identifier.
Definition Token.h:74
int release()
Release token: Decrease reference count and eventually delete.
Definition Token.cxx:81
const OID_t & oid() const
Access object identifier.
Definition Token.h:82
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition Token.cxx:169
const Guid & dbID() const
Access database identifier.
Definition Token.h:65
static const TypeH forGuid(const Guid &info)
Access classes by Guid.

◆ registerForWrite()

Token * AthenaPoolSharedIOCnvSvc::registerForWrite ( Placement * placement,
const void * obj,
const RootType & classDesc )
overridevirtual
Returns
a string token to a Data Object written to Pool
Parameters
placement[IN] pointer to the placement hint
obj[IN] pointer to the Data Object to be written to Pool
classDesc[IN] pointer to the Seal class description for the Data Object.

Definition at line 438 of file AthenaPoolSharedIOCnvSvc.cxx.

438 {
439 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
440 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
441 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
442 return(nullptr);
443 }
444 }
445 Token* token = nullptr;
446 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
447 && (!m_parallelCompression || placement->containerName().compare(0, m_metadataContainerProp.value().size(), m_metadataContainerProp.value()) == 0)) {
448 // Lock object
449 std::string placementStr = placement->toString();
450 placementStr += "[PNAME=";
451 placementStr += classDesc.Name();
452 placementStr += ']';
453 ATH_MSG_VERBOSE("Requesting write object for: " << placementStr);
454 StatusCode sc = m_outputStreamingTool->lockObject(placementStr.c_str());
455 while (sc.isRecoverable()) {
456 //usleep(100);
457 sc = m_outputStreamingTool->lockObject(placementStr.c_str());
458 }
459 if (!sc.isSuccess()) {
460 ATH_MSG_ERROR("Failed to lock Data for " << placementStr);
461 return(nullptr);
462 }
463 // Serialize object via ROOT
464 const void* buffer = nullptr;
465 std::size_t nbytes = 0;
466 bool own = true;
467 if (classDesc.Name() == "Token") {
468 nbytes = strlen(static_cast<const char*>(obj)) + 1;
469 buffer = obj;
470 own = false;
471 } else if (classDesc.IsFundamental()) {
472 nbytes = classDesc.SizeOf();
473 buffer = obj;
474 own = false;
475 } else {
476 buffer = m_serializeSvc->serialize(obj, classDesc, nbytes);
477 }
478 // Share object
479 sc = m_outputStreamingTool->putObject(buffer, nbytes);
480 while (sc.isRecoverable()) {
481 //usleep(100);
482 sc = m_outputStreamingTool->putObject(buffer, nbytes);
483 }
484 if (own) { delete [] static_cast<const char*>(buffer); }
485 buffer = nullptr;
486 if (!sc.isSuccess()) {
487 ATH_MSG_ERROR("Could not share object for: " << placementStr);
488 m_outputStreamingTool->putObject(nullptr, 0).ignore();
489 return(nullptr);
490 }
491 AuxDiscoverySvc auxDiscover;
492 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) {
493 ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr);
494 m_outputStreamingTool->putObject(nullptr, 0).ignore();
495 return(nullptr);
496 }
497 if (!m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) {
498 ATH_MSG_ERROR("Failed to put Data for " << placementStr);
499 return(nullptr);
500 }
501 // Get Token back from Server
502 const char* tokenStr = nullptr;
503 int num = -1;
504 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
505 while (sc.isRecoverable()) {
506 //usleep(100);
507 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
508 }
509 if (!sc.isSuccess()) {
510 ATH_MSG_ERROR("Failed to get Token");
511 return(nullptr);
512 }
513 if (!strcmp(tokenStr, "ABORT")) {
514 ATH_MSG_ERROR("Writer requested ABORT");
515 // tell the server we are leaving
516 m_outputStreamingTool->stop().ignore();
517 return nullptr;
518 }
519 Token* tempToken = new Token();
520 tempToken->fromString(tokenStr); tokenStr = nullptr;
521 tempToken->setClassID(pool::DbReflex::guid(classDesc));
522 token = tempToken; tempToken = nullptr;
523// Client Write Request
524 } else {
525 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
526 ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString());
527 Token* tempToken = new Token();
528 tempToken->setClassID(pool::DbReflex::guid(classDesc));
529 token = tempToken; tempToken = nullptr;
530 } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_streamServerActive) {
531 if(placement->technology() == 0) { // No technology specified, use the default
532 placement->setTechnology(pool::DbType::getType(m_defaultContainerType).type());
533 }
534 ATH_MSG_DEBUG("Requested write object for: " << placement->toString());
535 token = getPoolSvc()->registerForWrite(placement, obj, classDesc);
536 } else {
538 placement->setFileName(placement->fileName() + m_streamPortString.value());
539 }
540 token = AthenaPoolCnvSvc::registerForWrite(placement, obj, classDesc);
541 }
542 }
543 return(token);
544}
#define ATH_MSG_VERBOSE(x)
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
const std::string & containerName() const
Access container name.
Definition Placement.h:33
Placement & setFileName(const std::string &fileName)
Set file name.
Definition Placement.h:31
const std::string toString() const
Retrieve the string representation of the placement.
Definition Placement.cxx:15
Placement & setTechnology(int technology)
Set technology type.
Definition Placement.h:39
const std::string & fileName() const
Access file name.
Definition Placement.h:29
int technology() const
Access technology type.
Definition Placement.h:37
std::string Name(unsigned int mod=Reflex::SCOPED) const
Definition RootType.cxx:612
size_t SizeOf() const
Definition RootType.cxx:765
Token & setClassID(const Guid &cl_id)
Access database identifier.
Definition Token.h:76
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.
static DbType getType(const std::string &name)
Access known storage type object by name.

◆ setObjPtr()

void AthenaPoolSharedIOCnvSvc::setObjPtr ( void *& obj,
const Token * token )
overridevirtual
Parameters
obj[OUT] pointer to the Data Object.
token[IN] string token of the Data Object for which a Pool Ref is filled.

Definition at line 546 of file AthenaPoolSharedIOCnvSvc.cxx.

546 {
547 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
548 if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
549 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
550 }
551 }
552 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isServer()) {
553 if (token->dbID() == Guid::null()) {
554 int num = token->oid().first;
555 // Get object from SHM
556 void* buffer = nullptr;
557 std::size_t nbytes = 0;
558 StatusCode sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
559 while (sc.isRecoverable()) {
560 //usleep(100);
561 sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
562 }
563 if (!sc.isSuccess()) {
564 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
565 obj = nullptr;
566 } else {
567 ATH_MSG_DEBUG("Server deserializing " << token->toString());
568 if (token->classID() != Guid::null()) {
569 // Deserialize object
570 RootType cltype(pool::DbReflex::forGuid(token->classID()));
571 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
572 } else {
573 // Deserialize object
574 std::string className = token->auxString();
575 className = className.substr(className.find("[PNAME="));
576 className = className.substr(7, className.find(']') - 7);
577 RootType cltype(RootType::ByNameNoQuiet(className));
578 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
579 }
580 AuxDiscoverySvc auxDiscover;
581 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, num).isSuccess()) {
582 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
583 obj = nullptr;
584 }
585 }
586 }
587 }
588 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient() && (m_streamingTechnology.value() < 0 || token->technology() == m_streamingTechnology.value())) {
589 ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
590 if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
591 ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
592 obj = nullptr;
593 } else {
594 void* buffer = nullptr;
595 std::size_t nbytes = 0;
596 StatusCode sc = StatusCode::FAILURE;
597 // StopWatch listens from here until the end of this current scope
598 {
599 PMonUtils::BasicStopWatch stopWatch("gObj_ALL", this->m_chronoMap);
600 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
601 while (sc.isRecoverable()) {
602 // sleep
603 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
604 }
605 }
606 if (!sc.isSuccess()) {
607 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
608 obj = nullptr;
609 } else {
610 obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr;
611 AuxDiscoverySvc auxDiscover;
612 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_inputStreamingTool.get(), obj).isSuccess()) {
613 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
614 obj = nullptr;
615 }
616 }
617 }
618 } else if (token->dbID() != Guid::null()) {
619 AthenaPoolCnvSvc::setObjPtr(obj, token);
620 }
621}
virtual void setObjPtr(void *&obj, const Token *token) override
Gaudi::Property< int > m_streamingTechnology
Use Streaming for selected technologies only.
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
const std::string & auxString() const
Access auxiliary string.
Definition Token.h:92
int technology() const
Access technology type.
Definition Token.h:78

◆ SvcFactory< AthenaPoolSharedIOCnvSvc >

friend class SvcFactory< AthenaPoolSharedIOCnvSvc >
friend

Definition at line 1 of file AthenaPoolSharedIOCnvSvc.h.

Member Data Documentation

◆ m_fileCommitCounter

std::map<std::string, int> AthenaPoolSharedIOCnvSvc::m_fileCommitCounter
private

Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.

Definition at line 136 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_fileFlushSetting

Gaudi::Property<std::map<std::string, int> > AthenaPoolSharedIOCnvSvc::m_fileFlushSetting {this,"FileFlushSetting",{}}
private

Definition at line 137 of file AthenaPoolSharedIOCnvSvc.h.

137{this,"FileFlushSetting",{}};

◆ m_inputStreamingTool

ToolHandle<IAthenaIPCTool> AthenaPoolSharedIOCnvSvc::m_inputStreamingTool {this,"InputStreamingTool",{}}
private

Definition at line 116 of file AthenaPoolSharedIOCnvSvc.h.

116{this,"InputStreamingTool",{}};

◆ m_makeStreamingToolClient

Gaudi::Property<int> AthenaPoolSharedIOCnvSvc::m_makeStreamingToolClient {this,"MakeStreamingToolClient",0}
private

Make this instance a Streaming Client during first connect/write automatically.

Definition at line 128 of file AthenaPoolSharedIOCnvSvc.h.

128{this,"MakeStreamingToolClient",0};

◆ m_metadataClient

int AthenaPoolSharedIOCnvSvc::m_metadataClient =0
private

Definition at line 119 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_metadataContainerProp

Gaudi::Property<std::string> AthenaPoolSharedIOCnvSvc::m_metadataContainerProp {this,"OutputMetadataContainer","MetaData"}
private

For SharedWriter: To use MetadataSvc to merge data placed in a certain container.

Definition at line 124 of file AthenaPoolSharedIOCnvSvc.h.

124{this,"OutputMetadataContainer","MetaData"};

◆ m_metadataContainersAug

Gaudi::Property<std::vector<std::string> > AthenaPoolSharedIOCnvSvc::m_metadataContainersAug {this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"}
private

Definition at line 125 of file AthenaPoolSharedIOCnvSvc.h.

125{this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"};

◆ m_outputStreamingTool

ToolHandle<IAthenaIPCTool> AthenaPoolSharedIOCnvSvc::m_outputStreamingTool {this,"OutputStreamingTool",{}}
private

Definition at line 117 of file AthenaPoolSharedIOCnvSvc.h.

117{this,"OutputStreamingTool",{}};

◆ m_parallelCompression

Gaudi::Property<bool> AthenaPoolSharedIOCnvSvc::m_parallelCompression {this,"ParallelCompression",true}
private

Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.

Definition at line 132 of file AthenaPoolSharedIOCnvSvc.h.

132{this,"ParallelCompression",true};

◆ m_serializeSvc

ServiceHandle<IAthenaSerializeSvc> AthenaPoolSharedIOCnvSvc::m_serializeSvc {this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"}
private

Definition at line 115 of file AthenaPoolSharedIOCnvSvc.h.

115{this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"};

◆ m_streamingTechnology

Gaudi::Property<int> AthenaPoolSharedIOCnvSvc::m_streamingTechnology {this,"StreamingTechnology",-1}
private

Use Streaming for selected technologies only.

Definition at line 130 of file AthenaPoolSharedIOCnvSvc.h.

130{this,"StreamingTechnology",-1};

◆ m_streamPortString

Gaudi::Property<std::string> AthenaPoolSharedIOCnvSvc::m_streamPortString {this,"StreamPortString","?pmerge=localhost:0"}
private

Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".

Definition at line 134 of file AthenaPoolSharedIOCnvSvc.h.

134{this,"StreamPortString","?pmerge=localhost:0"};

◆ m_streamServerActive

bool AthenaPoolSharedIOCnvSvc::m_streamServerActive =false
private

Definition at line 118 of file AthenaPoolSharedIOCnvSvc.h.


The documentation for this class was generated from the following files: