ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaPoolSharedIOCnvSvc Class Reference

This class provides the interface between Athena and PoolSvc. More...

#include <AthenaPoolSharedIOCnvSvc.h>

Inheritance diagram for AthenaPoolSharedIOCnvSvc:
Collaboration diagram for AthenaPoolSharedIOCnvSvc:

Public Member Functions

virtual StatusCode initialize () override
 Required of all Gaudi Services.
virtual StatusCode finalize () override
 Required of all Gaudi Services.
virtual StatusCode connectOutput (const std::string &outputConnectionSpec, const std::string &openMode) override
 Implementation of IConversionSvc: Connect to the output connection specification with open mode.
virtual StatusCode connectOutput (const std::string &outputConnectionSpec) override
 Implementation of IConversionSvc: Connect to the output connection specification with open mode.
virtual StatusCode commitOutput (const std::string &outputConnectionSpec, bool doCommit) override
 Implementation of IConversionSvc: Commit pending output.
virtual StatusCode disconnectOutput (const std::string &outputConnectionSpec) override
 Disconnect to the output connection.
virtual TokenregisterForWrite (Placement *placement, const void *obj, const RootType &classDesc) override
virtual void setObjPtr (void *&obj, const Token *token) override
StatusCode createAddress (long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
 Create a Generic address using explicit arguments to identify a single object.
virtual StatusCode createAddress (long svcType, const CLID &clid, const std::string &refAddress, IOpaqueAddress *&refpAddress) override
 Create address from string form.
virtual StatusCode decodeOutputSpec (std::string &connectionSpec, int &outputTech) const override
 Extract/deduce the DB technology from the connection string/file specification.
virtual StatusCode cleanUp (const std::string &connection) override
 Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual StatusCode makeServer (int num) override
 Make this a server.
virtual StatusCode makeClient (int num) override
 Make this a client.
virtual StatusCode readData () override
 Read the next data object.
virtual StatusCode commitCatalog () override
 Commit Catalog.
StatusCode abortSharedWrClients (int client_n)
 Send abort to SharedWriter clients if the server quits on error.
virtual void handle (const Incident &incident) override
 Implementation of IIncidentListener: Handle for EndEvent incidence.
 AthenaPoolSharedIOCnvSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor.
virtual ~AthenaPoolSharedIOCnvSvc ()=default
 Destructor.

Private Attributes

ServiceHandle< IAthenaSerializeSvcm_serializeSvc {this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"}
ToolHandle< IAthenaIPCToolm_inputStreamingTool {this,"InputStreamingTool",{}}
ToolHandle< IAthenaIPCToolm_outputStreamingTool {this,"OutputStreamingTool",{}}
bool m_streamServerActive =false
int m_metadataClient =0
Gaudi::Property< std::string > m_metadataContainerProp {this,"OutputMetadataContainer","MetaData"}
 For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug {this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"}
Gaudi::Property< int > m_makeStreamingToolClient {this,"MakeStreamingToolClient",0}
 Make this instance a Streaming Client during first connect/write automatically.
Gaudi::Property< int > m_streamingTechnology {this,"StreamingTechnology",-1}
 Use Streaming for selected technologies only.
Gaudi::Property< bool > m_parallelCompression {this,"ParallelCompression",true}
 Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Gaudi::Property< std::string > m_streamPortString {this,"StreamPortString","?pmerge=localhost:0"}
 Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
std::map< std::string, int > m_fileCommitCounter
 Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting {this,"FileFlushSetting",{}}

Friends

class SvcFactory< AthenaPoolSharedIOCnvSvc >

Detailed Description

This class provides the interface between Athena and PoolSvc.

Definition at line 25 of file AthenaPoolSharedIOCnvSvc.h.

Constructor & Destructor Documentation

◆ AthenaPoolSharedIOCnvSvc()

AthenaPoolSharedIOCnvSvc::AthenaPoolSharedIOCnvSvc ( const std::string & name,
ISvcLocator * pSvcLocator )

Standard Service Constructor.

Definition at line 840 of file AthenaPoolSharedIOCnvSvc.cxx.

840 :
841 base_class(name, pSvcLocator) {
842}

◆ ~AthenaPoolSharedIOCnvSvc()

virtual AthenaPoolSharedIOCnvSvc::~AthenaPoolSharedIOCnvSvc ( )
virtualdefault

Destructor.

Member Function Documentation

◆ abortSharedWrClients()

StatusCode AthenaPoolSharedIOCnvSvc::abortSharedWrClients ( int client_n)

Send abort to SharedWriter clients if the server quits on error.

Parameters
client_n[IN] number of the current client, -1 if no current

Definition at line 814 of file AthenaPoolSharedIOCnvSvc.cxx.

815{
816 ATH_MSG_ERROR("Sending ABORT to clients");
817 // the master process will kill this process once workers abort
818 // but it could be a time-limited loop
819 StatusCode sc = StatusCode::SUCCESS;
820 while (sc.isSuccess()) {
821 if (client_n >= 0) {
822 sc = m_outputStreamingTool->lockObject("ABORT", client_n);
823 }
824 const char* dummy;
825 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
826 while (sc.isRecoverable()) {
827 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
828 }
829 }
830 return StatusCode::FAILURE;
831}
#define ATH_MSG_ERROR(x)
static Double_t sc
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
::StatusCode StatusCode
StatusCode definition for legacy code.

◆ cleanUp()

StatusCode AthenaPoolSharedIOCnvSvc::cleanUp ( const std::string & connection)
overridevirtual

Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.

Definition at line 682 of file AthenaPoolSharedIOCnvSvc.cxx.

682 {
683 auto pos = connection.find("?pmerge=");
684 std::string conn = (pos == std::string::npos) ? connection : connection.substr(0, pos);
685 return AthenaPoolCnvSvc::cleanUp(conn);
686}
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.

◆ commitCatalog()

StatusCode AthenaPoolSharedIOCnvSvc::commitCatalog ( )
overridevirtual

Commit Catalog.

Definition at line 805 of file AthenaPoolSharedIOCnvSvc.cxx.

805 {
806 pool::IFileCatalog* catalog ATLAS_THREAD_SAFE = // This is on the SharedWriter, after mother process finishes events
807 const_cast<pool::IFileCatalog*>(getPoolSvc()->catalog());
808 catalog->commit();
809 catalog->start();
810 return(StatusCode::SUCCESS);
811}
#define ATLAS_THREAD_SAFE

◆ commitOutput()

StatusCode AthenaPoolSharedIOCnvSvc::commitOutput ( const std::string & outputConnectionSpec,
bool doCommit )
overridevirtual

Implementation of IConversionSvc: Commit pending output.

Parameters
doCommit[IN] boolean to force full commit

Definition at line 127 of file AthenaPoolSharedIOCnvSvc.cxx.

127 {
128 // This is called after all DataObjects are converted.
129 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
130 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
131 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
132 m_outputStreamingTool->lockObject("wait").ignore();
133 if (!this->cleanUp(outputConnection).isSuccess()) {
134 ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
135 return(StatusCode::FAILURE);
136 }
137 return(StatusCode::SUCCESS);
138 }
139 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
140 ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server.");
141 return(StatusCode::SUCCESS);
142 }
143 std::map<void*, RootType> commitCache;
144 std::string fileName;
145 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && m_outputStreamingTool->isServer() && m_streamServerActive) {
146 // Clear object to get Placements for all objects in a Stream
147 const char* placementStr = nullptr;
148 int num = -1;
149 StatusCode sc = m_outputStreamingTool->clearObject(&placementStr, num);
150 if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
151 const char * matchedChars = strstr(placementStr, "[FILE=");
152 if (!matchedChars){
153 ATH_MSG_ERROR(std::format("No matching filename in {}", placementStr));
154 return abortSharedWrClients(num);
155 }
156 fileName = matchedChars;
157 fileName = fileName.substr(6, fileName.find(']') - 6);
158 if (!this->connectOutput(fileName).isSuccess()) {
159 ATH_MSG_ERROR(std::format("Failed to connectOutput for {}", fileName));
160 return abortSharedWrClients(num);
161 }
162 IConverter* DHcnv = converter(ClassID_traits<DataHeader>::ID());
163 bool dataHeaderSeen = false;
164 std::string dataHeaderID;
165 while (num > 0) {
166 std::string objName = "ALL";
167 if (useDetailChronoStat()) {
168 objName = placementStr; //FIXME, better descriptor
169 }
170 // StopWatch listens from here until the end of this current scope
171 {
172 PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
173 std::string_view pStr = placementStr;
174 std::string::size_type cpos = pStr.find ("[CONT=");
175 if (cpos == std::string::npos) {
176 ATH_MSG_ERROR(std::format("No CONT field in placement string: {}", pStr));
177 return StatusCode::FAILURE;
178 }
179 std::string tokenStr (pStr.substr(0, cpos));
180 std::string contName (pStr.substr(cpos, std::string::npos));
181 std::string::size_type cl1 = contName.find(']');
182 if (cl1 == std::string::npos) {
183 ATH_MSG_ERROR(std::format("Missing close bracket after CONT field in placement string: {}", pStr));
184 return StatusCode::FAILURE;
185 }
186 tokenStr.append(contName, cl1 + 1);
187 contName = contName.substr(6, cl1 - 6);
188
189 std::string::size_type ppos = pStr.find ("[PNAME=");
190 if (ppos == std::string::npos) {
191 ATH_MSG_ERROR(std::format("No PNAME field in placement string: {}", pStr));
192 return StatusCode::FAILURE;
193 }
194 std::string className (pStr.substr(ppos, std::string::npos));
195 std::string::size_type cl2 = className.find(']');
196 if (cl2 == std::string::npos) {
197 ATH_MSG_ERROR(std::format("Missing close bracket after PNAME field in placement string: {}", pStr));
198 return StatusCode::FAILURE;
199 }
200 className = className.substr(7, cl2 - 7);
201 RootType classDesc = RootType::ByNameNoQuiet(className);
202 void* obj = nullptr;
203 const std::string numStr = std::to_string(num);
204 std::string::size_type len = m_metadataContainerProp.value().size();
205 bool foundContainer = false;
206 std::size_t opPos = contName.find('(');
207 if (contName.compare(0, opPos, m_metadataContainerProp.value()) == 0) {
208 foundContainer = true;
209 } else {
210 for (const auto& item: m_metadataContainersAug.value()) {
211 if (contName.compare(0, opPos, item) == 0){
212 foundContainer = true;
213 len = item.size();
214 break;
215 }
216 }
217 }
218 if (len > 0 && foundContainer && contName[len] == '(' ) {
219 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
220 // For Metadata, before moving to next client, fire file incidents
221 if (m_metadataClient != num) {
222 if (m_metadataClient != 0) {
223 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
224 FileIncident beginInputIncident(name(), "BeginInputMemFile", memName);
225 incSvc->fireIncident(beginInputIncident);
226 FileIncident endInputIncident(name(), "EndInputMemFile", std::move(memName));
227 incSvc->fireIncident(endInputIncident);
228 }
230 }
231 // Retrieve MetaDataSvc
232 ServiceHandle<IAthMetaDataSvc> metadataSvc("MetaDataSvc", name());
233 ATH_CHECK(metadataSvc.retrieve());
234 sc = metadataSvc->shmProxy(std::format("{}[NUM={}]", pStr, numStr));
235 if (sc.isRecoverable()) {
236 ATH_MSG_WARNING("MetaDataSvc::shmProxy() no proxy added.");
237 } else if (sc.isFailure()) {
238 ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!");
239 return abortSharedWrClients(num);
240 }
241 } else {
242 Token readToken;
243 readToken.setOid(Token::OID_t(num, 0));
244 readToken.setAuxString("[PNAME=" + className + "]");
245 this->setObjPtr(obj, &readToken); // Pull/read Object out of shared memory
246 if (len == 0 || contName.compare(0, len, m_metadataContainerProp.value()) != 0) {
247 // Write object
248 if( m_oneDataHeaderForm.value() ) {
249 auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
250 if( className == "DataHeaderForm_p6" ) {
251 // Pass DHForms to the converter for later writing in the correct order - do not write it now
253 "", placementWithSwn());
254 DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
255 tokenStr = "";
256 } else {
257 Placement placement;
258 placement.fromString(placementStr);
259 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
260 if (token == nullptr) {
261 ATH_MSG_ERROR("Failed to write Data for: " << className);
262 return abortSharedWrClients(num);
263 }
264 tokenStr = token->toString();
265 }
266 if( className == "DataHeader_p6" ) {
267 // Found DataHeader - call the converter to update DHForm Ref
269 tokenStr, placementWithSwn());
270 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
271 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
272 return abortSharedWrClients(num);
273 }
274 } else
275 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
276 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
277 }
278 placementStr = nullptr;
279 } else {
280 // Multiple shared DataHeaderForms
281 Placement placement;
282 placement.fromString(placementStr); placementStr = nullptr;
283 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
284 if (token == nullptr) {
285 ATH_MSG_ERROR("Failed to write Data for: " << className);
286 return abortSharedWrClients(num);
287 }
288 tokenStr = token->toString();
289 if (className == "DataHeader_p6") {
290 // Found DataHeader
292 tokenStr, placement.auxString());
293 // call DH converter to add the ref to DHForm (stored earlier) and to itself
294 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
295 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
296 return abortSharedWrClients(num);
297 }
298 dataHeaderSeen = true;
299 // This dataHeaderID is used in DataHeaderCnv to index the DataHeaderForm cache.
300 // It must be unique per worker per stream so that we have a correct DataHeader(Form) association.
301 // This is achieved by building it as "CONTID/WORKERID/DBID".
302 // CONTID, e.g., POOLContainer(DataHeader), allows us to distinguish data and metadata headers,
303 // WORKERID allows us to distinguish AthenaMP workers,
304 // and DBID allows us to distinguish streams.
305 dataHeaderID = std::format("{}/{}/{}", token->contID(), numStr, token->dbID().toString());
306 } else if (dataHeaderSeen) {
307 dataHeaderSeen = false;
308 // next object after DataHeader - may be a DataHeaderForm
309 // in any case we need to call the DH converter to update the DHForm Ref
310 if (className == "DataHeaderForm_p6") {
311 // Tell DataHeaderCnv that it should use a new DHForm
313 tokenStr, dataHeaderID);
314 if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
315 ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr);
316 return abortSharedWrClients(num);
317 }
318 } else {
319 // Tell DataHeaderCnv that it should use the old DHForm
320 GenericAddress address(0, 0, "", dataHeaderID);
321 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
322 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
323 return abortSharedWrClients(num);
324 }
325 }
326 }
327 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
328 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
329 }
330 }
331 }
332 }
333 // Send Token back to Client
334 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
335 while (sc.isRecoverable()) {
336 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
337 }
338 if (!sc.isSuccess()) {
339 ATH_MSG_ERROR("Failed to lock Data for " << tokenStr);
340 return abortSharedWrClients(-1);
341 }
342 }
343 sc = m_outputStreamingTool->clearObject(&placementStr, num);
344 while (sc.isRecoverable()) {
345 sc = m_outputStreamingTool->clearObject(&placementStr, num);
346 }
347 if (sc.isFailure()) {
348 // no more clients, break the loop and exit
349 num = -1;
350 }
351 }
352 if (dataHeaderSeen) {
353 // DataHeader was the last object, need to tell the converter there is no DHForm coming
354 GenericAddress address(0, 0, "", std::move(dataHeaderID));
355 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
356 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
357 return abortSharedWrClients(-1);
358 }
359 }
360 placementStr = nullptr;
361 } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) {
362 return(StatusCode::RECOVERABLE);
363 } else if (sc.isRecoverable() || num == -1) {
364 return(StatusCode::RECOVERABLE);
365 }
366 if (sc.isFailure() || fileName.empty()) {
367 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
368 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
369 FileIncident beginInputIncident(name(), "BeginInputMemFile", memName);
370 incSvc->fireIncident(beginInputIncident);
371 FileIncident endInputIncident(name(), "EndInputMemFile", std::move(memName));
372 incSvc->fireIncident(endInputIncident);
373 if (sc.isFailure()) {
374 ATH_MSG_INFO("All SharedWriter clients stopped - exiting");
375 } else {
376 ATH_MSG_INFO("Failed to get Data for client: " << num);
377 }
378 return(StatusCode::FAILURE);
379 }
380 }
381 if (m_parallelCompression && !fileName.empty()) {
382 ATH_MSG_DEBUG(std::format("commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
383 return(StatusCode::SUCCESS);
384 }
385 if (outputConnection.empty()) {
386 outputConnection = std::move(fileName);
387 } else {
388 outputConnection = outputConnectionSpec;
390 outputConnection += m_streamPortString.value();
391 }
392 }
393 std::size_t merge = outputConnection.find("?pmerge="); // Used to remove trailing TMemFile
394 const std::string baseOutputConnection = outputConnection.substr(0, merge);
395 m_fileCommitCounter[baseOutputConnection]++;
397 m_fileFlushSetting.value().contains(baseOutputConnection) &&
398 m_fileFlushSetting[baseOutputConnection] > 0 &&
399 m_fileCommitCounter[baseOutputConnection] % m_fileFlushSetting[baseOutputConnection] == 0) {
400 doCommit = true;
401 ATH_MSG_DEBUG("commitOutput sending data.");
402 }
403 StatusCode status = AthenaPoolCnvSvc::commitOutput(outputConnection, doCommit);
404 for (auto& [ptr, rootType] : commitCache) {
405 rootType.Destruct(ptr);
406 }
407 return(status);
408}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
TTypeAdapter RootType
Definition RootType.h:211
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
std::map< std::string, int > m_fileCommitCounter
Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual void setObjPtr(void *&obj, const Token *token) override
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
const std::string & auxString() const
Access auxiliary string.
Definition Placement.h:40
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
Definition Placement.cxx:28
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition RootType.cxx:586
Bool_t IsFundamental() const
Definition RootType.cxx:731
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition Token.h:93
Token & setOid(const OID_t &oid)
Set object identifier.
Definition Token.h:85
status
Definition merge.py:16
static const DbType POOL_StorageType
Definition DbType.h:98
merge(input_file_pattern, output_file)
Merge many input LHE files into a single output file.
Definition LHE.py:29
char rootType(char typeidType)
This function is used internally in the code when creating primitive dynamic auxiliary branches.

