ATLAS Offline Software
Public Member Functions | Private Attributes | Friends | List of all members
AthenaPoolSharedIOCnvSvc Class Reference

This class provides the interface between Athena and PoolSvc. More...

#include <AthenaPoolSharedIOCnvSvc.h>

Inheritance diagram for AthenaPoolSharedIOCnvSvc:
Collaboration diagram for AthenaPoolSharedIOCnvSvc:

Public Member Functions

virtual StatusCode initialize () override
 Required of all Gaudi Services. More...
 
virtual StatusCode finalize () override
 Required of all Gaudi Services. More...
 
virtual StatusCode connectOutput (const std::string &outputConnectionSpec, const std::string &openMode) override
 Implementation of IConversionSvc: Connect to the output connection specification with open mode. More...
 
virtual StatusCode connectOutput (const std::string &outputConnectionSpec) override
 Implementation of IConversionSvc: Connect to the output connection specification with open mode. More...
 
virtual StatusCode commitOutput (const std::string &outputConnectionSpec, bool doCommit) override
 Implementation of IConversionSvc: Commit pending output. More...
 
virtual StatusCode disconnectOutput (const std::string &outputConnectionSpec) override
 Disconnect to the output connection. More...
 
virtual TokenregisterForWrite (Placement *placement, const void *obj, const RootType &classDesc) override
 
virtual void setObjPtr (void *&obj, const Token *token) override
 
