ATLAS Offline Software
Public Member Functions | Private Attributes | Friends | List of all members
AthenaPoolSharedIOCnvSvc Class Reference

This class provides the interface between Athena and PoolSvc. More...

#include <AthenaPoolSharedIOCnvSvc.h>

Inheritance diagram for AthenaPoolSharedIOCnvSvc:
Collaboration diagram for AthenaPoolSharedIOCnvSvc:

Public Member Functions

virtual StatusCode initialize () override
 Required of all Gaudi Services. More...
 
virtual StatusCode finalize () override
 Required of all Gaudi Services. More...
 
virtual StatusCode connectOutput (const std::string &outputConnectionSpec, const std::string &openMode) override
 Implementation of IConversionSvc: Connect to the output connection specification with open mode. More...
 
virtual StatusCode connectOutput (const std::string &outputConnectionSpec) override
 Implementation of IConversionSvc: Connect to the output connection specification with open mode. More...
 
virtual StatusCode commitOutput (const std::string &outputConnectionSpec, bool doCommit) override
 Implementation of IConversionSvc: Commit pending output. More...
 
virtual StatusCode disconnectOutput (const std::string &outputConnectionSpec) override
 Disconnect to the output connection. More...
 
virtual TokenregisterForWrite (Placement *placement, const void *obj, const RootType &classDesc) override
 
virtual void setObjPtr (void *&obj, const Token *token) override
 