◆ connectOutput() [1/2]

StatusCode AthenaPoolSharedIOCnvSvc::connectOutput ( const std::string & outputConnectionSpec)
overridevirtual

Implementation of IConversionSvc: Connect to the output connection specification with open mode.

Parameters
outputConnectionSpec[IN] the name of the output connection specification as string.

Definition at line 93 of file AthenaPoolSharedIOCnvSvc.cxx.

93 {
94// This is called before DataObjects are being converted.
95 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
96 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
97 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
98 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
99 return(StatusCode::FAILURE);
100 }
101 }
102 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
103 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
104 return(StatusCode::SUCCESS);
105 }
106 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
107 if (m_parallelCompression && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) {
108 ATH_MSG_DEBUG(std::format("connectOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
109 return(StatusCode::SUCCESS);
110 }
112 ATH_MSG_DEBUG("connectOutput SKIPPED for expired server.");
113 return(StatusCode::SUCCESS);
114 }
115 }
117 outputConnection += m_streamPortString.value();
118 }
119 std::size_t apend = outputConnectionSpec.find('[');
120 if (apend != std::string::npos) {
121 outputConnection += outputConnectionSpec.substr(apend);
122 }
123 return AthenaPoolCnvSvc::connectOutput(outputConnection);
124}
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
virtual StatusCode makeClient(int num) override
Make this a client.