StatusCode createAddress (long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
 Create a Generic address using explicit arguments to identify a single object. More...
 
virtual StatusCode createAddress (long svcType, const CLID &clid, const std::string &refAddress, IOpaqueAddress *&refpAddress) override
 Create address from string form. More...
 
virtual StatusCode makeServer (int num) override
 Make this a server. More...
 
virtual StatusCode makeClient (int num) override
 Make this a client. More...
 
virtual StatusCode readData () override
 Read the next data object. More...
 
virtual StatusCode commitCatalog () override
 Commit Catalog. More...
 
StatusCode abortSharedWrClients (int client_n)
 Send abort to SharedWriter clients if the server quits on error. More...
 
virtual void handle (const Incident &incident) override
 Implementation of IIncidentListener: Handle for EndEvent incidence. More...
 
 AthenaPoolSharedIOCnvSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor. More...
 
virtual ~AthenaPoolSharedIOCnvSvc ()=default
 Destructor. More...
 

Private Attributes

ServiceHandle< IAthenaSerializeSvcm_serializeSvc {this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"}
 
ToolHandle< IAthenaIPCToolm_inputStreamingTool {this,"InputStreamingTool",{}}
 
ToolHandle< IAthenaIPCToolm_outputStreamingTool {this,"OutputStreamingTool",{}}
 
bool m_streamServerActive =false
 
int m_metadataClient =0
 
StringProperty m_metadataContainerProp {this,"OutputMetadataContainer","MetaData"}
 For SharedWriter: To use MetadataSvc to merge data placed in a certain container. More...
 
StringArrayProperty m_metadataContainersAug {this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"}
 
IntegerProperty m_makeStreamingToolClient {this,"MakeStreamingToolClient",0}
 Make this instance a Streaming Client during first connect/write automatically. More...
 
IntegerProperty m_streamingTechnology {this,"StreamingTechnology",-1}
 Use Streaming for selected technologies only. More...
 
BooleanProperty m_parallelCompression {this,"ParallelCompression",true}
 Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile. More...
 
StringProperty m_streamPortString {this,"StreamPortString","?pmerge=localhost:0"}
 Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>". More...
 

Friends

class SvcFactory< AthenaPoolSharedIOCnvSvc >
 

Detailed Description

This class provides the interface between Athena and PoolSvc.

Definition at line 25 of file AthenaPoolSharedIOCnvSvc.h.

Constructor & Destructor Documentation

◆ AthenaPoolSharedIOCnvSvc()

AthenaPoolSharedIOCnvSvc::AthenaPoolSharedIOCnvSvc ( const std::string &  name,
ISvcLocator *  pSvcLocator 
)

Standard Service Constructor.

Definition at line 761 of file AthenaPoolSharedIOCnvSvc.cxx.

761  :
762  base_class(name, pSvcLocator) {
763 }

◆ ~AthenaPoolSharedIOCnvSvc()

virtual AthenaPoolSharedIOCnvSvc::~AthenaPoolSharedIOCnvSvc ( )
virtualdefault

Destructor.

Member Function Documentation

◆ abortSharedWrClients()

StatusCode AthenaPoolSharedIOCnvSvc::abortSharedWrClients ( int  client_n)

Send abort to SharedWriter clients if the server quits on error.

Parameters
client_n[IN] number of the current client, -1 if no current

Definition at line 735 of file AthenaPoolSharedIOCnvSvc.cxx.

736 {
737  ATH_MSG_ERROR("Sending ABORT to clients");
738  // the master process will kill this process once workers abort
739  // but it could be a time-limited loop
740  StatusCode sc = StatusCode::SUCCESS;
741  while (sc.isSuccess()) {
742  if (client_n >= 0) {
743  sc = m_outputStreamingTool->lockObject("ABORT", client_n);
744  }
745  const char* dummy;
746  sc = m_outputStreamingTool->clearObject(&dummy, client_n);
747  while (sc.isRecoverable()) {
748  sc = m_outputStreamingTool->clearObject(&dummy, client_n);
749  }
750  }
751  return StatusCode::FAILURE;
752 }

◆ commitCatalog()

StatusCode AthenaPoolSharedIOCnvSvc::commitCatalog ( )
overridevirtual

Commit Catalog.

Definition at line 726 of file AthenaPoolSharedIOCnvSvc.cxx.

726  {
727  pool::IFileCatalog* catalog ATLAS_THREAD_SAFE = // This is on the SharedWriter, after mother process finishes events
728  const_cast<pool::IFileCatalog*>(getPoolSvc()->catalog());
729  catalog->commit();
730  catalog->start();
731  return(StatusCode::SUCCESS);
732 }

◆ commitOutput()

StatusCode AthenaPoolSharedIOCnvSvc::commitOutput ( const std::string &  outputConnectionSpec,
bool  doCommit 
)
overridevirtual

Implementation of IConversionSvc: Commit pending output.

Parameters
doCommit[IN] boolean to force full commit

Definition at line 125 of file AthenaPoolSharedIOCnvSvc.cxx.

125  {
126  // This is called after all DataObjects are converted.
127  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
128  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
129  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
130  m_outputStreamingTool->lockObject("wait").ignore();
131  if (!this->cleanUp(outputConnection).isSuccess()) {
132  ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
133  return(StatusCode::FAILURE);
134  }
135  return(StatusCode::SUCCESS);
136  }
137  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
138  ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server.");
139  return(StatusCode::SUCCESS);
140  }
141  std::map<void*, RootType> commitCache;
142  std::string fileName;
143  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && m_outputStreamingTool->isServer() && m_streamServerActive) {
144  // Clear object to get Placements for all objects in a Stream
145  const char* placementStr = nullptr;
146  int num = -1;
147  StatusCode sc = m_outputStreamingTool->clearObject(&placementStr, num);
148  if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
149  const char * matchedChars = strstr(placementStr, "[FILE=");
150  if (not matchedChars){
151  ATH_MSG_ERROR("No matching filename in " << placementStr);
152  return abortSharedWrClients(num);
153  }
154  fileName = matchedChars;
155  fileName = fileName.substr(6, fileName.find(']') - 6);
156  if (!this->connectOutput(fileName).isSuccess()) {
157  ATH_MSG_ERROR("Failed to connectOutput for " << fileName);
158  return abortSharedWrClients(num);
159  }
160  IConverter* DHcnv = converter(ClassID_traits<DataHeader>::ID());
161  bool dataHeaderSeen = false;
162  std::string dataHeaderID;
163  while (num > 0) {
164  std::string objName = "ALL";
165  if (useDetailChronoStat()) {
166  std::string objName(placementStr); //FIXME, better descriptor
167  }
168  // StopWatch listens from here until the end of this current scope
169  {
170  PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
171  std::string tokenStr = placementStr;
172  std::string contName = strstr(placementStr, "[CONT=");
173  tokenStr.erase(tokenStr.find("[CONT=")); //throws if [CONT= not found
174  tokenStr.append(contName, contName.find(']') + 1);
175  contName = contName.substr(6, contName.find(']') - 6);
176  std::string className = strstr(placementStr, "[PNAME=");
177  className = className.substr(7, className.find(']') - 7);
179  void* obj = nullptr;
180  std::ostringstream oss2;
181  oss2 << std::dec << num;
182  std::string::size_type len = m_metadataContainerProp.value().size();
183  bool foundContainer = false;
184  std::size_t pPos = contName.find('(');
185  if (contName.compare(0, pPos, m_metadataContainerProp.value()) == 0) {
186  foundContainer = true;
187  } else {
188  for (const auto& item: m_metadataContainersAug.value()) {
189  if (contName.compare(0, pPos, item) == 0){
190  foundContainer = true;
191  len = item.size();
192  break;
193  }
194  }
195  }
196  if (len > 0 && foundContainer && contName[len] == '(' ) {
197  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
198  // For Metadata, before moving to next client, fire file incidents
199  if (m_metadataClient != num) {
200  if (m_metadataClient != 0) {
201  std::ostringstream oss1;
202  oss1 << std::dec << m_metadataClient;
203  std::string memName = "SHM[NUM=" + oss1.str() + "]";
204  FileIncident beginInputIncident(name(), "BeginInputFile", memName);
205  incSvc->fireIncident(beginInputIncident);
206  FileIncident endInputIncident(name(), "EndInputFile", memName);
207  incSvc->fireIncident(endInputIncident);
208  }
210  }
211  // Retrieve MetaDataSvc
212  ServiceHandle<IAthMetaDataSvc> metadataSvc("MetaDataSvc", name());
213  ATH_CHECK(metadataSvc.retrieve());
214  sc = metadataSvc->shmProxy(std::string(placementStr) + "[NUM=" + oss2.str() + "]");
215  if (sc.isRecoverable()) {
216  ATH_MSG_WARNING("MetaDataSvc::shmProxy() no proxy added.");
217  } else if (sc.isFailure()) {
218  ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!");
219  return abortSharedWrClients(num);
220  }
221  } else {
222  Token readToken;
223  readToken.setOid(Token::OID_t(num, 0));
224  readToken.setAuxString("[PNAME=" + className + "]");
225  this->setObjPtr(obj, &readToken); // Pull/read Object out of shared memory
226  if (len == 0 || contName.compare(0, len, m_metadataContainerProp.value()) != 0) {
227  // Write object
228  if( m_oneDataHeaderForm.value() ) {
229  auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
230  if( className == "DataHeaderForm_p6" ) {
231  // Pass DHForms to the converter for later writing in the correct order - do not write it now
232  GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(),
233  "", placementWithSwn());
234  DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
235  tokenStr = "";
236  } else {
237  Placement placement;
238  placement.fromString(placementStr);
239  std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
240  if (token == nullptr) {
241  ATH_MSG_ERROR("Failed to write Data for: " << className);
242  return abortSharedWrClients(num);
243  }
244  tokenStr = token->toString();
245  }
246  if( className == "DataHeader_p6" ) {
247  // Found DataHeader - call the converter to update DHForm Ref
248  GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(),
249  tokenStr, placementWithSwn());
250  if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
251  ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
252  return abortSharedWrClients(num);
253  }
254  } else
255  if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
256  commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
257  }
258  placementStr = nullptr;
259  } else {
260  // Multiple shared DataHeaderForms
261  Placement placement;
262  placement.fromString(placementStr); placementStr = nullptr;
263  std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
264  if (token == nullptr) {
265  ATH_MSG_ERROR("Failed to write Data for: " << className);
266  return abortSharedWrClients(num);
267  }
268  tokenStr = token->toString();
269  if (className == "DataHeader_p6") {
270  // Found DataHeader
271  GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(),
272  tokenStr, placement.auxString());
273  // call DH converter to add the ref to DHForm (stored earlier) and to itself
274  if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
275  ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
276  return abortSharedWrClients(num);
277  }
278  dataHeaderSeen = true;
279  // This dataHeaderID is used in DataHeaderCnv to index the DataHeaderForm cache.
280  // It must be unique per worker per stream so that we have a correct DataHeader(Form) association.
281  // This is achieved by building it as "CONTID/WORKERID/DBID".
282  // CONTID, e.g., POOLContainer(DataHeader), allows us to distinguish data and metadata headers,
283  // WORKERID allows us to distinguish AthenaMP workers,
284  // and DBID allows us to distinguish streams.
285  dataHeaderID = std::format("{}/{}/{}", token->contID(), oss2.str(), token->dbID().toString());
286  } else if (dataHeaderSeen) {
287  dataHeaderSeen = false;
288  // next object after DataHeader - may be a DataHeaderForm
289  // in any case we need to call the DH converter to update the DHForm Ref
290  if (className == "DataHeaderForm_p6") {
291  // Tell DataHeaderCnv that it should use a new DHForm
292  GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(),
293  tokenStr, dataHeaderID);
294  if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
295  ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr);
296  return abortSharedWrClients(num);
297  }
298  } else {
299  // Tell DataHeaderCnv that it should use the old DHForm
300  GenericAddress address(0, 0, "", dataHeaderID);
301  if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
302  ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
303  return abortSharedWrClients(num);
304  }
305  }
306  }
307  if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
308  commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
309  }
310  }
311  }
312  }
313  // Send Token back to Client
314  sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
315  while (sc.isRecoverable()) {
316  sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
317  }
318  if (!sc.isSuccess()) {
319  ATH_MSG_ERROR("Failed to lock Data for " << tokenStr);
320  return abortSharedWrClients(-1);
321  }
322  }
323  sc = m_outputStreamingTool->clearObject(&placementStr, num);
324  while (sc.isRecoverable()) {
325  sc = m_outputStreamingTool->clearObject(&placementStr, num);
326  }
327  if (sc.isFailure()) {
328  // no more clients, break the loop and exit
329  num = -1;
330  }
331  }
332  if (dataHeaderSeen) {
333  // DataHeader was the last object, need to tell the converter there is no DHForm coming
334  GenericAddress address(0, 0, "", dataHeaderID);
335  if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
336  ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
337  return abortSharedWrClients(-1);
338  }
339  }
340  placementStr = nullptr;
341  } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) {
342  return(StatusCode::RECOVERABLE);
343  } else if (sc.isRecoverable() || num == -1) {
344  return(StatusCode::RECOVERABLE);
345  }
346  if (sc.isFailure() || fileName.empty()) {
347  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
348  std::ostringstream oss1;
349  oss1 << std::dec << m_metadataClient;
350  std::string memName = "SHM[NUM=" + oss1.str() + "]";
351  FileIncident beginInputIncident(name(), "BeginInputFile", memName);
352  incSvc->fireIncident(beginInputIncident);
353  FileIncident endInputIncident(name(), "EndInputFile", memName);
354  incSvc->fireIncident(endInputIncident);
355  if (sc.isFailure()) {
356  ATH_MSG_INFO("All SharedWriter clients stopped - exiting");
357  } else {
358  ATH_MSG_INFO("Failed to get Data for client: " << num);
359  }
360  return(StatusCode::FAILURE);
361  }
362  }
363  if (m_parallelCompression && !fileName.empty()) {
364  ATH_MSG_DEBUG("commitOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
365  return(StatusCode::SUCCESS);
366  }
367  if (outputConnection.empty()) {
368  outputConnection = fileName;
369  } else {
370  outputConnection = outputConnectionSpec;
371  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
372  outputConnection += m_streamPortString.value();
373  }
374  }
375  StatusCode status = AthenaPoolCnvSvc::commitOutput(outputConnection, doCommit);
376  for (std::map<void*, RootType>::iterator iter = commitCache.begin(), last = commitCache.end(); iter != last; ++iter) {
377  iter->second.Destruct(iter->first);
378  }
379  return(status);
380 }