StatusCode createAddress (long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
 Create a Generic address using explicit arguments to identify a single object. More...
 
virtual StatusCode createAddress (long svcType, const CLID &clid, const std::string &refAddress, IOpaqueAddress *&refpAddress) override
 Create address from string form. More...
 
virtual StatusCode makeServer (int num) override
 Make this a server. More...
 
virtual StatusCode makeClient (int num) override
 Make this a client. More...
 
virtual StatusCode readData () override
 Read the next data object. More...
 
virtual StatusCode commitCatalog () override
 Commit Catalog. More...
 
StatusCode abortSharedWrClients (int client_n)
 Send abort to SharedWriter clients if the server quits on error. More...
 
virtual void handle (const Incident &incident) override
 Implementation of IIncidentListener: Handle for EndEvent incidence. More...
 
 AthenaPoolSharedIOCnvSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor. More...
 
virtual ~AthenaPoolSharedIOCnvSvc ()=default
 Destructor. More...
 

Private Attributes

ServiceHandle< IAthenaSerializeSvcm_serializeSvc {this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"}
 
ToolHandle< IAthenaIPCToolm_inputStreamingTool {this,"InputStreamingTool",{}}
 
ToolHandle< IAthenaIPCToolm_outputStreamingTool {this,"OutputStreamingTool",{}}
 
bool m_streamServerActive =false
 
int m_metadataClient =0
 
Gaudi::Property< std::string > m_metadataContainerProp {this,"OutputMetadataContainer","MetaData"}
 For SharedWriter: To use MetadataSvc to merge data placed in a certain container. More...
 
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug {this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"}
 
Gaudi::Property< int > m_makeStreamingToolClient {this,"MakeStreamingToolClient",0}
 Make this instance a Streaming Client during first connect/write automatically. More...
 
Gaudi::Property< int > m_streamingTechnology {this,"StreamingTechnology",-1}
 Use Streaming for selected technologies only. More...
 
Gaudi::Property< bool > m_parallelCompression {this,"ParallelCompression",true}
 Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile. More...
 
Gaudi::Property< std::string > m_streamPortString {this,"StreamPortString","?pmerge=localhost:0"}
 Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>". More...
 

Friends

class SvcFactory< AthenaPoolSharedIOCnvSvc >
 

Detailed Description

This class provides the interface between Athena and PoolSvc.

Definition at line 25 of file AthenaPoolSharedIOCnvSvc.h.

Constructor & Destructor Documentation

◆ AthenaPoolSharedIOCnvSvc()

AthenaPoolSharedIOCnvSvc::AthenaPoolSharedIOCnvSvc ( const std::string &  name,
ISvcLocator *  pSvcLocator 
)

Standard Service Constructor.

Definition at line 809 of file AthenaPoolSharedIOCnvSvc.cxx.

809  :
810  base_class(name, pSvcLocator) {
811 }

◆ ~AthenaPoolSharedIOCnvSvc()

virtual AthenaPoolSharedIOCnvSvc::~AthenaPoolSharedIOCnvSvc ( )
virtualdefault

Destructor.

Member Function Documentation

◆ abortSharedWrClients()

StatusCode AthenaPoolSharedIOCnvSvc::abortSharedWrClients ( int  client_n)

Send abort to SharedWriter clients if the server quits on error.

Parameters
client_n[IN] number of the current client, -1 if no current

Definition at line 783 of file AthenaPoolSharedIOCnvSvc.cxx.

784 {
785  ATH_MSG_ERROR("Sending ABORT to clients");
786  // the master process will kill this process once workers abort
787  // but it could be a time-limited loop
788  StatusCode sc = StatusCode::SUCCESS;
789  while (sc.isSuccess()) {
790  if (client_n >= 0) {
791  sc = m_outputStreamingTool->lockObject("ABORT", client_n);
792  }
793  const char* dummy;
794  sc = m_outputStreamingTool->clearObject(&dummy, client_n);
795  while (sc.isRecoverable()) {
796  sc = m_outputStreamingTool->clearObject(&dummy, client_n);
797  }
798  }
799  return StatusCode::FAILURE;
800 }

◆ commitCatalog()

StatusCode AthenaPoolSharedIOCnvSvc::commitCatalog ( )
overridevirtual

Commit Catalog.

Definition at line 774 of file AthenaPoolSharedIOCnvSvc.cxx.

774  {
775  pool::IFileCatalog* catalog ATLAS_THREAD_SAFE = // This is on the SharedWriter, after mother process finishes events
776  const_cast<pool::IFileCatalog*>(getPoolSvc()->catalog());
777  catalog->commit();
778  catalog->start();
779  return(StatusCode::SUCCESS);
780 }

◆ commitOutput()

StatusCode AthenaPoolSharedIOCnvSvc::commitOutput ( const std::string &  outputConnectionSpec,
bool  doCommit 
)
overridevirtual

Implementation of IConversionSvc: Commit pending output.

Parameters
doCommit[IN] boolean to force full commit

Definition at line 128 of file AthenaPoolSharedIOCnvSvc.cxx.

128  {
129  // This is called after all DataObjects are converted.
130  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
131  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
132  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
133  m_outputStreamingTool->lockObject("wait").ignore();
134  if (!this->cleanUp(outputConnection).isSuccess()) {
135  ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
136  return(StatusCode::FAILURE);
137  }
138  return(StatusCode::SUCCESS);
139  }
140  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
141  ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server.");
142  return(StatusCode::SUCCESS);
143  }
144  std::map<void*, RootType> commitCache;
145  std::string fileName;
146  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && m_outputStreamingTool->isServer() && m_streamServerActive) {
147  // Clear object to get Placements for all objects in a Stream
148  const char* placementStr = nullptr;
149  int num = -1;
150  StatusCode sc = m_outputStreamingTool->clearObject(&placementStr, num);
151  if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
152  const char * matchedChars = strstr(placementStr, "[FILE=");
153  if (!matchedChars){
154  ATH_MSG_ERROR(std::format("No matching filename in {}", placementStr));
155  return abortSharedWrClients(num);
156  }
157  fileName = matchedChars;
158  fileName = fileName.substr(6, fileName.find(']') - 6);
159  if (!this->connectOutput(fileName).isSuccess()) {
160  ATH_MSG_ERROR(std::format("Failed to connectOutput for {}", fileName));
161  return abortSharedWrClients(num);
162  }
163  IConverter* DHcnv = converter(ClassID_traits<DataHeader>::ID());
164  bool dataHeaderSeen = false;
165  std::string dataHeaderID;
166  while (num > 0) {
167  std::string objName = "ALL";
168  if (useDetailChronoStat()) {
169  objName = placementStr; //FIXME, better descriptor
170  }
171  // StopWatch listens from here until the end of this current scope
172  {
173  PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
174  std::string_view pStr = placementStr;
175  std::string::size_type cpos = pStr.find ("[CONT=");
176  if (cpos == std::string::npos) {
177  ATH_MSG_ERROR(std::format("No CONT field in placement string: {}", pStr));
178  return StatusCode::FAILURE;
179  }
180  std::string tokenStr (pStr.substr(0, cpos));
181  std::string contName (pStr.substr(cpos, std::string::npos));
182  std::string::size_type cl1 = contName.find(']');
183  if (cl1 == std::string::npos) {
184  ATH_MSG_ERROR(std::format("Missing close bracket after CONT field in placement string: {}", pStr));
185  return StatusCode::FAILURE;
186  }
187  tokenStr.append(contName, cl1 + 1);
188  contName = contName.substr(6, cl1 - 6);
189 
190  std::string::size_type ppos = pStr.find ("[PNAME=");
191  if (ppos == std::string::npos) {
192  ATH_MSG_ERROR(std::format("No PNAME field in placement string: {}", pStr));
193  return StatusCode::FAILURE;
194  }
195  std::string className (pStr.substr(ppos, std::string::npos));
196  std::string::size_type cl2 = className.find(']');
197  if (cl2 == std::string::npos) {
198  ATH_MSG_ERROR(std::format("Missing close bracket after PNAME field in placement string: {}", pStr));
199  return StatusCode::FAILURE;
200  }
201  className = className.substr(7, cl2 - 7);
203  void* obj = nullptr;
204  const std::string numStr = std::to_string(num);
205  std::string::size_type len = m_metadataContainerProp.value().size();
206  bool foundContainer = false;
207  std::size_t opPos = contName.find('(');
208  if (contName.compare(0, opPos, m_metadataContainerProp.value()) == 0) {
209  foundContainer = true;
210  } else {
211  for (const auto& item: m_metadataContainersAug.value()) {
212  if (contName.compare(0, opPos, item) == 0){
213  foundContainer = true;
214  len = item.size();
215  break;
216  }
217  }
218  }
219  if (len > 0 && foundContainer && contName[len] == '(' ) {
220  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
221  // For Metadata, before moving to next client, fire file incidents
222  if (m_metadataClient != num) {
223  if (m_metadataClient != 0) {
224  std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
225  FileIncident beginInputIncident(name(), "BeginInputMemFile", memName);
226  incSvc->fireIncident(beginInputIncident);
227  FileIncident endInputIncident(name(), "EndInputMemFile", std::move(memName));
228  incSvc->fireIncident(endInputIncident);
229  }
231  }
232  // Retrieve MetaDataSvc
233  ServiceHandle<IAthMetaDataSvc> metadataSvc("MetaDataSvc", name());
234  ATH_CHECK(metadataSvc.retrieve());
235  sc = metadataSvc->shmProxy(std::format("{}[NUM={}]", pStr, numStr));
236  if (sc.isRecoverable()) {
237  ATH_MSG_WARNING("MetaDataSvc::shmProxy() no proxy added.");
238  } else if (sc.isFailure()) {
239  ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!");
240  return abortSharedWrClients(num);
241  }
242  } else {
243  Token readToken;
244  readToken.setOid(Token::OID_t(num, 0));
245  readToken.setAuxString("[PNAME=" + className + "]");
246  this->setObjPtr(obj, &readToken); // Pull/read Object out of shared memory
247  if (len == 0 || contName.compare(0, len, m_metadataContainerProp.value()) != 0) {
248  // Write object
249  if( m_oneDataHeaderForm.value() ) {
250  auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
251  if( className == "DataHeaderForm_p6" ) {
252  // Pass DHForms to the converter for later writing in the correct order - do not write it now
253  GenericAddress address(pool::POOL_StorageType.type(), ClassID_traits<DataHeader>::ID(),
254  "", placementWithSwn());
255  DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
256  tokenStr = "";
257  } else {
258  Placement placement;
259  placement.fromString(placementStr);
260  std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
261  if (token == nullptr) {
262  ATH_MSG_ERROR("Failed to write Data for: " << className);
263  return abortSharedWrClients(num);
264  }
265  tokenStr = token->toString();
266  }
267  if( className == "DataHeader_p6" ) {
268  // Found DataHeader - call the converter to update DHForm Ref
269  GenericAddress address(pool::POOL_StorageType.type(), ClassID_traits<DataHeader>::ID(),
270  tokenStr, placementWithSwn());
271  if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
272  ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
273  return abortSharedWrClients(num);
274  }
275  } else
276  if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
277  commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
278  }
279  placementStr = nullptr;
280  } else {
281  // Multiple shared DataHeaderForms
282  Placement placement;
283  placement.fromString(placementStr); placementStr = nullptr;
284  std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
285  if (token == nullptr) {
286  ATH_MSG_ERROR("Failed to write Data for: " << className);
287  return abortSharedWrClients(num);
288  }
289  tokenStr = token->toString();
290  if (className == "DataHeader_p6") {
291  // Found DataHeader
292  GenericAddress address(pool::POOL_StorageType.type(), ClassID_traits<DataHeader>::ID(),
293  tokenStr, placement.auxString());
294  // call DH converter to add the ref to DHForm (stored earlier) and to itself
295  if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
296  ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
297  return abortSharedWrClients(num);
298  }
299  dataHeaderSeen = true;
300  // This dataHeaderID is used in DataHeaderCnv to index the DataHeaderForm cache.
301  // It must be unique per worker per stream so that we have a correct DataHeader(Form) association.
302  // This is achieved by building it as "CONTID/WORKERID/DBID".
303  // CONTID, e.g., POOLContainer(DataHeader), allows us to distinguish data and metadata headers,
304  // WORKERID allows us to distinguish AthenaMP workers,
305  // and DBID allows us to distinguish streams.
306  dataHeaderID = std::format("{}/{}/{}", token->contID(), numStr, token->dbID().toString());
307  } else if (dataHeaderSeen) {
308  dataHeaderSeen = false;
309  // next object after DataHeader - may be a DataHeaderForm
310  // in any case we need to call the DH converter to update the DHForm Ref
311  if (className == "DataHeaderForm_p6") {
312  // Tell DataHeaderCnv that it should use a new DHForm
313  GenericAddress address(pool::POOL_StorageType.type(), ClassID_traits<DataHeader>::ID(),
314  tokenStr, dataHeaderID);
315  if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
316  ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr);
317  return abortSharedWrClients(num);
318  }
319  } else {
320  // Tell DataHeaderCnv that it should use the old DHForm
321  GenericAddress address(0, 0, "", dataHeaderID);
322  if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
323  ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
324  return abortSharedWrClients(num);
325  }
326  }
327  }
328  if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
329  commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
330  }
331  }
332  }
333  }
334  // Send Token back to Client
335  sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
336  while (sc.isRecoverable()) {
337  sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
338  }
339  if (!sc.isSuccess()) {
340  ATH_MSG_ERROR("Failed to lock Data for " << tokenStr);
341  return abortSharedWrClients(-1);
342  }
343  }
344  sc = m_outputStreamingTool->clearObject(&placementStr, num);
345  while (sc.isRecoverable()) {
346  sc = m_outputStreamingTool->clearObject(&placementStr, num);
347  }
348  if (sc.isFailure()) {
349  // no more clients, break the loop and exit
350  num = -1;
351  }
352  }
353  if (dataHeaderSeen) {
354  // DataHeader was the last object, need to tell the converter there is no DHForm coming
355  GenericAddress address(0, 0, "", std::move(dataHeaderID));
356  if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
357  ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
358  return abortSharedWrClients(-1);
359  }
360  }
361  placementStr = nullptr;
362  } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) {
363  return(StatusCode::RECOVERABLE);
364  } else if (sc.isRecoverable() || num == -1) {
365  return(StatusCode::RECOVERABLE);
366  }
367  if (sc.isFailure() || fileName.empty()) {
368  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
369  std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
370  FileIncident beginInputIncident(name(), "BeginInputMemFile", memName);
371  incSvc->fireIncident(beginInputIncident);
372  FileIncident endInputIncident(name(), "EndInputMemFile", memName);
373  incSvc->fireIncident(endInputIncident);
374  if (sc.isFailure()) {
375  ATH_MSG_INFO("All SharedWriter clients stopped - exiting");
376  } else {
377  ATH_MSG_INFO("Failed to get Data for client: " << num);
378  }
379  return(StatusCode::FAILURE);
380  }
381  }
382  if (m_parallelCompression && !fileName.empty()) {
383  ATH_MSG_DEBUG(std::format("commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
384  return(StatusCode::SUCCESS);
385  }
386  if (outputConnection.empty()) {
387  outputConnection = std::move(fileName);
388  } else {
389  outputConnection = outputConnectionSpec;
390  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
391  outputConnection += m_streamPortString.value();
392  }
393  }
394  StatusCode status = AthenaPoolCnvSvc::commitOutput(outputConnection, doCommit);
395  for (auto& [ptr, rootType] : commitCache) {
396  rootType.Destruct(ptr);
397  }
398  return(status);
399 }