◆ connectOutput() [2/2]

StatusCode AthenaPoolSharedIOCnvSvc::connectOutput ( const std::string & outputConnectionSpec,
const std::string & openMode )
overridevirtual

Implementation of IConversionSvc: Connect to the output connection specification with open mode.

Parameters
outputConnectionSpec[IN] the name of the output connection specification as string.
openMode[IN] the open mode of the file as string.

Definition at line 88 of file AthenaPoolSharedIOCnvSvc.cxx.

89 {
90 return(connectOutput(outputConnectionSpec));
91}

◆ createAddress() [1/2]

StatusCode AthenaPoolSharedIOCnvSvc::createAddress ( long svcType,
const CLID & clid,
const std::string & refAddress,
IOpaqueAddress *& refpAddress )
overridevirtual

Create address from string form.

Parameters
svcType[IN] service type of the address.
clid[IN] class id for the address.
refAddress[IN] string form to be converted.
refpAddress[OUT] converted address.

Definition at line 659 of file AthenaPoolSharedIOCnvSvc.cxx.

662 {
663 return AthenaPoolCnvSvc::createAddress(svcType, clid, refAddress, refpAddress);
664}
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.

◆ createAddress() [2/2]

StatusCode AthenaPoolSharedIOCnvSvc::createAddress ( long svcType,
const CLID & clid,
const std::string * par,
const unsigned long * ip,
IOpaqueAddress *& refpAddress )
override