◆ connectOutput() [1/2]

StatusCode AthenaPoolSharedIOCnvSvc::connectOutput ( const std::string &  outputConnectionSpec)
overridevirtual

Implementation of IConversionSvc: Connect to the output connection specification with open mode.

Parameters
outputConnectionSpec[IN] the name of the output connection specification as string.

Definition at line 91 of file AthenaPoolSharedIOCnvSvc.cxx.

91  {
92 // This is called before DataObjects are being converted.
93  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
94  if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
95  if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
96  ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
97  return(StatusCode::FAILURE);
98  }
99  }
100  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
101  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
102  return(StatusCode::SUCCESS);
103  }
104  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
105  if (m_parallelCompression && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) {
106  ATH_MSG_DEBUG("connectOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
107  return(StatusCode::SUCCESS);
108  }
110  ATH_MSG_DEBUG("connectOutput SKIPPED for expired server.");
111  return(StatusCode::SUCCESS);
112  }
113  }
114  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
115  outputConnection += m_streamPortString.value();
116  }
117  std::size_t apend = outputConnectionSpec.find('[');
118  if (apend != std::string::npos) {
119  outputConnection += outputConnectionSpec.substr(apend);
120  }
121  return AthenaPoolCnvSvc::connectOutput(outputConnection);
122 }

◆ connectOutput() [2/2]