◆ connectOutput() [1/2]

StatusCode AthenaPoolSharedIOCnvSvc::connectOutput ( const std::string &  outputConnectionSpec)
overridevirtual

Implementation of IConversionSvc: Connect to the output connection specification with open mode.

Parameters
outputConnectionSpec[IN] the name of the output connection specification as string.

Definition at line 94 of file AthenaPoolSharedIOCnvSvc.cxx.

94  {
95 // This is called before DataObjects are being converted.
96  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
97  if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
98  if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
99  ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
100  return(StatusCode::FAILURE);
101  }
102  }
103  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
104  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
105  return(StatusCode::SUCCESS);
106  }
107  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
108  if (m_parallelCompression && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) {
109  ATH_MSG_DEBUG(std::format("connectOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
110  return(StatusCode::SUCCESS);
111  }
113  ATH_MSG_DEBUG("connectOutput SKIPPED for expired server.");
114  return(StatusCode::SUCCESS);
115  }
116  }
117  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
118  outputConnection += m_streamPortString.value();
119  }
120  std::size_t apend = outputConnectionSpec.find('[');
121  if (apend != std::string::npos) {
122  outputConnection += outputConnectionSpec.substr(apend);
123  }
124  return AthenaPoolCnvSvc::connectOutput(outputConnection);
125 }

◆ connectOutput() [2/2]

