ATLAS Offline Software
AthenaPoolCnvSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
10 #include "AthenaPoolCnvSvc.h"
11 
12 #include "GaudiKernel/AttribStringParser.h"
13 #include "GaudiKernel/ClassID.h"
14 #include "GaudiKernel/FileIncident.h"
15 #include "GaudiKernel/IIncidentSvc.h"
16 #include "GaudiKernel/IIoComponentMgr.h"
17 #include "GaudiKernel/IOpaqueAddress.h"
18 
26 
27 #include "StorageSvc/DbReflex.h"
29 #include "RootUtils/APRDefaults.h"
30 
31 #include "AuxDiscoverySvc.h"
32 
33 #include <algorithm>
34 #include <iomanip>
35 #include <sstream>
36 
37 //______________________________________________________________________________
38 // Initialize the service.
40  // Initialize DataModelCompatSvc
41  ServiceHandle<IService> dmcsvc("DataModelCompatSvc", this->name());
42  ATH_CHECK(dmcsvc.retrieve());
43  // Retrieve PoolSvc
44  ATH_CHECK(m_poolSvc.retrieve());
45  // Retrieve ClassIDSvc
46  ATH_CHECK(m_clidSvc.retrieve());
47  // Retrieve InputStreamingTool (if configured)
48  if (!m_inputStreamingTool.empty()) {
49  ATH_CHECK(m_inputStreamingTool.retrieve());
50  }
51  // Retrieve OutputStreamingTool (if configured)
52  if (!m_outputStreamingTool.empty()) {
53  ATH_CHECK(m_outputStreamingTool.retrieve());
54  if (m_makeStreamingToolClient.value() == -1) {
55  // Initialize AthenaRootSharedWriter
56  ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name());
57  ATH_CHECK(arswsvc.retrieve());
58  }
59  // Put PoolSvc into share mode to avoid duplicating catalog.
60  m_poolSvc->setShareMode(true);
61  }
62  if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) {
63  // Retrieve AthenaSerializeSvc
64  ATH_CHECK(m_serializeSvc.retrieve());
65  }
66  // Register this service for 'I/O' events
67  ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
68  ATH_CHECK(iomgr.retrieve());
69  if (!iomgr->io_register(this).isSuccess()) {
70  ATH_MSG_FATAL("Could not register myself with the IoComponentMgr !");
71  return(StatusCode::FAILURE);
72  }
73  // Extracting MaxFileSizes for global default and map by Database name.
74  for (std::vector<std::string>::const_iterator iter = m_maxFileSizes.value().begin(),
75  last = m_maxFileSizes.value().end(); iter != last; ++iter) {
76  if (auto p = iter->find('='); p != std::string::npos) {
77  long long maxFileSize = atoll(iter->data() + (p + 1));
78  std::string databaseName = iter->substr(0, iter->find_first_of(" ="));
79  m_databaseMaxFileSize.insert(std::make_pair(databaseName, maxFileSize));
80  } else {
81  m_domainMaxFileSize = atoll(iter->c_str());
82  }
83  }
84  ATH_MSG_DEBUG("Setting StorageType to " << m_storageTechProp.value());
86  if( m_dbType == TEST_StorageType ) {
87  ATH_MSG_FATAL("Unknown StorageType rquested: " << m_storageTechProp.value());
88  return StatusCode::FAILURE;
89  }
90  if( m_containerPrefixProp.value() == "Default" ) {
91  // select default storage element name accoring to storage tech
92  if( m_dbType.exactMatch(pool::ROOTRNTUPLE_StorageType) ) m_containerPrefixProp.setValue( APRDefaults::RNTupleNames::EventData );
94  }
95 
96  // Extracting INPUT POOL ItechnologySpecificAttributes for Domain, Database and Container.
98  // Extracting the INPUT POOL ItechnologySpecificAttributes which are to be printed for each event
100  // Setup incident for EndEvent to print out attributes each event
101  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
102  long int pri = 1000;
103  if (!m_inputPoolAttrPerEvent.value().empty()) {
104  // Set to be listener for EndEvent
105  incSvc->addListener(this, "EndEvent", pri);
106  ATH_MSG_DEBUG("Subscribed to EndEvent for printing out input file attributes.");
107  }
108  if (!processPoolAttributes(m_inputAttr, "", IPoolSvc::kInputStream, false, true, true).isSuccess()) {
109  ATH_MSG_DEBUG("setInputAttribute failed setting POOL domain attributes.");
110  }
111 
112  if (!m_outputStreamingTool.empty()) {
113  incSvc->addListener(this, "StoreCleared", pri);
114  ATH_MSG_DEBUG("Subscribed to StoreCleared");
115  }
116  // Load these dictionaries now, so we don't need to try to do so
117  // while multiple threads are running.
118  TClass::GetClass ("TLeafI");
119  TClass::GetClass ("TLeafL");
120  TClass::GetClass ("TLeafD");
121  TClass::GetClass ("TLeafF");
122 
123  return(StatusCode::SUCCESS);
124 }
125 //______________________________________________________________________________
127  ATH_MSG_DEBUG("I/O reinitialization...");
128  m_contextAttr.clear();
129  return(StatusCode::SUCCESS);
130 }
131 //______________________________________________________________________________
133  // Release AthenaSerializeSvc
134  if (!m_serializeSvc.empty()) {
135  if (!m_serializeSvc.release().isSuccess()) {
136  ATH_MSG_WARNING("Cannot release AthenaSerializeSvc.");
137  }
138  }
139  // Release OutputStreamingTool (if configured)
140  if (!m_outputStreamingTool.empty()) {
141  if (!m_outputStreamingTool.release().isSuccess()) {
142  ATH_MSG_WARNING("Cannot release Output AthenaIPCTool.");
143  }
144  }
145  // Release InputStreamingTool (if configured)
146  if (!m_inputStreamingTool.empty()) {
147  if (!m_inputStreamingTool.release().isSuccess()) {
148  ATH_MSG_WARNING("Cannot release Input AthenaIPCTool.");
149  }
150  }
151  // Release ClassIDSvc
152  if (!m_clidSvc.release().isSuccess()) {
153  ATH_MSG_WARNING("Cannot release ClassIDSvc.");
154  }
155  // Release PoolSvc
156  if (!m_poolSvc.release().isSuccess()) {
157  ATH_MSG_WARNING("Cannot release PoolSvc.");
158  }
159  // Print Performance Statistics
160  // The pattern AthenaPoolCnvSvc.*PerfStats is ignored in AtlasTest/TestTools/share/post.sh
161  const std::string msgPrefix{"PerfStats "};
162  ATH_MSG_INFO(msgPrefix << std::string(40, '-'));
163  ATH_MSG_INFO(msgPrefix << "Timing Measurements for AthenaPoolCnvSvc");
164  ATH_MSG_INFO(msgPrefix << std::string(40, '-'));
165  for(const auto& [key, value] : m_chronoMap) {
166  ATH_MSG_INFO(msgPrefix << "| " << std::left << std::setw(15) << key << " | "
167  << std::right << std::setw(15) << std::fixed << std::setprecision(0) << value << " ms |");
168  }
169  ATH_MSG_INFO(msgPrefix << std::string(40, '-'));
170 
171  m_cnvs.clear();
172  m_cnvs.shrink_to_fit();
173  return(StatusCode::SUCCESS);
174 }
175 //______________________________________________________________________________
177  ATH_MSG_DEBUG("I/O finalization...");
178  return(StatusCode::SUCCESS);
179 }
180 //_______________________________________________________________________
181 StatusCode AthenaPoolCnvSvc::queryInterface(const InterfaceID& riid, void** ppvInterface) {
182  if (IAthenaPoolCnvSvc::interfaceID().versionMatch(riid)) {
183  *ppvInterface = dynamic_cast<IAthenaPoolCnvSvc*>(this);
184  } else {
185  // Interface is not directly available: try out a base class
186  return(::AthCnvSvc::queryInterface(riid, ppvInterface));
187  }
188  addRef();
189  return(StatusCode::SUCCESS);
190 }
191 //______________________________________________________________________________
192 StatusCode AthenaPoolCnvSvc::createObj(IOpaqueAddress* pAddress, DataObject*& refpObject) {
193  assert(pAddress);
194  std::string objName = "ALL";
195  if (m_useDetailChronoStat.value()) {
196  if (m_clidSvc->getTypeNameOfID(pAddress->clID(), objName).isFailure()) {
197  std::ostringstream oss;
198  oss << std::dec << pAddress->clID();
199  objName = oss.str();
200  }
201  objName += '#';
202  objName += *(pAddress->par() + 1);
203  }
204  // StopWatch listens from here until the end of this current scope
205  PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, m_chronoMap);
206  if (!m_persSvcPerInputType.empty()) { // Use separate PersistencySvc for each input data type
207  TokenAddress* tokAddr = dynamic_cast<TokenAddress*>(pAddress);
208  if (tokAddr != nullptr && tokAddr->getToken() != nullptr && (tokAddr->getToken()->contID().starts_with(m_persSvcPerInputType.value() + "(") || tokAddr->getToken()->contID().starts_with(m_persSvcPerInputType.value() + "_"))) {
209  const unsigned int maxContext = m_poolSvc->getInputContextMap().size();
210  const unsigned int auxContext = m_poolSvc->getInputContext(tokAddr->getToken()->classID().toString() + tokAddr->getToken()->dbID().toString(), 1);
211  char text[32];
212  ::sprintf(text, "[CTXT=%08X]", auxContext);
213  if (m_poolSvc->getInputContextMap().size() > maxContext) {
214  if (m_poolSvc->setAttribute("TREE_CACHE", "0", pool::DbType(pool::ROOTTREE_StorageType).type(), "FID:" + tokAddr->getToken()->dbID().toString(), m_persSvcPerInputType.value(), auxContext).isSuccess()) {
215  ATH_MSG_DEBUG("setInputAttribute failed to switch off TTreeCache for id = " << auxContext << ".");
216  }
217  }
218  tokAddr->getToken()->setAuxString(text);
219  }
220  }
221  // Forward to base class createObj
222  StatusCode status = ::AthCnvSvc::createObj(pAddress, refpObject);
223  return(status);
224 }
225 //______________________________________________________________________________
226 StatusCode AthenaPoolCnvSvc::createRep(DataObject* pObject, IOpaqueAddress*& refpAddress) {
227  assert(pObject);
228  std::string objName = "ALL";
229  if (m_useDetailChronoStat.value()) {
230  if (m_clidSvc->getTypeNameOfID(pObject->clID(), objName).isFailure()) {
231  std::ostringstream oss;
232  oss << std::dec << pObject->clID();
233  objName = oss.str();
234  }
235  objName += '#';
236  objName += pObject->registry()->name();
237  }
238  // StopWatch listens from here until the end of this current scope
239  PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, m_chronoMap);
240  StatusCode status = StatusCode::FAILURE;
241  if (pObject->clID() == 1) {
242  // No transient object was found use cnv to write default persistent object
243  SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(pObject->registry());
244  if (proxy != nullptr) {
245  IConverter* cnv = converter(proxy->clID());
246  status = cnv->createRep(pObject, refpAddress);
247  }
248  } else {
249  // Forward to base class createRep
250  try {
251  status = ::AthCnvSvc::createRep(pObject, refpAddress);
252  } catch(std::runtime_error& e) {
253  ATH_MSG_FATAL(e.what());
254  }
255  }
256  return(status);
257 }
258 //______________________________________________________________________________
259 StatusCode AthenaPoolCnvSvc::fillRepRefs(IOpaqueAddress* pAddress, DataObject* pObject) {
260  assert(pObject);
261  std::string objName = "ALL";
262  if (m_useDetailChronoStat.value()) {
263  if (m_clidSvc->getTypeNameOfID(pObject->clID(), objName).isFailure()) {
264  std::ostringstream oss;
265  oss << std::dec << pObject->clID();
266  objName = oss.str();
267  }
268  objName += '#';
269  objName += pObject->registry()->name();
270  }
271  // StopWatch listens from here until the end of this current scope
272  PMonUtils::BasicStopWatch stopWatch("fRep_" + objName, m_chronoMap);
273  StatusCode status = StatusCode::FAILURE;
274  if (pObject->clID() == 1) {
275  // No transient object was found use cnv to write default persistent object
276  SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(pObject->registry());
277  if (proxy != nullptr) {
278  IConverter* cnv = converter(proxy->clID());
279  status = cnv->fillRepRefs(pAddress, pObject);
280  }
281  } else {
282  // Forward to base class fillRepRefs
283  try {
284  status = ::AthCnvSvc::fillRepRefs(pAddress, pObject);
285  } catch(std::runtime_error& e) {
286  ATH_MSG_FATAL(e.what());
287  }
288  }
289  return(status);
290 }
291 //______________________________________________________________________________
292 StatusCode AthenaPoolCnvSvc::connectOutput(const std::string& outputConnectionSpec,
293  const std::string& /*openMode*/) {
294  return(connectOutput(outputConnectionSpec));
295 }
296 //______________________________________________________________________________
297 StatusCode AthenaPoolCnvSvc::connectOutput(const std::string& outputConnectionSpec) {
298 // This is called before DataObjects are being converted.
299  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
300  // Extract the technology
301  int tech = m_dbType.type();
302  if (!decodeOutputSpec(outputConnection, tech).isSuccess()) {
303  ATH_MSG_ERROR("connectOutput FAILED extract file name and technology.");
304  return(StatusCode::FAILURE);
305  }
306  if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
307  if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
308  ATH_MSG_ERROR("Could not make AthenaPoolCnvSvc a Share Client");
309  return(StatusCode::FAILURE);
310  }
311  }
312  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
313  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
314  return(StatusCode::SUCCESS);
315  }
316  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
317  if (m_parallelCompression && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) {
318  ATH_MSG_DEBUG("connectOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
319  return(StatusCode::SUCCESS);
320  }
322  ATH_MSG_DEBUG("connectOutput SKIPPED for expired server.");
323  return(StatusCode::SUCCESS);
324  }
325  }
326 
327  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
328  outputConnection += m_streamPortString.value();
329  }
330  unsigned int contextId = outputContextId(outputConnection);
331  try {
332  if (!m_poolSvc->connect(pool::ITransaction::UPDATE, contextId).isSuccess()) {
333  ATH_MSG_ERROR("connectOutput FAILED to open an UPDATE transaction.");
334  return(StatusCode::FAILURE);
335  }
336  } catch (std::exception& e) {
337  ATH_MSG_ERROR("connectOutput - caught exception: " << e.what());
338  return(StatusCode::FAILURE);
339  }
340 
341  std::unique_lock<std::mutex> lock(m_mutex);
342  if (std::find(m_contextAttr.begin(), m_contextAttr.end(), contextId) == m_contextAttr.end()) {
343  std::size_t merge = outputConnection.find(m_streamPortString.value()); // Used to remove trailing TMemFile
344  int flush = m_numberEventsPerWrite.value();
345  m_contextAttr.push_back(contextId);
346  // Setting default 'TREE_MAX_SIZE' for ROOT to 1024 GB to avoid file chains.
347  std::vector<std::string> maxFileSize;
348  maxFileSize.push_back("TREE_MAX_SIZE");
349  maxFileSize.push_back("1099511627776L");
350  m_domainAttr.emplace_back(std::move(maxFileSize));
351  // Extracting OUTPUT POOL ItechnologySpecificAttributes for Domain, Database and Container.
353  for (std::vector<std::vector<std::string> >::iterator iter = m_databaseAttr.begin(), last = m_databaseAttr.end();
354  iter != last; ++iter) {
355  const std::string& opt = (*iter)[0];
356  std::string& data = (*iter)[1];
357  const std::string& file = (*iter)[2];
358  const std::string& cont = (*iter)[3];
359  std::size_t equal = cont.find('='); // Used to remove leading "TTree="
360  if (equal == std::string::npos) equal = 0;
361  else equal++;
362  std::size_t colon = m_containerPrefixProp.value().find(':');
363  if (colon == std::string::npos) colon = 0; // Used to remove leading technology
364  else colon++;
365  const auto& strProp = m_containerPrefixProp.value();
366  if (merge != std::string::npos && opt == "TREE_AUTO_FLUSH" && 0 == outputConnection.compare(0, merge, file) && cont.compare(equal, std::string::npos, strProp, colon) == 0 && data != "int" && data != "DbLonglong" && data != "double" && data != "string") {
367  flush = atoi(data.c_str());
368  if (flush < 0 && m_numberEventsPerWrite.value() > 0) {
369  flush = m_numberEventsPerWrite.value();
370  std::ostringstream eventAutoFlush;
371  eventAutoFlush << flush;
372  data = eventAutoFlush.str();
373  } else if (flush > 0 && flush < m_numberEventsPerWrite.value()) {
374  flush = flush * (int((m_numberEventsPerWrite.value()) / flush - 0.5) + 1);
375  }
376  }
377  }
378  if (merge != std::string::npos) {
379  ATH_MSG_INFO("connectOutput setting auto write for: " << outputConnection << " to " << flush << " events");
380  m_fileFlushSetting[outputConnection.substr(0, merge)] = flush;
381  }
382  }
383  if (!processPoolAttributes(m_domainAttr, outputConnection, contextId).isSuccess()) {
384  ATH_MSG_DEBUG("connectOutput failed process POOL domain attributes.");
385  }
386  if (!processPoolAttributes(m_databaseAttr, outputConnection, contextId).isSuccess()) {
387  ATH_MSG_DEBUG("connectOutput failed process POOL database attributes.");
388  }
389  return(StatusCode::SUCCESS);
390 }
391 //______________________________________________________________________________
392 StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpec, bool doCommit) {
393 // This is called after all DataObjects are converted.
394  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
395  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
396  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
397  m_outputStreamingTool->lockObject("wait").ignore();
398  if (!this->cleanUp(outputConnection).isSuccess()) {
399  ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
400  return(StatusCode::FAILURE);
401  }
402  return(StatusCode::SUCCESS);
403  }
404  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
405  ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server.");
406  return(StatusCode::SUCCESS);
407  }
408  std::map<void*, RootType> commitCache;
409  std::string fileName;
410  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && m_outputStreamingTool->isServer() && m_streamServerActive) {
411  // Clear object to get Placements for all objects in a Stream
412  const char* placementStr = nullptr;
413  int num = -1;
414  StatusCode sc = m_outputStreamingTool->clearObject(&placementStr, num);
415  if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
416  const char * matchedChars = strstr(placementStr, "[FILE=");
417  if (not matchedChars){
418  ATH_MSG_ERROR("No matching filename in " << placementStr);
419  return abortSharedWrClients(num);
420  }
421  fileName = matchedChars;
422  fileName = fileName.substr(6, fileName.find(']') - 6);
423  if (!this->connectOutput(fileName).isSuccess()) {
424  ATH_MSG_ERROR("Failed to connectOutput for " << fileName);
425  return abortSharedWrClients(num);
426  }
427  IConverter* DHcnv = converter(ClassID_traits<DataHeader>::ID());
428  bool dataHeaderSeen = false;
429  std::string dataHeaderID;
430  while (num > 0) {
431  std::string objName = "ALL";
432  if (m_useDetailChronoStat.value()) {
433  std::string objName(placementStr); //FIXME, better descriptor
434  }
435  // StopWatch listens from here until the end of this current scope
436  {
437  PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, m_chronoMap);
438  std::string tokenStr = placementStr;
439  std::string contName = strstr(placementStr, "[CONT=");
440  tokenStr.erase(tokenStr.find("[CONT=")); //throws if [CONT= not found
441  tokenStr.append(contName, contName.find(']') + 1);
442  contName = contName.substr(6, contName.find(']') - 6);
443  std::string className = strstr(placementStr, "[PNAME=");
444  className = className.substr(7, className.find(']') - 7);
446  void* obj = nullptr;
447  std::ostringstream oss2;
448  oss2 << std::dec << num;
449  std::string::size_type len = m_metadataContainerProp.value().size();
450  bool foundContainer = false;
451  std::size_t pPos = contName.find('(');
452  if (contName.compare(0, pPos, m_metadataContainerProp.value()) == 0) {
453  foundContainer = true;
454  }
455  else {
456  for (const auto& item: m_metadataContainersAug.value()) {
457  if (contName.compare(0, pPos, item) == 0){
458  foundContainer = true;
459  len = item.size();
460  break;
461  }
462  }
463  }
464  if (len > 0 && foundContainer && contName[len] == '(' ) {
465  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
466  // For Metadata, before moving to next client, fire file incidents
467  if (m_metadataClient != num) {
468  if (m_metadataClient != 0) {
469  std::ostringstream oss1;
470  oss1 << std::dec << m_metadataClient;
471  std::string memName = "SHM[NUM=" + oss1.str() + "]";
472  FileIncident beginInputIncident(name(), "BeginInputFile", memName);
473  incSvc->fireIncident(beginInputIncident);
474  FileIncident endInputIncident(name(), "EndInputFile", memName);
475  incSvc->fireIncident(endInputIncident);
476  }
478  }
479  // Retrieve MetaDataSvc
480  ServiceHandle<IAthMetaDataSvc> metadataSvc("MetaDataSvc", name());
481  ATH_CHECK(metadataSvc.retrieve());
482  sc = metadataSvc->shmProxy(std::string(placementStr) + "[NUM=" + oss2.str() + "]");
483  if (sc.isRecoverable()) {
484  ATH_MSG_WARNING("MetaDataSvc::shmProxy() no proxy added.");
485  } else if (sc.isFailure()) {
486  ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!");
487  return abortSharedWrClients(num);
488  }
489  } else {
490  Token readToken;
491  readToken.setOid(Token::OID_t(num, 0));
492  readToken.setAuxString("[PNAME=" + className + "]");
493  this->setObjPtr(obj, &readToken); // Pull/read Object out of shared memory
494  if (len == 0 || contName.compare(0, len, m_metadataContainerProp.value()) != 0) {
495  // Write object
496  Placement placement;
497  placement.fromString(placementStr); placementStr = nullptr;
498  std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
499  if (token == nullptr) {
500  ATH_MSG_ERROR("Failed to write Data for: " << className);
501  return abortSharedWrClients(num);
502  }
503  tokenStr = token->toString();
504  if (className == "DataHeader_p6") {
505  // Found DataHeader
506  GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(),
507  tokenStr, placement.auxString());
508  // call DH converter to add the ref to DHForm (stored earlier) and to itself
509  if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
510  ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
511  return abortSharedWrClients(num);
512  }
513  dataHeaderSeen = true;
514  // This dataHeaderID is used in DataHeaderCnv to index the DataHeaderForm cache.
515  // It must be unique per worker per stream so that we have a correct DataHeader(Form) association.
516  // This is achieved by building it as "CONTID/WORKERID/DBID".
517  // CONTID, e.g., POOLContainer(DataHeader), allows us to distinguish data and metadata headers,
518  // WORKERID allows us to distinguish AthenaMP workers,
519  // and DBID allows us to distinguish streams.
520  dataHeaderID = token->contID();
521  dataHeaderID += '/';
522  dataHeaderID += oss2.str();
523  dataHeaderID += '/';
524  dataHeaderID += token->dbID().toString();
525  } else if (dataHeaderSeen) {
526  dataHeaderSeen = false;
527  // next object after DataHeader - may be a DataHeaderForm
528  // in any case we need to call the DH converter to update the DHForm Ref
529  if (className == "DataHeaderForm_p6") {
530  // Tell DataHeaderCnv that it should use a new DHForm
531  GenericAddress address(POOL_StorageType, ClassID_traits<DataHeader>::ID(),
532  tokenStr, dataHeaderID);
533  if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
534  ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr);
535  return abortSharedWrClients(num);
536  }
537  } else {
538  // Tell DataHeaderCnv that it should use the old DHForm
539  GenericAddress address(0, 0, "", dataHeaderID);
540  if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
541  ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
542  return abortSharedWrClients(num);
543  }
544  }
545  }
546  if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
547  commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
548  }
549  }
550  }
551  // Send Token back to Client
552  sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
553  while (sc.isRecoverable()) {
554  sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
555  }
556  if (!sc.isSuccess()) {
557  ATH_MSG_ERROR("Failed to lock Data for " << tokenStr);
558  return abortSharedWrClients(-1);
559  }
560  }
561  sc = m_outputStreamingTool->clearObject(&placementStr, num);
562  while (sc.isRecoverable()) {
563  sc = m_outputStreamingTool->clearObject(&placementStr, num);
564  }
565  if (sc.isFailure()) {
566  // no more clients, break the loop and exit
567  num = -1;
568  }
569  }
570  if (dataHeaderSeen) {
571  // DataHeader was the last object, need to tell the converter there is no DHForm coming
572  GenericAddress address(0, 0, "", dataHeaderID);
573  if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
574  ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
575  return abortSharedWrClients(-1);
576  }
577  }
578  placementStr = nullptr;
579  } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) {
580  return(StatusCode::RECOVERABLE);
581  } else if (sc.isRecoverable() || num == -1) {
582  return(StatusCode::RECOVERABLE);
583  }
584  if (sc.isFailure() || fileName.empty()) {
585  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
586  std::ostringstream oss1;
587  oss1 << std::dec << m_metadataClient;
588  std::string memName = "SHM[NUM=" + oss1.str() + "]";
589  FileIncident beginInputIncident(name(), "BeginInputFile", memName);
590  incSvc->fireIncident(beginInputIncident);
591  FileIncident endInputIncident(name(), "EndInputFile", memName);
592  incSvc->fireIncident(endInputIncident);
593  if (sc.isFailure()) {
594  ATH_MSG_INFO("All SharedWriter clients stopped - exiting");
595  } else {
596  ATH_MSG_INFO("Failed to get Data for client: " << num);
597  }
598  return(StatusCode::FAILURE);
599  }
600  }
601  if (m_parallelCompression && !fileName.empty()) {
602  ATH_MSG_DEBUG("commitOutput SKIPPED for metadata-only server: " << outputConnectionSpec);
603  return(StatusCode::SUCCESS);
604  }
605  // StopWatch listens from here until the end of this current scope
606  PMonUtils::BasicStopWatch stopWatch("commitOutput", m_chronoMap);
607  std::unique_lock<std::mutex> lock(m_mutex);
608  if (outputConnection.empty()) {
609  outputConnection = fileName;
610  }
611  // Extract the technology
612  int tech = m_dbType.type();
613  if (!decodeOutputSpec(outputConnection, tech).isSuccess()) {
614  ATH_MSG_ERROR("connectOutput FAILED extract file name and technology.");
615  return(StatusCode::FAILURE);
616  }
617  const std::string oldOutputConnection = outputConnection;
618  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
619  m_fileCommitCounter[outputConnection] = m_fileCommitCounter[outputConnection] + 1;
620  if (m_fileFlushSetting[outputConnection] > 0 && m_fileCommitCounter[outputConnection] % m_fileFlushSetting[outputConnection] == 0) {
621  doCommit = true;
622  ATH_MSG_DEBUG("commitOutput sending data.");
623  }
624  outputConnection += m_streamPortString.value();
625  }
626  unsigned int contextId = outputContextId(outputConnection);
627  if (!processPoolAttributes(m_domainAttr, outputConnection, contextId).isSuccess()) {
628  ATH_MSG_DEBUG("commitOutput failed process POOL domain attributes.");
629  }
630  if (!processPoolAttributes(m_databaseAttr, outputConnection, contextId).isSuccess()) {
631  ATH_MSG_DEBUG("commitOutput failed process POOL database attributes.");
632  }
633  if (!processPoolAttributes(m_containerAttr, outputConnection, contextId).isSuccess()) {
634  ATH_MSG_DEBUG("commitOutput failed process POOL container attributes.");
635  }
636  // lock.unlock(); //MN: first need to make commitCache slot-specific
637  try {
638  if (doCommit) {
639  if (!m_poolSvc->commit(contextId).isSuccess()) {
640  ATH_MSG_ERROR("commitOutput FAILED to commit OutputStream.");
641  return(StatusCode::FAILURE);
642  }
643  } else {
644  if (!m_poolSvc->commitAndHold(contextId).isSuccess()) {
645  ATH_MSG_ERROR("commitOutput FAILED to commitAndHold OutputStream.");
646  return(StatusCode::FAILURE);
647  }
648  }
649  } catch (std::exception& e) {
650  ATH_MSG_ERROR("commitOutput - caught exception: " << e.what());
651  return(StatusCode::FAILURE);
652  }
653  if (!this->cleanUp(oldOutputConnection).isSuccess()) {
654  ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
655  return(StatusCode::FAILURE);
656  }
657  for (std::map<void*, RootType>::iterator iter = commitCache.begin(), last = commitCache.end(); iter != last; ++iter) {
658  iter->second.Destruct(iter->first);
659  }
660  // Check FileSize
661  long long int currentFileSize = m_poolSvc->getFileSize(outputConnection, m_dbType.type(), contextId);
662  if (m_databaseMaxFileSize.find(outputConnection) != m_databaseMaxFileSize.end()) {
663  if (currentFileSize > m_databaseMaxFileSize[outputConnection]) {
664  ATH_MSG_WARNING("FileSize > " << m_databaseMaxFileSize[outputConnection] << " for " << outputConnection);
665  return(StatusCode::RECOVERABLE);
666  }
667  } else if (currentFileSize > m_domainMaxFileSize) {
668  ATH_MSG_WARNING("FileSize > " << m_domainMaxFileSize << " for " << outputConnection);
669  return(StatusCode::RECOVERABLE);
670  }
671  return(StatusCode::SUCCESS);
672 }
673 
674 //______________________________________________________________________________
675 StatusCode AthenaPoolCnvSvc::disconnectOutput(const std::string& outputConnectionSpec) {
676  std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
677  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
678  && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
679  return(StatusCode::SUCCESS);
680  }
681  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
682  if (m_streamServerActive) {
683  m_streamServerActive = false;
684  ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server.");
685  return(StatusCode::SUCCESS);
686  } else {
687  m_streamServerActive = false;
688  }
689  ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server.");
690  }
691  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
692  outputConnection += m_streamPortString.value();
693  }
694  unsigned int contextId = outputContextId(outputConnection);
695  StatusCode sc = m_poolSvc->disconnect(contextId);
696  return sc;
697 }
698 
699 //______________________________________________________________________________
700 unsigned int AthenaPoolCnvSvc::outputContextId(const std::string& outputConnection) {
701  return m_persSvcPerOutput?
702  m_poolSvc->getOutputContext(outputConnection) : (unsigned int)IPoolSvc::kOutputStream;
703 }
704 
705 //______________________________________________________________________________
707  return(&*m_poolSvc);
708 }
709 //______________________________________________________________________________
710 Token* AthenaPoolCnvSvc::registerForWrite(Placement* placement, const void* obj, const RootType& classDesc) {
711  // StopWatch listens from here until the end of this current scope
712  PMonUtils::BasicStopWatch stopWatch("cRepR_ALL", m_chronoMap);
713  if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
714  if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
715  ATH_MSG_ERROR("Could not make AthenaPoolCnvSvc a Share Client");
716  return(nullptr);
717  }
718  }
719  Token* token = nullptr;
720  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
721  && (!m_parallelCompression || placement->containerName().compare(0, m_metadataContainerProp.value().size(), m_metadataContainerProp.value()) == 0)) {
722  // Lock object
723  std::string placementStr = placement->toString();
724  placementStr += "[PNAME=";
725  placementStr += classDesc.Name();
726  placementStr += ']';
727  ATH_MSG_VERBOSE("Requesting write object for: " << placementStr);
728  StatusCode sc = m_outputStreamingTool->lockObject(placementStr.c_str());
729  while (sc.isRecoverable()) {
730  //usleep(100);
731  sc = m_outputStreamingTool->lockObject(placementStr.c_str());
732  }
733  if (!sc.isSuccess()) {
734  ATH_MSG_ERROR("Failed to lock Data for " << placementStr);
735  return(nullptr);
736  }
737  // Serialize object via ROOT
738  const void* buffer = nullptr;
739  std::size_t nbytes = 0;
740  bool own = true;
741  if (classDesc.Name() == "Token") {
742  nbytes = strlen(static_cast<const char*>(obj)) + 1;
743  buffer = obj;
744  own = false;
745  } else if (classDesc.IsFundamental()) {
746  nbytes = classDesc.SizeOf();
747  buffer = obj;
748  own = false;
749  } else {
750  buffer = m_serializeSvc->serialize(obj, classDesc, nbytes);
751  }
752  // Share object
753  sc = m_outputStreamingTool->putObject(buffer, nbytes);
754  while (sc.isRecoverable()) {
755  //usleep(100);
756  sc = m_outputStreamingTool->putObject(buffer, nbytes);
757  }
758  if (own) { delete [] static_cast<const char*>(buffer); }
759  buffer = nullptr;
760  if (!sc.isSuccess()) {
761  ATH_MSG_ERROR("Could not share object for: " << placementStr);
762  m_outputStreamingTool->putObject(nullptr, 0).ignore();
763  return(nullptr);
764  }
765  AuxDiscoverySvc auxDiscover;
766  if (!auxDiscover.sendStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) {
767  ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr);
768  m_outputStreamingTool->putObject(nullptr, 0).ignore();
769  return(nullptr);
770  }
771  if (!m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) {
772  ATH_MSG_ERROR("Failed to put Data for " << placementStr);
773  return(nullptr);
774  }
775  // Get Token back from Server
776  const char* tokenStr = nullptr;
777  int num = -1;
778  sc = m_outputStreamingTool->clearObject(&tokenStr, num);
779  while (sc.isRecoverable()) {
780  //usleep(100);
781  sc = m_outputStreamingTool->clearObject(&tokenStr, num);
782  }
783  if (!sc.isSuccess()) {
784  ATH_MSG_ERROR("Failed to get Token");
785  return(nullptr);
786  }
787  if (!strcmp(tokenStr, "ABORT")) {
788  ATH_MSG_ERROR("Writer requested ABORT");
789  // tell the server we are leaving
790  m_outputStreamingTool->stop().ignore();
791  return nullptr;
792  }
793  Token* tempToken = new Token();
794  tempToken->fromString(tokenStr); tokenStr = nullptr;
795  tempToken->setClassID(pool::DbReflex::guid(classDesc));
796  token = tempToken; tempToken = nullptr;
797 // Client Write Request
798  } else {
799  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
800  ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString());
801  Token* tempToken = new Token();
802  tempToken->setClassID(pool::DbReflex::guid(classDesc));
803  token = tempToken; tempToken = nullptr;
804  } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_streamServerActive) {
805  ATH_MSG_DEBUG("Requested write object for: " << placement->toString());
806  token = m_poolSvc->registerForWrite(placement, obj, classDesc);
807  } else {
808  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient() && m_parallelCompression) {
809  placement->setFileName(placement->fileName() + m_streamPortString.value());
810  }
811  if (m_persSvcPerOutput) { // Use separate PersistencySvc for each output stream/file
812  char text[32];
813  ::sprintf(text, "[CTXT=%08X]", m_poolSvc->getOutputContext(placement->fileName()));
814  placement->setAuxString(text);
815  }
816  token = m_poolSvc->registerForWrite(placement, obj, classDesc);
817  }
818  }
819  return(token);
820 }
821 //______________________________________________________________________________
822 void AthenaPoolCnvSvc::setObjPtr(void*& obj, const Token* token) {
823  ATH_MSG_VERBOSE("Requesting object for: " << token->toString());
824  // StopWatch listens from here until the end of this current scope
825  PMonUtils::BasicStopWatch stopWatchOuter("cObjR_ALL", m_chronoMap);
826  if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
827  if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
828  ATH_MSG_ERROR("Could not make AthenaPoolCnvSvc a Share Client");
829  }
830  }
831  if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isServer()) {
832  if (token->dbID() == Guid::null()) {
833  int num = token->oid().first;
834  // Get object from SHM
835  void* buffer = nullptr;
836  std::size_t nbytes = 0;
837  StatusCode sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
838  while (sc.isRecoverable()) {
839  //usleep(100);
840  sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
841  }
842  if (!sc.isSuccess()) {
843  ATH_MSG_ERROR("Failed to get Data for " << token->toString());
844  obj = nullptr;
845  } else {
846  if (token->classID() != Guid::null()) {
847  // Deserialize object
848  RootType cltype(pool::DbReflex::forGuid(token->classID()));
849  obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
850  } else {
851  // Deserialize object
852  std::string className = token->auxString();
853  className = className.substr(className.find("[PNAME="));
854  className = className.substr(7, className.find(']') - 7);
856  obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
857  }
858  AuxDiscoverySvc auxDiscover;
859  if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, num).isSuccess()) {
860  ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
861  obj = nullptr;
862  }
863  }
864  }
865  }
866  if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient() && (m_streamingTechnology.value() < 0 || token->technology() == m_streamingTechnology.value())) {
867  ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
868  if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
869  ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
870  obj = nullptr;
871  } else {
872  void* buffer = nullptr;
873  std::size_t nbytes = 0;
874  StatusCode sc = StatusCode::FAILURE;
875  // StopWatch listens from here until the end of this current scope
876  {
877  PMonUtils::BasicStopWatch stopWatchInner("gObj_ALL", m_chronoMap);
878  sc = m_inputStreamingTool->getObject(&buffer, nbytes);
879  while (sc.isRecoverable()) {
880  // sleep
881  sc = m_inputStreamingTool->getObject(&buffer, nbytes);
882  }
883  }
884  if (!sc.isSuccess()) {
885  ATH_MSG_ERROR("Failed to get Data for " << token->toString());
886  obj = nullptr;
887  } else {
888  obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr;
889  AuxDiscoverySvc auxDiscover;
890  if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_inputStreamingTool.get(), obj).isSuccess()) {
891  ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
892  obj = nullptr;
893  }
894  }
895  }
896  } else if (token->dbID() != Guid::null()) {
897  ATH_MSG_VERBOSE("Requesting object for: " << token->toString());
898  m_poolSvc->setObjPtr(obj, token);
899  }
900 }
901 //______________________________________________________________________________
903  return(m_useDetailChronoStat.value());
904 }
905 //______________________________________________________________________________
907  const CLID& clid,
908  const std::string* par,
909  const unsigned long* ip,
910  IOpaqueAddress*& refpAddress) {
911  if (svcType != POOL_StorageType) {
912  ATH_MSG_ERROR("createAddress: svcType != POOL_StorageType " << svcType << " " << POOL_StorageType);
913  return(StatusCode::FAILURE);
914  }
915  if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
916  if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
917  ATH_MSG_ERROR("Could not make AthenaPoolCnvSvc a Share Client");
918  return(StatusCode::FAILURE);
919  }
920  }
921  Token* token = nullptr;
922  if (par[0].compare(0, 3, "SHM") == 0) {
923  token = new Token();
924  token->setOid(Token::OID_t(ip[0], ip[1]));
925  token->setAuxString("[PNAME=" + par[2] + "]");
926  RootType classDesc = RootType::ByNameNoQuiet(par[2]);
927  token->setClassID(pool::DbReflex::guid(classDesc));
928  } else if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) {
929  Token addressToken;
930  addressToken.setDb(par[0].substr(4));
931  addressToken.setCont(par[1]);
932  addressToken.setOid(Token::OID_t(ip[0], ip[1]));
933  if (!m_inputStreamingTool->lockObject(addressToken.toString().c_str()).isSuccess()) {
934  ATH_MSG_WARNING("Failed to lock Address Token: " << addressToken.toString());
935  return(StatusCode::FAILURE);
936  }
937  void* buffer = nullptr;
938  std::size_t nbytes = 0;
939  StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes);
940  while (sc.isRecoverable()) {
941  // sleep
942  sc = m_inputStreamingTool->getObject(&buffer, nbytes);
943  }
944  if (!sc.isSuccess()) {
945  ATH_MSG_WARNING("Failed to get Address Token: " << addressToken.toString());
946  return(StatusCode::FAILURE);
947  }
948  token = new Token();
949  token->fromString(static_cast<const char*>(buffer)); buffer = nullptr;
950  if (token->classID() == Guid::null()) {
951  delete token; token = nullptr;
952  }
953  m_inputStreamingTool->getObject(&buffer, nbytes).ignore();
954  } else {
955  token = m_poolSvc->getToken(par[0], par[1], ip[0]);
956  }
957  if (token == nullptr) {
958  return(StatusCode::RECOVERABLE);
959  }
960  refpAddress = new TokenAddress(POOL_StorageType, clid, "", par[1], IPoolSvc::kInputStream, token);
961  return(StatusCode::SUCCESS);
962 }
963 //______________________________________________________________________________
965  const CLID& clid,
966  const std::string& refAddress,
967  IOpaqueAddress*& refpAddress) {
968  if (svcType != POOL_StorageType) {
969  ATH_MSG_ERROR("createAddress: svcType != POOL_StorageType " << svcType << " " << POOL_StorageType);
970  return(StatusCode::FAILURE);
971  }
972  refpAddress = new GenericAddress(POOL_StorageType, clid, refAddress);
973  return(StatusCode::SUCCESS);
974 }
975 //______________________________________________________________________________
976 StatusCode AthenaPoolCnvSvc::convertAddress(const IOpaqueAddress* pAddress,
977  std::string& refAddress) {
978  assert(pAddress);
979  const TokenAddress* tokAddr = dynamic_cast<const TokenAddress*>(pAddress);
980  if (tokAddr != nullptr && tokAddr->getToken() != nullptr) {
981  refAddress = tokAddr->getToken()->toString();
982  } else {
983  refAddress = *pAddress->par();
984  }
985  return(StatusCode::SUCCESS);
986 }
987 //__________________________________________________________________________
989 AthenaPoolCnvSvc::decodeOutputSpec(std::string& fileSpec, int& outputTech) const
990 {
991  if (fileSpec.starts_with ( "oracle") || fileSpec.starts_with ( "mysql")) {
992  outputTech = pool::POOL_RDBMS_StorageType.type();
993  } else if (fileSpec.starts_with ( "ROOTKEY:")) {
994  outputTech = pool::ROOTKEY_StorageType.type();
995  fileSpec.erase(0, 8);
996  } else if (fileSpec.starts_with ( "ROOTTREE:")) {
997  outputTech = pool::ROOTTREE_StorageType.type();
998  fileSpec.erase(0, 9);
999  } else if (fileSpec.starts_with ( "ROOTTREEINDEX:")) {
1000  outputTech = pool::ROOTTREEINDEX_StorageType.type();
1001  fileSpec.erase(0, 14);
1002  } else if (fileSpec.starts_with ( "ROOTRNTUPLE:")) {
1003  outputTech = pool::ROOTRNTUPLE_StorageType.type();
1004  fileSpec.erase(0, 12);
1005  } else if (outputTech == 0) {
1006  outputTech = m_dbType.type();
1007  }
1008  return(StatusCode::SUCCESS);
1009 }
1010 //______________________________________________________________________________
1012  m_cnvs.push_back(cnv);
1013  return(StatusCode::SUCCESS);
1014 }
1015 //______________________________________________________________________________
1016 StatusCode AthenaPoolCnvSvc::cleanUp(const std::string& connection) {
1017  bool retError = false;
1018  std::size_t cpos = connection.find(':');
1019  std::size_t bpos = connection.find('[');
1020  if (cpos == std::string::npos) {
1021  cpos = 0;
1022  } else {
1023  cpos++;
1024  }
1025  if (bpos != std::string::npos) bpos = bpos - cpos;
1026  const std::string conn = connection.substr(cpos, bpos);
1027  ATH_MSG_VERBOSE("Cleanup for Connection='"<< conn <<"'");
1028  for (auto converter : m_cnvs) {
1029  if (!converter->cleanUp(conn).isSuccess()) {
1030  ATH_MSG_WARNING("AthenaPoolConverter cleanUp failed.");
1031  retError = true;
1032  }
1033  }
1034  return(retError ? StatusCode::FAILURE : StatusCode::SUCCESS);
1035 }
1036 //______________________________________________________________________________
1038  // Set attributes for input file
1039  m_lastInputFileName = fileName; // Save file name for printing attributes per event
1040  if (!processPoolAttributes(m_inputAttr, m_lastInputFileName, IPoolSvc::kInputStream, false, true, false).isSuccess()) {
1041  ATH_MSG_DEBUG("setInputAttribute failed setting POOL database/container attributes.");
1042  }
1044  ATH_MSG_DEBUG("setInputAttribute failed getting POOL database/container attributes.");
1045  }
1046  if (!m_persSvcPerInputType.empty()) {
1047  // Loop over all extra event input contexts and switch off TTreeCache
1048  const auto& extraInputContextMap = m_poolSvc->getInputContextMap();
1049  for (const auto& [label, id]: extraInputContextMap) {
1050  if (m_poolSvc->setAttribute("TREE_CACHE", "0", pool::DbType(pool::ROOTTREE_StorageType).type(), m_lastInputFileName, m_persSvcPerInputType.value(), id).isSuccess()) {
1051  ATH_MSG_DEBUG("setInputAttribute failed to switch off TTreeCache for = " << label << ".");
1052  }
1053  }
1054  }
1055  return(StatusCode::SUCCESS);
1056 }
1057 //______________________________________________________________________________
1059  if (num < 0) {
1060  num = -num;
1061  m_streamServerActive = true;
1062  num = num % 1024;
1063  if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer()) {
1064  ATH_MSG_DEBUG("makeServer: " << m_outputStreamingTool << " = " << num);
1065  ATH_MSG_DEBUG("makeServer: Calling shared memory tool with port suffix " << m_streamPortString);
1066  const std::string streamPortSuffix = m_streamPortString.value();
1067  if (m_outputStreamingTool->makeServer(num, streamPortSuffix).isFailure()) {
1068  ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed");
1069  return(StatusCode::FAILURE);
1070  }
1071  // Disable PersistencySvc per output file mode, for SharedWriter Server
1072  m_persSvcPerOutput.setValue(false);
1073  return(StatusCode::SUCCESS);
1074  }
1075  return(StatusCode::RECOVERABLE);
1076  }
1077  if (m_inputStreamingTool.empty()) {
1078  return(StatusCode::RECOVERABLE);
1079  }
1080  ATH_MSG_DEBUG("makeServer: " << m_inputStreamingTool << " = " << num);
1081  return(m_inputStreamingTool->makeServer(num, ""));
1082 }
1083 //________________________________________________________________________________
1085  if (!m_outputStreamingTool.empty()) {
1086  ATH_MSG_DEBUG("makeClient: " << m_outputStreamingTool << " = " << num);
1087  std::string streamPortSuffix;
1088  if (m_outputStreamingTool->makeClient(num, streamPortSuffix).isFailure()) {
1089  ATH_MSG_ERROR("makeClient: " << m_outputStreamingTool << " failed");
1090  return(StatusCode::FAILURE);
1091  } else if (m_streamPortString.value().find("localhost:0") != std::string::npos) {
1092  // We don't seem to use a dedicated port per stream so doing this for the first client is probably OK
1093  ATH_MSG_DEBUG("makeClient: Setting conversion service port suffix to " << streamPortSuffix);
1094  m_streamPortString.setValue(streamPortSuffix);
1095  }
1096  }
1097  if (m_inputStreamingTool.empty()) {
1098  return(StatusCode::SUCCESS);
1099  }
1100  ATH_MSG_DEBUG("makeClient: " << m_inputStreamingTool << " = " << num);
1101  std::string dummyStr;
1102  return(m_inputStreamingTool->makeClient(num, dummyStr));
1103 }
1104 //________________________________________________________________________________
1106  if (m_inputStreamingTool.empty()) {
1107  return(StatusCode::FAILURE);
1108  }
1109  const char* tokenStr = nullptr;
1110  int num = -1;
1111  StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
1112  if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
1113  ATH_MSG_DEBUG("readData: " << tokenStr << ", for client: " << num);
1114  } else {
1115  return(sc);
1116  }
1117  // Read object instance via POOL/ROOT
1118  void* instance = nullptr;
1119  Token token;
1120  token.fromString(tokenStr); tokenStr = nullptr;
1121  if (token.classID() != Guid::null()) {
1122  std::string objName = "ALL";
1123  if (m_useDetailChronoStat.value()) {
1124  objName = token.classID().toString();
1125  }
1126  // StopWatch listens from here until the end of this current scope
1127  PMonUtils::BasicStopWatch stopWatchInner("cObj_" + objName, m_chronoMap);
1128  this->setObjPtr(instance, &token);
1129  // Serialize object via ROOT
1130  RootType cltype(pool::DbReflex::forGuid(token.classID()));
1131  void* buffer = nullptr;
1132  std::size_t nbytes = 0;
1133  buffer = m_serializeSvc->serialize(instance, cltype, nbytes);
1134  sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
1135  while (sc.isRecoverable()) {
1136  sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
1137  }
1138  delete [] static_cast<char*>(buffer); buffer = nullptr;
1139  if (!sc.isSuccess()) {
1140  ATH_MSG_ERROR("Could not share object for: " << token.toString());
1141  return(StatusCode::FAILURE);
1142  }
1143  AuxDiscoverySvc auxDiscover;
1144  if (!auxDiscover.sendStore(m_serializeSvc.get(), m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) {
1145  ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString());
1146  return(StatusCode::FAILURE);
1147  }
1148  cltype.Destruct(instance); instance = nullptr;
1149  if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
1150  ATH_MSG_ERROR("Could not share object for: " << token.toString());
1151  return(StatusCode::FAILURE);
1152  }
1153  } else if (token.dbID() != Guid::null()) {
1154  std::string returnToken;
1155  const Token* metadataToken = m_poolSvc->getToken("FID:" + token.dbID().toString(), token.contID(), token.oid().first);
1156  if (metadataToken != nullptr) {
1157  returnToken = metadataToken->toString();
1158  } else {
1159  returnToken = token.toString();
1160  }
1161  delete metadataToken; metadataToken = nullptr;
1162  // Share token
1163  sc = m_inputStreamingTool->putObject(returnToken.c_str(), returnToken.size() + 1, num);
1164  if (!sc.isSuccess() || !m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
1165  ATH_MSG_ERROR("Could not share token for: " << token.toString());
1166  return(StatusCode::FAILURE);
1167  }
1168  } else {
1169  return(StatusCode::RECOVERABLE);
1170  }
1171  return(StatusCode::SUCCESS);
1172 }
1173 
1174 //________________________________________________________________________________
1176  pool::IFileCatalog* catalog ATLAS_THREAD_SAFE = // This is on the SharedWriter, after mother process finishes events
1177  const_cast<pool::IFileCatalog*>(m_poolSvc->catalog());
1178  catalog->commit();
1179  catalog->start();
1180  return(StatusCode::SUCCESS);
1181 }
1182 
1183 //______________________________________________________________________________
1185 {
1186  ATH_MSG_ERROR("Sending ABORT to clients");
1187  // the master process will kill this process once workers abort
1188  // but it could be a time-limited loop
1189  StatusCode sc = StatusCode::SUCCESS;
1190  while (sc.isSuccess()) {
1191  if (client_n >= 0) {
1192  sc = m_outputStreamingTool->lockObject("ABORT", client_n);
1193  }
1194  const char* dummy;
1195  sc = m_outputStreamingTool->clearObject(&dummy, client_n);
1196  while (sc.isRecoverable()) {
1197  sc = m_outputStreamingTool->clearObject(&dummy, client_n);
1198  }
1199  }
1200  return StatusCode::FAILURE;
1201 }
1202 
1203 //______________________________________________________________________________
1204 void AthenaPoolCnvSvc::handle(const Incident& incident) {
1205  if (incident.type() == "StoreCleared" && m_outputStreamingTool->isClient() && !m_parallelCompression) {
1206  m_outputStreamingTool->lockObject("release").ignore();
1207  }
1208  if (incident.type() == "EndEvent") {
1210  ATH_MSG_DEBUG("handle EndEvent failed process POOL database attributes.");
1211  }
1212  }
1213 }
1214 //______________________________________________________________________________
1215 AthenaPoolCnvSvc::AthenaPoolCnvSvc(const std::string& name, ISvcLocator* pSvcLocator) :
1216  ::AthCnvSvc(name, pSvcLocator, POOL_StorageType),
1217  m_outputStreamingTool(this)
1218 {
1219  declareProperty("OutputStreamingTool", m_outputStreamingTool);
1220 }
1221 //__________________________________________________________________________
1222 void AthenaPoolCnvSvc::extractPoolAttributes(const StringArrayProperty& property,
1223  std::vector<std::vector<std::string> >* contAttr,
1224  std::vector<std::vector<std::string> >* dbAttr,
1225  std::vector<std::vector<std::string> >* domAttr) const {
1226  std::vector<std::string> opt;
1227  std::string attributeName, containerName, databaseName, valueString;
1228  for (std::vector<std::string>::const_iterator iter = property.value().begin(),
1229  last = property.value().end(); iter != last; ++iter) {
1230  opt.clear();
1231  attributeName.clear();
1232  containerName.clear();
1233  databaseName.clear();
1234  valueString.clear();
1235  using Gaudi::Utils::AttribStringParser;
1236  for (const AttribStringParser::Attrib& attrib : AttribStringParser (*iter)) {
1237  const std::string tag = attrib.tag;
1238  const std::string val = attrib.value;
1239  if (tag == "DatabaseName") {
1240  databaseName = val;
1241  } else if (tag == "ContainerName") {
1242  if (databaseName.empty()) {
1243  databaseName = "*";
1244  }
1245  containerName = val;
1246  } else {
1247  attributeName = tag;
1248  valueString = val;
1249  }
1250  }
1251  if (!attributeName.empty() && !valueString.empty()) {
1252  opt.push_back(attributeName);
1253  opt.push_back(valueString);
1254  if (!databaseName.empty()) {
1255  opt.push_back(databaseName);
1256  if (!containerName.empty()) {
1257  opt.push_back(containerName);
1258  if (containerName.compare(0, 6, "TTree=") == 0) {
1259  dbAttr->push_back(opt);
1260  } else {
1261  contAttr->push_back(opt);
1262  }
1263  } else {
1264  opt.push_back("");
1265  dbAttr->push_back(opt);
1266  }
1267  } else if (domAttr != 0) {
1268  domAttr->push_back(opt);
1269  } else {
1270  opt.push_back("*");
1271  opt.push_back("");
1272  dbAttr->push_back(opt);
1273  }
1274  }
1275  }
1276 }
1277 //__________________________________________________________________________
1278 StatusCode AthenaPoolCnvSvc::processPoolAttributes(std::vector<std::vector<std::string> >& attr,
1279  const std::string& fileName,
1280  unsigned long contextId,
1281  bool doGet,
1282  bool doSet,
1283  bool doClear) const {
1284  bool retError = false;
1285  if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) doGet = false;
1286  for (std::vector<std::vector<std::string> >::iterator iter = attr.begin(), last = attr.end();
1287  iter != last; ++iter) {
1288  if (iter->size() == 2) {
1289  const std::string& opt = (*iter)[0];
1290  std::string data = (*iter)[1];
1291  if (data == "int" || data == "DbLonglong" || data == "double" || data == "string") {
1292  if (doGet) {
1293  if (!m_poolSvc->getAttribute(opt, data, pool::DbType(pool::ROOTTREE_StorageType).type(), contextId).isSuccess()) {
1294  ATH_MSG_DEBUG("getAttribute failed for domain attr " << opt);
1295  retError = true;
1296  }
1297  }
1298  } else if (doSet) {
1299  if (m_poolSvc->setAttribute(opt, data, pool::DbType(pool::ROOTTREE_StorageType).type(), contextId).isSuccess()) {
1300  ATH_MSG_DEBUG("setAttribute " << opt << " to " << data);
1301  if (doClear) {
1302  iter->clear();
1303  }
1304  } else {
1305  ATH_MSG_DEBUG("setAttribute failed for domain attr " << opt << " to " << data);
1306  retError = true;
1307  }
1308  }
1309  }
1310  if (iter->size() == 4) {
1311  const std::string& opt = (*iter)[0];
1312  std::string data = (*iter)[1];
1313  const std::string& file = (*iter)[2];
1314  const std::string& cont = (*iter)[3];
1315  if (!fileName.empty() && (0 == fileName.compare(0, fileName.find('?'), file)
1316  || (file[0] == '*' && file.find("," + fileName + ",") == std::string::npos))) {
1317  if (data == "int" || data == "DbLonglong" || data == "double" || data == "string") {
1318  if (doGet) {
1319  if (!m_poolSvc->getAttribute(opt, data, pool::DbType(pool::ROOTTREE_StorageType).type(), fileName, cont, contextId).isSuccess()) {
1320  ATH_MSG_DEBUG("getAttribute failed for database/container attr " << opt);
1321  retError = true;
1322  }
1323  }
1324  } else if (doSet) {
1325  if (m_poolSvc->setAttribute(opt, data, pool::DbType(pool::ROOTTREE_StorageType).type(), fileName, cont, contextId).isSuccess()) {
1326  ATH_MSG_DEBUG("setAttribute " << opt << " to " << data << " for db: " << fileName << " and cont: " << cont);
1327  if (doClear) {
1328  if (file[0] == '*' && !m_persSvcPerOutput) {
1329  (*iter)[2] += "," + fileName + ",";
1330  } else {
1331  iter->clear();
1332  }
1333  }
1334  } else {
1335  ATH_MSG_DEBUG("setAttribute failed for " << opt << " to " << data << " for db: " << fileName << " and cont: " << cont);
1336  retError = true;
1337  }
1338  }
1339  }
1340  }
1341  }
1342  for (std::vector<std::vector<std::string> >::iterator iter = attr.begin(); iter != attr.end(); ) {
1343  if (iter->empty()) {
1344  iter = attr.erase(iter);
1345  } else {
1346  ++iter;
1347  }
1348  }
1349  return(retError ? StatusCode::FAILURE : StatusCode::SUCCESS);
1350 }
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
AthenaPoolCnvSvc::m_poolSvc
ServiceHandle< IPoolSvc > m_poolSvc
Definition: AthenaPoolCnvSvc.h:192
AthenaPoolCnvSvc::m_contextAttr
std::vector< unsigned int > m_contextAttr
Definition: AthenaPoolCnvSvc.h:223
AthenaPoolCnvSvc::disconnectOutput
StatusCode disconnectOutput(const std::string &outputConnectionSpec)
Disconnect to the output connection.
Definition: AthenaPoolCnvSvc.cxx:675
AthenaPoolCnvSvc::registerCleanUp
StatusCode registerCleanUp(IAthenaPoolCleanUp *cnv)
Implement registerCleanUp to register a IAthenaPoolCleanUp to be called during cleanUp.
Definition: AthenaPoolCnvSvc.cxx:1011
IPoolSvc::kOutputStream
@ kOutputStream
Definition: IPoolSvc.h:40
AuxDiscoverySvc.h
This file contains the class definition for the AuxDiscoverySvc class.
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
Guid::null
static const Guid & null()
NULL-Guid: static class method.
Definition: Guid.cxx:18
Amg::compare
std::pair< int, int > compare(const AmgSymMatrix(N) &m1, const AmgSymMatrix(N) &m2, double precision=1e-9, bool relative=false)
compare two matrices, returns the indices of the first element that fails the condition,...
Definition: EventPrimitivesHelpers.h:109
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
Placement
This class holds all the necessary information to guide the writing of an object in a physical place.
Definition: Placement.h:19
IPoolSvc
This class provides the interface to the LCG POOL persistency software.
Definition: IPoolSvc.h:36
checkCorrelInHIST.conn
conn
Definition: checkCorrelInHIST.py:25
AthenaPoolCnvSvc::m_metadataContainerProp
StringProperty m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
Definition: AthenaPoolCnvSvc.h:251
TScopeAdapter::ByNameNoQuiet
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition: RootType.cxx:581
StateLessPT_NewConfig.proxy
proxy
Definition: StateLessPT_NewConfig.py:392
python.PerfMonSerializer.p
def p
Definition: PerfMonSerializer.py:743
DbReflex.h
pool::DbType::getType
static DbType getType(const std::string &name)
Access known storage type object by name.
AthenaPoolCnvSvc::outputContextId
unsigned outputContextId(const std::string &outputConnection)
Definition: AthenaPoolCnvSvc.cxx:700
AthenaPoolCnvSvc::commitCatalog
virtual StatusCode commitCatalog()
Commit Catalog.
Definition: AthenaPoolCnvSvc.cxx:1175
Placement::containerName
const std::string & containerName() const
Access container name.
Definition: Placement.h:32
AthCnvSvc::queryInterface
virtual StatusCode queryInterface(const InterfaceID &riid, void **ppvInterface)
Definition: AthCnvSvc.cxx:149
AthenaPoolCnvSvc::registerForWrite
Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc)
Definition: AthenaPoolCnvSvc.cxx:710
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
find
std::string find(const std::string &s)
return a remapped string
Definition: hcg.cxx:135
AthenaPoolCnvSvc::m_clidSvc
ServiceHandle< IClassIDSvc > m_clidSvc
Definition: AthenaPoolCnvSvc.h:193
AthenaPoolCnvSvc::m_inputStreamingTool
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
Definition: AthenaPoolCnvSvc.h:195
FullCPAlgorithmsTest_eljob.flush
flush
Definition: FullCPAlgorithmsTest_eljob.py:168
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
Token::contID
const std::string & contID() const
Access container identifier.
Definition: Token.h:67
AthenaPoolCnvSvc::m_streamingTechnology
IntegerProperty m_streamingTechnology
Use Streaming for selected technologies only.
Definition: AthenaPoolCnvSvc.h:257
IAthMetaDataSvc.h
This file contains the class definition for the IAthMetaDataSvc class.
AthenaPoolCnvSvc::m_useDetailChronoStat
BooleanProperty m_useDetailChronoStat
UseDetailChronoStat, enable detailed output for time and size statistics for AthenaPOOL: default = fa...
Definition: AthenaPoolCnvSvc.h:206
PlotCalibFromCool.label
label
Definition: PlotCalibFromCool.py:78
AthenaPoolCnvSvc::commitOutput
StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit)
Implementation of IConversionSvc: Commit pending output.
Definition: AthenaPoolCnvSvc.cxx:392
AthenaPoolCnvSvc::m_domainMaxFileSize
long long m_domainMaxFileSize
Definition: AthenaPoolCnvSvc.h:236
Token::auxString
const std::string & auxString() const
Access auxiliary string.
Definition: Token.h:89
AthenaPoolCnvSvc::m_inputAttrPerEvent
std::vector< std::vector< std::string > > m_inputAttrPerEvent
Definition: AthenaPoolCnvSvc.h:232
IFileCatalog.h
pool::ITransaction::UPDATE
@ UPDATE
Definition: ITransaction.h:30
Token::dbID
const Guid & dbID() const
Access database identifier.
Definition: Token.h:62
IAthenaPoolCleanUp
This class provides the interface for the AthenaPoolCleanUp which is used to clean up AthenaPoolConve...
Definition: IAthenaPoolCleanUp.h:19
AthenaPoolCnvSvc::io_finalize
StatusCode io_finalize()
Definition: AthenaPoolCnvSvc.cxx:176
AthenaPoolCnvSvc::m_databaseAttr
std::vector< std::vector< std::string > > m_databaseAttr
Definition: AthenaPoolCnvSvc.h:221
AthenaPoolCnvSvc::m_chronoMap
PMonUtils::BasicStopWatchResultMap_t m_chronoMap
Map that holds chrono information.
Definition: AthenaPoolCnvSvc.h:201
AthenaPoolCnvSvc::queryInterface
StatusCode queryInterface(const InterfaceID &riid, void **ppvInterface)
Required of all Gaudi services: see Gaudi documentation for details.
Definition: AthenaPoolCnvSvc.cxx:181
AthenaPoolCnvSvc::io_reinit
StatusCode io_reinit()
Definition: AthenaPoolCnvSvc.cxx:126
athena.value
value
Definition: athena.py:122
AthenaPoolCnvSvc::decodeOutputSpec
StatusCode decodeOutputSpec(std::string &connectionSpec, int &outputTech) const
Extract/deduce the DB technology from the connection string/file specification.
Definition: AthenaPoolCnvSvc.cxx:989
AuxDiscoverySvc::sendStore
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
Definition: AuxDiscoverySvc.cxx:208
Guid::toString
const std::string toString() const
Automatic conversion to string representation.
Definition: Guid.cxx:58
AthenaPoolCnvSvc::m_dbType
pool::DbType m_dbType
decoded storage tech requested in "StorageTechnology" property
Definition: AthenaPoolCnvSvc.h:190
pool::IFileCatalog::commit
void commit()
Save catalog to file.
Definition: IFileCatalog.h:49
AthenaPoolCnvSvc::m_mutex
std::mutex m_mutex
Definition: AthenaPoolCnvSvc.h:247
Token::classID
const Guid & classID() const
Access database identifier.
Definition: Token.h:71
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
pool::DbReflex::forGuid
static const TypeH forGuid(const Guid &info)
Access classes by Guid.
Placement::setAuxString
Placement & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition: Placement.h:42
AthenaPoolCnvSvc::AthenaPoolCnvSvc
AthenaPoolCnvSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Definition: AthenaPoolCnvSvc.cxx:1215
AthenaPoolCnvSvc::convertAddress
StatusCode convertAddress(const IOpaqueAddress *pAddress, std::string &refAddress)
Convert address to string form.
Definition: AthenaPoolCnvSvc.cxx:976
AthenaPoolCnvSvc::processPoolAttributes
StatusCode processPoolAttributes(std::vector< std::vector< std::string > > &attr, const std::string &fileName, unsigned long contextId, bool doGet=true, bool doSet=true, bool doClear=true) const
Set/get technology dependent POOL attributes.
Definition: AthenaPoolCnvSvc.cxx:1278
AthenaPoolCnvSvc::finalize
StatusCode finalize()
Required of all Gaudi Services.
Definition: AthenaPoolCnvSvc.cxx:132
AthenaPoolCnvSvc::m_fileFlushSetting
std::map< std::string, int > m_fileFlushSetting
Definition: AthenaPoolCnvSvc.h:225
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
pool::DbType::type
int type() const
Access to full type.
Definition: DbType.h:66
CxxUtils::fpcompare::equal
bool equal(double a, double b)
Compare two FP numbers, working around x87 precision issues.
Definition: fpcompare.h:114
PMonUtils::BasicStopWatch
Definition: BasicStopWatch.h:17
Token
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition: Token.h:21
AthenaPoolCnvSvc::m_persSvcPerInputType
StringProperty m_persSvcPerInputType
PersSvcPerInputType, string property, tree name to use multiple persistency services,...
Definition: AthenaPoolCnvSvc.h:246
AthenaPoolCnvSvc::m_storageTechProp
StringProperty m_storageTechProp
Default Storage Tech for containers (ROOTTREE, ROOTTREEINDEX, ROOTRNTUPLE)
Definition: AthenaPoolCnvSvc.h:209
AthenaPoolCnvSvc::extractPoolAttributes
void extractPoolAttributes(const StringArrayProperty &property, std::vector< std::vector< std::string > > *contAttr, std::vector< std::vector< std::string > > *dbAttr, std::vector< std::vector< std::string > > *domAttr=0) const
Extract POOL ItechnologySpecificAttributes for Domain, Database and Container from property.
Definition: AthenaPoolCnvSvc.cxx:1222
TokenAddress
This class provides a Generic Transient Address for POOL tokens.
Definition: TokenAddress.h:21
AthenaPoolCnvSvc::fillRepRefs
StatusCode fillRepRefs(IOpaqueAddress *pAddress, DataObject *pObject)
Implementation of IConversionSvc: Resolve the references of the converted object.
Definition: AthenaPoolCnvSvc.cxx:259
IAthenaOutputStreamTool.h
Interface to an output stream tool.
instance
std::map< std::string, double > instance
Definition: Run_To_Get_Tags.h:8
Token::OID_t
Definition: Token.h:24
JetTagCalibConfig.className
string className
Definition: JetTagCalibConfig.py:31
Token::fromString
Token & fromString(const std::string &from)
Build from the string representation of a token.
Definition: Token.cxx:133
APRDefaults::RNTupleNames::EventData
static constexpr const char * EventData
Definition: APRDefaults.h:18
Token::setClassID
Token & setClassID(const Guid &cl_id)
Access database identifier.
Definition: Token.h:73
IPoolSvc::kInputStream
@ kInputStream
Definition: IPoolSvc.h:40
AthenaPoolCnvSvc::createAddress
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress)
Create a Generic address using explicit arguments to identify a single object.
Definition: AthenaPoolCnvSvc.cxx:906
Token::technology
int technology() const
Access technology type.
Definition: Token.h:75
createCoolChannelIdFile.buffer
buffer
Definition: createCoolChannelIdFile.py:12
FortranAlgorithmOptions.fileName
fileName
Definition: FortranAlgorithmOptions.py:13
pool::IFileCatalog
Definition: IFileCatalog.h:23
APRDefaults.h
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
AthenaPoolCnvSvc::makeServer
virtual StatusCode makeServer(int num)
Make this a server.
Definition: AthenaPoolCnvSvc.cxx:1058
pool::DbType
Definition: DbType.h:31
AthenaPoolCnvSvc::m_metadataContainersAug
StringArrayProperty m_metadataContainersAug
Definition: AthenaPoolCnvSvc.h:252
AuxDiscoverySvc::receiveStore
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
Definition: AuxDiscoverySvc.cxx:162
AthenaPoolCnvSvc::useDetailChronoStat
bool useDetailChronoStat() const
Definition: AthenaPoolCnvSvc.cxx:902
AthenaPoolCnvSvc::m_maxFileSizes
StringArrayProperty m_maxFileSizes
MaxFileSizes, vector with maximum file sizes for Athena POOL output files.
Definition: AthenaPoolCnvSvc.h:235
AthenaPoolCnvSvc::connectOutput
StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode)
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Definition: AthenaPoolCnvSvc.cxx:292
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthCnvSvc::createRep
virtual StatusCode createRep(DataObject *pObject, IOpaqueAddress *&refpAddress)
Implementation of IConverter: Convert the transient object to the requested representation.
Definition: AthCnvSvc.cxx:305
AthenaPoolCnvSvc::m_makeStreamingToolClient
IntegerProperty m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
Definition: AthenaPoolCnvSvc.h:255
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:40
calibdata.exception
exception
Definition: calibdata.py:496
AthenaPoolCnvSvc.h
This file contains the class definition for the AthenaPoolCnvSvc class.
file
TFile * file
Definition: tile_monitor.h:29
AthCnvSvc::createObj
virtual StatusCode createObj(IOpaqueAddress *pAddress, DataObject *&refpObject)
Implementation of IConverter: Create the transient representation of an object.
Definition: AthCnvSvc.cxx:273
AthenaPoolCnvSvc::m_inputAttr
std::vector< std::vector< std::string > > m_inputAttr
Definition: AthenaPoolCnvSvc.h:228
AthenaPoolCnvSvc::m_databaseMaxFileSize
std::map< std::string, long long > m_databaseMaxFileSize
Definition: AthenaPoolCnvSvc.h:237
find_tgc_unfilled_channelids.ip
ip
Definition: find_tgc_unfilled_channelids.py:3
IAthenaSerializeSvc.h
python.xAODType.dummy
dummy
Definition: xAODType.py:4
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
Token::setDb
Token & setDb(const Guid &db)
Set database name.
Definition: Token.h:64
Placement::fileName
const std::string & fileName() const
Access file name.
Definition: Placement.h:28
AthenaPoolCnvSvc::m_numberEventsPerWrite
IntegerProperty m_numberEventsPerWrite
When using TMemFile call Write on number of Events, respecting CollectionTree auto_flush.
Definition: AthenaPoolCnvSvc.h:263
Placement::setFileName
Placement & setFileName(const std::string &fileName)
Set file name.
Definition: Placement.h:30
IAthenaPoolCnvSvc::interfaceID
static const InterfaceID & interfaceID()
Retrieve interface ID.
Definition: IAthenaPoolCnvSvc.h:39
DataHeader.h
This file contains the class definition for the DataHeader and DataHeaderElement classes.
CLID
uint32_t CLID
The Class ID type.
Definition: Event/xAOD/xAODCore/xAODCore/ClassID_traits.h:47
Placement::auxString
const std::string & auxString() const
Access auxiliary string.
Definition: Placement.h:40
TScopeAdapter::Name
std::string Name(unsigned int mod=Reflex::SCOPED) const
Definition: RootType.cxx:607
AthenaPoolCnvSvc::m_containerAttr
std::vector< std::vector< std::string > > m_containerAttr
Definition: AthenaPoolCnvSvc.h:222
trigbs_pickEvents.num
num
Definition: trigbs_pickEvents.py:76
AthenaPoolCnvSvc::initialize
StatusCode initialize()
Required of all Gaudi Services.
Definition: AthenaPoolCnvSvc.cxx:39
AthenaPoolCnvSvc::m_streamServerActive
bool m_streamServerActive
Definition: AthenaPoolCnvSvc.h:197
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
createCoolChannelIdFile.par
par
Definition: createCoolChannelIdFile.py:29
pmontree.opt
opt
Definition: pmontree.py:16
AthCnvSvc::fillRepRefs
virtual StatusCode fillRepRefs(IOpaqueAddress *pAddress, DataObject *pObject)
Implementation of IConverter: Resolve the references of the converted object.
Definition: AthCnvSvc.cxx:313
AthenaPoolCnvSvc::m_persSvcPerOutput
BooleanProperty m_persSvcPerOutput
PersSvcPerOutput, boolean property to use multiple persistency services, one per output stream.
Definition: AthenaPoolCnvSvc.h:241
Token::toString
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition: Token.cxx:114
AthenaPoolCnvSvc::setInputAttributes
StatusCode setInputAttributes(const std::string &fileName)
Set the input file attributes, if any are requested from jobOpts.
Definition: AthenaPoolCnvSvc.cxx:1037
AthenaPoolCnvSvc::m_inputPoolAttr
StringArrayProperty m_inputPoolAttr
Input PoolAttributes, vector with names and values of technology specific attributes for POOL.
Definition: AthenaPoolCnvSvc.h:227
Token::setOid
Token & setOid(const OID_t &oid)
Set object identifier.
Definition: Token.h:83
item
Definition: ItemListSvc.h:43
RTTAlgmain.address
address
Definition: RTTAlgmain.py:55
AthenaPoolCnvSvc::readData
virtual StatusCode readData()
Read the next data object.
Definition: AthenaPoolCnvSvc.cxx:1105
AthenaPoolCnvSvc::cleanUp
StatusCode cleanUp(const std::string &connection)
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
Definition: AthenaPoolCnvSvc.cxx:1016
AthenaPoolCnvSvc::m_streamPortString
StringProperty m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
Definition: AthenaPoolCnvSvc.h:261
Placement::toString
const std::string toString() const
Retrieve the string representation of the placement.
Definition: Placement.cxx:15
AthenaPoolCnvSvc::m_domainAttr
std::vector< std::vector< std::string > > m_domainAttr
Definition: AthenaPoolCnvSvc.h:220
AthenaPoolCnvSvc::setObjPtr
void setObjPtr(void *&obj, const Token *token)
Definition: AthenaPoolCnvSvc.cxx:822
AthCnvSvc::converter
virtual IConverter * converter(const CLID &wanted)
Retrieve converter from list.
Definition: AthCnvSvc.cxx:394
AthenaPoolCnvSvc::m_fileCommitCounter
std::map< std::string, int > m_fileCommitCounter
Definition: AthenaPoolCnvSvc.h:224
AthenaPoolCnvSvc::createRep
StatusCode createRep(DataObject *pObject, IOpaqueAddress *&refpAddress)
Implementation of IConversionSvc: Convert the transient object to the requested representation.
Definition: AthenaPoolCnvSvc.cxx:226
AthenaPoolCnvSvc::m_metadataClient
int m_metadataClient
Definition: AthenaPoolCnvSvc.h:198
TokenAddress.h
This file contains the class definition for the TokenAddress class.
TScopeAdapter::IsFundamental
Bool_t IsFundamental() const
Definition: RootType.cxx:726
DiTauMassTools::MaxHistStrategyV2::e
e
Definition: PhysicsAnalysis/TauID/DiTauMassTools/DiTauMassTools/HelperFunctions.h:26
pool::DbType::exactMatch
bool exactMatch(const DbType &typ) const
Definition: DbType.h:73
AthenaPoolCnvSvc::abortSharedWrClients
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
Definition: AthenaPoolCnvSvc.cxx:1184
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaPoolCnvSvc::m_outputStreamingTool
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
Definition: AthenaPoolCnvSvc.h:196
Pythia8_RapidityOrderMPI.val
val
Definition: Pythia8_RapidityOrderMPI.py:14
TokenAddress::getToken
Token * getToken()
Definition: TokenAddress.h:48
Token::oid
const OID_t & oid() const
Access object identifier.
Definition: Token.h:79
Token::setAuxString
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition: Token.h:91
TScopeAdapter::Destruct
void Destruct(void *place) const
Definition: RootType.cxx:672
AthenaPoolCnvSvc::m_poolAttr
StringArrayProperty m_poolAttr
Output PoolAttributes, vector with names and values of technology specific attributes for POOL.
Definition: AthenaPoolCnvSvc.h:219
Token::setCont
Token & setCont(const std::string &cnt)
Set container name.
Definition: Token.h:69
AthenaPoolCnvSvc::m_inputPoolAttrPerEvent
StringArrayProperty m_inputPoolAttrPerEvent
Print input PoolAttributes per event, vector with names of technology specific attributes for POOL to...
Definition: AthenaPoolCnvSvc.h:231
declareProperty
#define declareProperty(n, p, h)
Definition: BaseFakeBkgTool.cxx:15
makeTransCanvas.text
text
Definition: makeTransCanvas.py:11
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
merge.status
status
Definition: merge.py:17
ATLAS_THREAD_SAFE
#define ATLAS_THREAD_SAFE
Definition: checker_macros.h:211
CaloCondBlobAlgs_fillNoiseFromASCII.tag
string tag
Definition: CaloCondBlobAlgs_fillNoiseFromASCII.py:24
AthenaPoolCnvSvc::getPoolSvc
IPoolSvc * getPoolSvc()
Definition: AthenaPoolCnvSvc.cxx:706
Placement.h
This file contains the class definition for the Placement class (migrated from POOL).
AthenaPoolCnvSvc::handle
void handle(const Incident &incident)
Implementation of IIncidentListener: Handle for EndEvent incidence.
Definition: AthenaPoolCnvSvc.cxx:1204
AthenaPoolCnvSvc::m_containerPrefixProp
StringProperty m_containerPrefixProp
POOL Container name prefix - will be part of or whole TTree/RNTuple name 'Default' takes the prefix f...
Definition: AthenaPoolCnvSvc.h:212
APRDefaults::TTreeNames::EventData
static constexpr const char * EventData
Definition: APRDefaults.h:12
python.PyAthena.obj
obj
Definition: PyAthena.py:135
AthenaPoolCnvSvc::createObj
StatusCode createObj(IOpaqueAddress *pAddress, DataObject *&refpObject)
Implementation of IConversionSvc: Create the transient representation of an object from persistent st...
Definition: AthenaPoolCnvSvc.cxx:192
SG::DataProxy
Definition: DataProxy.h:44
TScopeAdapter::SizeOf
size_t SizeOf() const
Definition: RootType.cxx:760
Token.h
This file contains the class definition for the Token class (migrated from POOL).
IAthenaPoolCleanUpSvc::m_cnvs
std::vector< IAthenaPoolCleanUp * > m_cnvs
Definition: IAthenaPoolCleanUpSvc.h:33
AthenaPoolCnvSvc::m_parallelCompression
BooleanProperty m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Definition: AthenaPoolCnvSvc.h:259
AthenaPoolCnvSvc::m_lastInputFileName
std::string m_lastInputFileName
Definition: AthenaPoolCnvSvc.h:191
AthCnvSvc
Definition: AthCnvSvc.h:67
AthenaPoolCnvSvc::m_serializeSvc
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc
Definition: AthenaPoolCnvSvc.h:194
Placement::fromString
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
Definition: Placement.cxx:28
AuxDiscoverySvc
This class provides the interface between AthenaPoolCnvSvc and AuxStore classes.
Definition: AuxDiscoverySvc.h:33
merge
Definition: merge.py:1
AthenaPoolCnvSvc::makeClient
virtual StatusCode makeClient(int num)
Make this a client.
Definition: AthenaPoolCnvSvc.cxx:1084
ServiceHandle
Definition: ClusterMakerTool.h:37
mapkey::key
key
Definition: TElectronEfficiencyCorrectionTool.cxx:37
TScopeAdapter
Definition: RootType.h:119
pool::DbReflex::guid
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.
IAthenaPoolCnvSvc
This class provides the interface between Athena and PoolSvc.
Definition: IAthenaPoolCnvSvc.h:36