StatusCode AthenaPoolSharedIOCnvSvc::connectOutput ( const std::string &  outputConnectionSpec,
const std::string &  openMode 
)
overridevirtual

Implementation of IConversionSvc: Connect to the output connection specification with open mode.

Parameters
outputConnectionSpec[IN] the name of the output connection specification as string.
openMode[IN] the open mode of the file as string.

Definition at line 86 of file AthenaPoolSharedIOCnvSvc.cxx.

87  {
88  return(connectOutput(outputConnectionSpec));
89 }

◆ createAddress() [1/2]

StatusCode AthenaPoolSharedIOCnvSvc::createAddress ( long  svcType,
const CLID clid,
const std::string &  refAddress,
IOpaqueAddress *&  refpAddress 
)
overridevirtual

Create address from string form.

Parameters
svcType[IN] service type of the address.
clid[IN] class id for the address.
refAddress[IN] string form to be converted.
refpAddress[OUT] converted address.

Definition at line 602 of file AthenaPoolSharedIOCnvSvc.cxx.

605  {
606  return AthenaPoolCnvSvc::createAddress(svcType, clid, refAddress, refpAddress);
607 }

◆ createAddress() [2/2]

StatusCode AthenaPoolSharedIOCnvSvc::createAddress ( long  svcType,
const CLID clid,
const std::string *  par,
const unsigned long *  ip,
IOpaqueAddress *&  refpAddress 
)
override

Create a Generic address using explicit arguments to identify a single object.

Parameters
svcType[IN] service type of the address.
clid[IN] class id for the address.
par[IN] string containing the database name.
ip[IN] object identifier.
refpAddress[OUT] converted address.

Definition at line 588 of file AthenaPoolSharedIOCnvSvc.cxx.

592  {
593  if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
594  if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
595  ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
596  return(StatusCode::FAILURE);
597  }
598  }
599  return AthenaPoolCnvSvc::createAddress(svcType, clid, par, ip, refpAddress);
600 }

◆ disconnectOutput()

StatusCode AthenaPoolSharedIOCnvSvc::disconnectOutput ( const std::string &  outputConnectionSpec)
overridevirtual

Disconnect to the output connection.

Definition at line 383 of file AthenaPoolSharedIOCnvSvc.cxx.

383  {
384  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
385  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
386  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
387  return(StatusCode::SUCCESS);
388  }
389  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
390  if (m_streamServerActive) {
391  m_streamServerActive = false;
392  ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server.");
393  return(StatusCode::SUCCESS);
394  } else {
395  m_streamServerActive = false;
396  }
397  ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server.");
398  }
399  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
400  outputConnection += m_streamPortString.value();
401  }
402  return AthenaPoolCnvSvc::disconnectOutput(outputConnectionSpec + m_streamPortString.value());
403 }

◆ finalize()

StatusCode AthenaPoolSharedIOCnvSvc::finalize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 64 of file AthenaPoolSharedIOCnvSvc.cxx.

64  {
65  // Release AthenaSerializeSvc
66  if (!m_serializeSvc.empty()) {
67  if (!m_serializeSvc.release().isSuccess()) {
68  ATH_MSG_WARNING("Cannot release AthenaSerializeSvc.");
69  }
70  }
71  // Release OutputStreamingTool (if configured)
72  if (!m_outputStreamingTool.empty()) {
73  if (!m_outputStreamingTool.release().isSuccess()) {
74  ATH_MSG_WARNING("Cannot release Output AthenaIPCTool.");
75  }
76  }
77  // Release InputStreamingTool (if configured)
78  if (!m_inputStreamingTool.empty()) {
79  if (!m_inputStreamingTool.release().isSuccess()) {
80  ATH_MSG_WARNING("Cannot release Input AthenaIPCTool.");
81  }
82  }
83  return this->AthenaPoolCnvSvc::finalize();
84 }

◆ handle()

void AthenaPoolSharedIOCnvSvc::handle ( const Incident &  incident)
overridevirtual

Implementation of IIncidentListener: Handle for EndEvent incidence.

Definition at line 755 of file AthenaPoolSharedIOCnvSvc.cxx.

755  {
756  if (incident.type() == "StoreCleared" && m_outputStreamingTool->isClient() && !m_parallelCompression) {
757  m_outputStreamingTool->lockObject("release").ignore();
758  }
759 }

◆ initialize()

StatusCode AthenaPoolSharedIOCnvSvc::initialize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 35 of file AthenaPoolSharedIOCnvSvc.cxx.

35  {
36  // Retrieve InputStreamingTool (if configured)
37  if (!m_inputStreamingTool.empty()) {
38  ATH_CHECK(m_inputStreamingTool.retrieve());
39  }
40  // Retrieve OutputStreamingTool (if configured)
41  if (!m_outputStreamingTool.empty()) {
42  ATH_CHECK(m_outputStreamingTool.retrieve());
43  if (m_makeStreamingToolClient.value() == -1) {
44  // Initialize AthenaRootSharedWriter
45  ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name());
46  ATH_CHECK(arswsvc.retrieve());
47  }
48  // Put PoolSvc into share mode to avoid duplicating catalog.
49  getPoolSvc()->setShareMode(true);
50  }
51  if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) {
52  // Retrieve AthenaSerializeSvc
53  ATH_CHECK(m_serializeSvc.retrieve());
54  }
55  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
56  long int pri = 1000;
57  if (!m_outputStreamingTool.empty()) {
58  incSvc->addListener(this, "StoreCleared", pri);
59  ATH_MSG_DEBUG("Subscribed to StoreCleared");
60  }
61  return this->AthenaPoolCnvSvc::initialize();
62 }

◆ makeClient()

StatusCode AthenaPoolSharedIOCnvSvc::makeClient ( int  num)
overridevirtual

Make this a client.

Definition at line 635 of file AthenaPoolSharedIOCnvSvc.cxx.