StatusCode AthenaPoolSharedIOCnvSvc::connectOutput ( const std::string &  outputConnectionSpec,
const std::string &  openMode 
)
overridevirtual

Implementation of IConversionSvc: Connect to the output connection specification with open mode.

Parameters
outputConnectionSpec[IN] the name of the output connection specification as string.
openMode[IN] the open mode of the file as string.

Definition at line 89 of file AthenaPoolSharedIOCnvSvc.cxx.

90  {
91  return(connectOutput(outputConnectionSpec));
92 }

◆ createAddress() [1/2]

StatusCode AthenaPoolSharedIOCnvSvc::createAddress ( long  svcType,
const CLID clid,
const std::string &  refAddress,
IOpaqueAddress *&  refpAddress 
)
overridevirtual

Create address from string form.

Parameters
svcType[IN] service type of the address.
clid[IN] class id for the address.
refAddress[IN] string form to be converted.
refpAddress[OUT] converted address.

Definition at line 650 of file AthenaPoolSharedIOCnvSvc.cxx.

653  {
654  return AthenaPoolCnvSvc::createAddress(svcType, clid, refAddress, refpAddress);
655 }

◆ createAddress() [2/2]

StatusCode AthenaPoolSharedIOCnvSvc::createAddress ( long  svcType,
const CLID clid,
const std::string *  par,
const unsigned long *  ip,
IOpaqueAddress *&  refpAddress 
)
override

Create a Generic address using explicit arguments to identify a single object.

Parameters
svcType[IN] service type of the address.
clid[IN] class id for the address.
par[IN] string containing the database name.
ip[IN] object identifier.
refpAddress[OUT] converted address.

Definition at line 607 of file AthenaPoolSharedIOCnvSvc.cxx.

611  {
612  if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
614  }
615  if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) {
616  Token addressToken;
617  addressToken.setDb(par[0].substr(4));
618  addressToken.setCont(par[1]);
619  addressToken.setOid(Token::OID_t(ip[0], ip[1]));
620  ATH_CHECK(m_inputStreamingTool->lockObject(addressToken.toString().c_str()));
621  void* buffer = nullptr;
622  std::size_t nbytes = 0;
623  StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes);
624  while (sc.isRecoverable()) {
625  // sleep
626  sc = m_inputStreamingTool->getObject(&buffer, nbytes);
627  }
628  if (!sc.isSuccess()) {
629  ATH_MSG_WARNING("Failed to get Address Token: " << addressToken.toString());
630  return(StatusCode::FAILURE);
631  }
632  auto token = std::make_unique<Token>();
633  token->fromString(static_cast<const char*>(buffer)); buffer = nullptr;
634  if (token->classID() == Guid::null()) {
635  token.reset();
636  }
637  m_inputStreamingTool->getObject(&buffer, nbytes).ignore();
638  if (token) {
639  refpAddress = new TokenAddress(pool::POOL_StorageType.type(), clid, "", par[1], IPoolSvc::kInputStream, std::move(token));
640  return(StatusCode::SUCCESS);
641  }
642  else {
643  return(StatusCode::RECOVERABLE);
644  }
645  } else {
646  return AthenaPoolCnvSvc::createAddress(svcType, clid, par, ip, refpAddress);
647  }
648 }

◆ disconnectOutput()

StatusCode AthenaPoolSharedIOCnvSvc::disconnectOutput ( const std::string &  outputConnectionSpec)
overridevirtual

Disconnect to the output connection.

Definition at line 402 of file AthenaPoolSharedIOCnvSvc.cxx.

402  {
403  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
404  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
405  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
406  return(StatusCode::SUCCESS);
407  }
408  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
409  if (m_streamServerActive) {
410  m_streamServerActive = false;
411  ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server.");
412  return(StatusCode::SUCCESS);
413  } else {
414  m_streamServerActive = false;
415  }
416  ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server.");
417  }
418  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
419  outputConnection += m_streamPortString.value();
420  }
421  return AthenaPoolCnvSvc::disconnectOutput(outputConnectionSpec + m_streamPortString.value());
422 }

◆ finalize()

StatusCode AthenaPoolSharedIOCnvSvc::finalize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 67 of file AthenaPoolSharedIOCnvSvc.cxx.

67  {
68  // Release AthenaSerializeSvc
69  if (!m_serializeSvc.empty()) {
70  if (!m_serializeSvc.release().isSuccess()) {
71  ATH_MSG_WARNING("Cannot release AthenaSerializeSvc.");
72  }
73  }
74  // Release OutputStreamingTool (if configured)
75  if (!m_outputStreamingTool.empty()) {
76  if (!m_outputStreamingTool.release().isSuccess()) {
77  ATH_MSG_WARNING("Cannot release Output AthenaIPCTool.");
78  }
79  }
80  // Release InputStreamingTool (if configured)
81  if (!m_inputStreamingTool.empty()) {
82  if (!m_inputStreamingTool.release().isSuccess()) {
83  ATH_MSG_WARNING("Cannot release Input AthenaIPCTool.");
84  }
85  }
86  return this->AthenaPoolCnvSvc::finalize();
87 }

◆ handle()

void AthenaPoolSharedIOCnvSvc::handle ( const Incident &  incident)
overridevirtual

Implementation of IIncidentListener: Handle for EndEvent incidence.

Definition at line 803 of file AthenaPoolSharedIOCnvSvc.cxx.

803  {
804  if (incident.type() == "StoreCleared" && m_outputStreamingTool->isClient() && !m_parallelCompression) {
805  m_outputStreamingTool->lockObject("release").ignore();
806  }
807 }

◆ initialize()

StatusCode AthenaPoolSharedIOCnvSvc::initialize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 38 of file AthenaPoolSharedIOCnvSvc.cxx.