Create a Generic address using explicit arguments to identify a single object.

Parameters
svcType[IN] service type of the address.
clid[IN] class id for the address.
par[IN] string containing the database name.
ip[IN] object identifier.
refpAddress[OUT] converted address.

Definition at line 616 of file AthenaPoolSharedIOCnvSvc.cxx.

620 {
621 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
623 }
624 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) {
625 Token addressToken;
626 addressToken.setDb(par[0].substr(4));
627 addressToken.setCont(par[1]);
628 addressToken.setOid(Token::OID_t(ip[0], ip[1]));
629 ATH_CHECK(m_inputStreamingTool->lockObject(addressToken.toString().c_str()));
630 void* buffer = nullptr;
631 std::size_t nbytes = 0;
632 StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes);
633 while (sc.isRecoverable()) {
634 // sleep
635 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
636 }
637 if (!sc.isSuccess()) {
638 ATH_MSG_WARNING("Failed to get Address Token: " << addressToken.toString());
639 return(StatusCode::FAILURE);
640 }
641 auto token = std::make_unique<Token>();
642 token->fromString(static_cast<const char*>(buffer)); buffer = nullptr;
643 if (token->classID() == Guid::null()) {
644 token.reset();
645 }
646 m_inputStreamingTool->getObject(&buffer, nbytes).ignore();
647 if (token) {
648 refpAddress = new TokenAddress(pool::POOL_StorageType.type(), clid, "", par[1], IPoolSvc::kInputStream, std::move(token));
649 return(StatusCode::SUCCESS);
650 }
651 else {
652 return(StatusCode::RECOVERABLE);
653 }
654 } else {
655 return AthenaPoolCnvSvc::createAddress(svcType, clid, par, ip, refpAddress);
656 }
657}
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
@ kInputStream
Definition IPoolSvc.h:40
Token & setCont(const std::string &cnt)
Set container name.
Definition Token.h:71
Token & setDb(const Guid &db)
Set database name.
Definition Token.h:66
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:134