635  {
636  if (!m_outputStreamingTool.empty()) {
637  ATH_MSG_DEBUG("makeClient: " << m_outputStreamingTool << " = " << num);
638  std::string streamPortSuffix;
639  if (m_outputStreamingTool->makeClient(num, streamPortSuffix).isFailure()) {
640  ATH_MSG_ERROR("makeClient: " << m_outputStreamingTool << " failed");
641  return(StatusCode::FAILURE);
642  } else if (m_streamPortString.value().find("localhost:0") != std::string::npos) {
643  // We don't seem to use a dedicated port per stream so doing this for the first client is probably OK
644  ATH_MSG_DEBUG("makeClient: Setting conversion service port suffix to " << streamPortSuffix);
645  m_streamPortString.setValue(streamPortSuffix);
646  }
647  }
648  if (m_inputStreamingTool.empty()) {
649  return(StatusCode::SUCCESS);
650  }
651  ATH_MSG_DEBUG("makeClient: " << m_inputStreamingTool << " = " << num);
652  std::string dummyStr;
653  return(m_inputStreamingTool->makeClient(num, dummyStr));
654 }

◆ makeServer()

StatusCode AthenaPoolSharedIOCnvSvc::makeServer ( int  num)
overridevirtual

Make this a server.

Definition at line 609 of file AthenaPoolSharedIOCnvSvc.cxx.

609  {
610  if (num < 0) {
611  num = -num;
612  m_streamServerActive = true;
613  num = num % 1024;
614  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer()) {
615  ATH_MSG_DEBUG("makeServer: " << m_outputStreamingTool << " = " << num);
616  ATH_MSG_DEBUG("makeServer: Calling shared memory tool with port suffix " << m_streamPortString);
617  const std::string streamPortSuffix = m_streamPortString.value();
618  if (m_outputStreamingTool->makeServer(num, streamPortSuffix).isFailure()) {
619  ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed");
620  return(StatusCode::FAILURE);
621  }
622  // Disable PersistencySvc per output file mode, for SharedWriter Server
623  m_persSvcPerOutput.setValue(false);
624  return(StatusCode::SUCCESS);
625  }
626  return(StatusCode::RECOVERABLE);
627  }
628  if (m_inputStreamingTool.empty()) {
629  return(StatusCode::RECOVERABLE);
630  }
631  ATH_MSG_DEBUG("makeServer: " << m_inputStreamingTool << " = " << num);
632  return(m_inputStreamingTool->makeServer(num, ""));
633 }

◆ readData()

StatusCode AthenaPoolSharedIOCnvSvc::readData ( )
overridevirtual

Read the next data object.

Definition at line 656 of file AthenaPoolSharedIOCnvSvc.cxx.

656  {
657  if (m_inputStreamingTool.empty()) {
658  return(StatusCode::FAILURE);
659  }
660  const char* tokenStr = nullptr;
661  int num = -1;
662  StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
663  if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
664  ATH_MSG_DEBUG("readData: " << tokenStr << ", for client: " << num);
665  } else {
666  return(sc);
667  }
668  // Read object instance via POOL/ROOT
669  void* instance = nullptr;
670  Token token;
671  token.fromString(tokenStr); tokenStr = nullptr;
672  if (token.classID() != Guid::null()) {
673  std::string objName = "ALL";
674  if (useDetailChronoStat()) {
675  objName = token.classID().toString();
676  }
677  // StopWatch listens from here until the end of this current scope
678  PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, this->m_chronoMap);
679  this->setObjPtr(instance, &token);
680  // Serialize object via ROOT
681  RootType cltype(pool::DbReflex::forGuid(token.classID()));
682  void* buffer = nullptr;
683  std::size_t nbytes = 0;
684  buffer = m_serializeSvc->serialize(instance, cltype, nbytes);
685  sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
686  while (sc.isRecoverable()) {
687  sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
688  }
689  delete [] static_cast<char*>(buffer); buffer = nullptr;
690  if (!sc.isSuccess()) {
691  ATH_MSG_ERROR("Could not share object for: " << token.toString());
692  return(StatusCode::FAILURE);
693  }
694  AuxDiscoverySvc auxDiscover;
695  if (!auxDiscover.sendStore(m_serializeSvc.get(), m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) {
696  ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString());
697  return(StatusCode::FAILURE);
698  }
699  cltype.Destruct(instance); instance = nullptr;
700  if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
701  ATH_MSG_ERROR("Could not share object for: " << token.toString());
702  return(StatusCode::FAILURE);
703  }
704  } else if (token.dbID() != Guid::null()) {
705  std::string returnToken;
706  const Token* metadataToken = getPoolSvc()->getToken("FID:" + token.dbID().toString(), token.contID(), token.oid().first);
707  if (metadataToken != nullptr) {
708  returnToken = metadataToken->toString();
709  } else {
710  returnToken = token.toString();
711  }
712  delete metadataToken; metadataToken = nullptr;
713  // Share token
714  sc = m_inputStreamingTool->putObject(returnToken.c_str(), returnToken.size() + 1, num);
715  if (!sc.isSuccess() || !m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
716  ATH_MSG_ERROR("Could not share token for: " << token.toString());
717  return(StatusCode::FAILURE);
718  }
719  } else {
720  return(StatusCode::RECOVERABLE);
721  }
722  return(StatusCode::SUCCESS);
723 }

◆ registerForWrite()

Token * AthenaPoolSharedIOCnvSvc::registerForWrite ( Placement placement,
const void *  obj,
const RootType classDesc 
)
overridevirtual
Returns
a string token to a Data Object written to Pool
Parameters
placement[IN] pointer to the placement hint
obj[IN] pointer to the Data Object to be written to Pool
classDesc[IN] pointer to the Seal class description for the Data Object.

Definition at line 406 of file AthenaPoolSharedIOCnvSvc.cxx.