38  {
39  // Retrieve InputStreamingTool (if configured)
40  if (!m_inputStreamingTool.empty()) {
41  ATH_CHECK(m_inputStreamingTool.retrieve());
42  }
43  // Retrieve OutputStreamingTool (if configured)
44  if (!m_outputStreamingTool.empty()) {
45  ATH_CHECK(m_outputStreamingTool.retrieve());
46  if (m_makeStreamingToolClient.value() == -1) {
47  // Initialize AthenaRootSharedWriter
48  ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name());
49  ATH_CHECK(arswsvc.retrieve());
50  }
51  // Put PoolSvc into share mode to avoid duplicating catalog.
52  getPoolSvc()->setShareMode(true);
53  }
54  if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) {
55  // Retrieve AthenaSerializeSvc
56  ATH_CHECK(m_serializeSvc.retrieve());
57  }
58  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
59  long int pri = 1000;
60  if (!m_outputStreamingTool.empty()) {
61  incSvc->addListener(this, "StoreCleared", pri);
62  ATH_MSG_DEBUG("Subscribed to StoreCleared");
63  }
64  return this->AthenaPoolCnvSvc::initialize();
65 }

◆ makeClient()

StatusCode AthenaPoolSharedIOCnvSvc::makeClient ( int  num)
overridevirtual

Make this a client.

Definition at line 683 of file AthenaPoolSharedIOCnvSvc.cxx.

683  {
684  if (!m_outputStreamingTool.empty()) {
685  ATH_MSG_DEBUG("makeClient: " << m_outputStreamingTool << " = " << num);
686  std::string streamPortSuffix;
687  if (m_outputStreamingTool->makeClient(num, streamPortSuffix).isFailure()) {
688  ATH_MSG_ERROR("makeClient: " << m_outputStreamingTool << " failed");
689  return(StatusCode::FAILURE);
690  } else if (m_streamPortString.value().find("localhost:0") != std::string::npos) {
691  // We don't seem to use a dedicated port per stream so doing this for the first client is probably OK
692  ATH_MSG_DEBUG("makeClient: Setting conversion service port suffix to " << streamPortSuffix);
693  m_streamPortString.setValue(streamPortSuffix);
694  }
695  }
696  if (m_inputStreamingTool.empty()) {
697  return(StatusCode::SUCCESS);
698  }
699  ATH_MSG_DEBUG("makeClient: " << m_inputStreamingTool << " = " << num);
700  std::string dummyStr;
701  return(m_inputStreamingTool->makeClient(num, dummyStr));
702 }

◆ makeServer()

StatusCode AthenaPoolSharedIOCnvSvc::makeServer ( int  num)
overridevirtual

Make this a server.

Definition at line 657 of file AthenaPoolSharedIOCnvSvc.cxx.

657  {
658  if (num < 0) {
659  num = -num;
660  m_streamServerActive = true;
661  num = num % 1024;
662  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer()) {
663  ATH_MSG_DEBUG(std::format("makeServer: {} = {}", m_outputStreamingTool.name(), num));
664  ATH_MSG_DEBUG(std::format("makeServer: Calling shared memory tool with port suffix {}", m_streamPortString.value()));
665  const std::string streamPortSuffix = m_streamPortString.value();
666  if (m_outputStreamingTool->makeServer(num, streamPortSuffix).isFailure()) {
667  ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed");
668  return(StatusCode::FAILURE);
669  }
670  // Disable PersistencySvc per output file mode, for SharedWriter Server
671  m_persSvcPerOutput.setValue(false);
672  return(StatusCode::SUCCESS);
673  }
674  return(StatusCode::RECOVERABLE);
675  }
676  if (m_inputStreamingTool.empty()) {
677  return(StatusCode::RECOVERABLE);
678  }
679  ATH_MSG_DEBUG("makeServer: " << m_inputStreamingTool << " = " << num);
680  return(m_inputStreamingTool->makeServer(num, ""));
681 }

◆ readData()

StatusCode AthenaPoolSharedIOCnvSvc::readData ( )
overridevirtual

Read the next data object.

Definition at line 704 of file AthenaPoolSharedIOCnvSvc.cxx.

704  {
705  if (m_inputStreamingTool.empty()) {
706  return(StatusCode::FAILURE);
707  }
708  const char* tokenStr = nullptr;
709  int num = -1;
710  StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
711  if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
712  ATH_MSG_DEBUG("readData: " << tokenStr << ", for client: " << num);
713  } else {
714  return(sc);
715  }
716  // Read object instance via POOL/ROOT
717  void* instance = nullptr;
718  Token token;
719  token.fromString(tokenStr); tokenStr = nullptr;
720  if (token.classID() != Guid::null()) {
721  std::string objName = "ALL";
722  if (useDetailChronoStat()) {
723  objName = token.classID().toString();
724  }
725  // StopWatch listens from here until the end of this current scope
726  PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, this->m_chronoMap);
727  this->setObjPtr(instance, &token);
728  // Serialize object via ROOT
729  RootType cltype(pool::DbReflex::forGuid(token.classID()));
730  void* buffer = nullptr;
731  std::size_t nbytes = 0;
732  buffer = m_serializeSvc->serialize(instance, cltype, nbytes);
733  sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
734  while (sc.isRecoverable()) {
735  sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
736  }
737  delete [] static_cast<char*>(buffer); buffer = nullptr;
738  if (!sc.isSuccess()) {
739  ATH_MSG_ERROR("Could not share object for: " << token.toString());
740  return(StatusCode::FAILURE);
741  }
742  AuxDiscoverySvc auxDiscover;
743  if (!auxDiscover.sendStore(m_serializeSvc.get(), m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) {
744  ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString());
745  return(StatusCode::FAILURE);
746  }
747  cltype.Destruct(instance); instance = nullptr;
748  if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
749  ATH_MSG_ERROR("Could not share object for: " << token.toString());
750  return(StatusCode::FAILURE);
751  }
752  } else if (token.dbID() != Guid::null()) {
753  std::string returnToken;
754  const Token* metadataToken = getPoolSvc()->getToken("FID:" + token.dbID().toString(), token.contID(), token.oid().first);
755  if (metadataToken != nullptr) {
756  returnToken = metadataToken->toString();
757  } else {
758  returnToken = token.toString();
759  }
760  delete metadataToken; metadataToken = nullptr;
761  // Share token
762  sc = m_inputStreamingTool->putObject(returnToken.c_str(), returnToken.size() + 1, num);
763  if (!sc.isSuccess() || !m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
764  ATH_MSG_ERROR("Could not share token for: " << token.toString());
765  return(StatusCode::FAILURE);
766  }
767  } else {
768  return(StatusCode::RECOVERABLE);
769  }
770  return(StatusCode::SUCCESS);
771 }

