ATLAS Offline Software
AthenaPoolCnvSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
10 #include "AthenaPoolCnvSvc.h"
11 
12 #include "GaudiKernel/AttribStringParser.h"
13 #include "GaudiKernel/ClassID.h"
14 #include "GaudiKernel/FileIncident.h"
15 #include "GaudiKernel/IIncidentSvc.h"
16 #include "GaudiKernel/IIoComponentMgr.h"
17 #include "GaudiKernel/IOpaqueAddress.h"
18 
26 
27 #include "StorageSvc/DbReflex.h"
29 #include "RootUtils/APRDefaults.h"
30 
31 #include "AuxDiscoverySvc.h"
32 
33 #include <algorithm>
34 #include <iomanip>
35 #include <sstream>
36 
37 //______________________________________________________________________________
38 // Initialize the service.
40  // Initialize DataModelCompatSvc
41  ServiceHandle<IService> dmcsvc("DataModelCompatSvc", this->name());
42  ATH_CHECK(dmcsvc.retrieve());
43  // Retrieve PoolSvc
44  ATH_CHECK(m_poolSvc.retrieve());
45  // Retrieve ClassIDSvc
46  ATH_CHECK(m_clidSvc.retrieve());
47  // Retrieve InputStreamingTool (if configured)
48  if (!m_inputStreamingTool.empty()) {
49  ATH_CHECK(m_inputStreamingTool.retrieve());
50  }
51  // Retrieve OutputStreamingTool (if configured)
52  if (!m_outputStreamingTool.empty()) {
53  ATH_CHECK(m_outputStreamingTool.retrieve());
54  if (m_makeStreamingToolClient.value() == -1) {
55  // Initialize AthenaRootSharedWriter
56  ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name());
57  ATH_CHECK(arswsvc.retrieve());
58  }
59  // Put PoolSvc into share mode to avoid duplicating catalog.
60  m_poolSvc->setShareMode(true);
61  }
62  if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) {
63  // Retrieve AthenaSerializeSvc
64  ATH_CHECK(m_serializeSvc.retrieve());
65  }
66  // Register this service for 'I/O' events
67  ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
68  ATH_CHECK(iomgr.retrieve());
69  if (!iomgr->io_register(this).isSuccess()) {
70  ATH_MSG_FATAL("Could not register myself with the IoComponentMgr !");
71  return(StatusCode::FAILURE);
72  }
73  // Extracting MaxFileSizes for global default and map by Database name.
74  for (std::vector<std::string>::const_iterator iter = m_maxFileSizes.value().begin(),
75  last = m_maxFileSizes.value().end(); iter != last; ++iter) {
76  if (auto p = iter->find('='); p != std::string::npos) {
77  long long maxFileSize = atoll(iter->data() + (p + 1));
78  std::string databaseName = iter->substr(0, iter->find_first_of(" ="));
79  m_databaseMaxFileSize.insert(std::make_pair(databaseName, maxFileSize));
80  } else {
81  m_domainMaxFileSize = atoll(iter->c_str());
82  }
83  }
84  ATH_MSG_DEBUG("Setting StorageType to " << m_storageTechProp.value());
86  if( m_dbType == TEST_StorageType ) {
87  ATH_MSG_FATAL("Unknown StorageType rquested: " << m_storageTechProp.value());
88  return StatusCode::FAILURE;
89  }
90  if( m_containerPrefixProp.value() == "Default" ) {
91  // select default storage element name accoring to storage tech
92  if( m_dbType.exactMatch(pool::ROOTRNTUPLE_StorageType) ) m_containerPrefixProp.setValue( APRDefaults::RNTupleNames::EventData );
94  }
95 
96  // Extracting INPUT POOL ItechnologySpecificAttributes for Domain, Database and Container.
98  // Extracting the INPUT POOL ItechnologySpecificAttributes which are to be printed for each event
100  // Setup incident for EndEvent to print out attributes each event
101  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
102  long int pri = 1000;
103  if (!m_inputPoolAttrPerEvent.value().empty()) {
104  // Set to be listener for EndEvent
105  incSvc->addListener(this, "EndEvent", pri);
106  ATH_MSG_DEBUG("Subscribed to EndEvent for printing out input file attributes.");
107  }
108  if (!processPoolAttributes(m_inputAttr, "", IPoolSvc::kInputStream, false, true, true).isSuccess()) {
109  ATH_MSG_DEBUG("setInputAttribute failed setting POOL domain attributes.");
110  }
111 
112  if (!m_outputStreamingTool.empty()) {
113  incSvc->addListener(this, "StoreCleared", pri);
114  ATH_MSG_DEBUG("Subscribed to StoreCleared");
115  }
116  // Load these dictionaries now, so we don't need to try to do so
117  // while multiple threads are running.
118  TClass::GetClass ("TLeafI");
119  TClass::GetClass ("TLeafL");
120  TClass::GetClass ("TLeafD");
121  TClass::GetClass ("TLeafF");
122 
123  return(StatusCode::SUCCESS);
124 }
125 //______________________________________________________________________________
127  ATH_MSG_DEBUG("I/O reinitialization...");
128  m_contextAttr.clear();
129  return(StatusCode::SUCCESS);
130 }
131 //______________________________________________________________________________
133  // Write remaining DataHeaderForms for a given streamName, "*"" means all
134  auto DHCnvListener = dynamic_cast<IIncidentListener*>( converter( ClassID_traits<DataHeader>::ID() ) );
135  FileIncident incident(name(), "WriteDataHeaderForms", streamName);
136  if( DHCnvListener ) DHCnvListener->handle(incident);
137 }
138 //______________________________________________________________________________
140  ATH_MSG_VERBOSE("stop()");
141  // In case of direct writing without an OutputStream, this should be a good time to flush DHForms
143  return StatusCode::SUCCESS;
144 }
145 //______________________________________________________________________________
147  ATH_MSG_VERBOSE("Finalizing...");
148  // Some algorithms write in finalize(), flush DHForms if any are left
150  // Release AthenaSerializeSvc
151  if (!m_serializeSvc.empty()) {
152  if (!m_serializeSvc.release().isSuccess()) {
153  ATH_MSG_WARNING("Cannot release AthenaSerializeSvc.");
154  }
155  }
156  // Release OutputStreamingTool (if configured)
157  if (!m_outputStreamingTool.empty()) {
158  if (!m_outputStreamingTool.release().isSuccess()) {
159  ATH_MSG_WARNING("Cannot release Output AthenaIPCTool.");
160  }
161  }
162  // Release InputStreamingTool (if configured)
163  if (!m_inputStreamingTool.empty()) {
164  if (!m_inputStreamingTool.release().isSuccess()) {
165  ATH_MSG_WARNING("Cannot release Input AthenaIPCTool.");
166  }
167  }
168  // Release ClassIDSvc
169  if (!m_clidSvc.release().isSuccess()) {
170  ATH_MSG_WARNING("Cannot release ClassIDSvc.");
171  }
172  // Release PoolSvc
173  if (!m_poolSvc.release().isSuccess()) {
174  ATH_MSG_WARNING("Cannot release PoolSvc.");
175  }
176  // Print Performance Statistics
177  // The pattern AthenaPoolCnvSvc.*PerfStats is ignored in AtlasTest/TestTools/share/post.sh
178  const std::string msgPrefix{"PerfStats "};
179  ATH_MSG_INFO(msgPrefix << std::string(40, '-'));
180  ATH_MSG_INFO(msgPrefix << "Timing Measurements for AthenaPoolCnvSvc");
181  ATH_MSG_INFO(msgPrefix << std::string(40, '-'));
182  for(const auto& [key, value] : m_chronoMap) {
183  ATH_MSG_INFO(msgPrefix << "| " << std::left << std::setw(15) << key << " | "
184  << std::right << std::setw(15) << std::fixed << std::setprecision(0) << value << " ms |");
185  }
186  ATH_MSG_INFO(msgPrefix << std::string(40, '-'));
187 
188  m_cnvs.clear();
189  m_cnvs.shrink_to_fit();
190  return(StatusCode::SUCCESS);
191 }
192 //______________________________________________________________________________
194  ATH_MSG_DEBUG("I/O finalization...");
195  return(StatusCode::SUCCESS);
196 }
197 //______________________________________________________________________________
198 StatusCode AthenaPoolCnvSvc::createObj(IOpaqueAddress* pAddress, DataObject*& refpObject) {
199  assert(pAddress);
200  std::string objName = "ALL";
201  if (m_useDetailChronoStat.value()) {
202  if (m_clidSvc->getTypeNameOfID(pAddress->clID(), objName).isFailure()) {
203  std::ostringstream oss;
204  oss << std::dec << pAddress->clID();
205  objName = oss.str();
206  }
207  objName += '#';
208  objName += *(pAddress->par() + 1);
209  }
210  // StopWatch listens from here until the end of this current scope
211  PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, m_chronoMap);
212  if (!m_persSvcPerInputType.empty()) { // Use separate PersistencySvc for each input data type
213  TokenAddress* tokAddr = dynamic_cast<TokenAddress*>(pAddress);
214  if (tokAddr != nullptr && tokAddr->getToken() != nullptr && (tokAddr->getToken()->contID().starts_with(m_persSvcPerInputType.value() + "(") || tokAddr->getToken()->contID().starts_with(m_persSvcPerInputType.value() + "_"))) {
215  const unsigned int maxContext = m_poolSvc->getInputContextMap().size();
216  const unsigned int auxContext = m_poolSvc->getInputContext(tokAddr->getToken()->classID().toString() + tokAddr->getToken()->dbID().toString(), 1);
217  char text[32];
218  ::sprintf(text, "[CTXT=%08X]", auxContext);
219  if (m_poolSvc->getInputContextMap().size() > maxContext) {
220  if (m_poolSvc->setAttribute("TREE_CACHE", "0", pool::DbType(pool::ROOTTREE_StorageType).type(), "FID:" + tokAddr->getToken()->dbID().toString(), m_persSvcPerInputType.value(), auxContext).isSuccess()) {
221  ATH_MSG_DEBUG("setInputAttribute failed to switch off TTreeCache for id = " << auxContext << ".");
222  }
223  }
224  tokAddr->getToken()->setAuxString(text);
225  }
226  }
227  // Forward to base class createObj
228  StatusCode status = ::AthCnvSvc::createObj(pAddress, refpObject);
229  return(status);
230 }
231 //______________________________________________________________________________
232 StatusCode AthenaPoolCnvSvc::createRep(DataObject* pObject, IOpaqueAddress*& refpAddress) {
233  assert(pObject);
234  std::string objName = "ALL";
235  if (m_useDetailChronoStat.value()) {
236  if (m_clidSvc->getTypeNameOfID(pObject->clID(), objName).isFailure()) {
237  std::ostringstream oss;
238  oss << std::dec << pObject->clID();
239  objName = oss.str();
240  }
241  objName += '#';
242  objName += pObject->registry()->name();
243  }
244  // StopWatch listens from here until the end of this current scope
245  PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, m_chronoMap);
246  StatusCode status = StatusCode::FAILURE;
247  if (pObject->clID() == 1) {
248  // No transient object was found use cnv to write default persistent object
249  SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(pObject->registry());
250  if (proxy != nullptr) {
251  IConverter* cnv = converter(proxy->clID());
252  status = cnv->createRep(pObject, refpAddress);
253  }
254  } else {
255  // Forward to base class createRep
256  try {
257  status = ::AthCnvSvc::createRep(pObject, refpAddress);
258  } catch(std::runtime_error& e) {
259  ATH_MSG_FATAL(e.what());
260  }
261  }
262  return(status);
263 }
264 //______________________________________________________________________________
265 StatusCode AthenaPoolCnvSvc::fillRepRefs(IOpaqueAddress* pAddress, DataObject* pObject) {
266  assert(pObject);
267  std::string objName = "ALL";
268  if (m_useDetailChronoStat.value()) {
269  if (m_clidSvc->getTypeNameOfID(pObject->clID(), objName).isFailure()) {
270  std::ostringstream oss;
271  oss << std::dec << pObject->clID();
272  objName = oss.str();
273  }
274  objName += '#';
275  objName += pObject->registry()->name();
276  }
277  // StopWatch listens from here until the end of this current scope
278  PMonUtils::BasicStopWatch stopWatch("fRep_" + objName, m_chronoMap);
279  StatusCode status = StatusCode::FAILURE;
280  if (pObject->clID() == 1) {
281  // No transient object was found use cnv to write default persistent object
282  SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(pObject->registry());
283  if (proxy != nullptr) {
284  IConverter* cnv = converter(proxy->clID());
285  status = cnv->fillRepRefs(pAddress, pObject);
286  }
287  } else {
288  // Forward to base class fillRepRefs
289  try {
290  status = ::AthCnvSvc::fillRepRefs(pAddress, pObject);
291  } catch(std::runtime_error& e) {
292  ATH_MSG_FATAL(e.what());
293  }
294  }
295  return(status);
296 }
297 //______________________________________________________________________________
298 StatusCode AthenaPoolCnvSvc::connectOutput(const std::string& outputConnectionSpec,
299  const std::string& /*openMode*/) {
300  return(connectOutput(outputConnectionSpec));
301 }
302 //______________________________________________________________________________
303 StatusCode AthenaPoolCnvSvc::connectOutput(const std::string& outputConnectionSpec) {
304 // This is called before DataObjects are being converted.
305  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
306  // Extract the technology
307  int tech = m_dbType.type();
308  if (!decodeOutputSpec(outputConnection, tech).isSuccess()) {
309  ATH_MSG_ERROR("connectOutput FAILED extract file name and technology.");
310  return(StatusCode::FAILURE);
311  }
312  if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
313  if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
314  ATH_MSG_ERROR("Could not make AthenaPoolCnvSvc a Share Client");
315  return(StatusCode::FAILURE);
316  }
317  }
318  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
319  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
320  return(StatusCode::SUCCESS);
321  }
322  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
323  if (m_parallelCompression && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) {
324  ATH_MSG_DEBUG("connectOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
325  return(StatusCode::SUCCESS);
326  }
328  ATH_MSG_DEBUG("connectOutput SKIPPED for expired server.");
329  return(StatusCode::SUCCESS);
330  }
331  }
332 
333  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
334  outputConnection += m_streamPortString.value();
335  }
336  unsigned int contextId = outputContextId(outputConnection);
337  try {
338  if (!m_poolSvc->connect(pool::ITransaction::UPDATE, contextId).isSuccess()) {
339  ATH_MSG_ERROR("connectOutput FAILED to open an UPDATE transaction.");
340  return(StatusCode::FAILURE);
341  }
342  } catch (std::exception& e) {
343  ATH_MSG_ERROR("connectOutput - caught exception: " << e.what());
344  return(StatusCode::FAILURE);
345  }
346 
347  std::unique_lock<std::mutex> lock(m_mutex);
348  if (std::find(m_contextAttr.begin(), m_contextAttr.end(), contextId) == m_contextAttr.end()) {
349  std::size_t merge = outputConnection.find(m_streamPortString.value()); // Used to remove trailing TMemFile
350  int flush = m_numberEventsPerWrite.value();
351  m_contextAttr.push_back(contextId);
352  // Setting default 'TREE_MAX_SIZE' for ROOT to 1024 GB to avoid file chains.
353  std::vector<std::string> maxFileSize;
354  maxFileSize.push_back("TREE_MAX_SIZE");
355  maxFileSize.push_back("1099511627776L");
356  m_domainAttr.emplace_back(std::move(maxFileSize));
357  // Extracting OUTPUT POOL ItechnologySpecificAttributes for Domain, Database and Container.
359  for (std::vector<std::vector<std::string> >::iterator iter = m_databaseAttr.begin(), last = m_databaseAttr.end();
360  iter != last; ++iter) {
361  const std::string& opt = (*iter)[0];
362  std::string& data = (*iter)[1];
363  const std::string& file = (*iter)[2];
364  const std::string& cont = (*iter)[3];
365  std::size_t equal = cont.find('='); // Used to remove leading "TTree="
366  if (equal == std::string::npos) equal = 0;
367  else equal++;
368  std::size_t colon = m_containerPrefixProp.value().find(':');
369  if (colon == std::string::npos) colon = 0; // Used to remove leading technology
370  else colon++;
371  const auto& strProp = m_containerPrefixProp.value();
372  if (merge != std::string::npos && opt == "TREE_AUTO_FLUSH" && 0 == outputConnection.compare(0, merge, file) && cont.compare(equal, std::string::npos, strProp, colon) == 0 && data != "int" && data != "DbLonglong" && data != "double" && data != "string") {
373  flush = atoi(data.c_str());
374  if (flush < 0 && m_numberEventsPerWrite.value() > 0) {
375  flush = m_numberEventsPerWrite.value();
376  std::ostringstream eventAutoFlush;
377  eventAutoFlush << flush;
378  data = eventAutoFlush.str();
379  } else if (flush > 0 && flush < m_numberEventsPerWrite.value()) {
380  flush = flush * (int((m_numberEventsPerWrite.value()) / flush - 0.5) + 1);
381  }
382  }
383  }
384  if (merge != std::string::npos) {
385  ATH_MSG_INFO("connectOutput setting auto write for: " << outputConnection << " to " << flush << " events");
386  m_fileFlushSetting[outputConnection.substr(0, merge)] = flush;
387  }
388  }
389  if (!processPoolAttributes(m_domainAttr, outputConnection, contextId).isSuccess()) {
390  ATH_MSG_DEBUG("connectOutput failed process POOL domain attributes.");
391  }
392  if (!processPoolAttributes(m_databaseAttr, outputConnection, contextId).isSuccess()) {
393  ATH_MSG_DEBUG("connectOutput failed process POOL database attributes.");
394  }
395  return(StatusCode::SUCCESS);
396 }
397 
398 //______________________________________________________________________________
399 StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpec, bool doCommit) {
400  // This is called after all DataObjects are converted.
401  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
402  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
403  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
404  m_outputStreamingTool->lockObject("wait").ignore();
405  if (!this->cleanUp(outputConnection).isSuccess()) {
406  ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
407  return(StatusCode::FAILURE);
408  }
409  return(StatusCode::SUCCESS);
410  }
411  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
412  ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server.");
413  return(StatusCode::SUCCESS);
414  }
415  std::map<void*, RootType> commitCache;
416  std::string fileName;
417  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && m_outputStreamingTool->isServer() && m_streamServerActive) {
418  // Clear object to get Placements for all objects in a Stream
419  const char* placementStr = nullptr;
420  int num = -1;
421  StatusCode sc = m_outputStreamingTool->clearObject(&placementStr, num);
422  if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
423  const char * matchedChars = strstr(placementStr, "[FILE=");
424  if (not matchedChars){
425  ATH_MSG_ERROR("No matching filename in " << placementStr);
426  return abortSharedWrClients(num);
427  }
428  fileName = matchedChars;
429  fileName = fileName.substr(6, fileName.find(']') - 6);
430  if (!this->connectOutput(fileName).isSuccess()) {
431  ATH_MSG_ERROR("Failed to connectOutput for " << fileName);
432  return abortSharedWrClients(num);
433  }
434  IConverter* DHcnv = converter(ClassID_traits<DataHeader>::ID());
435  bool dataHeaderSeen = false;
436  std::string dataHeaderID;
437  while (num > 0) {
438  std::string objName = "ALL";
439  if (m_useDetailChronoStat.value()) {
440  std::string objName(placementStr); //FIXME, better descriptor
441  }
442  // StopWatch listens from here until the end of this current scope
443  {
444  PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, m_chronoMap);
445  std::string tokenStr = placementStr;
446  std::string contName = strstr(placementStr, "[CONT=");
447  tokenStr.erase(tokenStr.find("[CONT=")); //throws if [CONT= not found
448  tokenStr.append(contName, contName.find(']') + 1);
449  contName = contName.substr(6, contName.find(']') - 6);
450  std::string className = strstr(placementStr, "[PNAME=");
451  className = className.substr(7, className.find(']') - 7);
453  void* obj = nullptr;
454  std::ostringstream oss2;
455  oss2 << std::dec << num;
456  std::string::size_type len = m_metadataContainerProp.value().size();
457  bool foundContainer = false;
458  std::size_t pPos = contName.find('(');
459  if (contName.compare(0, pPos, m_metadataContainerProp.value()) == 0) {
460  foundContainer = true;
461  }
462  else {
463  for (const auto& item: m_metadataContainersAug.value()) {
464  if (contName.compare(0, pPos, item) == 0){
465  foundContainer = true;
466  len = item.size();
467  break;
468  }
469  }
470  }
471  if (len > 0 && foundContainer && contName[len] == '(' ) {
472  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
473  // For Metadata, before moving to next client, fire file incidents
474  if (m_metadataClient != num) {
475  if (m_metadataClient != 0) {
476  std::ostringstream oss1;
477  oss1 << std::dec << m_metadataClient;
478  std::string memName = "SHM[NUM=" + oss1.str() + "]";
479  FileIncident beginInputIncident(name(), "BeginInputFile", memName);
480  incSvc->fireIncident(beginInputIncident);
481  FileIncident endInputIncident(name(), "EndInputFile", memName);
482  incSvc->fireIncident(endInputIncident);
483  }
485  }
486  // Retrieve MetaDataSvc
487  ServiceHandle<IAthMetaDataSvc> metadataSvc("MetaDataSvc", name());
488  ATH_CHECK(metadataSvc.retrieve());
489  sc = metadataSvc->shmProxy(std::string(placementStr) + "[NUM=" + oss2.str() + "]");
490  if (sc.isRecoverable()) {
491  ATH_MSG_WARNING("MetaDataSvc::shmProxy() no proxy added.");
492  } else if (sc.isFailure()) {
493  ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!");
494  return abortSharedWrClients(num);
495  }
496  } else {
497  Token readToken;
498  readToken.setOid(Token::OID_t(num, 0));
499  readToken.setAuxString("[PNAME=" + className + "]");
500  this->setObjPtr(obj, &readToken); // Pull/read Object out of shared memory
501  if (len == 0 || contName.compare(0, len, m_metadataContainerProp.value()) != 0) {
502  // Write object
503  if( m_oneDataHeaderForm.value() ) {
504  auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
505  if( className == "DataHeaderForm_p6" ) {
506  // Pass DHForms to the converter for later writing in the correct order - do not write it now
507  GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(),
508  "", placementWithSwn());
509  DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
510  tokenStr = "";
511  } else {
512  Placement placement;
513  placement.fromString(placementStr);
514  std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
515  if (token == nullptr) {
516  ATH_MSG_ERROR("Failed to write Data for: " << className);
517  return abortSharedWrClients(num);
518  }
519  tokenStr = token->toString();
520  }
521  if( className == "DataHeader_p6" ) {
522  // Found DataHeader - call the converter to update DHForm Ref
523  GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(),
524  tokenStr, placementWithSwn());
525  if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
526  ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
527  return abortSharedWrClients(num);
528  }
529  } else
530  if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
531  commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
532  }
533  placementStr = nullptr;
534  } else {
535 
536  // Multiple shared DataHeaderForms
537  Placement placement;
538  placement.fromString(placementStr); placementStr = nullptr;
539  std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
540  if (token == nullptr) {
541  ATH_MSG_ERROR("Failed to write Data for: " << className);
542  return abortSharedWrClients(num);
543  }
544  tokenStr = token->toString();
545  if (className == "DataHeader_p6") {
546  // Found DataHeader
547  GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(),
548  tokenStr, placement.auxString());
549  // call DH converter to add the ref to DHForm (stored earlier) and to itself
550  if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
551  ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
552  return abortSharedWrClients(num);
553  }
554  dataHeaderSeen = true;
555  // This dataHeaderID is used in DataHeaderCnv to index the DataHeaderForm cache.
556  // It must be unique per worker per stream so that we have a correct DataHeader(Form) association.
557  // This is achieved by building it as "CONTID/WORKERID/DBID".
558  // CONTID, e.g., POOLContainer(DataHeader), allows us to distinguish data and metadata headers,
559  // WORKERID allows us to distinguish AthenaMP workers,
560  // and DBID allows us to distinguish streams.
561  dataHeaderID = std::format("{}/{}/{}", token->contID(), oss2.str(), token->dbID().toString());
562  } else if (dataHeaderSeen) {
563  dataHeaderSeen = false;
564  // next object after DataHeader - may be a DataHeaderForm
565  // in any case we need to call the DH converter to update the DHForm Ref
566  if (className == "DataHeaderForm_p6") {
567  // Tell DataHeaderCnv that it should use a new DHForm
568  GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(),
569  tokenStr, dataHeaderID);
570  if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
571  ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr);
572  return abortSharedWrClients(num);
573  }
574  } else {
575  // Tell DataHeaderCnv that it should use the old DHForm
576  GenericAddress address(0, 0, "", dataHeaderID);
577  if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
578  ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
579  return abortSharedWrClients(num);
580  }
581  }
582  }
583  if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
584  commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
585  }
586  }
587  }
588  }
589  // Send Token back to Client
590  sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
591  while (sc.isRecoverable()) {
592  sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
593  }
594  if (!sc.isSuccess()) {
595  ATH_MSG_ERROR("Failed to lock Data for " << tokenStr);
596  return abortSharedWrClients(-1);
597  }
598  }
599  sc = m_outputStreamingTool->clearObject(&placementStr, num);
600  while (sc.isRecoverable()) {
601  sc = m_outputStreamingTool->clearObject(&placementStr, num);
602  }
603  if (sc.isFailure()) {
604  // no more clients, break the loop and exit
605  num = -1;
606  }
607  }
608  if (dataHeaderSeen) {
609  // DataHeader was the last object, need to tell the converter there is no DHForm coming
610  GenericAddress address(0, 0, "", dataHeaderID);
611  if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
612  ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
613  return abortSharedWrClients(-1);
614  }
615  }
616  placementStr = nullptr;
617  } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) {
618  return(StatusCode::RECOVERABLE);
619  } else if (sc.isRecoverable() || num == -1) {
620  return(StatusCode::RECOVERABLE);
621  }
622  if (sc.isFailure() || fileName.empty()) {
623  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
624  std::ostringstream oss1;
625  oss1 << std::dec << m_metadataClient;
626  std::string memName = "SHM[NUM=" + oss1.str() + "]";
627  FileIncident beginInputIncident(name(), "BeginInputFile", memName);
628  incSvc->fireIncident(beginInputIncident);
629  FileIncident endInputIncident(name(), "EndInputFile", memName);
630  incSvc->fireIncident(endInputIncident);
631  if (sc.isFailure()) {
632  ATH_MSG_INFO("All SharedWriter clients stopped - exiting");
633  } else {
634  ATH_MSG_INFO("Failed to get Data for client: " << num);
635  }
636  return(StatusCode::FAILURE);
637  }
638  }
639  if (m_parallelCompression && !fileName.empty()) {
640  ATH_MSG_DEBUG("commitOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
641  return(StatusCode::SUCCESS);
642  }
643  // StopWatch listens from here until the end of this current scope
644  PMonUtils::BasicStopWatch stopWatch("commitOutput", m_chronoMap);
645  std::unique_lock<std::mutex> lock(m_mutex);
646  if (outputConnection.empty()) {
647  outputConnection = fileName;
648  }
649  // Extract the technology
650  int tech = m_dbType.type();
651  if (!decodeOutputSpec(outputConnection, tech).isSuccess()) {
652  ATH_MSG_ERROR("connectOutput FAILED extract file name and technology.");
653  return(StatusCode::FAILURE);
654  }
655  const std::string oldOutputConnection = outputConnection;
656  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
657  m_fileCommitCounter[outputConnection] = m_fileCommitCounter[outputConnection] + 1;
658  if (m_fileFlushSetting[outputConnection] > 0 && m_fileCommitCounter[outputConnection] % m_fileFlushSetting[outputConnection] == 0) {
659  doCommit = true;
660  ATH_MSG_DEBUG("commitOutput sending data.");
661  }
662  outputConnection += m_streamPortString.value();
663  }
664  unsigned int contextId = outputContextId(outputConnection);
665  if (!processPoolAttributes(m_domainAttr, outputConnection, contextId).isSuccess()) {
666  ATH_MSG_DEBUG("commitOutput failed process POOL domain attributes.");
667  }
668  if (!processPoolAttributes(m_databaseAttr, outputConnection, contextId).isSuccess()) {
669  ATH_MSG_DEBUG("commitOutput failed process POOL database attributes.");
670  }
671  if (!processPoolAttributes(m_containerAttr, outputConnection, contextId).isSuccess()) {
672  ATH_MSG_DEBUG("commitOutput failed process POOL container attributes.");
673  }
674  // lock.unlock(); //MN: first need to make commitCache slot-specific
675  try {
676  if (doCommit) {
677  if (!m_poolSvc->commit(contextId).isSuccess()) {
678  ATH_MSG_ERROR("commitOutput FAILED to commit OutputStream.");
679  return(StatusCode::FAILURE);
680  }
681  } else {
682  if (!m_poolSvc->commitAndHold(contextId).isSuccess()) {
683  ATH_MSG_ERROR("commitOutput FAILED to commitAndHold OutputStream.");
684  return(StatusCode::FAILURE);
685  }
686  }
687  } catch (std::exception& e) {
688  ATH_MSG_ERROR("commitOutput - caught exception: " << e.what());
689  return(StatusCode::FAILURE);
690  }
691  if (!this->cleanUp(oldOutputConnection).isSuccess()) {
692  ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
693  return(StatusCode::FAILURE);
694  }
695  for (std::map<void*, RootType>::iterator iter = commitCache.begin(), last = commitCache.end(); iter != last; ++iter) {
696  iter->second.Destruct(iter->first);
697  }
698  // Check FileSize
699  long long int currentFileSize = m_poolSvc->getFileSize(outputConnection, m_dbType.type(), contextId);
700  if (m_databaseMaxFileSize.find(outputConnection) != m_databaseMaxFileSize.end()) {
701  if (currentFileSize > m_databaseMaxFileSize[outputConnection]) {
702  ATH_MSG_WARNING("FileSize > " << m_databaseMaxFileSize[outputConnection] << " for " << outputConnection);
703  return(StatusCode::RECOVERABLE);
704  }
705  } else if (currentFileSize > m_domainMaxFileSize) {
706  ATH_MSG_WARNING("FileSize > " << m_domainMaxFileSize << " for " << outputConnection);
707  return(StatusCode::RECOVERABLE);
708  }
709  return(StatusCode::SUCCESS);
710 }
711 
712 //______________________________________________________________________________
713 StatusCode AthenaPoolCnvSvc::disconnectOutput(const std::string& outputConnectionSpec) {
714  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
715  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
716  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
717  return(StatusCode::SUCCESS);
718  }
719  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
720  if (m_streamServerActive) {
721  m_streamServerActive = false;
722  ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server.");
723  return(StatusCode::SUCCESS);
724  } else {
725  m_streamServerActive = false;
726  }
727  ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server.");
728  }
729  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
730  outputConnection += m_streamPortString.value();
731  }
732  unsigned int contextId = outputContextId(outputConnection);
733  StatusCode sc = m_poolSvc->disconnect(contextId);
734  return sc;
735 }
736 
737 //______________________________________________________________________________
738 unsigned int AthenaPoolCnvSvc::outputContextId(const std::string& outputConnection) {
739  return m_persSvcPerOutput?
740  m_poolSvc->getOutputContext(outputConnection) : (unsigned int)IPoolSvc::kOutputStream;
741 }
742 
743 //______________________________________________________________________________
745  return(&*m_poolSvc);
746 }
747 //______________________________________________________________________________
748 Token* AthenaPoolCnvSvc::registerForWrite(Placement* placement, const void* obj, const RootType& classDesc) {
749  // StopWatch listens from here until the end of this current scope
750  PMonUtils::BasicStopWatch stopWatch("cRepR_ALL", m_chronoMap);
751  if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
752  if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
753  ATH_MSG_ERROR("Could not make AthenaPoolCnvSvc a Share Client");
754  return(nullptr);
755  }
756  }
757  Token* token = nullptr;
758  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
759  && (!m_parallelCompression || placement->containerName().compare(0, m_metadataContainerProp.value().size(), m_metadataContainerProp.value()) == 0)) {
760  // Lock object
761  std::string placementStr = placement->toString();
762  placementStr += "[PNAME=";
763  placementStr += classDesc.Name();
764  placementStr += ']';
765  ATH_MSG_VERBOSE("Requesting write object for: " << placementStr);
766  StatusCode sc = m_outputStreamingTool->lockObject(placementStr.c_str());
767  while (sc.isRecoverable()) {
768  //usleep(100);
769  sc = m_outputStreamingTool->lockObject(placementStr.c_str());
770  }
771  if (!sc.isSuccess()) {
772  ATH_MSG_ERROR("Failed to lock Data for " << placementStr);
773  return(nullptr);
774  }
775  // Serialize object via ROOT
776  const void* buffer = nullptr;
777  std::size_t nbytes = 0;
778  bool own = true;
779  if (classDesc.Name() == "Token") {
780  nbytes = strlen(static_cast<const char*>(obj)) + 1;
781  buffer = obj;
782  own = false;
783  } else if (classDesc.IsFundamental()) {
784  nbytes = classDesc.SizeOf();
785  buffer = obj;
786  own = false;
787  } else {
788  buffer = m_serializeSvc->serialize(obj, classDesc, nbytes);
789  }
790  // Share object
791  sc = m_outputStreamingTool->putObject(buffer, nbytes);
792  while (sc.isRecoverable()) {
793  //usleep(100);
794  sc = m_outputStreamingTool->putObject(buffer, nbytes);
795  }
796  if (own) { delete [] static_cast<const char*>(buffer); }
797  buffer = nullptr;
798  if (!sc.isSuccess()) {
799  ATH_MSG_ERROR("Could not share object for: " << placementStr);
800  m_outputStreamingTool->putObject(nullptr, 0).ignore();
801  return(nullptr);
802  }
803  AuxDiscoverySvc auxDiscover;
804  if (!auxDiscover.sendStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) {
805  ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr);
806  m_outputStreamingTool->putObject(nullptr, 0).ignore();
807  return(nullptr);
808  }
809  if (!m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) {
810  ATH_MSG_ERROR("Failed to put Data for " << placementStr);
811  return(nullptr);
812  }
813  // Get Token back from Server
814  const char* tokenStr = nullptr;
815  int num = -1;
816  sc = m_outputStreamingTool->clearObject(&tokenStr, num);
817  while (sc.isRecoverable()) {
818  //usleep(100);
819  sc = m_outputStreamingTool->clearObject(&tokenStr, num);
820  }
821  if (!sc.isSuccess()) {
822  ATH_MSG_ERROR("Failed to get Token");
823  return(nullptr);
824  }
825  if (!strcmp(tokenStr, "ABORT")) {
826  ATH_MSG_ERROR("Writer requested ABORT");
827  // tell the server we are leaving
828  m_outputStreamingTool->stop().ignore();
829  return nullptr;
830  }
831  Token* tempToken = new Token();
832  tempToken->fromString(tokenStr); tokenStr = nullptr;
833  tempToken->setClassID(pool::DbReflex::guid(classDesc));
834  token = tempToken; tempToken = nullptr;
835 // Client Write Request
836  } else {
837  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
838  ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString());
839  Token* tempToken = new Token();
840  tempToken->setClassID(pool::DbReflex::guid(classDesc));
841  token = tempToken; tempToken = nullptr;
842  } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_streamServerActive) {
843  ATH_MSG_DEBUG("Requested write object for: " << placement->toString());
844  token = m_poolSvc->registerForWrite(placement, obj, classDesc);
845  } else {
846  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
847  placement->setFileName(placement->fileName() + m_streamPortString.value());
848  }
849  if (m_persSvcPerOutput) { // Use separate PersistencySvc for each output stream/file
850  char text[32];
851  ::sprintf(text, "[CTXT=%08X]", m_poolSvc->getOutputContext(placement->fileName()));
852  placement->setAuxString(text);
853  }
854  token = m_poolSvc->registerForWrite(placement, obj, classDesc);
855  }
856  }
857  return(token);
858 }
859 //______________________________________________________________________________
860 void AthenaPoolCnvSvc::setObjPtr(void*& obj, const Token* token) {
861  ATH_MSG_VERBOSE("Requesting object for: " << token->toString());
862  // StopWatch listens from here until the end of this current scope
863  PMonUtils::BasicStopWatch stopWatchOuter("cObjR_ALL", m_chronoMap);
864  if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
865  if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
866  ATH_MSG_ERROR("Could not make AthenaPoolCnvSvc a Share Client");
867  }
868  }
869  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isServer()) {
870  if (token->dbID() == Guid::null()) {
871  int num = token->oid().first;
872  // Get object from SHM
873  void* buffer = nullptr;
874  std::size_t nbytes = 0;
875  StatusCode sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
876  while (sc.isRecoverable()) {
877  //usleep(100);
878  sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
879  }
880  if (!sc.isSuccess()) {
881  ATH_MSG_ERROR("Failed to get Data for " << token->toString());
882  obj = nullptr;
883  } else {
884  ATH_MSG_DEBUG("Server deserializing " << token->toString());
885  if (token->classID() != Guid::null()) {
886  // Deserialize object
887  RootType cltype(pool::DbReflex::forGuid(token->classID()));
888  obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
889  } else {
890  // Deserialize object
891  std::string className = token->auxString();
892  className = className.substr(className.find("[PNAME="));
893  className = className.substr(7, className.find(']') - 7);
895  obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
896  }
897  AuxDiscoverySvc auxDiscover;
898  if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, num).isSuccess()) {
899  ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
900  obj = nullptr;
901  }
902  }
903  }
904  }
905  if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient() && (m_streamingTechnology.value() < 0 || token->technology() == m_streamingTechnology.value())) {
906  ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
907  if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
908  ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
909  obj = nullptr;
910  } else {
911  void* buffer = nullptr;
912  std::size_t nbytes = 0;
913  StatusCode sc = StatusCode::FAILURE;
914  // StopWatch listens from here until the end of this current scope
915  {
916  PMonUtils::BasicStopWatch stopWatchInner("gObj_ALL", m_chronoMap);
917  sc = m_inputStreamingTool->getObject(&buffer, nbytes);
918  while (sc.isRecoverable()) {
919  // sleep
920  sc = m_inputStreamingTool->getObject(&buffer, nbytes);
921  }
922  }
923  if (!sc.isSuccess()) {
924  ATH_MSG_ERROR("Failed to get Data for " << token->toString());
925  obj = nullptr;
926  } else {
927  obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr;
928  AuxDiscoverySvc auxDiscover;
929  if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_inputStreamingTool.get(), obj).isSuccess()) {
930  ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
931  obj = nullptr;
932  }
933  }
934  }
935  } else if (token->dbID() != Guid::null()) {
936  ATH_MSG_VERBOSE("Requesting object for: " << token->toString());
937  m_poolSvc->setObjPtr(obj, token);
938  }
939 }
940 //______________________________________________________________________________
942  return(m_useDetailChronoStat.value());
943 }
944 //______________________________________________________________________________
946  const CLID& clid,
947  const std::string* par,
948  const unsigned long* ip,
949  IOpaqueAddress*& refpAddress) {
950  if (svcType != POOL_StorageType) {
951  ATH_MSG_ERROR("createAddress: svcType != POOL_StorageType " << svcType << " " << POOL_StorageType);
952  return(StatusCode::FAILURE);
953  }
954  if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
955  if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
956  ATH_MSG_ERROR("Could not make AthenaPoolCnvSvc a Share Client");
957  return(StatusCode::FAILURE);
958  }
959  }
960  Token* token = nullptr;
961  if (par[0].compare(0, 3, "SHM") == 0) {
962  token = new Token();
963  token->setOid(Token::OID_t(ip[0], ip[1]));
964  token->setAuxString("[PNAME=" + par[2] + "]");
965  RootType classDesc = RootType::ByNameNoQuiet(par[2]);
966  token->setClassID(pool::DbReflex::guid(classDesc));
967  } else if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) {
968  Token addressToken;
969  addressToken.setDb(par[0].substr(4));
970  addressToken.setCont(par[1]);
971  addressToken.setOid(Token::OID_t(ip[0], ip[1]));
972  if (!m_inputStreamingTool->lockObject(addressToken.toString().c_str()).isSuccess()) {
973  ATH_MSG_WARNING("Failed to lock Address Token: " << addressToken.toString());
974  return(StatusCode::FAILURE);
975  }
976  void* buffer = nullptr;
977  std::size_t nbytes = 0;
978  StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes);
979  while (sc.isRecoverable()) {
980  // sleep
981  sc = m_inputStreamingTool->getObject(&buffer, nbytes);
982  }
983  if (!sc.isSuccess()) {
984  ATH_MSG_WARNING("Failed to get Address Token: " << addressToken.toString());
985  return(StatusCode::FAILURE);
986  }
987  token = new Token();
988  token->fromString(static_cast<const char*>(buffer)); buffer = nullptr;
989  if (token->classID() == Guid::null()) {
990  delete token; token = nullptr;
991  }
992  m_inputStreamingTool->getObject(&buffer, nbytes).ignore();
993  } else {
994  token = m_poolSvc->getToken(par[0], par[1], ip[0]);
995  }
996  if (token == nullptr) {
997  return(StatusCode::RECOVERABLE);
998  }
999  refpAddress = new TokenAddress(POOL_StorageType, clid, "", par[1], IPoolSvc::kInputStream, token);
1000  return(StatusCode::SUCCESS);
1001 }
1002 //______________________________________________________________________________
1004  const CLID& clid,
1005  const std::string& refAddress,
1006  IOpaqueAddress*& refpAddress) {
1007  if (svcType != POOL_StorageType) {
1008  ATH_MSG_ERROR("createAddress: svcType != POOL_StorageType " << svcType << " " << POOL_StorageType);
1009  return(StatusCode::FAILURE);
1010  }
1011  refpAddress = new GenericAddress(POOL_StorageType, clid, refAddress);
1012  return(StatusCode::SUCCESS);
1013 }
1014 //______________________________________________________________________________
1015 StatusCode AthenaPoolCnvSvc::convertAddress(const IOpaqueAddress* pAddress,
1016  std::string& refAddress) {
1017  assert(pAddress);
1018  const TokenAddress* tokAddr = dynamic_cast<const TokenAddress*>(pAddress);
1019  if (tokAddr != nullptr && tokAddr->getToken() != nullptr) {
1020  refAddress = tokAddr->getToken()->toString();
1021  } else {
1022  refAddress = *pAddress->par();
1023  }
1024  return(StatusCode::SUCCESS);
1025 }
1026 //__________________________________________________________________________
1027 StatusCode
1028 AthenaPoolCnvSvc::decodeOutputSpec(std::string& fileSpec, int& outputTech) const
1029 {
1030  if (fileSpec.starts_with ( "oracle") || fileSpec.starts_with ( "mysql")) {
1031  outputTech = pool::POOL_RDBMS_StorageType.type();
1032  } else if (fileSpec.starts_with ( "ROOTKEY:")) {
1033  outputTech = pool::ROOTKEY_StorageType.type();
1034  fileSpec.erase(0, 8);
1035  } else if (fileSpec.starts_with ( "ROOTTREE:")) {
1036  outputTech = pool::ROOTTREE_StorageType.type();
1037  fileSpec.erase(0, 9);
1038  } else if (fileSpec.starts_with ( "ROOTTREEINDEX:")) {
1039  outputTech = pool::ROOTTREEINDEX_StorageType.type();
1040  fileSpec.erase(0, 14);
1041  } else if (fileSpec.starts_with ( "ROOTRNTUPLE:")) {
1042  outputTech = pool::ROOTRNTUPLE_StorageType.type();
1043  fileSpec.erase(0, 12);
1044  } else if (outputTech == 0) {
1045  outputTech = m_dbType.type();
1046  }
1047  return(StatusCode::SUCCESS);
1048 }
1049 //______________________________________________________________________________
1051  m_cnvs.push_back(cnv);
1052  return(StatusCode::SUCCESS);
1053 }
1054 //______________________________________________________________________________
1055 StatusCode AthenaPoolCnvSvc::cleanUp(const std::string& connection) {
1056  bool retError = false;
1057  std::size_t cpos = connection.find(':');
1058  std::size_t bpos = connection.find('[');
1059  if (cpos == std::string::npos) {
1060  cpos = 0;
1061  } else {
1062  cpos++;
1063  }
1064  if (bpos != std::string::npos) bpos = bpos - cpos;
1065  const std::string conn = connection.substr(cpos, bpos);
1066  ATH_MSG_VERBOSE("Cleanup for Connection='"<< conn <<"'");
1067  for (auto converter : m_cnvs) {
1068  if (!converter->cleanUp(conn).isSuccess()) {
1069  ATH_MSG_WARNING("AthenaPoolConverter cleanUp failed.");
1070  retError = true;
1071  }
1072  }
1073  return(retError ? StatusCode::FAILURE : StatusCode::SUCCESS);
1074 }
1075 //______________________________________________________________________________
1077  // Set attributes for input file
1078  m_lastInputFileName = fileName; // Save file name for printing attributes per event
1079  if (!processPoolAttributes(m_inputAttr, m_lastInputFileName, IPoolSvc::kInputStream, false, true, false).isSuccess()) {
1080  ATH_MSG_DEBUG("setInputAttribute failed setting POOL database/container attributes.");
1081  }
1083  ATH_MSG_DEBUG("setInputAttribute failed getting POOL database/container attributes.");
1084  }
1085  if (!m_persSvcPerInputType.empty()) {
1086  // Loop over all extra event input contexts and switch off TTreeCache
1087  const auto& extraInputContextMap = m_poolSvc->getInputContextMap();
1088  for (const auto& [label, id]: extraInputContextMap) {
1089  if (m_poolSvc->setAttribute("TREE_CACHE", "0", pool::DbType(pool::ROOTTREE_StorageType).type(), m_lastInputFileName, m_persSvcPerInputType.value(), id).isSuccess()) {
1090  ATH_MSG_DEBUG("setInputAttribute failed to switch off TTreeCache for = " << label << ".");
1091  }
1092  }
1093  }
1094  return(StatusCode::SUCCESS);
1095 }
1096 //______________________________________________________________________________
1098  if (num < 0) {
1099  num = -num;
1100  m_streamServerActive = true;
1101  num = num % 1024;
1102  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer()) {
1103  ATH_MSG_DEBUG("makeServer: " << m_outputStreamingTool << " = " << num);
1104  ATH_MSG_DEBUG("makeServer: Calling shared memory tool with port suffix " << m_streamPortString);
1105  const std::string streamPortSuffix = m_streamPortString.value();
1106  if (m_outputStreamingTool->makeServer(num, streamPortSuffix).isFailure()) {
1107  ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed");
1108  return(StatusCode::FAILURE);
1109  }
1110  // Disable PersistencySvc per output file mode, for SharedWriter Server
1111  m_persSvcPerOutput.setValue(false);
1112  return(StatusCode::SUCCESS);
1113  }
1114  return(StatusCode::RECOVERABLE);
1115  }
1116  if (m_inputStreamingTool.empty()) {
1117  return(StatusCode::RECOVERABLE);
1118  }
1119  ATH_MSG_DEBUG("makeServer: " << m_inputStreamingTool << " = " << num);
1120  return(m_inputStreamingTool->makeServer(num, ""));
1121 }
1122 //________________________________________________________________________________
1124  if (!m_outputStreamingTool.empty()) {
1125  ATH_MSG_DEBUG("makeClient: " << m_outputStreamingTool << " = " << num);
1126  std::string streamPortSuffix;
1127  if (m_outputStreamingTool->makeClient(num, streamPortSuffix).isFailure()) {
1128  ATH_MSG_ERROR("makeClient: " << m_outputStreamingTool << " failed");
1129  return(StatusCode::FAILURE);
1130  } else if (m_streamPortString.value().find("localhost:0") != std::string::npos) {
1131  // We don't seem to use a dedicated port per stream so doing this for the first client is probably OK
1132  ATH_MSG_DEBUG("makeClient: Setting conversion service port suffix to " << streamPortSuffix);
1133  m_streamPortString.setValue(streamPortSuffix);
1134  }
1135  }
1136  if (m_inputStreamingTool.empty()) {
1137  return(StatusCode::SUCCESS);
1138  }
1139  ATH_MSG_DEBUG("makeClient: " << m_inputStreamingTool << " = " << num);
1140  std::string dummyStr;
1141  return(m_inputStreamingTool->makeClient(num, dummyStr));
1142 }
1143 //________________________________________________________________________________
1145  if (m_inputStreamingTool.empty()) {
1146  return(StatusCode::FAILURE);
1147  }
1148  const char* tokenStr = nullptr;
1149  int num = -1;
1150  StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
1151  if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
1152  ATH_MSG_DEBUG("readData: " << tokenStr << ", for client: " << num);
1153  } else {
1154  return(sc);
1155  }
1156  // Read object instance via POOL/ROOT
1157  void* instance = nullptr;
1158  Token token;
1159  token.fromString(tokenStr); tokenStr = nullptr;
1160  if (token.classID() != Guid::null()) {
1161  std::string objName = "ALL";
1162  if (m_useDetailChronoStat.value()) {
1163  objName = token.classID().toString();
1164  }
1165  // StopWatch listens from here until the end of this current scope
1166  PMonUtils::BasicStopWatch stopWatchInner("cObj_" + objName, m_chronoMap);
1167  this->setObjPtr(instance, &token);
1168  // Serialize object via ROOT
1169  RootType cltype(pool::DbReflex::forGuid(token.classID()));
1170  void* buffer = nullptr;
1171  std::size_t nbytes = 0;
1172  buffer = m_serializeSvc->serialize(instance, cltype, nbytes);
1173  sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
1174  while (sc.isRecoverable()) {
1175  sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
1176  }
1177  delete [] static_cast<char*>(buffer); buffer = nullptr;
1178  if (!sc.isSuccess()) {
1179  ATH_MSG_ERROR("Could not share object for: " << token.toString());
1180  return(StatusCode::FAILURE);
1181  }
1182  AuxDiscoverySvc auxDiscover;
1183  if (!auxDiscover.sendStore(m_serializeSvc.get(), m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) {
1184  ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString());
1185  return(StatusCode::FAILURE);
1186  }
1187  cltype.Destruct(instance); instance = nullptr;
1188  if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
1189  ATH_MSG_ERROR("Could not share object for: " << token.toString());
1190  return(StatusCode::FAILURE);
1191  }
1192  } else if (token.dbID() != Guid::null()) {
1193  std::string returnToken;
1194  const Token* metadataToken = m_poolSvc->getToken("FID:" + token.dbID().toString(), token.contID(), token.oid().first);
1195  if (metadataToken != nullptr) {
1196  returnToken = metadataToken->toString();
1197  } else {
1198  returnToken = token.toString();
1199  }
1200  delete metadataToken; metadataToken = nullptr;
1201  // Share token
1202  sc = m_inputStreamingTool->putObject(returnToken.c_str(), returnToken.size() + 1, num);
1203  if (!sc.isSuccess() || !m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
1204  ATH_MSG_ERROR("Could not share token for: " << token.toString());
1205  return(StatusCode::FAILURE);
1206  }
1207  } else {
1208  return(StatusCode::RECOVERABLE);
1209  }
1210  return(StatusCode::SUCCESS);
1211 }
1212 
1213 //________________________________________________________________________________
1215  pool::IFileCatalog* catalog ATLAS_THREAD_SAFE = // This is on the SharedWriter, after mother process finishes events
1216  const_cast<pool::IFileCatalog*>(m_poolSvc->catalog());
1217  catalog->commit();
1218  catalog->start();
1219  return(StatusCode::SUCCESS);
1220 }
1221 
1222 //______________________________________________________________________________
1224 {
1225  ATH_MSG_ERROR("Sending ABORT to clients");
1226  // the master process will kill this process once workers abort
1227  // but it could be a time-limited loop
1228  StatusCode sc = StatusCode::SUCCESS;
1229  while (sc.isSuccess()) {
1230  if (client_n >= 0) {
1231  sc = m_outputStreamingTool->lockObject("ABORT", client_n);
1232  }
1233  const char* dummy;
1234  sc = m_outputStreamingTool->clearObject(&dummy, client_n);
1235  while (sc.isRecoverable()) {
1236  sc = m_outputStreamingTool->clearObject(&dummy, client_n);
1237  }
1238  }
1239  return StatusCode::FAILURE;
1240 }
1241 
1242 //______________________________________________________________________________
1243 void AthenaPoolCnvSvc::handle(const Incident& incident) {
1244  if (incident.type() == "StoreCleared" && m_outputStreamingTool->isClient() && !m_parallelCompression) {
1245  m_outputStreamingTool->lockObject("release").ignore();
1246  }
1247  if (incident.type() == "EndEvent") {
1249  ATH_MSG_DEBUG("handle EndEvent failed process POOL database attributes.");
1250  }
1251  }
1252 }
1253 //______________________________________________________________________________
1254 AthenaPoolCnvSvc::AthenaPoolCnvSvc(const std::string& name, ISvcLocator* pSvcLocator) :
1255  base_class(name, pSvcLocator, POOL_StorageType)
1256 {
1257 }
1258 //__________________________________________________________________________
1259 void AthenaPoolCnvSvc::extractPoolAttributes(const StringArrayProperty& property,
1260  std::vector<std::vector<std::string> >* contAttr,
1261  std::vector<std::vector<std::string> >* dbAttr,
1262  std::vector<std::vector<std::string> >* domAttr) const {
1263  std::vector<std::string> opt;
1264  std::string attributeName, containerName, databaseName, valueString;
1265  for (std::vector<std::string>::const_iterator iter = property.value().begin(),
1266  last = property.value().end(); iter != last; ++iter) {
1267  opt.clear();
1268  attributeName.clear();
1269  containerName.clear();
1270  databaseName.clear();
1271  valueString.clear();
1272  using Gaudi::Utils::AttribStringParser;
1273  for (const AttribStringParser::Attrib& attrib : AttribStringParser (*iter)) {
1274  const std::string tag = attrib.tag;
1275  const std::string val = attrib.value;
1276  if (tag == "DatabaseName") {
1277  databaseName = val;
1278  } else if (tag == "ContainerName") {
1279  if (databaseName.empty()) {
1280  databaseName = "*";
1281  }
1282  containerName = val;
1283  } else {
1284  attributeName = tag;
1285  valueString = val;
1286  }
1287  }
1288  if (!attributeName.empty() && !valueString.empty()) {
1289  opt.push_back(attributeName);
1290  opt.push_back(valueString);
1291  if (!databaseName.empty()) {
1292  opt.push_back(databaseName);
1293  if (!containerName.empty()) {
1294  opt.push_back(containerName);
1295  if (containerName.compare(0, 6, "TTree=") == 0) {
1296  dbAttr->push_back(opt);
1297  } else {
1298  contAttr->push_back(opt);
1299  }
1300  } else {
1301  opt.push_back("");
1302  dbAttr->push_back(opt);
1303  }
1304  } else if (domAttr != 0) {
1305  domAttr->push_back(opt);
1306  } else {
1307  opt.push_back("*");
1308  opt.push_back("");
1309  dbAttr->push_back(opt);
1310  }
1311  }
1312  }
1313 }
1314 //__________________________________________________________________________
1315 StatusCode AthenaPoolCnvSvc::processPoolAttributes(std::vector<std::vector<std::string> >& attr,
1316  const std::string& fileName,
1317  unsigned long contextId,
1318  bool doGet,
1319  bool doSet,
1320  bool doClear) const {
1321  bool retError = false;
1322  if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) doGet = false;
1323  for (std::vector<std::vector<std::string> >::iterator iter = attr.begin(), last = attr.end();
1324  iter != last; ++iter) {
1325  if (iter->size() == 2) {
1326  const std::string& opt = (*iter)[0];
1327  std::string data = (*iter)[1];
1328  if (data == "int" || data == "DbLonglong" || data == "double" || data == "string") {
1329  if (doGet) {
1330  if (!m_poolSvc->getAttribute(opt, data, pool::DbType(pool::ROOTTREE_StorageType).type(), contextId).isSuccess()) {
1331  ATH_MSG_DEBUG("getAttribute failed for domain attr " << opt);
1332  retError = true;
1333  }
1334  }
1335  } else if (doSet) {
1336  if (m_poolSvc->setAttribute(opt, data, pool::DbType(pool::ROOTTREE_StorageType).type(), contextId).isSuccess()) {
1337  ATH_MSG_DEBUG("setAttribute " << opt << " to " << data);
1338  if (doClear) {
1339  iter->clear();
1340  }
1341  } else {
1342  ATH_MSG_DEBUG("setAttribute failed for domain attr " << opt << " to " << data);
1343  retError = true;
1344  }
1345  }
1346  }
1347  if (iter->size() == 4) {
1348  const std::string& opt = (*iter)[0];
1349  std::string data = (*iter)[1];
1350  const std::string& file = (*iter)[2];
1351  const std::string& cont = (*iter)[3];
1352  if (!fileName.empty() && (0 == fileName.compare(0, fileName.find('?'), file)
1353  || (file[0] == '*' && file.find("," + fileName + ",") == std::string::npos))) {
1354  if (data == "int" || data == "DbLonglong" || data == "double" || data == "string") {
1355  if (doGet) {
1356  if (!m_poolSvc->getAttribute(opt, data, pool::DbType(pool::ROOTTREE_StorageType).type(), fileName, cont, contextId).isSuccess()) {
1357  ATH_MSG_DEBUG("getAttribute failed for database/container attr " << opt);
1358  retError = true;
1359  }
1360  }
1361  } else if (doSet) {
1362  if (m_poolSvc->setAttribute(opt, data, pool::DbType(pool::ROOTTREE_StorageType).type(), fileName, cont, contextId).isSuccess()) {
1363  ATH_MSG_DEBUG("setAttribute " << opt << " to " << data << " for db: " << fileName << " and cont: " << cont);
1364  if (doClear) {
1365  if (file[0] == '*' && !m_persSvcPerOutput) {
1366  (*iter)[2] += "," + fileName + ",";
1367  } else {
1368  iter->clear();
1369  }
1370  }
1371  } else {
1372  ATH_MSG_DEBUG("setAttribute failed for " << opt << " to " << data << " for db: " << fileName << " and cont: " << cont);
1373  retError = true;
1374  }
1375  }
1376  }
1377  }
1378  }
1379  for (std::vector<std::vector<std::string> >::iterator iter = attr.begin(); iter != attr.end(); ) {
1380  if (iter->empty()) {
1381  iter = attr.erase(iter);
1382  } else {
1383  ++iter;
1384  }
1385  }
1386  return(retError ? StatusCode::FAILURE : StatusCode::SUCCESS);
1387 }
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
AthenaPoolCnvSvc::m_poolSvc
ServiceHandle< IPoolSvc > m_poolSvc
Definition: AthenaPoolCnvSvc.h:194
AthenaPoolCnvSvc::m_contextAttr
std::vector< unsigned int > m_contextAttr
Definition: AthenaPoolCnvSvc.h:225
AthenaPoolCnvSvc::registerCleanUp
virtual StatusCode registerCleanUp(IAthenaPoolCleanUp *cnv) override
Implement registerCleanUp to register a IAthenaPoolCleanUp to be called during cleanUp.
Definition: AthenaPoolCnvSvc.cxx:1050
AthenaPoolCnvSvc::createAddress
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
Definition: AthenaPoolCnvSvc.cxx:945
AllowedVariables::e
e
Definition: AsgElectronSelectorTool.cxx:37
IPoolSvc::kOutputStream
@ kOutputStream
Definition: IPoolSvc.h:39
AthenaPoolCnvSvc::cleanUp
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
Definition: AthenaPoolCnvSvc.cxx:1055
AuxDiscoverySvc.h
This file contains the class definition for the AuxDiscoverySvc class.
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
Guid::null
static const Guid & null()
NULL-Guid: static class method.
Definition: Guid.cxx:18
Amg::compare
std::pair< int, int > compare(const AmgSymMatrix(N) &m1, const AmgSymMatrix(N) &m2, double precision=1e-9, bool relative=false)
compare two matrices, returns the indices of the first element that fails the condition,...
Definition: EventPrimitivesHelpers.h:109
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
Placement
This class holds all the necessary information to guide the writing of an object in a physical place.
Definition: Placement.h:19
IPoolSvc
This class provides the interface to the LCG POOL persistency software.
Definition: IPoolSvc.h:35
checkCorrelInHIST.conn
conn
Definition: checkCorrelInHIST.py:25
AthenaPoolCnvSvc::m_metadataContainerProp
StringProperty m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
Definition: AthenaPoolCnvSvc.h:253
TScopeAdapter::ByNameNoQuiet
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition: RootType.cxx:581
StateLessPT_NewConfig.proxy
proxy
Definition: StateLessPT_NewConfig.py:392
AthenaPoolCnvSvc::initialize
virtual StatusCode initialize() override
Required of all Gaudi Services.
Definition: AthenaPoolCnvSvc.cxx:39
DbReflex.h
pool::DbType::getType
static DbType getType(const std::string &name)
Access known storage type object by name.
AthenaPoolCnvSvc::outputContextId
unsigned outputContextId(const std::string &outputConnection)
Definition: AthenaPoolCnvSvc.cxx:738
Placement::containerName
const std::string & containerName() const
Access container name.
Definition: Placement.h:32
vtune_athena.format
format
Definition: vtune_athena.py:14
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
find
std::string find(const std::string &s)
return a remapped string
Definition: hcg.cxx:135
AthenaPoolCnvSvc::m_clidSvc
ServiceHandle< IClassIDSvc > m_clidSvc
Definition: AthenaPoolCnvSvc.h:195
AthenaPoolCnvSvc::m_inputStreamingTool
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
Definition: AthenaPoolCnvSvc.h:197
FullCPAlgorithmsTest_eljob.flush
flush
Definition: FullCPAlgorithmsTest_eljob.py:186
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
make_coralServer_rep.opt
opt
Definition: make_coralServer_rep.py:19
Token::contID
const std::string & contID() const
Access container identifier.
Definition: Token.h:67
AthenaPoolCnvSvc::m_streamingTechnology
IntegerProperty m_streamingTechnology
Use Streaming for selected technologies only.
Definition: AthenaPoolCnvSvc.h:259
IAthMetaDataSvc.h
This file contains the class definition for the IAthMetaDataSvc class.
AthenaPoolCnvSvc::m_useDetailChronoStat
BooleanProperty m_useDetailChronoStat
UseDetailChronoStat, enable detailed output for time and size statistics for AthenaPOOL: default = fa...
Definition: AthenaPoolCnvSvc.h:208
PlotCalibFromCool.label
label
Definition: PlotCalibFromCool.py:78
AthenaPoolCnvSvc::m_domainMaxFileSize
long long m_domainMaxFileSize
Definition: AthenaPoolCnvSvc.h:238
Token::auxString
const std::string & auxString() const
Access auxiliary string.
Definition: Token.h:89
AthenaPoolCnvSvc::m_inputAttrPerEvent
std::vector< std::vector< std::string > > m_inputAttrPerEvent
Definition: AthenaPoolCnvSvc.h:234
IFileCatalog.h
pool::ITransaction::UPDATE
@ UPDATE
Definition: ITransaction.h:30
AthenaPoolCnvSvc::decodeOutputSpec
virtual StatusCode decodeOutputSpec(std::string &connectionSpec, int &outputTech) const override
Extract/deduce the DB technology from the connection string/file specification.
Definition: AthenaPoolCnvSvc.cxx:1028
Token::dbID
const Guid & dbID() const
Access database identifier.
Definition: Token.h:62
IAthenaPoolCleanUp
This class provides the interface for the AthenaPoolCleanUp which is used to clean up AthenaPoolConve...
Definition: IAthenaPoolCleanUp.h:19
AthenaPoolCnvSvc::m_databaseAttr
std::vector< std::vector< std::string > > m_databaseAttr
Definition: AthenaPoolCnvSvc.h:223
AthenaPoolCnvSvc::m_chronoMap
PMonUtils::BasicStopWatchResultMap_t m_chronoMap
Map that holds chrono information.
Definition: AthenaPoolCnvSvc.h:203
athena.value
value
Definition: athena.py:124
AuxDiscoverySvc::sendStore
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
Definition: AuxDiscoverySvc.cxx:208
AthenaPoolCnvSvc::fillRepRefs
virtual StatusCode fillRepRefs(IOpaqueAddress *pAddress, DataObject *pObject) override
Implementation of IConversionSvc: Resolve the references of the converted object.
Definition: AthenaPoolCnvSvc.cxx:265
Guid::toString
const std::string toString() const
Automatic conversion to string representation.
Definition: Guid.cxx:58
AthenaPoolCnvSvc::m_dbType
pool::DbType m_dbType
decoded storage tech requested in "StorageTechnology" property
Definition: AthenaPoolCnvSvc.h:192
pool::IFileCatalog::commit
void commit()
Save catalog to file.
Definition: IFileCatalog.h:49
AthenaPoolCnvSvc::useDetailChronoStat
virtual bool useDetailChronoStat() const override
Definition: AthenaPoolCnvSvc.cxx:941
AthenaPoolCnvSvc::m_mutex
std::mutex m_mutex
Definition: AthenaPoolCnvSvc.h:249
Token::classID
const Guid & classID() const
Access database identifier.
Definition: Token.h:71
AthenaPoolCnvSvc::setObjPtr
virtual void setObjPtr(void *&obj, const Token *token) override
Definition: AthenaPoolCnvSvc.cxx:860
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
pool::DbReflex::forGuid
static const TypeH forGuid(const Guid &info)
Access classes by Guid.
Placement::setAuxString
Placement & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition: Placement.h:42
AthenaPoolCnvSvc::AthenaPoolCnvSvc
AthenaPoolCnvSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Definition: AthenaPoolCnvSvc.cxx:1254
AthenaPoolCnvSvc::getPoolSvc
virtual IPoolSvc * getPoolSvc() override
Definition: AthenaPoolCnvSvc.cxx:744
AthenaPoolCnvSvc::processPoolAttributes
StatusCode processPoolAttributes(std::vector< std::vector< std::string > > &attr, const std::string &fileName, unsigned long contextId, bool doGet=true, bool doSet=true, bool doClear=true) const
Set/get technology dependent POOL attributes.
Definition: AthenaPoolCnvSvc.cxx:1315
AthenaPoolCnvSvc::readData
virtual StatusCode readData() override
Read the next data object.
Definition: AthenaPoolCnvSvc.cxx:1144
AthenaPoolCnvSvc::makeClient
virtual StatusCode makeClient(int num) override
Make this a client.
Definition: AthenaPoolCnvSvc.cxx:1123
AthenaPoolCnvSvc::flushDataHeaderForms
void flushDataHeaderForms(const std::string &streamName="*")
Tell DataHeaderCnv to write out all DataHeaderForms for a given streamName (default is all)
Definition: AthenaPoolCnvSvc.cxx:132
AthenaPoolCnvSvc::m_fileFlushSetting
std::map< std::string, int > m_fileFlushSetting
Definition: AthenaPoolCnvSvc.h:227
AthenaPoolCnvSvc::m_oneDataHeaderForm
BooleanProperty m_oneDataHeaderForm
If true, use only one DataHeaderForm per Stream.
Definition: AthenaPoolCnvSvc.h:268
AthenaPoolCnvSvc::io_reinit
virtual StatusCode io_reinit() override
Definition: AthenaPoolCnvSvc.cxx:126
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
pool::DbType::type
int type() const
Access to full type.
Definition: DbType.h:66
CxxUtils::fpcompare::equal
bool equal(double a, double b)
Compare two FP numbers, working around x87 precision issues.
Definition: fpcompare.h:114
PMonUtils::BasicStopWatch
Definition: BasicStopWatch.h:17
Token
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition: Token.h:21
AthenaPoolCnvSvc::registerForWrite
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
Definition: AthenaPoolCnvSvc.cxx:748
AthenaPoolCnvSvc::m_persSvcPerInputType
StringProperty m_persSvcPerInputType
PersSvcPerInputType, string property, tree name to use multiple persistency services,...
Definition: AthenaPoolCnvSvc.h:248
AthenaPoolCnvSvc::createObj
virtual StatusCode createObj(IOpaqueAddress *pAddress, DataObject *&refpObject) override
Implementation of IConversionSvc: Create the transient representation of an object from persistent st...
Definition: AthenaPoolCnvSvc.cxx:198
AthenaPoolCnvSvc::m_storageTechProp
StringProperty m_storageTechProp
Default Storage Tech for containers (ROOTTREE, ROOTTREEINDEX, ROOTRNTUPLE)
Definition: AthenaPoolCnvSvc.h:211
AthenaPoolCnvSvc::extractPoolAttributes
void extractPoolAttributes(const StringArrayProperty &property, std::vector< std::vector< std::string > > *contAttr, std::vector< std::vector< std::string > > *dbAttr, std::vector< std::vector< std::string > > *domAttr=0) const
Extract POOL ItechnologySpecificAttributes for Domain, Database and Container from property.
Definition: AthenaPoolCnvSvc.cxx:1259
TokenAddress
This class provides a Generic Transient Address for POOL tokens.
Definition: TokenAddress.h:21
IAthenaOutputStreamTool.h
Interface to an output stream tool.
instance
std::map< std::string, double > instance
Definition: Run_To_Get_Tags.h:8
Token::OID_t
Definition: Token.h:24
JetTagCalibConfig.className
string className
Definition: JetTagCalibConfig.py:36
Token::fromString
Token & fromString(const std::string &from)
Build from the string representation of a token.
Definition: Token.cxx:133
APRDefaults::RNTupleNames::EventData
static constexpr const char * EventData
Definition: APRDefaults.h:18
AthenaPoolCnvSvc::handle
virtual void handle(const Incident &incident) override
Implementation of IIncidentListener: Handle for EndEvent incidence.
Definition: AthenaPoolCnvSvc.cxx:1243
Token::setClassID
Token & setClassID(const Guid &cl_id)
Access database identifier.
Definition: Token.h:73
IPoolSvc::kInputStream
@ kInputStream
Definition: IPoolSvc.h:39
Token::technology
int technology() const
Access technology type.
Definition: Token.h:75
python.utils.AtlRunQueryDQUtils.p
p
Definition: AtlRunQueryDQUtils.py:210
createCoolChannelIdFile.buffer
buffer
Definition: createCoolChannelIdFile.py:12
FortranAlgorithmOptions.fileName
fileName
Definition: FortranAlgorithmOptions.py:13
pool::IFileCatalog
Definition: IFileCatalog.h:23
APRDefaults.h
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
pool::DbType
Definition: DbType.h:31
AthenaPoolCnvSvc::m_metadataContainersAug
StringArrayProperty m_metadataContainersAug
Definition: AthenaPoolCnvSvc.h:254
AuxDiscoverySvc::receiveStore
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
Definition: AuxDiscoverySvc.cxx:162
AthenaPoolCnvSvc::setInputAttributes
virtual StatusCode setInputAttributes(const std::string &fileName) override
Set the input file attributes, if any are requested from jobOpts.
Definition: AthenaPoolCnvSvc.cxx:1076
AthenaPoolCnvSvc::finalize
virtual StatusCode finalize() override
Required of all Gaudi Services.
Definition: AthenaPoolCnvSvc.cxx:146
AthenaPoolCnvSvc::m_maxFileSizes
StringArrayProperty m_maxFileSizes
MaxFileSizes, vector with maximum file sizes for Athena POOL output files.
Definition: AthenaPoolCnvSvc.h:237
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthenaPoolCnvSvc::m_makeStreamingToolClient
IntegerProperty m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
Definition: AthenaPoolCnvSvc.h:257
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:40
calibdata.exception
exception
Definition: calibdata.py:496
AthenaPoolCnvSvc.h
This file contains the class definition for the AthenaPoolCnvSvc class.
file
TFile * file
Definition: tile_monitor.h:29
AthenaPoolCnvSvc::m_inputAttr
std::vector< std::vector< std::string > > m_inputAttr
Definition: AthenaPoolCnvSvc.h:230
AthenaPoolCnvSvc::m_databaseMaxFileSize
std::map< std::string, long long > m_databaseMaxFileSize
Definition: AthenaPoolCnvSvc.h:239
find_tgc_unfilled_channelids.ip
ip
Definition: find_tgc_unfilled_channelids.py:3
IAthenaSerializeSvc.h
python.xAODType.dummy
dummy
Definition: xAODType.py:4
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
Token::setDb
Token & setDb(const Guid &db)
Set database name.
Definition: Token.h:64
Placement::fileName
const std::string & fileName() const
Access file name.
Definition: Placement.h:28
AthenaPoolCnvSvc::io_finalize
virtual StatusCode io_finalize() override
Definition: AthenaPoolCnvSvc.cxx:193
AthenaPoolCnvSvc::m_numberEventsPerWrite
IntegerProperty m_numberEventsPerWrite
When using TMemFile call Write on number of Events, respecting CollectionTree auto_flush.
Definition: AthenaPoolCnvSvc.h:265
Placement::setFileName
Placement & setFileName(const std::string &fileName)
Set file name.
Definition: Placement.h:30
AthenaPoolCnvSvc::convertAddress
virtual StatusCode convertAddress(const IOpaqueAddress *pAddress, std::string &refAddress) override
Convert address to string form.
Definition: AthenaPoolCnvSvc.cxx:1015
DataHeader.h
This file contains the class definition for the DataHeader and DataHeaderElement classes.
CLID
uint32_t CLID
The Class ID type.
Definition: Event/xAOD/xAODCore/xAODCore/ClassID_traits.h:47
Placement::auxString
const std::string & auxString() const
Access auxiliary string.
Definition: Placement.h:40
TScopeAdapter::Name
std::string Name(unsigned int mod=Reflex::SCOPED) const
Definition: RootType.cxx:607
AthenaPoolCnvSvc::disconnectOutput
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
Definition: AthenaPoolCnvSvc.cxx:713
AthenaPoolCnvSvc::m_containerAttr
std::vector< std::vector< std::string > > m_containerAttr
Definition: AthenaPoolCnvSvc.h:224
trigbs_pickEvents.num
num
Definition: trigbs_pickEvents.py:76
AthenaPoolCnvSvc::makeServer
virtual StatusCode makeServer(int num) override
Make this a server.
Definition: AthenaPoolCnvSvc.cxx:1097
AthenaPoolCnvSvc::commitOutput
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
Definition: AthenaPoolCnvSvc.cxx:399
AthenaPoolCnvSvc::m_streamServerActive
bool m_streamServerActive
Definition: AthenaPoolCnvSvc.h:199
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:221
createCoolChannelIdFile.par
par
Definition: createCoolChannelIdFile.py:29
AthCnvSvc::createObj
virtual StatusCode createObj(IOpaqueAddress *pAddress, DataObject *&refpObject) override
Implementation of IConverter: Create the transient representation of an object.
Definition: AthCnvSvc.cxx:244
AthenaPoolCnvSvc::m_persSvcPerOutput
BooleanProperty m_persSvcPerOutput
PersSvcPerOutput, boolean property to use multiple persistency services, one per output stream.
Definition: AthenaPoolCnvSvc.h:243
AthenaPoolCnvSvc::createRep
virtual StatusCode createRep(DataObject *pObject, IOpaqueAddress *&refpAddress) override
Implementation of IConversionSvc: Convert the transient object to the requested representation.
Definition: AthenaPoolCnvSvc.cxx:232
Token::toString
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition: Token.cxx:114
AthenaPoolCnvSvc::commitCatalog
virtual StatusCode commitCatalog() override
Commit Catalog.
Definition: AthenaPoolCnvSvc.cxx:1214
AthenaPoolCnvSvc::m_inputPoolAttr
StringArrayProperty m_inputPoolAttr
Input PoolAttributes, vector with names and values of technology specific attributes for POOL.
Definition: AthenaPoolCnvSvc.h:229
Token::setOid
Token & setOid(const OID_t &oid)
Set object identifier.
Definition: Token.h:83
item
Definition: ItemListSvc.h:43
RTTAlgmain.address
address
Definition: RTTAlgmain.py:55
AthenaPoolCnvSvc::m_streamPortString
StringProperty m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
Definition: AthenaPoolCnvSvc.h:263
Placement::toString
const std::string toString() const
Retrieve the string representation of the placement.
Definition: Placement.cxx:15
AthenaPoolCnvSvc::m_domainAttr
std::vector< std::vector< std::string > > m_domainAttr
Definition: AthenaPoolCnvSvc.h:222
AthenaPoolCnvSvc::m_fileCommitCounter
std::map< std::string, int > m_fileCommitCounter
Definition: AthenaPoolCnvSvc.h:226
AthenaPoolCnvSvc::m_metadataClient
int m_metadataClient
Definition: AthenaPoolCnvSvc.h:200
TokenAddress.h
This file contains the class definition for the TokenAddress class.
TScopeAdapter::IsFundamental
Bool_t IsFundamental() const
Definition: RootType.cxx:726
AthenaPoolExample_Copy.streamName
string streamName
Definition: AthenaPoolExample_Copy.py:39
pool::DbType::exactMatch
bool exactMatch(const DbType &typ) const
Definition: DbType.h:73
AthenaPoolCnvSvc::abortSharedWrClients
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
Definition: AthenaPoolCnvSvc.cxx:1223
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaPoolCnvSvc::m_outputStreamingTool
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
Definition: AthenaPoolCnvSvc.h:198
Pythia8_RapidityOrderMPI.val
val
Definition: Pythia8_RapidityOrderMPI.py:14
TokenAddress::getToken
Token * getToken()
Definition: TokenAddress.h:48
Token::oid
const OID_t & oid() const
Access object identifier.
Definition: Token.h:79
Token::setAuxString
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition: Token.h:91
AthenaPoolCnvSvc::connectOutput
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Definition: AthenaPoolCnvSvc.cxx:298
TScopeAdapter::Destruct
void Destruct(void *place) const
Definition: RootType.cxx:672
AthenaPoolCnvSvc::m_poolAttr
StringArrayProperty m_poolAttr
Output PoolAttributes, vector with names and values of technology specific attributes for POOL.
Definition: AthenaPoolCnvSvc.h:221
AthCnvSvc::fillRepRefs
virtual StatusCode fillRepRefs(IOpaqueAddress *pAddress, DataObject *pObject) override
Implementation of IConverter: Resolve the references of the converted object.
Definition: AthCnvSvc.cxx:284
Token::setCont
Token & setCont(const std::string &cnt)
Set container name.
Definition: Token.h:69
AthenaPoolCnvSvc::m_inputPoolAttrPerEvent
StringArrayProperty m_inputPoolAttrPerEvent
Print input PoolAttributes per event, vector with names of technology specific attributes for POOL to...
Definition: AthenaPoolCnvSvc.h:233
makeTransCanvas.text
text
Definition: makeTransCanvas.py:11
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
merge.status
status
Definition: merge.py:17
ATLAS_THREAD_SAFE
#define ATLAS_THREAD_SAFE
Definition: checker_macros.h:211
CaloCondBlobAlgs_fillNoiseFromASCII.tag
string tag
Definition: CaloCondBlobAlgs_fillNoiseFromASCII.py:24
Placement.h
This file contains the class definition for the Placement class (migrated from POOL).
AthenaPoolCnvSvc::m_containerPrefixProp
StringProperty m_containerPrefixProp
POOL Container name prefix - will be part of or whole TTree/RNTuple name 'Default' takes the prefix f...
Definition: AthenaPoolCnvSvc.h:214
APRDefaults::TTreeNames::EventData
static constexpr const char * EventData
Definition: APRDefaults.h:12
python.PyAthena.obj
obj
Definition: PyAthena.py:132
SG::DataProxy
Definition: DataProxy.h:44
TScopeAdapter::SizeOf
size_t SizeOf() const
Definition: RootType.cxx:760
Token.h
This file contains the class definition for the Token class (migrated from POOL).
AthenaPoolCnvSvc::m_parallelCompression
BooleanProperty m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Definition: AthenaPoolCnvSvc.h:261
AthenaPoolCnvSvc::m_lastInputFileName
std::string m_lastInputFileName
Definition: AthenaPoolCnvSvc.h:193
AthenaPoolCnvSvc::m_serializeSvc
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc
Definition: AthenaPoolCnvSvc.h:196
AthCnvSvc::createRep
virtual StatusCode createRep(DataObject *pObject, IOpaqueAddress *&refpAddress) override
Implementation of IConverter: Convert the transient object to the requested representation.
Definition: AthCnvSvc.cxx:276
AthenaPoolCnvSvc::stop
virtual StatusCode stop() override final
Definition: AthenaPoolCnvSvc.cxx:139
Placement::fromString
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
Definition: Placement.cxx:28
AuxDiscoverySvc
This class provides the interface between AthenaPoolCnvSvc and AuxStore classes.
Definition: AuxDiscoverySvc.h:33
merge
Definition: merge.py:1
ServiceHandle
Definition: ClusterMakerTool.h:37
mapkey::key
key
Definition: TElectronEfficiencyCorrectionTool.cxx:37
TScopeAdapter
Definition: RootType.h:119
pool::DbReflex::guid
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.