406  {
407  if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
408  if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
409  ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
410  return(nullptr);
411  }
412  }
413  Token* token = nullptr;
414  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
415  && (!m_parallelCompression || placement->containerName().compare(0, m_metadataContainerProp.value().size(), m_metadataContainerProp.value()) == 0)) {
416  // Lock object
417  std::string placementStr = placement->toString();
418  placementStr += "[PNAME=";
419  placementStr += classDesc.Name();
420  placementStr += ']';
421  ATH_MSG_VERBOSE("Requesting write object for: " << placementStr);
422  StatusCode sc = m_outputStreamingTool->lockObject(placementStr.c_str());
423  while (sc.isRecoverable()) {
424  //usleep(100);
425  sc = m_outputStreamingTool->lockObject(placementStr.c_str());
426  }
427  if (!sc.isSuccess()) {
428  ATH_MSG_ERROR("Failed to lock Data for " << placementStr);
429  return(nullptr);
430  }
431  // Serialize object via ROOT
432  const void* buffer = nullptr;
433  std::size_t nbytes = 0;
434  bool own = true;
435  if (classDesc.Name() == "Token") {
436  nbytes = strlen(static_cast<const char*>(obj)) + 1;
437  buffer = obj;
438  own = false;
439  } else if (classDesc.IsFundamental()) {
440  nbytes = classDesc.SizeOf();
441  buffer = obj;
442  own = false;
443  } else {
444  buffer = m_serializeSvc->serialize(obj, classDesc, nbytes);
445  }
446  // Share object
447  sc = m_outputStreamingTool->putObject(buffer, nbytes);
448  while (sc.isRecoverable()) {
449  //usleep(100);
450  sc = m_outputStreamingTool->putObject(buffer, nbytes);
451  }
452  if (own) { delete [] static_cast<const char*>(buffer); }
453  buffer = nullptr;
454  if (!sc.isSuccess()) {
455  ATH_MSG_ERROR("Could not share object for: " << placementStr);
456  m_outputStreamingTool->putObject(nullptr, 0).ignore();
457  return(nullptr);
458  }
459  AuxDiscoverySvc auxDiscover;
460  if (!auxDiscover.sendStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) {
461  ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr);
462  m_outputStreamingTool->putObject(nullptr, 0).ignore();
463  return(nullptr);
464  }
465  if (!m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) {
466  ATH_MSG_ERROR("Failed to put Data for " << placementStr);
467  return(nullptr);
468  }
469  // Get Token back from Server
470  const char* tokenStr = nullptr;
471  int num = -1;
472  sc = m_outputStreamingTool->clearObject(&tokenStr, num);
473  while (sc.isRecoverable()) {
474  //usleep(100);
475  sc = m_outputStreamingTool->clearObject(&tokenStr, num);
476  }
477  if (!sc.isSuccess()) {
478  ATH_MSG_ERROR("Failed to get Token");
479  return(nullptr);
480  }
481  if (!strcmp(tokenStr, "ABORT")) {
482  ATH_MSG_ERROR("Writer requested ABORT");
483  // tell the server we are leaving
484  m_outputStreamingTool->stop().ignore();
485  return nullptr;
486  }
487  Token* tempToken = new Token();
488  tempToken->fromString(tokenStr); tokenStr = nullptr;
489  tempToken->setClassID(pool::DbReflex::guid(classDesc));
490  token = tempToken; tempToken = nullptr;
491 // Client Write Request
492  } else {
493  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
494  ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString());
495  Token* tempToken = new Token();
496  tempToken->setClassID(pool::DbReflex::guid(classDesc));
497  token = tempToken; tempToken = nullptr;
498  } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_streamServerActive) {
499  ATH_MSG_DEBUG("Requested write object for: " << placement->toString());
500  token = getPoolSvc()->registerForWrite(placement, obj, classDesc);
501  } else {
502  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
503  placement->setFileName(placement->fileName() + m_streamPortString.value());
504  }
505  token = AthenaPoolCnvSvc::registerForWrite(placement, obj, classDesc);
506  }
507  }
508  return(token);
509 }

◆ setObjPtr()

void AthenaPoolSharedIOCnvSvc::setObjPtr ( void *&  obj,
const Token token 
)
overridevirtual
Parameters
obj[OUT] pointer to the Data Object.
token[IN] string token of the Data Object for which a Pool Ref is filled.

Definition at line 511 of file AthenaPoolSharedIOCnvSvc.cxx.

511  {
512  if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
513  if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
514  ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
515  }
516  }
517  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isServer()) {
518  if (token->dbID() == Guid::null()) {
519  int num = token->oid().first;
520  // Get object from SHM
521  void* buffer = nullptr;
522  std::size_t nbytes = 0;
523  StatusCode sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
524  while (sc.isRecoverable()) {
525  //usleep(100);
526  sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
527  }
528  if (!sc.isSuccess()) {
529  ATH_MSG_ERROR("Failed to get Data for " << token->toString());
530  obj = nullptr;
531  } else {
532  ATH_MSG_DEBUG("Server deserializing " << token->toString());
533  if (token->classID() != Guid::null()) {
534  // Deserialize object
535  RootType cltype(pool::DbReflex::forGuid(token->classID()));
536  obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
537  } else {
538  // Deserialize object
539  std::string className = token->auxString();
540  className = className.substr(className.find("[PNAME="));
541  className = className.substr(7, className.find(']') - 7);
543  obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
544  }
545  AuxDiscoverySvc auxDiscover;
546  if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, num).isSuccess()) {
547  ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
548  obj = nullptr;
549  }
550  }
551  }
552  }
553  if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient() && (m_streamingTechnology.value() < 0 || token->technology() == m_streamingTechnology.value())) {
554  ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
555  if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
556  ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
557  obj = nullptr;
558  } else {
559  void* buffer = nullptr;
560  std::size_t nbytes = 0;
561  StatusCode sc = StatusCode::FAILURE;
562  // StopWatch listens from here until the end of this current scope
563  {
564  PMonUtils::BasicStopWatch stopWatch("gObj_ALL", this->m_chronoMap);
565  sc = m_inputStreamingTool->getObject(&buffer, nbytes);
566  while (sc.isRecoverable()) {
567  // sleep
568  sc = m_inputStreamingTool->getObject(&buffer, nbytes);
569  }
570  }
571  if (!sc.isSuccess()) {
572  ATH_MSG_ERROR("Failed to get Data for " << token->toString());
573  obj = nullptr;
574  } else {
575  obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr;
576  AuxDiscoverySvc auxDiscover;
577  if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_inputStreamingTool.get(), obj).isSuccess()) {
578  ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
579  obj = nullptr;
580  }
581  }
582  }
583  } else if (token->dbID() != Guid::null()) {
585  }
586 }