◆ registerForWrite()

Token * AthenaPoolSharedIOCnvSvc::registerForWrite ( Placement placement,
const void *  obj,
const RootType classDesc 
)
overridevirtual
Returns
a string token to a Data Object written to Pool
Parameters
placement[IN] pointer to the placement hint
obj[IN] pointer to the Data Object to be written to Pool
classDesc[IN] pointer to the Seal class description for the Data Object.

Definition at line 425 of file AthenaPoolSharedIOCnvSvc.cxx.

425  {
426  if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
427  if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
428  ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
429  return(nullptr);
430  }
431  }
432  Token* token = nullptr;
433  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
434  && (!m_parallelCompression || placement->containerName().compare(0, m_metadataContainerProp.value().size(), m_metadataContainerProp.value()) == 0)) {
435  // Lock object
436  std::string placementStr = placement->toString();
437  placementStr += "[PNAME=";
438  placementStr += classDesc.Name();
439  placementStr += ']';
440  ATH_MSG_VERBOSE("Requesting write object for: " << placementStr);
441  StatusCode sc = m_outputStreamingTool->lockObject(placementStr.c_str());
442  while (sc.isRecoverable()) {
443  //usleep(100);
444  sc = m_outputStreamingTool->lockObject(placementStr.c_str());
445  }
446  if (!sc.isSuccess()) {
447  ATH_MSG_ERROR("Failed to lock Data for " << placementStr);
448  return(nullptr);
449  }
450  // Serialize object via ROOT
451  const void* buffer = nullptr;
452  std::size_t nbytes = 0;
453  bool own = true;
454  if (classDesc.Name() == "Token") {
455  nbytes = strlen(static_cast<const char*>(obj)) + 1;
456  buffer = obj;
457  own = false;
458  } else if (classDesc.IsFundamental()) {
459  nbytes = classDesc.SizeOf();
460  buffer = obj;
461  own = false;
462  } else {
463  buffer = m_serializeSvc->serialize(obj, classDesc, nbytes);
464  }
465  // Share object
466  sc = m_outputStreamingTool->putObject(buffer, nbytes);
467  while (sc.isRecoverable()) {
468  //usleep(100);
469  sc = m_outputStreamingTool->putObject(buffer, nbytes);
470  }
471  if (own) { delete [] static_cast<const char*>(buffer); }
472  buffer = nullptr;
473  if (!sc.isSuccess()) {
474  ATH_MSG_ERROR("Could not share object for: " << placementStr);
475  m_outputStreamingTool->putObject(nullptr, 0).ignore();
476  return(nullptr);
477  }
478  AuxDiscoverySvc auxDiscover;
479  if (!auxDiscover.sendStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) {
480  ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr);
481  m_outputStreamingTool->putObject(nullptr, 0).ignore();
482  return(nullptr);
483  }
484  if (!m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) {
485  ATH_MSG_ERROR("Failed to put Data for " << placementStr);
486  return(nullptr);
487  }
488  // Get Token back from Server
489  const char* tokenStr = nullptr;
490  int num = -1;
491  sc = m_outputStreamingTool->clearObject(&tokenStr, num);
492  while (sc.isRecoverable()) {
493  //usleep(100);
494  sc = m_outputStreamingTool->clearObject(&tokenStr, num);
495  }
496  if (!sc.isSuccess()) {
497  ATH_MSG_ERROR("Failed to get Token");
498  return(nullptr);
499  }
500  if (!strcmp(tokenStr, "ABORT")) {
501  ATH_MSG_ERROR("Writer requested ABORT");
502  // tell the server we are leaving
503  m_outputStreamingTool->stop().ignore();
504  return nullptr;
505  }
506  Token* tempToken = new Token();
507  tempToken->fromString(tokenStr); tokenStr = nullptr;
508  tempToken->setClassID(pool::DbReflex::guid(classDesc));
509  token = tempToken; tempToken = nullptr;
510 // Client Write Request
511  } else {
512  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
513  ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString());
514  Token* tempToken = new Token();
515  tempToken->setClassID(pool::DbReflex::guid(classDesc));
516  token = tempToken; tempToken = nullptr;
517  } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_streamServerActive) {
518  ATH_MSG_DEBUG("Requested write object for: " << placement->toString());
519  token = getPoolSvc()->registerForWrite(placement, obj, classDesc);
520  } else {
521  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
522  placement->setFileName(placement->fileName() + m_streamPortString.value());
523  }
524  token = AthenaPoolCnvSvc::registerForWrite(placement, obj, classDesc);
525  }
526  }
527  return(token);
528 }

◆ setObjPtr()

void AthenaPoolSharedIOCnvSvc::setObjPtr ( void *&  obj,
const Token token 
)
overridevirtual
Parameters
obj[OUT] pointer to the Data Object.
token[IN] string token of the Data Object for which a Pool Ref is filled.

Definition at line 530 of file AthenaPoolSharedIOCnvSvc.cxx.

530  {
531  if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
532  if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
533  ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
534  }
535  }
536  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isServer()) {
537  if (token->dbID() == Guid::null()) {
538  int num = token->oid().first;
539  // Get object from SHM
540  void* buffer = nullptr;
541  std::size_t nbytes = 0;
542  StatusCode sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
543  while (sc.isRecoverable()) {
544  //usleep(100);
545  sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
546  }
547  if (!sc.isSuccess()) {
548  ATH_MSG_ERROR("Failed to get Data for " << token->toString());
549  obj = nullptr;
550  } else {
551  ATH_MSG_DEBUG("Server deserializing " << token->toString());
552  if (token->classID() != Guid::null()) {
553  // Deserialize object
554  RootType cltype(pool::DbReflex::forGuid(token->classID()));
555  obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
556  } else {
557  // Deserialize object
558  std::string className = token->auxString();
559  className = className.substr(className.find("[PNAME="));
560  className = className.substr(7, className.find(']') - 7);
562  obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
563  }
564  AuxDiscoverySvc auxDiscover;
565  if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, num).isSuccess()) {
566  ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
567  obj = nullptr;
568  }
569  }
570  }
571  }
572  if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient() && (m_streamingTechnology.value() < 0 || token->technology() == m_streamingTechnology.value())) {
573  ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
574  if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
575  ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
576  obj = nullptr;
577  } else {
578  void* buffer = nullptr;
579  std::size_t nbytes = 0;
580  StatusCode sc = StatusCode::FAILURE;
581  // StopWatch listens from here until the end of this current scope
582  {
583  PMonUtils::BasicStopWatch stopWatch("gObj_ALL", this->m_chronoMap);
584  sc = m_inputStreamingTool->getObject(&buffer, nbytes);
585  while (sc.isRecoverable()) {
586  // sleep
587  sc = m_inputStreamingTool->getObject(&buffer, nbytes);
588  }
589  }
590  if (!sc.isSuccess()) {
591  ATH_MSG_ERROR("Failed to get Data for " << token->toString());
592  obj = nullptr;
593  } else {
594  obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr;
595  AuxDiscoverySvc auxDiscover;
596  if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_inputStreamingTool.get(), obj).isSuccess()) {
597  ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
598  obj = nullptr;
599  }
600  }
601  }
602  } else if (token->dbID() != Guid::null()) {
604  }
605 }