◆ decodeOutputSpec()

StatusCode AthenaPoolSharedIOCnvSvc::decodeOutputSpec ( std::string & connectionSpec,
int & outputTech ) const
overridevirtual

Extract/deduce the DB technology from the connection string/file specification.

Definition at line 666 of file AthenaPoolSharedIOCnvSvc.cxx.

666 {
667 auto pos = fileSpec.find("?pmerge=");
668 std::string suffix;
669 // Remove trailing TMemFile for decoding
670 if (pos != std::string::npos) {
671 suffix = fileSpec.substr(pos);
672 fileSpec.erase(pos);
673 }
674 StatusCode sc = AthenaPoolCnvSvc::decodeOutputSpec(fileSpec, outputTech);
675 // Append back the suffix
676 if (!suffix.empty()) {
677 fileSpec += suffix;
678 }
679 return sc;
680}
virtual StatusCode decodeOutputSpec(std::string &connectionSpec, int &outputTech) const override
Extract/deduce the DB technology from the connection string/file specification.

◆ disconnectOutput()

StatusCode AthenaPoolSharedIOCnvSvc::disconnectOutput ( const std::string & outputConnectionSpec)
overridevirtual

Disconnect to the output connection.

Definition at line 411 of file AthenaPoolSharedIOCnvSvc.cxx.

411 {
412 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
413 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
414 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
415 return(StatusCode::SUCCESS);
416 }
417 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
419 m_streamServerActive = false;
420 ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server.");
421 return(StatusCode::SUCCESS);
422 } else {
423 m_streamServerActive = false;
424 }
425 ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server.");
426 }
428 outputConnection += m_streamPortString.value();
429 }
430 return AthenaPoolCnvSvc::disconnectOutput(outputConnectionSpec + m_streamPortString.value());
431}
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.

◆ finalize()

StatusCode AthenaPoolSharedIOCnvSvc::finalize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 66 of file AthenaPoolSharedIOCnvSvc.cxx.

66 {
67 // Release AthenaSerializeSvc
68 if (!m_serializeSvc.empty()) {
69 if (!m_serializeSvc.release().isSuccess()) {
70 ATH_MSG_WARNING("Cannot release AthenaSerializeSvc.");
71 }
72 }
73 // Release OutputStreamingTool (if configured)
74 if (!m_outputStreamingTool.empty()) {
75 if (!m_outputStreamingTool.release().isSuccess()) {
76 ATH_MSG_WARNING("Cannot release Output AthenaIPCTool.");
77 }
78 }
79 // Release InputStreamingTool (if configured)
80 if (!m_inputStreamingTool.empty()) {
81 if (!m_inputStreamingTool.release().isSuccess()) {
82 ATH_MSG_WARNING("Cannot release Input AthenaIPCTool.");
83 }
84 }
85 return this->AthenaPoolCnvSvc::finalize();
86}
virtual StatusCode finalize() override
Required of all Gaudi Services.
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc

◆ handle()

void AthenaPoolSharedIOCnvSvc::handle ( const Incident & incident)
overridevirtual

Implementation of IIncidentListener: Handle for EndEvent incidence.

Definition at line 834 of file AthenaPoolSharedIOCnvSvc.cxx.

834 {
835 if (incident.type() == "StoreCleared" && m_outputStreamingTool->isClient() && !m_parallelCompression) {
836 m_outputStreamingTool->lockObject("release").ignore();
837 }
838}

◆ initialize()

StatusCode AthenaPoolSharedIOCnvSvc::initialize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 37 of file AthenaPoolSharedIOCnvSvc.cxx.

37 {
38 // Retrieve InputStreamingTool (if configured)
39 if (!m_inputStreamingTool.empty()) {
41 }
42 // Retrieve OutputStreamingTool (if configured)
43 if (!m_outputStreamingTool.empty()) {
45 if (m_makeStreamingToolClient.value() == -1) {
46 // Initialize AthenaRootSharedWriter
47 ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name());
48 ATH_CHECK(arswsvc.retrieve());
49 }
50 // Put PoolSvc into share mode to avoid duplicating catalog.
51 getPoolSvc()->setShareMode(true);
52 }
53 if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) {
54 // Retrieve AthenaSerializeSvc
55 ATH_CHECK(m_serializeSvc.retrieve());
56 }
57 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
58 long int pri = 1000;
59 if (!m_outputStreamingTool.empty()) {
60 incSvc->addListener(this, "StoreCleared", pri);
61 ATH_MSG_DEBUG("Subscribed to StoreCleared");
62 }
63 return this->AthenaPoolCnvSvc::initialize();
64}
virtual StatusCode initialize() override
Required of all Gaudi Services.

◆ makeClient()

StatusCode AthenaPoolSharedIOCnvSvc::makeClient ( int num)
overridevirtual

Make this a client.

Definition at line 714 of file AthenaPoolSharedIOCnvSvc.cxx.