Friends And Related Function Documentation

◆ SvcFactory< AthenaPoolSharedIOCnvSvc >

friend class SvcFactory< AthenaPoolSharedIOCnvSvc >
friend

Definition at line 1 of file AthenaPoolSharedIOCnvSvc.h.

Member Data Documentation

◆ m_inputStreamingTool

ToolHandle<IAthenaIPCTool> AthenaPoolSharedIOCnvSvc::m_inputStreamingTool {this,"InputStreamingTool",{}}
private

Definition at line 113 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_makeStreamingToolClient

IntegerProperty AthenaPoolSharedIOCnvSvc::m_makeStreamingToolClient {this,"MakeStreamingToolClient",0}
private

Make this instance a Streaming Client during first connect/write automatically.

Definition at line 125 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_metadataClient

int AthenaPoolSharedIOCnvSvc::m_metadataClient =0
private

Definition at line 116 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_metadataContainerProp

StringProperty AthenaPoolSharedIOCnvSvc::m_metadataContainerProp {this,"OutputMetadataContainer","MetaData"}
private

For SharedWriter: To use MetadataSvc to merge data placed in a certain container.

Definition at line 121 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_metadataContainersAug

StringArrayProperty AthenaPoolSharedIOCnvSvc::m_metadataContainersAug {this, "OutputMetadataContainers", {}, "Metadata containers used for augmentations"}
private

Definition at line 122 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_outputStreamingTool

ToolHandle<IAthenaIPCTool> AthenaPoolSharedIOCnvSvc::m_outputStreamingTool {this,"OutputStreamingTool",{}}
private

Definition at line 114 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_parallelCompression

BooleanProperty AthenaPoolSharedIOCnvSvc::m_parallelCompression {this,"ParallelCompression",true}
private

Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.

Definition at line 129 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_serializeSvc

ServiceHandle<IAthenaSerializeSvc> AthenaPoolSharedIOCnvSvc::m_serializeSvc {this,"AthenaRootSerializeSvc","AthenaRootSerializeSvc"}
private

Definition at line 112 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_streamingTechnology

IntegerProperty AthenaPoolSharedIOCnvSvc::m_streamingTechnology {this,"StreamingTechnology",-1}
private

Use Streaming for selected technologies only.

Definition at line 127 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_streamPortString

StringProperty AthenaPoolSharedIOCnvSvc::m_streamPortString {this,"StreamPortString","?pmerge=localhost:0"}
private

Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".

Definition at line 131 of file AthenaPoolSharedIOCnvSvc.h.

◆ m_streamServerActive

bool AthenaPoolSharedIOCnvSvc::m_streamServerActive =false
private

Definition at line 115 of file AthenaPoolSharedIOCnvSvc.h.