Friends And Related Function Documentation

◆ SvcFactory< AthenaPoolSharedIOCnvSvc >

friend class SvcFactory< AthenaPoolSharedIOCnvSvc >
friend

Definition at line 1 of file AthenaPoolSharedIOCnvSvc.h.

Member Data Documentation

◆ m_inputStreamingTool

ToolHandle<IAthenaIPCTool> AthenaPoolSharedIOCnvSvc::m_inputStreamingTool {this,"InputStreamingTool",{}}
private

Definition at line 113 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_makeStreamingToolClient

Gaudi::Property<int> AthenaPoolSharedIOCnvSvc::m_makeStreamingToolClient {this,"MakeStreamingToolClient",0}
private

Make this instance a Streaming Client during first connect/write automatically.

Definition at line 125 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_metadataClient

int AthenaPoolSharedIOCnvSvc::m_metadataClient =0
private

Definition at line 116 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_metadataContainerProp

Gaudi::Property<std::string> AthenaPoolSharedIOCnvSvc::m_metadataContainerProp {this,"OutputMetadataContainer","MetaData"}
private

For SharedWriter: To use MetadataSvc to merge data placed in a certain container.

Definition at line 121 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_metadataContainersAug

Gaudi::Property<std::vector<std::string> > AthenaPoolSharedIOCnvSvc::m_metadataContainersAug {this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"}
private

Definition at line 122 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_outputStreamingTool

ToolHandle<IAthenaIPCTool> AthenaPoolSharedIOCnvSvc::m_outputStreamingTool {this,"OutputStreamingTool",{}}
private

Definition at line 114 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_parallelCompression

Gaudi::Property<bool> AthenaPoolSharedIOCnvSvc::m_parallelCompression {this,"ParallelCompression",true}
private

Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.

Definition at line 129 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_serializeSvc

ServiceHandle<IAthenaSerializeSvc> AthenaPoolSharedIOCnvSvc::m_serializeSvc {this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"}
private

Definition at line 112 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_streamingTechnology

Gaudi::Property<int> AthenaPoolSharedIOCnvSvc::m_streamingTechnology {this,"StreamingTechnology",-1}
private

Use Streaming for selected technologies only.

Definition at line 127 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_streamPortString

Gaudi::Property<std::string> AthenaPoolSharedIOCnvSvc::m_streamPortString {this,"StreamPortString","?pmerge=localhost:0"}
private

Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".

Definition at line 131 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_streamServerActive

bool AthenaPoolSharedIOCnvSvc::m_streamServerActive =false
private

Definition at line 115 of file AthenaPoolSharedIOCnvSvc.h.


The documentation for this class was generated from the following files:
AthenaPoolSharedIOCnvSvc::m_parallelCompression
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Definition: AthenaPoolSharedIOCnvSvc.h:129
AthenaPoolCnvSvc::createAddress
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
Definition: AthenaPoolCnvSvc.cxx:448
AthenaPoolSharedIOCnvSvc::m_serializeSvc
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc
Definition: AthenaPoolSharedIOCnvSvc.h:112
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
Placement
This class holds all the necessary information to guide the writing of an object in a physical place.
Definition: Placement.h:19
AthenaPoolSharedIOCnvSvc::m_metadataClient
int m_metadataClient
Definition: AthenaPoolSharedIOCnvSvc.h:116
TScopeAdapter::ByNameNoQuiet
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition: RootType.cxx:586
AthenaPoolCnvSvc::initialize
virtual StatusCode initialize() override
Required of all Gaudi Services.
Definition: AthenaPoolCnvSvc.cxx:35
Placement::containerName
const std::string & containerName() const
Access container name.
Definition: Placement.h:32
vtune_athena.format
format
Definition: vtune_athena.py:14
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
Token::contID
const std::string & contID() const
Access container identifier.
Definition: Token.h:69
AthenaPoolSharedIOCnvSvc::m_streamingTechnology
Gaudi::Property< int > m_streamingTechnology
Use Streaming for selected technologies only.
Definition: AthenaPoolSharedIOCnvSvc.h:127
AthenaPoolSharedIOCnvSvc::m_streamServerActive
bool m_streamServerActive
Definition: AthenaPoolSharedIOCnvSvc.h:115
Token::auxString
const std::string & auxString() const
Access auxiliary string.
Definition: Token.h:91
Token::dbID
const Guid & dbID() const
Access database identifier.
Definition: Token.h:64
AuxDiscoverySvc::sendStore
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
Definition: AuxDiscoverySvc.cxx:208
pool::IFileCatalog::commit
void commit()
Save catalog to file.
Definition: IFileCatalog.h:49
AthenaPoolSharedIOCnvSvc::abortSharedWrClients
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
Definition: AthenaPoolSharedIOCnvSvc.cxx:783
Token::classID
const Guid & classID() const
Access database identifier.
Definition: Token.h:73
AthenaPoolCnvSvc::setObjPtr
virtual void setObjPtr(void *&obj, const Token *token) override
Definition: AthenaPoolCnvSvc.cxx:434
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
pool::DbReflex::forGuid
static const TypeH forGuid(const Guid &info)
Access classes by Guid.
dbg::ptr
void * ptr(T *p)
Definition: SGImplSvc.cxx:74
MuonR4::to_string
std::string to_string(const SectorProjector proj)
Definition: MsTrackSeeder.cxx:66
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
PMonUtils::BasicStopWatch
Definition: BasicStopWatch.h:17
Token
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition: Token.h:21
AthenaPoolCnvSvc::registerForWrite
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
Definition: AthenaPoolCnvSvc.cxx:419
AthenaPoolSharedIOCnvSvc::m_inputStreamingTool
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
Definition: AthenaPoolSharedIOCnvSvc.h:113
TokenAddress
This class provides a Generic Transient Address for POOL tokens.
Definition: TokenAddress.h:23
instance
std::map< std::string, double > instance
Definition: Run_To_Get_Tags.h:8
PyPoolBrowser.item
item
Definition: PyPoolBrowser.py:129
Token::OID_t
Definition: Token.h:24
Token::setClassID
Token & setClassID(const Guid &cl_id)
Access database identifier.
Definition: Token.h:75
IPoolSvc::kInputStream
@ kInputStream
Definition: IPoolSvc.h:39
Token::technology
int technology() const
Access technology type.
Definition: Token.h:77
Guid::toString
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
createCoolChannelIdFile.buffer
buffer
Definition: createCoolChannelIdFile.py:11
pool::IFileCatalog
Definition: IFileCatalog.h:23
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
AuxDiscoverySvc::receiveStore
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
Definition: AuxDiscoverySvc.cxx:162
AthenaPoolCnvSvc::finalize
virtual StatusCode finalize() override
Required of all Gaudi Services.
Definition: AthenaPoolCnvSvc.cxx:130
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:37
CalibDbCompareRT.dummy
dummy
Definition: CalibDbCompareRT.py:59
find_tgc_unfilled_channelids.ip
ip
Definition: find_tgc_unfilled_channelids.py:3
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
Token::setDb
Token & setDb(const Guid &db)
Set database name.
Definition: Token.h:66
AthenaPoolSharedIOCnvSvc::m_makeStreamingToolClient
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
Definition: AthenaPoolSharedIOCnvSvc.h:125
Placement::fileName
const std::string & fileName() const
Access file name.
Definition: Placement.h:28
Placement::setFileName
Placement & setFileName(const std::string &fileName)
Set file name.
Definition: Placement.h:30
Placement::auxString
const std::string & auxString() const
Access auxiliary string.
Definition: Placement.h:40
TScopeAdapter::Name
std::string Name(unsigned int mod=Reflex::SCOPED) const
Definition: RootType.cxx:612
AthenaPoolCnvSvc::disconnectOutput
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
Definition: AthenaPoolCnvSvc.cxx:401
trigbs_pickEvents.num
num
Definition: trigbs_pickEvents.py:76
AthenaPoolSharedIOCnvSvc::setObjPtr
virtual void setObjPtr(void *&obj, const Token *token) override
Definition: AthenaPoolSharedIOCnvSvc.cxx:530
AthenaPoolCnvSvc::commitOutput
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
Definition: AthenaPoolCnvSvc.cxx:335
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
createCoolChannelIdFile.par
par
Definition: createCoolChannelIdFile.py:28
Token::toString
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition: Token.cxx:134
AthenaPoolSharedIOCnvSvc::makeClient
virtual StatusCode makeClient(int num) override
Make this a client.
Definition: AthenaPoolSharedIOCnvSvc.cxx:683
AthenaPoolSharedIOCnvSvc::m_metadataContainersAug
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Definition: AthenaPoolSharedIOCnvSvc.h:122
Token::setOid
Token & setOid(const OID_t &oid)
Set object identifier.
Definition: Token.h:85
RTTAlgmain.address
address
Definition: RTTAlgmain.py:55
Placement::toString
const std::string toString() const
Retrieve the string representation of the placement.
Definition: Placement.cxx:15
Guid::null
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition: Guid.cxx:14
Token::fromString
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition: Token.cxx:147
TScopeAdapter::IsFundamental
Bool_t IsFundamental() const
Definition: RootType.cxx:731
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaPoolSharedIOCnvSvc::m_metadataContainerProp
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
Definition: AthenaPoolSharedIOCnvSvc.h:121
Token::oid
const OID_t & oid() const
Access object identifier.
Definition: Token.h:81
Token::setAuxString
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition: Token.h:93
AthenaPoolCnvSvc::connectOutput
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Definition: AthenaPoolCnvSvc.cxx:260
AthenaPoolSharedIOCnvSvc::registerForWrite
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
Definition: AthenaPoolSharedIOCnvSvc.cxx:425
Token::setCont
Token & setCont(const std::string &cnt)
Set container name.
Definition: Token.h:71
AthenaPoolSharedIOCnvSvc::m_outputStreamingTool
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
Definition: AthenaPoolSharedIOCnvSvc.h:114
merge.status
status
Definition: merge.py:16
ATLAS_THREAD_SAFE
#define ATLAS_THREAD_SAFE
Definition: checker_macros.h:211
jobOptions.fileName
fileName
Definition: jobOptions.SuperChic_ALP2.py:39
AthenaPoolSharedIOCnvSvc::m_streamPortString
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
Definition: AthenaPoolSharedIOCnvSvc.h:131
python.PyAthena.obj
obj
Definition: PyAthena.py:132
TScopeAdapter::SizeOf
size_t SizeOf() const
Definition: RootType.cxx:765
AthenaPoolSharedIOCnvSvc::connectOutput
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Definition: AthenaPoolSharedIOCnvSvc.cxx:89
LArL1Calo_ComputeHVCorr.className
className
Definition: LArL1Calo_ComputeHVCorr.py:135
xAOD::Utils::rootType
char rootType(char typeidType)
This function is used internally in the code when creating primitive dynamic auxiliary branches.
Definition: Control/xAODRootAccess/Root/Utils.cxx:295
Placement::fromString
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
Definition: Placement.cxx:28
AuxDiscoverySvc
This class provides the interface between AthenaPoolCnvSvc and AuxStore classes.
Definition: AuxDiscoverySvc.h:33
ServiceHandle< IIncidentSvc >
TScopeAdapter
Definition: RootType.h:119
pool::DbReflex::guid
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.