ATLAS Offline Software
AthenaPoolSharedIOCnvSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
11 
12 #include "GaudiKernel/ClassID.h"
13 #include "GaudiKernel/FileIncident.h"
14 #include "GaudiKernel/GenericAddress.h"
15 #include "GaudiKernel/IIncidentSvc.h"
16 #include "GaudiKernel/IOpaqueAddress.h"
17 
24 
25 
26 #include "StorageSvc/DbReflex.h"
28 
29 #include "AuxDiscoverySvc.h"
30 
31 #include <algorithm>
32 #include <iomanip>
33 #include <sstream>
34 
35 //______________________________________________________________________________
36 // Initialize the service.
38  // Retrieve InputStreamingTool (if configured)
39  if (!m_inputStreamingTool.empty()) {
40  ATH_CHECK(m_inputStreamingTool.retrieve());
41  }
42  // Retrieve OutputStreamingTool (if configured)
43  if (!m_outputStreamingTool.empty()) {
44  ATH_CHECK(m_outputStreamingTool.retrieve());
45  if (m_makeStreamingToolClient.value() == -1) {
46  // Initialize AthenaRootSharedWriter
47  ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name());
48  ATH_CHECK(arswsvc.retrieve());
49  }
50  // Put PoolSvc into share mode to avoid duplicating catalog.
51  getPoolSvc()->setShareMode(true);
52  }
53  if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) {
54  // Retrieve AthenaSerializeSvc
55  ATH_CHECK(m_serializeSvc.retrieve());
56  }
57  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
58  long int pri = 1000;
59  if (!m_outputStreamingTool.empty()) {
60  incSvc->addListener(this, "StoreCleared", pri);
61  ATH_MSG_DEBUG("Subscribed to StoreCleared");
62  }
63  return this->AthenaPoolCnvSvc::initialize();
64 }
65 //______________________________________________________________________________
67  // Release AthenaSerializeSvc
68  if (!m_serializeSvc.empty()) {
69  if (!m_serializeSvc.release().isSuccess()) {
70  ATH_MSG_WARNING("Cannot release AthenaSerializeSvc.");
71  }
72  }
73  // Release OutputStreamingTool (if configured)
74  if (!m_outputStreamingTool.empty()) {
75  if (!m_outputStreamingTool.release().isSuccess()) {
76  ATH_MSG_WARNING("Cannot release Output AthenaIPCTool.");
77  }
78  }
79  // Release InputStreamingTool (if configured)
80  if (!m_inputStreamingTool.empty()) {
81  if (!m_inputStreamingTool.release().isSuccess()) {
82  ATH_MSG_WARNING("Cannot release Input AthenaIPCTool.");
83  }
84  }
85  return this->AthenaPoolCnvSvc::finalize();
86 }
87 //______________________________________________________________________________
88 StatusCode AthenaPoolSharedIOCnvSvc::connectOutput(const std::string& outputConnectionSpec,
89  const std::string& /*openMode*/) {
90  return(connectOutput(outputConnectionSpec));
91 }
92 //______________________________________________________________________________
93 StatusCode AthenaPoolSharedIOCnvSvc::connectOutput(const std::string& outputConnectionSpec) {
94 // This is called before DataObjects are being converted.
95  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
96  if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
97  if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
98  ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
99  return(StatusCode::FAILURE);
100  }
101  }
102  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
103  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
104  return(StatusCode::SUCCESS);
105  }
106  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
107  if (m_parallelCompression && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) {
108  ATH_MSG_DEBUG("connectOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
109  return(StatusCode::SUCCESS);
110  }
112  ATH_MSG_DEBUG("connectOutput SKIPPED for expired server.");
113  return(StatusCode::SUCCESS);
114  }
115  }
116  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
117  outputConnection += m_streamPortString.value();
118  }
119  std::size_t apend = outputConnectionSpec.find('[');
120  if (apend != std::string::npos) {
121  outputConnection += outputConnectionSpec.substr(apend);
122  }
123  return AthenaPoolCnvSvc::connectOutput(outputConnection);
124 }
125 
126 //______________________________________________________________________________
127 StatusCode AthenaPoolSharedIOCnvSvc::commitOutput(const std::string& outputConnectionSpec, bool doCommit) {
128  // This is called after all DataObjects are converted.
129  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
130  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
131  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
132  m_outputStreamingTool->lockObject("wait").ignore();
133  if (!this->cleanUp(outputConnection).isSuccess()) {
134  ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
135  return(StatusCode::FAILURE);
136  }
137  return(StatusCode::SUCCESS);
138  }
139  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
140  ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server.");
141  return(StatusCode::SUCCESS);
142  }
143  std::map<void*, RootType> commitCache;
144  std::string fileName;
145  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && m_outputStreamingTool->isServer() && m_streamServerActive) {
146  // Clear object to get Placements for all objects in a Stream
147  const char* placementStr = nullptr;
148  int num = -1;
149  StatusCode sc = m_outputStreamingTool->clearObject(&placementStr, num);
150  if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
151  const char * matchedChars = strstr(placementStr, "[FILE=");
152  if (not matchedChars){
153  ATH_MSG_ERROR("No matching filename in " << placementStr);
154  return abortSharedWrClients(num);
155  }
156  fileName = matchedChars;
157  fileName = fileName.substr(6, fileName.find(']') - 6);
158  if (!this->connectOutput(fileName).isSuccess()) {
159  ATH_MSG_ERROR("Failed to connectOutput for " << fileName);
160  return abortSharedWrClients(num);
161  }
162  IConverter* DHcnv = converter(ClassID_traits<DataHeader>::ID());
163  bool dataHeaderSeen = false;
164  std::string dataHeaderID;
165  while (num > 0) {
166  std::string objName = "ALL";
167  if (useDetailChronoStat()) {
168  objName = placementStr; //FIXME, better descriptor
169  }
170  // StopWatch listens from here until the end of this current scope
171  {
172  PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
173  std::string_view pStr = placementStr;
174  std::string::size_type cpos = pStr.find ("[CONT=");
175  if (cpos == std::string::npos) {
176  ATH_MSG_ERROR("No CONT field in placement string: " << pStr);
177  return StatusCode::FAILURE;
178  }
179  std::string tokenStr (pStr.substr(0, cpos));
180  std::string contName (pStr.substr(cpos, std::string::npos));
181  std::string::size_type cl1 = contName.find(']');
182  if (cl1 == std::string::npos) {
183  ATH_MSG_ERROR("Missing close bracket after CONT field in placement string: " << pStr);
184  return StatusCode::FAILURE;
185  }
186  tokenStr.append(contName, cl1 + 1);
187  contName = contName.substr(6, cl1 - 6);
188 
189  std::string::size_type ppos = pStr.find ("[PNAME=");
190  if (ppos == std::string::npos) {
191  ATH_MSG_ERROR("No PNAME field in placement string: " << pStr);
192  return StatusCode::FAILURE;
193  }
194  std::string className (pStr.substr(ppos, std::string::npos));
195  std::string::size_type cl2 = className.find(']');
196  if (cl2 == std::string::npos) {
197  ATH_MSG_ERROR("Missing close bracket after PNAME field in placement string: " << pStr);
198  return StatusCode::FAILURE;
199  }
200  className = className.substr(7, cl2 - 7);
202  void* obj = nullptr;
203  std::ostringstream oss2;
204  oss2 << std::dec << num;
205  std::string::size_type len = m_metadataContainerProp.value().size();
206  bool foundContainer = false;
207  std::size_t opPos = contName.find('(');
208  if (contName.compare(0, opPos, m_metadataContainerProp.value()) == 0) {
209  foundContainer = true;
210  } else {
211  for (const auto& item: m_metadataContainersAug.value()) {
212  if (contName.compare(0, opPos, item) == 0){
213  foundContainer = true;
214  len = item.size();
215  break;
216  }
217  }
218  }
219  if (len > 0 && foundContainer && contName[len] == '(' ) {
220  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
221  // For Metadata, before moving to next client, fire file incidents
222  if (m_metadataClient != num) {
223  if (m_metadataClient != 0) {
224  std::ostringstream oss1;
225  oss1 << std::dec << m_metadataClient;
226  std::string memName = "SHM[NUM=" + oss1.str() + "]";
227  FileIncident beginInputIncident(name(), "BeginInputMemFile", memName);
228  incSvc->fireIncident(beginInputIncident);
229  FileIncident endInputIncident(name(), "EndInputMemFile", std::move(memName));
230  incSvc->fireIncident(endInputIncident);
231  }
233  }
234  // Retrieve MetaDataSvc
235  ServiceHandle<IAthMetaDataSvc> metadataSvc("MetaDataSvc", name());
236  ATH_CHECK(metadataSvc.retrieve());
237  sc = metadataSvc->shmProxy(std::string(pStr) + "[NUM=" + oss2.str() + "]");
238  if (sc.isRecoverable()) {
239  ATH_MSG_WARNING("MetaDataSvc::shmProxy() no proxy added.");
240  } else if (sc.isFailure()) {
241  ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!");
242  return abortSharedWrClients(num);
243  }
244  } else {
245  Token readToken;
246  readToken.setOid(Token::OID_t(num, 0));
247  readToken.setAuxString("[PNAME=" + className + "]");
248  this->setObjPtr(obj, &readToken); // Pull/read Object out of shared memory
249  if (len == 0 || contName.compare(0, len, m_metadataContainerProp.value()) != 0) {
250  // Write object
251  if( m_oneDataHeaderForm.value() ) {
252  auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
253  if( className == "DataHeaderForm_p6" ) {
254  // Pass DHForms to the converter for later writing in the correct order - do not write it now
255  GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(),
256  "", placementWithSwn());
257  DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
258  tokenStr = "";
259  } else {
260  Placement placement;
261  placement.fromString(placementStr);
262  std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
263  if (token == nullptr) {
264  ATH_MSG_ERROR("Failed to write Data for: " << className);
265  return abortSharedWrClients(num);
266  }
267  tokenStr = token->toString();
268  }
269  if( className == "DataHeader_p6" ) {
270  // Found DataHeader - call the converter to update DHForm Ref
271  GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(),
272  tokenStr, placementWithSwn());
273  if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
274  ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
275  return abortSharedWrClients(num);
276  }
277  } else
278  if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
279  commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
280  }
281  placementStr = nullptr;
282  } else {
283  // Multiple shared DataHeaderForms
284  Placement placement;
285  placement.fromString(placementStr); placementStr = nullptr;
286  std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
287  if (token == nullptr) {
288  ATH_MSG_ERROR("Failed to write Data for: " << className);
289  return abortSharedWrClients(num);
290  }
291  tokenStr = token->toString();
292  if (className == "DataHeader_p6") {
293  // Found DataHeader
294  GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(),
295  tokenStr, placement.auxString());
296  // call DH converter to add the ref to DHForm (stored earlier) and to itself
297  if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
298  ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
299  return abortSharedWrClients(num);
300  }
301  dataHeaderSeen = true;
302  // This dataHeaderID is used in DataHeaderCnv to index the DataHeaderForm cache.
303  // It must be unique per worker per stream so that we have a correct DataHeader(Form) association.
304  // This is achieved by building it as "CONTID/WORKERID/DBID".
305  // CONTID, e.g., POOLContainer(DataHeader), allows us to distinguish data and metadata headers,
306  // WORKERID allows us to distinguish AthenaMP workers,
307  // and DBID allows us to distinguish streams.
308  dataHeaderID = std::format("{}/{}/{}", token->contID(), oss2.str(), token->dbID().toString());
309  } else if (dataHeaderSeen) {
310  dataHeaderSeen = false;
311  // next object after DataHeader - may be a DataHeaderForm
312  // in any case we need to call the DH converter to update the DHForm Ref
313  if (className == "DataHeaderForm_p6") {
314  // Tell DataHeaderCnv that it should use a new DHForm
315  GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(),
316  tokenStr, dataHeaderID);
317  if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
318  ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr);
319  return abortSharedWrClients(num);
320  }
321  } else {
322  // Tell DataHeaderCnv that it should use the old DHForm
323  GenericAddress address(0, 0, "", dataHeaderID);
324  if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
325  ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
326  return abortSharedWrClients(num);
327  }
328  }
329  }
330  if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
331  commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
332  }
333  }
334  }
335  }
336  // Send Token back to Client
337  sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
338  while (sc.isRecoverable()) {
339  sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
340  }
341  if (!sc.isSuccess()) {
342  ATH_MSG_ERROR("Failed to lock Data for " << tokenStr);
343  return abortSharedWrClients(-1);
344  }
345  }
346  sc = m_outputStreamingTool->clearObject(&placementStr, num);
347  while (sc.isRecoverable()) {
348  sc = m_outputStreamingTool->clearObject(&placementStr, num);
349  }
350  if (sc.isFailure()) {
351  // no more clients, break the loop and exit
352  num = -1;
353  }
354  }
355  if (dataHeaderSeen) {
356  // DataHeader was the last object, need to tell the converter there is no DHForm coming
357  GenericAddress address(0, 0, "", std::move(dataHeaderID));
358  if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
359  ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
360  return abortSharedWrClients(-1);
361  }
362  }
363  placementStr = nullptr;
364  } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) {
365  return(StatusCode::RECOVERABLE);
366  } else if (sc.isRecoverable() || num == -1) {
367  return(StatusCode::RECOVERABLE);
368  }
369  if (sc.isFailure() || fileName.empty()) {
370  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
371  std::ostringstream oss1;
372  oss1 << std::dec << m_metadataClient;
373  std::string memName = "SHM[NUM=" + oss1.str() + "]";
374  FileIncident beginInputIncident(name(), "BeginInputMemFile", memName);
375  incSvc->fireIncident(beginInputIncident);
376  FileIncident endInputIncident(name(), "EndInputMemFile", memName);
377  incSvc->fireIncident(endInputIncident);
378  if (sc.isFailure()) {
379  ATH_MSG_INFO("All SharedWriter clients stopped - exiting");
380  } else {
381  ATH_MSG_INFO("Failed to get Data for client: " << num);
382  }
383  return(StatusCode::FAILURE);
384  }
385  }
386  if (m_parallelCompression && !fileName.empty()) {
387  ATH_MSG_DEBUG("commitOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
388  return(StatusCode::SUCCESS);
389  }
390  if (outputConnection.empty()) {
391  outputConnection = std::move(fileName);
392  } else {
393  outputConnection = outputConnectionSpec;
394  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
395  outputConnection += m_streamPortString.value();
396  }
397  }
398  StatusCode status = AthenaPoolCnvSvc::commitOutput(outputConnection, doCommit);
399  for (std::map<void*, RootType>::iterator iter = commitCache.begin(), last = commitCache.end(); iter != last; ++iter) {
400  iter->second.Destruct(iter->first);
401  }
402  return(status);
403 }
404 
405 //______________________________________________________________________________
406 StatusCode AthenaPoolSharedIOCnvSvc::disconnectOutput(const std::string& outputConnectionSpec) {
407  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
408  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
409  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
410  return(StatusCode::SUCCESS);
411  }
412  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
413  if (m_streamServerActive) {
414  m_streamServerActive = false;
415  ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server.");
416  return(StatusCode::SUCCESS);
417  } else {
418  m_streamServerActive = false;
419  }
420  ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server.");
421  }
422  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
423  outputConnection += m_streamPortString.value();
424  }
425  return AthenaPoolCnvSvc::disconnectOutput(outputConnectionSpec + m_streamPortString.value());
426 }
427 
428 //______________________________________________________________________________
429 Token* AthenaPoolSharedIOCnvSvc::registerForWrite(Placement* placement, const void* obj, const RootType& classDesc) {
430  if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
431  if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
432  ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
433  return(nullptr);
434  }
435  }
436  Token* token = nullptr;
437  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
438  && (!m_parallelCompression || placement->containerName().compare(0, m_metadataContainerProp.value().size(), m_metadataContainerProp.value()) == 0)) {
439  // Lock object
440  std::string placementStr = placement->toString();
441  placementStr += "[PNAME=";
442  placementStr += classDesc.Name();
443  placementStr += ']';
444  ATH_MSG_VERBOSE("Requesting write object for: " << placementStr);
445  StatusCode sc = m_outputStreamingTool->lockObject(placementStr.c_str());
446  while (sc.isRecoverable()) {
447  //usleep(100);
448  sc = m_outputStreamingTool->lockObject(placementStr.c_str());
449  }
450  if (!sc.isSuccess()) {
451  ATH_MSG_ERROR("Failed to lock Data for " << placementStr);
452  return(nullptr);
453  }
454  // Serialize object via ROOT
455  const void* buffer = nullptr;
456  std::size_t nbytes = 0;
457  bool own = true;
458  if (classDesc.Name() == "Token") {
459  nbytes = strlen(static_cast<const char*>(obj)) + 1;
460  buffer = obj;
461  own = false;
462  } else if (classDesc.IsFundamental()) {
463  nbytes = classDesc.SizeOf();
464  buffer = obj;
465  own = false;
466  } else {
467  buffer = m_serializeSvc->serialize(obj, classDesc, nbytes);
468  }
469  // Share object
470  sc = m_outputStreamingTool->putObject(buffer, nbytes);
471  while (sc.isRecoverable()) {
472  //usleep(100);
473  sc = m_outputStreamingTool->putObject(buffer, nbytes);
474  }
475  if (own) { delete [] static_cast<const char*>(buffer); }
476  buffer = nullptr;
477  if (!sc.isSuccess()) {
478  ATH_MSG_ERROR("Could not share object for: " << placementStr);
479  m_outputStreamingTool->putObject(nullptr, 0).ignore();
480  return(nullptr);
481  }
482  AuxDiscoverySvc auxDiscover;
483  if (!auxDiscover.sendStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) {
484  ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr);
485  m_outputStreamingTool->putObject(nullptr, 0).ignore();
486  return(nullptr);
487  }
488  if (!m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) {
489  ATH_MSG_ERROR("Failed to put Data for " << placementStr);
490  return(nullptr);
491  }
492  // Get Token back from Server
493  const char* tokenStr = nullptr;
494  int num = -1;
495  sc = m_outputStreamingTool->clearObject(&tokenStr, num);
496  while (sc.isRecoverable()) {
497  //usleep(100);
498  sc = m_outputStreamingTool->clearObject(&tokenStr, num);
499  }
500  if (!sc.isSuccess()) {
501  ATH_MSG_ERROR("Failed to get Token");
502  return(nullptr);
503  }
504  if (!strcmp(tokenStr, "ABORT")) {
505  ATH_MSG_ERROR("Writer requested ABORT");
506  // tell the server we are leaving
507  m_outputStreamingTool->stop().ignore();
508  return nullptr;
509  }
510  Token* tempToken = new Token();
511  tempToken->fromString(tokenStr); tokenStr = nullptr;
512  tempToken->setClassID(pool::DbReflex::guid(classDesc));
513  token = tempToken; tempToken = nullptr;
514 // Client Write Request
515  } else {
516  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
517  ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString());
518  Token* tempToken = new Token();
519  tempToken->setClassID(pool::DbReflex::guid(classDesc));
520  token = tempToken; tempToken = nullptr;
521  } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_streamServerActive) {
522  ATH_MSG_DEBUG("Requested write object for: " << placement->toString());
523  token = getPoolSvc()->registerForWrite(placement, obj, classDesc);
524  } else {
525  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
526  placement->setFileName(placement->fileName() + m_streamPortString.value());
527  }
528  token = AthenaPoolCnvSvc::registerForWrite(placement, obj, classDesc);
529  }
530  }
531  return(token);
532 }
533 //______________________________________________________________________________
534 void AthenaPoolSharedIOCnvSvc::setObjPtr(void*& obj, const Token* token) {
535  if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
536  if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
537  ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
538  }
539  }
540  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isServer()) {
541  if (token->dbID() == Guid::null()) {
542  int num = token->oid().first;
543  // Get object from SHM
544  void* buffer = nullptr;
545  std::size_t nbytes = 0;
546  StatusCode sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
547  while (sc.isRecoverable()) {
548  //usleep(100);
549  sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
550  }
551  if (!sc.isSuccess()) {
552  ATH_MSG_ERROR("Failed to get Data for " << token->toString());
553  obj = nullptr;
554  } else {
555  ATH_MSG_DEBUG("Server deserializing " << token->toString());
556  if (token->classID() != Guid::null()) {
557  // Deserialize object
558  RootType cltype(pool::DbReflex::forGuid(token->classID()));
559  obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
560  } else {
561  // Deserialize object
562  std::string className = token->auxString();
563  className = className.substr(className.find("[PNAME="));
564  className = className.substr(7, className.find(']') - 7);
566  obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
567  }
568  AuxDiscoverySvc auxDiscover;
569  if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, num).isSuccess()) {
570  ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
571  obj = nullptr;
572  }
573  }
574  }
575  }
576  if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient() && (m_streamingTechnology.value() < 0 || token->technology() == m_streamingTechnology.value())) {
577  ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
578  if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
579  ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
580  obj = nullptr;
581  } else {
582  void* buffer = nullptr;
583  std::size_t nbytes = 0;
584  StatusCode sc = StatusCode::FAILURE;
585  // StopWatch listens from here until the end of this current scope
586  {
587  PMonUtils::BasicStopWatch stopWatch("gObj_ALL", this->m_chronoMap);
588  sc = m_inputStreamingTool->getObject(&buffer, nbytes);
589  while (sc.isRecoverable()) {
590  // sleep
591  sc = m_inputStreamingTool->getObject(&buffer, nbytes);
592  }
593  }
594  if (!sc.isSuccess()) {
595  ATH_MSG_ERROR("Failed to get Data for " << token->toString());
596  obj = nullptr;
597  } else {
598  obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr;
599  AuxDiscoverySvc auxDiscover;
600  if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_inputStreamingTool.get(), obj).isSuccess()) {
601  ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
602  obj = nullptr;
603  }
604  }
605  }
606  } else if (token->dbID() != Guid::null()) {
608  }
609 }
610 //______________________________________________________________________________
612  const CLID& clid,
613  const std::string* par,
614  const unsigned long* ip,
615  IOpaqueAddress*& refpAddress) {
616  if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
617  if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
618  ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
619  return(StatusCode::FAILURE);
620  }
621  }
622  if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) {
623  Token addressToken;
624  addressToken.setDb(par[0].substr(4));
625  addressToken.setCont(par[1]);
626  addressToken.setOid(Token::OID_t(ip[0], ip[1]));
627  if (!m_inputStreamingTool->lockObject(addressToken.toString().c_str()).isSuccess()) {
628  ATH_MSG_WARNING("Failed to lock Address Token: " << addressToken.toString());
629  return(StatusCode::FAILURE);
630  }
631  void* buffer = nullptr;
632  std::size_t nbytes = 0;
633  StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes);
634  while (sc.isRecoverable()) {
635  // sleep
636  sc = m_inputStreamingTool->getObject(&buffer, nbytes);
637  }
638  if (!sc.isSuccess()) {
639  ATH_MSG_WARNING("Failed to get Address Token: " << addressToken.toString());
640  return(StatusCode::FAILURE);
641  }
642  auto token = std::make_unique<Token>();
643  token->fromString(static_cast<const char*>(buffer)); buffer = nullptr;
644  if (token->classID() == Guid::null()) {
645  token.reset();
646  }
647  m_inputStreamingTool->getObject(&buffer, nbytes).ignore();
648  if (token) {
649  refpAddress = new TokenAddress(POOL_StorageType, clid, "", par[1], IPoolSvc::kInputStream, std::move(token));
650  return(StatusCode::SUCCESS);
651  }
652  else {
653  return(StatusCode::RECOVERABLE);
654  }
655  } else {
656  return AthenaPoolCnvSvc::createAddress(svcType, clid, par, ip, refpAddress);
657  }
658 }
659 //______________________________________________________________________________
661  const CLID& clid,
662  const std::string& refAddress,
663  IOpaqueAddress*& refpAddress) {
664  return AthenaPoolCnvSvc::createAddress(svcType, clid, refAddress, refpAddress);
665 }
666 //______________________________________________________________________________
668  if (num < 0) {
669  num = -num;
670  m_streamServerActive = true;
671  num = num % 1024;
672  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer()) {
673  ATH_MSG_DEBUG("makeServer: " << m_outputStreamingTool << " = " << num);
674  ATH_MSG_DEBUG("makeServer: Calling shared memory tool with port suffix " << m_streamPortString);
675  const std::string streamPortSuffix = m_streamPortString.value();
676  if (m_outputStreamingTool->makeServer(num, streamPortSuffix).isFailure()) {
677  ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed");
678  return(StatusCode::FAILURE);
679  }
680  // Disable PersistencySvc per output file mode, for SharedWriter Server
681  m_persSvcPerOutput.setValue(false);
682  return(StatusCode::SUCCESS);
683  }
684  return(StatusCode::RECOVERABLE);
685  }
686  if (m_inputStreamingTool.empty()) {
687  return(StatusCode::RECOVERABLE);
688  }
689  ATH_MSG_DEBUG("makeServer: " << m_inputStreamingTool << " = " << num);
690  return(m_inputStreamingTool->makeServer(num, ""));
691 }
692 //________________________________________________________________________________
694  if (!m_outputStreamingTool.empty()) {
695  ATH_MSG_DEBUG("makeClient: " << m_outputStreamingTool << " = " << num);
696  std::string streamPortSuffix;
697  if (m_outputStreamingTool->makeClient(num, streamPortSuffix).isFailure()) {
698  ATH_MSG_ERROR("makeClient: " << m_outputStreamingTool << " failed");
699  return(StatusCode::FAILURE);
700  } else if (m_streamPortString.value().find("localhost:0") != std::string::npos) {
701  // We don't seem to use a dedicated port per stream so doing this for the first client is probably OK
702  ATH_MSG_DEBUG("makeClient: Setting conversion service port suffix to " << streamPortSuffix);
703  m_streamPortString.setValue(streamPortSuffix);
704  }
705  }
706  if (m_inputStreamingTool.empty()) {
707  return(StatusCode::SUCCESS);
708  }
709  ATH_MSG_DEBUG("makeClient: " << m_inputStreamingTool << " = " << num);
710  std::string dummyStr;
711  return(m_inputStreamingTool->makeClient(num, dummyStr));
712 }
713 //________________________________________________________________________________
715  if (m_inputStreamingTool.empty()) {
716  return(StatusCode::FAILURE);
717  }
718  const char* tokenStr = nullptr;
719  int num = -1;
720  StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
721  if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
722  ATH_MSG_DEBUG("readData: " << tokenStr << ", for client: " << num);
723  } else {
724  return(sc);
725  }
726  // Read object instance via POOL/ROOT
727  void* instance = nullptr;
728  Token token;
729  token.fromString(tokenStr); tokenStr = nullptr;
730  if (token.classID() != Guid::null()) {
731  std::string objName = "ALL";
732  if (useDetailChronoStat()) {
733  objName = token.classID().toString();
734  }
735  // StopWatch listens from here until the end of this current scope
736  PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, this->m_chronoMap);
737  this->setObjPtr(instance, &token);
738  // Serialize object via ROOT
739  RootType cltype(pool::DbReflex::forGuid(token.classID()));
740  void* buffer = nullptr;
741  std::size_t nbytes = 0;
742  buffer = m_serializeSvc->serialize(instance, cltype, nbytes);
743  sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
744  while (sc.isRecoverable()) {
745  sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
746  }
747  delete [] static_cast<char*>(buffer); buffer = nullptr;
748  if (!sc.isSuccess()) {
749  ATH_MSG_ERROR("Could not share object for: " << token.toString());
750  return(StatusCode::FAILURE);
751  }
752  AuxDiscoverySvc auxDiscover;
753  if (!auxDiscover.sendStore(m_serializeSvc.get(), m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) {
754  ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString());
755  return(StatusCode::FAILURE);
756  }
757  cltype.Destruct(instance); instance = nullptr;
758  if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
759  ATH_MSG_ERROR("Could not share object for: " << token.toString());
760  return(StatusCode::FAILURE);
761  }
762  } else if (token.dbID() != Guid::null()) {
763  std::string returnToken;
764  const Token* metadataToken = getPoolSvc()->getToken("FID:" + token.dbID().toString(), token.contID(), token.oid().first);
765  if (metadataToken != nullptr) {
766  returnToken = metadataToken->toString();
767  } else {
768  returnToken = token.toString();
769  }
770  delete metadataToken; metadataToken = nullptr;
771  // Share token
772  sc = m_inputStreamingTool->putObject(returnToken.c_str(), returnToken.size() + 1, num);
773  if (!sc.isSuccess() || !m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
774  ATH_MSG_ERROR("Could not share token for: " << token.toString());
775  return(StatusCode::FAILURE);
776  }
777  } else {
778  return(StatusCode::RECOVERABLE);
779  }
780  return(StatusCode::SUCCESS);
781 }
782 
783 //________________________________________________________________________________
785  pool::IFileCatalog* catalog ATLAS_THREAD_SAFE = // This is on the SharedWriter, after mother process finishes events
786  const_cast<pool::IFileCatalog*>(getPoolSvc()->catalog());
787  catalog->commit();
788  catalog->start();
789  return(StatusCode::SUCCESS);
790 }
791 
792 //______________________________________________________________________________
794 {
795  ATH_MSG_ERROR("Sending ABORT to clients");
796  // the master process will kill this process once workers abort
797  // but it could be a time-limited loop
798  StatusCode sc = StatusCode::SUCCESS;
799  while (sc.isSuccess()) {
800  if (client_n >= 0) {
801  sc = m_outputStreamingTool->lockObject("ABORT", client_n);
802  }
803  const char* dummy;
804  sc = m_outputStreamingTool->clearObject(&dummy, client_n);
805  while (sc.isRecoverable()) {
806  sc = m_outputStreamingTool->clearObject(&dummy, client_n);
807  }
808  }
809  return StatusCode::FAILURE;
810 }
811 
812 //______________________________________________________________________________
813 void AthenaPoolSharedIOCnvSvc::handle(const Incident& incident) {
814  if (incident.type() == "StoreCleared" && m_outputStreamingTool->isClient() && !m_parallelCompression) {
815  m_outputStreamingTool->lockObject("release").ignore();
816  }
817 }
818 //______________________________________________________________________________
819 AthenaPoolSharedIOCnvSvc::AthenaPoolSharedIOCnvSvc(const std::string& name, ISvcLocator* pSvcLocator) :
820  base_class(name, pSvcLocator) {
821 }
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
AthenaPoolCnvSvc::createAddress
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
Definition: AthenaPoolCnvSvc.cxx:440
AthenaPoolSharedIOCnvSvc::makeServer
virtual StatusCode makeServer(int num) override
Make this a server.
Definition: AthenaPoolSharedIOCnvSvc.cxx:667
AuxDiscoverySvc.h
This file contains the class definition for the AuxDiscoverySvc class.
AthenaPoolSharedIOCnvSvc::m_serializeSvc
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc
Definition: AthenaPoolSharedIOCnvSvc.h:112
Guid::null
static const Guid & null()
NULL-Guid: static class method.
Definition: Guid.cxx:18
createLinkingScheme.iter
iter
Definition: createLinkingScheme.py:62
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
Placement
This class holds all the necessary information to guide the writing of an object in a physical place.
Definition: Placement.h:19
AthenaPoolSharedIOCnvSvc::m_metadataClient
int m_metadataClient
Definition: AthenaPoolSharedIOCnvSvc.h:116
TScopeAdapter::ByNameNoQuiet
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition: RootType.cxx:586
AthenaPoolCnvSvc::initialize
virtual StatusCode initialize() override
Required of all Gaudi Services.
Definition: AthenaPoolCnvSvc.cxx:33
DbReflex.h
AthenaPoolSharedIOCnvSvc::readData
virtual StatusCode readData() override
Read the next data object.
Definition: AthenaPoolSharedIOCnvSvc.cxx:714
Placement::containerName
const std::string & containerName() const
Access container name.
Definition: Placement.h:32
vtune_athena.format
format
Definition: vtune_athena.py:14
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
AthenaPoolSharedIOCnvSvc::createAddress
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
Definition: AthenaPoolSharedIOCnvSvc.cxx:611
Token::contID
const std::string & contID() const
Access container identifier.
Definition: Token.h:69
IAthMetaDataSvc.h
This file contains the class definition for the IAthMetaDataSvc class.
AthenaPoolSharedIOCnvSvc::m_streamServerActive
bool m_streamServerActive
Definition: AthenaPoolSharedIOCnvSvc.h:115
AthenaPoolSharedIOCnvSvc::commitOutput
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
Definition: AthenaPoolSharedIOCnvSvc.cxx:127
Token::auxString
const std::string & auxString() const
Access auxiliary string.
Definition: Token.h:91
IFileCatalog.h
Token::dbID
const Guid & dbID() const
Access database identifier.
Definition: Token.h:64
AuxDiscoverySvc::sendStore
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
Definition: AuxDiscoverySvc.cxx:208
Guid::toString
const std::string toString() const
Automatic conversion to string representation.
Definition: Guid.cxx:58
pool::IFileCatalog::commit
void commit()
Save catalog to file.
Definition: IFileCatalog.h:49
AthenaPoolSharedIOCnvSvc::abortSharedWrClients
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
Definition: AthenaPoolSharedIOCnvSvc.cxx:793
Token::classID
const Guid & classID() const
Access database identifier.
Definition: Token.h:73
AthenaPoolCnvSvc::setObjPtr
virtual void setObjPtr(void *&obj, const Token *token) override
Definition: AthenaPoolCnvSvc.cxx:426
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
pool::DbReflex::forGuid
static const TypeH forGuid(const Guid &info)
Access classes by Guid.
AthenaPoolSharedIOCnvSvc::handle
virtual void handle(const Incident &incident) override
Implementation of IIncidentListener: Handle for EndEvent incidence.
Definition: AthenaPoolSharedIOCnvSvc.cxx:813
AthenaPoolSharedIOCnvSvc::disconnectOutput
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
Definition: AthenaPoolSharedIOCnvSvc.cxx:406
AthenaPoolSharedIOCnvSvc::m_metadataContainerProp
StringProperty m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
Definition: AthenaPoolSharedIOCnvSvc.h:121
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
PMonUtils::BasicStopWatch
Definition: BasicStopWatch.h:17
Token
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition: Token.h:21
AthenaPoolCnvSvc::registerForWrite
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
Definition: AthenaPoolCnvSvc.cxx:413
AthenaPoolSharedIOCnvSvc::m_inputStreamingTool
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
Definition: AthenaPoolSharedIOCnvSvc.h:113
TokenAddress
This class provides a Generic Transient Address for POOL tokens.
Definition: TokenAddress.h:23
IAthenaOutputStreamTool.h
Interface to an output stream tool.
instance
std::map< std::string, double > instance
Definition: Run_To_Get_Tags.h:8
PyPoolBrowser.item
item
Definition: PyPoolBrowser.py:129
Token::OID_t
Definition: Token.h:24
Token::fromString
Token & fromString(const std::string &from)
Build from the string representation of a token.
Definition: Token.cxx:148
Token::setClassID
Token & setClassID(const Guid &cl_id)
Access database identifier.
Definition: Token.h:75
IPoolSvc::kInputStream
@ kInputStream
Definition: IPoolSvc.h:39
AthenaPoolSharedIOCnvSvc::commitCatalog
virtual StatusCode commitCatalog() override
Commit Catalog.
Definition: AthenaPoolSharedIOCnvSvc.cxx:784
Token::technology
int technology() const
Access technology type.
Definition: Token.h:77
createCoolChannelIdFile.buffer
buffer
Definition: createCoolChannelIdFile.py:11
pool::IFileCatalog
Definition: IFileCatalog.h:23
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
AuxDiscoverySvc::receiveStore
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
Definition: AuxDiscoverySvc.cxx:162
AthenaPoolCnvSvc::finalize
virtual StatusCode finalize() override
Required of all Gaudi Services.
Definition: AthenaPoolCnvSvc.cxx:117
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaPoolSharedIOCnvSvc::m_streamingTechnology
IntegerProperty m_streamingTechnology
Use Streaming for selected technologies only.
Definition: AthenaPoolSharedIOCnvSvc.h:127
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:37
CalibDbCompareRT.dummy
dummy
Definition: CalibDbCompareRT.py:59
find_tgc_unfilled_channelids.ip
ip
Definition: find_tgc_unfilled_channelids.py:3
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
Token::setDb
Token & setDb(const Guid &db)
Set database name.
Definition: Token.h:66
Placement::fileName
const std::string & fileName() const
Access file name.
Definition: Placement.h:28
Placement::setFileName
Placement & setFileName(const std::string &fileName)
Set file name.
Definition: Placement.h:30
AthenaPoolSharedIOCnvSvc::m_metadataContainersAug
StringArrayProperty m_metadataContainersAug
Definition: AthenaPoolSharedIOCnvSvc.h:122
AthenaPoolSharedIOCnvSvc.h
This file contains the class definition for the AthenaPoolSharedIOCnvSvc class.
DataHeader.h
This file contains the class definition for the DataHeader and DataHeaderElement classes.
CLID
uint32_t CLID
The Class ID type.
Definition: Event/xAOD/xAODCore/xAODCore/ClassID_traits.h:47
AthenaPoolSharedIOCnvSvc::finalize
virtual StatusCode finalize() override
Required of all Gaudi Services.
Definition: AthenaPoolSharedIOCnvSvc.cxx:66
Placement::auxString
const std::string & auxString() const
Access auxiliary string.
Definition: Placement.h:40
TScopeAdapter::Name
std::string Name(unsigned int mod=Reflex::SCOPED) const
Definition: RootType.cxx:612
AthenaPoolCnvSvc::disconnectOutput
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
Definition: AthenaPoolCnvSvc.cxx:395
trigbs_pickEvents.num
num
Definition: trigbs_pickEvents.py:76
AthenaPoolSharedIOCnvSvc::setObjPtr
virtual void setObjPtr(void *&obj, const Token *token) override
Definition: AthenaPoolSharedIOCnvSvc.cxx:534
AthenaPoolSharedIOCnvSvc::m_makeStreamingToolClient
IntegerProperty m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
Definition: AthenaPoolSharedIOCnvSvc.h:125
AthenaPoolCnvSvc::commitOutput
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
Definition: AthenaPoolCnvSvc.cxx:329
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
createCoolChannelIdFile.par
par
Definition: createCoolChannelIdFile.py:28
Token::toString
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition: Token.cxx:129
AthenaPoolSharedIOCnvSvc::makeClient
virtual StatusCode makeClient(int num) override
Make this a client.
Definition: AthenaPoolSharedIOCnvSvc.cxx:693
Token::setOid
Token & setOid(const OID_t &oid)
Set object identifier.
Definition: Token.h:85
RTTAlgmain.address
address
Definition: RTTAlgmain.py:55
Placement::toString
const std::string toString() const
Retrieve the string representation of the placement.
Definition: Placement.cxx:15
TokenAddress.h
This file contains the class definition for the TokenAddress class.
TScopeAdapter::IsFundamental
Bool_t IsFundamental() const
Definition: RootType.cxx:731
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
Token::oid
const OID_t & oid() const
Access object identifier.
Definition: Token.h:81
Token::setAuxString
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition: Token.h:93
AthenaPoolCnvSvc::connectOutput
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Definition: AthenaPoolCnvSvc.cxx:251
TScopeAdapter::Destruct
void Destruct(void *place) const
Definition: RootType.cxx:677
AthenaPoolSharedIOCnvSvc::registerForWrite
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
Definition: AthenaPoolSharedIOCnvSvc.cxx:429
Token::setCont
Token & setCont(const std::string &cnt)
Set container name.
Definition: Token.h:71
AthenaPoolSharedIOCnvSvc::m_outputStreamingTool
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
Definition: AthenaPoolSharedIOCnvSvc.h:114
merge.status
status
Definition: merge.py:16
AthenaPoolSharedIOCnvSvc::initialize
virtual StatusCode initialize() override
Required of all Gaudi Services.
Definition: AthenaPoolSharedIOCnvSvc.cxx:37
ATLAS_THREAD_SAFE
#define ATLAS_THREAD_SAFE
Definition: checker_macros.h:211
jobOptions.fileName
fileName
Definition: jobOptions.SuperChic_ALP2.py:39
Placement.h
This file contains the class definition for the Placement class (migrated from POOL).
AthenaPoolSharedIOCnvSvc::AthenaPoolSharedIOCnvSvc
AthenaPoolSharedIOCnvSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Definition: AthenaPoolSharedIOCnvSvc.cxx:819
python.PyAthena.obj
obj
Definition: PyAthena.py:132
TScopeAdapter::SizeOf
size_t SizeOf() const
Definition: RootType.cxx:765
Token.h
This file contains the class definition for the Token class (migrated from POOL).
AthenaPoolSharedIOCnvSvc::connectOutput
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Definition: AthenaPoolSharedIOCnvSvc.cxx:88
AthenaPoolSharedIOCnvSvc::m_streamPortString
StringProperty m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
Definition: AthenaPoolSharedIOCnvSvc.h:131
LArL1Calo_ComputeHVCorr.className
className
Definition: LArL1Calo_ComputeHVCorr.py:135
Placement::fromString
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
Definition: Placement.cxx:28
AuxDiscoverySvc
This class provides the interface between AthenaPoolCnvSvc and AuxStore classes.
Definition: AuxDiscoverySvc.h:33
AthenaPoolSharedIOCnvSvc::m_parallelCompression
BooleanProperty m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Definition: AthenaPoolSharedIOCnvSvc.h:129
ServiceHandle
Definition: ClusterMakerTool.h:37
TScopeAdapter
Definition: RootType.h:119
pool::DbReflex::guid
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.