714 {
715 if (!m_outputStreamingTool.empty()) {
716 ATH_MSG_DEBUG("makeClient: " << m_outputStreamingTool << " = " << num);
717 std::string streamPortSuffix;
718 if (m_outputStreamingTool->makeClient(num, streamPortSuffix).isFailure()) {
719 ATH_MSG_ERROR("makeClient: " << m_outputStreamingTool << " failed");
720 return(StatusCode::FAILURE);
721 } else if (m_streamPortString.value().find("localhost:0") != std::string::npos) {
722 // We don't seem to use a dedicated port per stream so doing this for the first client is probably OK
723 ATH_MSG_DEBUG("makeClient: Setting conversion service port suffix to " << streamPortSuffix);
724 m_streamPortString.setValue(streamPortSuffix);
725 }
726 }
727 if (m_inputStreamingTool.empty()) {
728 return(StatusCode::SUCCESS);
729 }
730 ATH_MSG_DEBUG("makeClient: " << m_inputStreamingTool << " = " << num);
731 std::string dummyStr;
732 return(m_inputStreamingTool->makeClient(num, dummyStr));
733}

◆ makeServer()

StatusCode AthenaPoolSharedIOCnvSvc::makeServer ( int num)
overridevirtual

Make this a server.

Definition at line 688 of file AthenaPoolSharedIOCnvSvc.cxx.

688 {
689 if (num < 0) {
690 num = -num;
692 num = num % 1024;
693 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer()) {
694 ATH_MSG_DEBUG(std::format("makeServer: {} = {}", m_outputStreamingTool.name(), num));
695 ATH_MSG_DEBUG(std::format("makeServer: Calling shared memory tool with port suffix {}", m_streamPortString.value()));
696 const std::string streamPortSuffix = m_streamPortString.value();
697 if (m_outputStreamingTool->makeServer(num, streamPortSuffix).isFailure()) {
698 ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed");
699 return(StatusCode::FAILURE);
700 }
701 // Disable PersistencySvc per output file mode, for SharedWriter Server
702 m_persSvcPerOutput.setValue(false);
703 return(StatusCode::SUCCESS);
704 }
705 return(StatusCode::RECOVERABLE);
706 }
707 if (m_inputStreamingTool.empty()) {
708 return(StatusCode::RECOVERABLE);
709 }
710 ATH_MSG_DEBUG("makeServer: " << m_inputStreamingTool << " = " << num);
711 return(m_inputStreamingTool->makeServer(num, ""));
712}

◆ readData()

StatusCode AthenaPoolSharedIOCnvSvc::readData ( )
overridevirtual

Read the next data object.

Definition at line 735 of file AthenaPoolSharedIOCnvSvc.cxx.

735 {
736 if (m_inputStreamingTool.empty()) {
737 return(StatusCode::FAILURE);
738 }
739 const char* tokenStr = nullptr;
740 int num = -1;
741 StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
742 if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
743 ATH_MSG_DEBUG("readData: " << tokenStr << ", for client: " << num);
744 } else {
745 return(sc);
746 }
747 // Read object instance via POOL/ROOT
748 void* instance = nullptr;
749 Token token;
750 token.fromString(tokenStr); tokenStr = nullptr;
751 if (token.classID() != Guid::null()) {
752 std::string objName = "ALL";
753 if (useDetailChronoStat()) {
754 objName = token.classID().toString();
755 }
756 // StopWatch listens from here until the end of this current scope
757 PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, this->m_chronoMap);
758 this->setObjPtr(instance, &token);
759 // Serialize object via ROOT
761 void* buffer = nullptr;
762 std::size_t nbytes = 0;
763 buffer = m_serializeSvc->serialize(instance, cltype, nbytes);
764 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
765 while (sc.isRecoverable()) {
766 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
767 }
768 delete [] static_cast<char*>(buffer); buffer = nullptr;
769 if (!sc.isSuccess()) {
770 ATH_MSG_ERROR("Could not share object for: " << token.toString());
771 return(StatusCode::FAILURE);
772 }
773 AuxDiscoverySvc auxDiscover;
774 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) {
775 ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString());
776 return(StatusCode::FAILURE);
777 }
778 cltype.Destruct(instance); instance = nullptr;
779 if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
780 ATH_MSG_ERROR("Could not share object for: " << token.toString());
781 return(StatusCode::FAILURE);
782 }
783 } else if (token.dbID() != Guid::null()) {
784 std::string returnToken;
785 const Token* metadataToken = getPoolSvc()->getToken("FID:" + token.dbID().toString(), token.contID(), token.oid().first);
786 if (metadataToken != nullptr) {
787 returnToken = metadataToken->toString();
788 } else {
789 returnToken = token.toString();
790 }
791 delete metadataToken; metadataToken = nullptr;
792 // Share token
793 sc = m_inputStreamingTool->putObject(returnToken.c_str(), returnToken.size() + 1, num);
794 if (!sc.isSuccess() || !m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
795 ATH_MSG_ERROR("Could not share token for: " << token.toString());
796 return(StatusCode::FAILURE);
797 }
798 } else {
799 return(StatusCode::RECOVERABLE);
800 }
801 return(StatusCode::SUCCESS);
802}
std::map< std::string, double > instance
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
const std::string & contID() const
Access container identifier.
Definition Token.h:69
const Guid & classID() const
Access database identifier.
Definition Token.h:73
const OID_t & oid() const
Access object identifier.
Definition Token.h:81
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition Token.cxx:147
const Guid & dbID() const
Access database identifier.
Definition Token.h:64
static const TypeH forGuid(const Guid &info)
Access classes by Guid.

◆ registerForWrite()

Token * AthenaPoolSharedIOCnvSvc::registerForWrite ( Placement * placement,
const void * obj,
const RootType & classDesc )
overridevirtual
Returns
a string token to a Data Object written to Pool
Parameters
placement[IN] pointer to the placement hint
obj[IN] pointer to the Data Object to be written to Pool
classDesc[IN] pointer to the Seal class description for the Data Object.

Definition at line 434 of file AthenaPoolSharedIOCnvSvc.cxx.