The documentation for this class was generated from the following files:
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
AthenaPoolCnvSvc::createAddress
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
Definition: AthenaPoolCnvSvc.cxx:440
AthenaPoolSharedIOCnvSvc::m_serializeSvc
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc
Definition: AthenaPoolSharedIOCnvSvc.h:112
Guid::null
static const Guid & null()
NULL-Guid: static class method.
Definition: Guid.cxx:18
createLinkingScheme.iter
iter
Definition: createLinkingScheme.py:62
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
Placement
This class holds all the necessary information to guide the writing of an object in a physical place.
Definition: Placement.h:19
AthenaPoolSharedIOCnvSvc::m_metadataClient
int m_metadataClient
Definition: AthenaPoolSharedIOCnvSvc.h:116
TScopeAdapter::ByNameNoQuiet
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition: RootType.cxx:586
AthenaPoolCnvSvc::initialize
virtual StatusCode initialize() override
Required of all Gaudi Services.
Definition: AthenaPoolCnvSvc.cxx:33
Placement::containerName
const std::string & containerName() const
Access container name.
Definition: Placement.h:32
vtune_athena.format
format
Definition: vtune_athena.py:14
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
Token::contID
const std::string & contID() const
Access container identifier.
Definition: Token.h:69
AthenaPoolSharedIOCnvSvc::m_streamServerActive
bool m_streamServerActive
Definition: AthenaPoolSharedIOCnvSvc.h:115
Token::auxString
const std::string & auxString() const
Access auxiliary string.
Definition: Token.h:91
Token::dbID
const Guid & dbID() const
Access database identifier.
Definition: Token.h:64
AuxDiscoverySvc::sendStore
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
Definition: AuxDiscoverySvc.cxx:208
Guid::toString
const std::string toString() const
Automatic conversion to string representation.
Definition: Guid.cxx:58
pool::IFileCatalog::commit
void commit()
Save catalog to file.
Definition: IFileCatalog.h:49
AthenaPoolSharedIOCnvSvc::abortSharedWrClients
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
Definition: AthenaPoolSharedIOCnvSvc.cxx:735
Token::classID
const Guid & classID() const
Access database identifier.
Definition: Token.h:73
AthenaPoolCnvSvc::setObjPtr
virtual void setObjPtr(void *&obj, const Token *token) override
Definition: AthenaPoolCnvSvc.cxx:426
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
pool::DbReflex::forGuid
static const TypeH forGuid(const Guid &info)
Access classes by Guid.
AthenaPoolSharedIOCnvSvc::m_metadataContainerProp
StringProperty m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
Definition: AthenaPoolSharedIOCnvSvc.h:121
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
PMonUtils::BasicStopWatch
Definition: BasicStopWatch.h:17
Token
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition: Token.h:21
AthenaPoolCnvSvc::registerForWrite
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
Definition: AthenaPoolCnvSvc.cxx:413
AthenaPoolSharedIOCnvSvc::m_inputStreamingTool
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
Definition: AthenaPoolSharedIOCnvSvc.h:113
instance
std::map< std::string, double > instance
Definition: Run_To_Get_Tags.h:8
Token::OID_t
Definition: Token.h:24
Token::fromString
Token & fromString(const std::string &from)
Build from the string representation of a token.
Definition: Token.cxx:148
Token::setClassID
Token & setClassID(const Guid &cl_id)
Access database identifier.
Definition: Token.h:75
Token::technology
int technology() const
Access technology type.
Definition: Token.h:77
createCoolChannelIdFile.buffer
buffer
Definition: createCoolChannelIdFile.py:11
pool::IFileCatalog
Definition: IFileCatalog.h:23
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
AuxDiscoverySvc::receiveStore
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
Definition: AuxDiscoverySvc.cxx:162
AthenaPoolCnvSvc::finalize
virtual StatusCode finalize() override
Required of all Gaudi Services.
Definition: AthenaPoolCnvSvc.cxx:117
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaPoolSharedIOCnvSvc::m_streamingTechnology
IntegerProperty m_streamingTechnology
Use Streaming for selected technologies only.
Definition: AthenaPoolSharedIOCnvSvc.h:127
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:37
CalibDbCompareRT.dummy
dummy
Definition: CalibDbCompareRT.py:59
find_tgc_unfilled_channelids.ip
ip
Definition: find_tgc_unfilled_channelids.py:3
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
Placement::fileName
const std::string & fileName() const
Access file name.
Definition: Placement.h:28
Placement::setFileName
Placement & setFileName(const std::string &fileName)
Set file name.
Definition: Placement.h:30
AthenaPoolSharedIOCnvSvc::m_metadataContainersAug
StringArrayProperty m_metadataContainersAug
Definition: AthenaPoolSharedIOCnvSvc.h:122
Placement::auxString
const std::string & auxString() const
Access auxiliary string.
Definition: Placement.h:40
TScopeAdapter::Name
std::string Name(unsigned int mod=Reflex::SCOPED) const
Definition: RootType.cxx:612
AthenaPoolCnvSvc::disconnectOutput
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
Definition: AthenaPoolCnvSvc.cxx:395
trigbs_pickEvents.num
num
Definition: trigbs_pickEvents.py:76
AthenaPoolSharedIOCnvSvc::setObjPtr
virtual void setObjPtr(void *&obj, const Token *token) override
Definition: AthenaPoolSharedIOCnvSvc.cxx:511
AthenaPoolSharedIOCnvSvc::m_makeStreamingToolClient
IntegerProperty m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
Definition: AthenaPoolSharedIOCnvSvc.h:125
AthenaPoolCnvSvc::commitOutput
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
Definition: AthenaPoolCnvSvc.cxx:329
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
createCoolChannelIdFile.par
par
Definition: createCoolChannelIdFile.py:28
Token::toString
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition: Token.cxx:129
AthenaPoolSharedIOCnvSvc::makeClient
virtual StatusCode makeClient(int num) override
Make this a client.
Definition: AthenaPoolSharedIOCnvSvc.cxx:635
Token::setOid
Token & setOid(const OID_t &oid)
Set object identifier.
Definition: Token.h:85
item
Definition: ItemListSvc.h:43
RTTAlgmain.address
address
Definition: RTTAlgmain.py:55
Placement::toString
const std::string toString() const
Retrieve the string representation of the placement.
Definition: Placement.cxx:15
TScopeAdapter::IsFundamental
Bool_t IsFundamental() const
Definition: RootType.cxx:731
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
Token::oid
const OID_t & oid() const
Access object identifier.
Definition: Token.h:81
Token::setAuxString
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition: Token.h:93
AthenaPoolCnvSvc::connectOutput
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Definition: AthenaPoolCnvSvc.cxx:251
AthenaPoolSharedIOCnvSvc::registerForWrite
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
Definition: AthenaPoolSharedIOCnvSvc.cxx:406
AthenaPoolSharedIOCnvSvc::m_outputStreamingTool
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
Definition: AthenaPoolSharedIOCnvSvc.h:114
merge.status
status
Definition: merge.py:16
ATLAS_THREAD_SAFE
#define ATLAS_THREAD_SAFE
Definition: checker_macros.h:211
jobOptions.fileName
fileName
Definition: jobOptions.SuperChic_ALP2.py:39
python.PyAthena.obj
obj
Definition: PyAthena.py:132
TScopeAdapter::SizeOf
size_t SizeOf() const
Definition: RootType.cxx:765
AthenaPoolSharedIOCnvSvc::connectOutput
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Definition: AthenaPoolSharedIOCnvSvc.cxx:86
AthenaPoolSharedIOCnvSvc::m_streamPortString
StringProperty m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
Definition: AthenaPoolSharedIOCnvSvc.h:131
LArL1Calo_ComputeHVCorr.className
className
Definition: LArL1Calo_ComputeHVCorr.py:135
Placement::fromString
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
Definition: Placement.cxx:28
AuxDiscoverySvc
This class provides the interface between AthenaPoolCnvSvc and AuxStore classes.
Definition: AuxDiscoverySvc.h:33
AthenaPoolSharedIOCnvSvc::m_parallelCompression
BooleanProperty m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Definition: AthenaPoolSharedIOCnvSvc.h:129
ServiceHandle< IIncidentSvc >
TScopeAdapter
Definition: RootType.h:119
pool::DbReflex::guid
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.