434 {
435 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
436 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
437 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
438 return(nullptr);
439 }
440 }
441 Token* token = nullptr;
442 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
443 && (!m_parallelCompression || placement->containerName().compare(0, m_metadataContainerProp.value().size(), m_metadataContainerProp.value()) == 0)) {
444 // Lock object
445 std::string placementStr = placement->toString();
446 placementStr += "[PNAME=";
447 placementStr += classDesc.Name();
448 placementStr += ']';
449 ATH_MSG_VERBOSE("Requesting write object for: " << placementStr);
450 StatusCode sc = m_outputStreamingTool->lockObject(placementStr.c_str());
451 while (sc.isRecoverable()) {
452 //usleep(100);
453 sc = m_outputStreamingTool->lockObject(placementStr.c_str());
454 }
455 if (!sc.isSuccess()) {
456 ATH_MSG_ERROR("Failed to lock Data for " << placementStr);
457 return(nullptr);
458 }
459 // Serialize object via ROOT
460 const void* buffer = nullptr;
461 std::size_t nbytes = 0;
462 bool own = true;
463 if (classDesc.Name() == "Token") {
464 nbytes = strlen(static_cast<const char*>(obj)) + 1;
465 buffer = obj;
466 own = false;
467 } else if (classDesc.IsFundamental()) {
468 nbytes = classDesc.SizeOf();
469 buffer = obj;
470 own = false;
471 } else {
472 buffer = m_serializeSvc->serialize(obj, classDesc, nbytes);
473 }
474 // Share object
475 sc = m_outputStreamingTool->putObject(buffer, nbytes);
476 while (sc.isRecoverable()) {
477 //usleep(100);
478 sc = m_outputStreamingTool->putObject(buffer, nbytes);
479 }
480 if (own) { delete [] static_cast<const char*>(buffer); }
481 buffer = nullptr;
482 if (!sc.isSuccess()) {
483 ATH_MSG_ERROR("Could not share object for: " << placementStr);
484 m_outputStreamingTool->putObject(nullptr, 0).ignore();
485 return(nullptr);
486 }
487 AuxDiscoverySvc auxDiscover;
488 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) {
489 ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr);
490 m_outputStreamingTool->putObject(nullptr, 0).ignore();
491 return(nullptr);
492 }
493 if (!m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) {
494 ATH_MSG_ERROR("Failed to put Data for " << placementStr);
495 return(nullptr);
496 }
497 // Get Token back from Server
498 const char* tokenStr = nullptr;
499 int num = -1;
500 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
501 while (sc.isRecoverable()) {
502 //usleep(100);
503 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
504 }
505 if (!sc.isSuccess()) {
506 ATH_MSG_ERROR("Failed to get Token");
507 return(nullptr);
508 }
509 if (!strcmp(tokenStr, "ABORT")) {
510 ATH_MSG_ERROR("Writer requested ABORT");
511 // tell the server we are leaving
512 m_outputStreamingTool->stop().ignore();
513 return nullptr;
514 }
515 Token* tempToken = new Token();
516 tempToken->fromString(tokenStr); tokenStr = nullptr;
517 tempToken->setClassID(pool::DbReflex::guid(classDesc));
518 token = tempToken; tempToken = nullptr;
519// Client Write Request
520 } else {
521 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
522 ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString());
523 Token* tempToken = new Token();
524 tempToken->setClassID(pool::DbReflex::guid(classDesc));
525 token = tempToken; tempToken = nullptr;
526 } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_streamServerActive) {
527 ATH_MSG_DEBUG("Requested write object for: " << placement->toString());
528 token = getPoolSvc()->registerForWrite(placement, obj, classDesc);
529 } else {
531 placement->setFileName(placement->fileName() + m_streamPortString.value());
532 }
533 token = AthenaPoolCnvSvc::registerForWrite(placement, obj, classDesc);
534 }
535 }
536 return(token);
537}
#define ATH_MSG_VERBOSE(x)
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
const std::string & containerName() const
Access container name.
Definition Placement.h:32
Placement & setFileName(const std::string &fileName)
Set file name.
Definition Placement.h:30
const std::string toString() const
Retrieve the string representation of the placement.
Definition Placement.cxx:15
const std::string & fileName() const
Access file name.
Definition Placement.h:28
std::string Name(unsigned int mod=Reflex::SCOPED) const
Definition RootType.cxx:612
size_t SizeOf() const
Definition RootType.cxx:765
Token & setClassID(const Guid &cl_id)
Access database identifier.
Definition Token.h:75
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.

◆ setObjPtr()

void AthenaPoolSharedIOCnvSvc::setObjPtr ( void *& obj,
const Token * token )
overridevirtual
Parameters
obj[OUT] pointer to the Data Object.
token[IN] string token of the Data Object for which a Pool Ref is filled.

Definition at line 539 of file AthenaPoolSharedIOCnvSvc.cxx.

539 {
540 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
541 if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
542 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
543 }
544 }
545 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isServer()) {
546 if (token->dbID() == Guid::null()) {
547 int num = token->oid().first;
548 // Get object from SHM
549 void* buffer = nullptr;
550 std::size_t nbytes = 0;
551 StatusCode sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
552 while (sc.isRecoverable()) {
553 //usleep(100);
554 sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
555 }
556 if (!sc.isSuccess()) {
557 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
558 obj = nullptr;
559 } else {
560 ATH_MSG_DEBUG("Server deserializing " << token->toString());
561 if (token->classID() != Guid::null()) {
562 // Deserialize object
563 RootType cltype(pool::DbReflex::forGuid(token->classID()));
564 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
565 } else {
566 // Deserialize object
567 std::string className = token->auxString();
568 className = className.substr(className.find("[PNAME="));
569 className = className.substr(7, className.find(']') - 7);
570 RootType cltype(RootType::ByNameNoQuiet(className));
571 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
572 }
573 AuxDiscoverySvc auxDiscover;
574 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, num).isSuccess()) {
575 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
576 obj = nullptr;
577 }
578 }
579 }
580 }
581 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient() && (m_streamingTechnology.value() < 0 || token->technology() == m_streamingTechnology.value())) {
582 ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
583 if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
584 ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
585 obj = nullptr;
586 } else {
587 void* buffer = nullptr;
588 std::size_t nbytes = 0;
589 StatusCode sc = StatusCode::FAILURE;
590 // StopWatch listens from here until the end of this current scope
591 {
592 PMonUtils::BasicStopWatch stopWatch("gObj_ALL", this->m_chronoMap);
593 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
594 while (sc.isRecoverable()) {
595 // sleep
596 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
597 }
598 }
599 if (!sc.isSuccess()) {
600 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
601 obj = nullptr;
602 } else {
603 obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr;
604 AuxDiscoverySvc auxDiscover;
605 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_inputStreamingTool.get(), obj).isSuccess()) {
606 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
607 obj = nullptr;
608 }
609 }
610 }
611 } else if (token->dbID() != Guid::null()) {
612 AthenaPoolCnvSvc::setObjPtr(obj, token);
613 }
614}
virtual void setObjPtr(void *&obj, const Token *token) override
Gaudi::Property< int > m_streamingTechnology
Use Streaming for selected technologies only.
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
const std::string & auxString() const
Access auxiliary string.
Definition Token.h:91
int technology() const
Access technology type.
Definition Token.h:77

◆ SvcFactory< AthenaPoolSharedIOCnvSvc >

friend class SvcFactory< AthenaPoolSharedIOCnvSvc >
friend

Definition at line 1 of file AthenaPoolSharedIOCnvSvc.h.

Member Data Documentation

◆ m_fileCommitCounter

std::map<std::string, int> AthenaPoolSharedIOCnvSvc::m_fileCommitCounter
private

Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.

Definition at line 141 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_fileFlushSetting

Gaudi::Property<std::map<std::string, int> > AthenaPoolSharedIOCnvSvc::m_fileFlushSetting {this,"FileFlushSetting",{}}
private

Definition at line 142 of file AthenaPoolSharedIOCnvSvc.h.

142{this,"FileFlushSetting",{}};

◆ m_inputStreamingTool

ToolHandle<IAthenaIPCTool> AthenaPoolSharedIOCnvSvc::m_inputStreamingTool {this,"InputStreamingTool",{}}
private

Definition at line 121 of file AthenaPoolSharedIOCnvSvc.h.

121{this,"InputStreamingTool",{}};

◆ m_makeStreamingToolClient

Gaudi::Property<int> AthenaPoolSharedIOCnvSvc::m_makeStreamingToolClient {this,"MakeStreamingToolClient",0}
private

Make this instance a Streaming Client during first connect/write automatically.

Definition at line 133 of file AthenaPoolSharedIOCnvSvc.h.

133{this,"MakeStreamingToolClient",0};

◆ m_metadataClient

int AthenaPoolSharedIOCnvSvc::m_metadataClient =0
private

Definition at line 124 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_metadataContainerProp

Gaudi::Property<std::string> AthenaPoolSharedIOCnvSvc::m_metadataContainerProp {this,"OutputMetadataContainer","MetaData"}
private

For SharedWriter: To use MetadataSvc to merge data placed in a certain container.

Definition at line 129 of file AthenaPoolSharedIOCnvSvc.h.

129{this,"OutputMetadataContainer","MetaData"};

◆ m_metadataContainersAug

Gaudi::Property<std::vector<std::string> > AthenaPoolSharedIOCnvSvc::m_metadataContainersAug {this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"}
private

Definition at line 130 of file AthenaPoolSharedIOCnvSvc.h.

130{this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"};

◆ m_outputStreamingTool

ToolHandle<IAthenaIPCTool> AthenaPoolSharedIOCnvSvc::m_outputStreamingTool {this,"OutputStreamingTool",{}}
private

Definition at line 122 of file AthenaPoolSharedIOCnvSvc.h.

122{this,"OutputStreamingTool",{}};

◆ m_parallelCompression

Gaudi::Property<bool> AthenaPoolSharedIOCnvSvc::m_parallelCompression {this,"ParallelCompression",true}
private

Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.

Definition at line 137 of file AthenaPoolSharedIOCnvSvc.h.

137{this,"ParallelCompression",true};

◆ m_serializeSvc

ServiceHandle<IAthenaSerializeSvc> AthenaPoolSharedIOCnvSvc::m_serializeSvc {this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"}
private

Definition at line 120 of file AthenaPoolSharedIOCnvSvc.h.

120{this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"};

◆ m_streamingTechnology

Gaudi::Property<int> AthenaPoolSharedIOCnvSvc::m_streamingTechnology {this,"StreamingTechnology",-1}
private

Use Streaming for selected technologies only.

Definition at line 135 of file AthenaPoolSharedIOCnvSvc.h.

135{this,"StreamingTechnology",-1};

◆ m_streamPortString

Gaudi::Property<std::string> AthenaPoolSharedIOCnvSvc::m_streamPortString {this,"StreamPortString","?pmerge=localhost:0"}
private

Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".

Definition at line 139 of file AthenaPoolSharedIOCnvSvc.h.

139{this,"StreamPortString","?pmerge=localhost:0"};

◆ m_streamServerActive

bool AthenaPoolSharedIOCnvSvc::m_streamServerActive =false
private

Definition at line 123 of file AthenaPoolSharedIOCnvSvc.h.


The documentation for this class was generated from the following files: