ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaPoolCnvSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
9
10#include "AthenaPoolCnvSvc.h"
11
12#include "GaudiKernel/AttribStringParser.h"
13#include "GaudiKernel/ClassID.h"
14#include "GaudiKernel/FileIncident.h"
15#include "GaudiKernel/IIncidentSvc.h"
16#include "GaudiKernel/IIoComponentMgr.h"
17#include "GaudiKernel/IOpaqueAddress.h"
18
23
24#include "StorageSvc/DbReflex.h"
26
27#include <algorithm>
28#include <charconv>
29#include <format>
30#include <iomanip>
31#include <sstream>
32
33//______________________________________________________________________________
34// Initialize the service.
36 // Initialize DataModelCompatSvc
37 ServiceHandle<IService> dmcsvc("DataModelCompatSvc", this->name());
38 ATH_CHECK(dmcsvc.retrieve());
39 // Retrieve PoolSvc
40 ATH_CHECK(m_poolSvc.retrieve());
41 StringProperty defContainerType("DefaultContainerType", "ROOTTREEINDEX");
42 if(IProperty* propertyServer = dynamic_cast<IProperty*>(m_poolSvc.get())) {
43 propertyServer->getProperty(&defContainerType).ignore();
44 }
45 m_defContainerType = defContainerType.value();
46 // Retrieve ClassIDSvc
47 ATH_CHECK(m_clidSvc.retrieve());
48 // Register this service for 'I/O' events
49 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
50 ATH_CHECK(iomgr.retrieve());
51 if (!iomgr->io_register(this).isSuccess()) {
52 ATH_MSG_FATAL("Could not register myself with the IoComponentMgr !");
53 return(StatusCode::FAILURE);
54 }
55 // Extracting MaxFileSizes for global default and map by Database name.
56 for (const auto& maxFileSizeSpec : m_maxFileSizes.value()) {
57 if (auto p = maxFileSizeSpec.find('='); p != std::string::npos) {
58 long long maxFileSize = 0;
59 const char* start = maxFileSizeSpec.data() + (p + 1);
60 const char* end = maxFileSizeSpec.data() + maxFileSizeSpec.size();
61 if (auto [ptr, ec] = std::from_chars(start, end, maxFileSize); ec != std::errc{}) {
62 ATH_MSG_WARNING(std::format("Invalid MaxFileSize value: {}", std::string(start, end)));
63 }
64 std::string databaseName = maxFileSizeSpec.substr(0, maxFileSizeSpec.find_first_of(" ="));
65 std::unique_lock<std::mutex> lock(m_mutex);
66 m_databaseMaxFileSize.emplace(std::move(databaseName), maxFileSize);
67 } else {
68 if (auto [ptr, ec] = std::from_chars(maxFileSizeSpec.data(), maxFileSizeSpec.data() + maxFileSizeSpec.size(), m_domainMaxFileSize); ec != std::errc{}) {
69 ATH_MSG_WARNING(std::format("Invalid MaxFileSize value: {}", maxFileSizeSpec));
70 }
71 }
72 }
73 // Validate provided event data technologies and fill the internal cache
74 for (const auto& [key, value] : m_storageTechProp.value()) {
75 try {
76 const auto dbType = pool::DbType::getType(value);
77 if (dbType == pool::TEST_StorageType) {
78 ATH_MSG_FATAL(std::format("Unknown storage type requested for file {}: {}", key, value));
79 return StatusCode::FAILURE;
80 }
81 m_storageTechMap.emplace(key, dbType.type());
82 } catch (const std::exception& e) {
83 ATH_MSG_FATAL(std::format("Exception while getting storage type for file {}: {}", key, e.what()));
84 return StatusCode::FAILURE;
85 } catch (...) {
86 ATH_MSG_FATAL(std::format("Unknown exception while getting storage type for file {}", key));
87 return StatusCode::FAILURE;
88 }
89 }
90 // Extracting INPUT POOL ItechnologySpecificAttributes for Domain, Database and Container.
92 // Extracting the INPUT POOL ItechnologySpecificAttributes which are to be printed for each event
94 // Setup incident for EndEvent to print out attributes each event
95 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
96 long int pri = 1000;
97 if (!m_inputPoolAttrPerEvent.value().empty()) {
98 // Set to be listener for EndEvent
99 incSvc->addListener(this, "EndEvent", pri);
100 ATH_MSG_DEBUG("Subscribed to EndEvent for printing out input file attributes.");
101 }
102 if (!processPoolAttributes(m_inputAttr, "", IPoolSvc::kInputStream, false, true, true).isSuccess()) {
103 ATH_MSG_DEBUG("setInputAttribute failed setting POOL domain attributes.");
104 }
105
106 // Load these dictionaries now, so we don't need to try to do so
107 // while multiple threads are running.
108 TClass::GetClass ("TLeafI");
109 TClass::GetClass ("TLeafL");
110 TClass::GetClass ("TLeafD");
111 TClass::GetClass ("TLeafF");
112
113 return(StatusCode::SUCCESS);
114}
115//______________________________________________________________________________
117 ATH_MSG_DEBUG("I/O reinitialization...");
118 m_contextAttr.clear();
119 return(StatusCode::SUCCESS);
120}
121//______________________________________________________________________________
122void AthenaPoolCnvSvc::flushDataHeaderForms(const std::string& streamName) {
123 // Write remaining DataHeaderForms for a given streamName, "*"" means all
124 auto DHCnvListener = dynamic_cast<IIncidentListener*>( converter( ClassID_traits<DataHeader>::ID() ) );
125 FileIncident incident(name(), "WriteDataHeaderForms", streamName);
126 if( DHCnvListener ) DHCnvListener->handle(incident);
127}
128//______________________________________________________________________________
130 ATH_MSG_VERBOSE("stop()");
131 // In case of direct writing without an OutputStream, this should be a good time to flush DHForms
133 return StatusCode::SUCCESS;
134}
135//______________________________________________________________________________
137 ATH_MSG_VERBOSE("Finalizing...");
138 // Some algorithms write in finalize(), flush DHForms if any are left
140 // Release ClassIDSvc
141 if (!m_clidSvc.release().isSuccess()) {
142 ATH_MSG_WARNING("Cannot release ClassIDSvc.");
143 }
144 // Release PoolSvc
145 if (!m_poolSvc.release().isSuccess()) {
146 ATH_MSG_WARNING("Cannot release PoolSvc.");
147 }
148 // Print Performance Statistics
149 // The pattern AthenaPoolCnvSvc.*PerfStats is ignored in AtlasTest/TestTools/share/post.sh
150 const std::string msgPrefix{"PerfStats "};
151 ATH_MSG_INFO(msgPrefix << std::string(40, '-'));
152 ATH_MSG_INFO(msgPrefix << "Timing Measurements for AthenaPoolCnvSvc");
153 ATH_MSG_INFO(msgPrefix << std::string(40, '-'));
154 for(const auto& [key, value] : m_chronoMap) {
155 ATH_MSG_INFO(msgPrefix << "| " << std::left << std::setw(15) << key << " | "
156 << std::right << std::setw(15) << std::fixed << std::setprecision(0) << value << " ms |");
157 }
158 ATH_MSG_INFO(msgPrefix << std::string(40, '-'));
159
160 m_cnvs.clear();
161 m_cnvs.shrink_to_fit();
162 return(StatusCode::SUCCESS);
163}
164//______________________________________________________________________________
166 ATH_MSG_DEBUG("I/O finalization...");
167 return(StatusCode::SUCCESS);
168}
169//______________________________________________________________________________
170StatusCode AthenaPoolCnvSvc::createObj(IOpaqueAddress* pAddress, DataObject*& refpObject) {
171 assert(pAddress);
172 std::string objName = "ALL";
173 if (m_useDetailChronoStat.value()) {
174 if (m_clidSvc->getTypeNameOfID(pAddress->clID(), objName).isFailure()) {
175 objName = std::to_string(pAddress->clID());
176 }
177 objName += '#';
178 objName += *(pAddress->par() + 1);
179 }
180 // StopWatch listens from here until the end of this current scope
181 PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, m_chronoMap);
182 if (!m_persSvcPerInputType.empty()) { // Use separate PersistencySvc for each input data type
183 TokenAddress* tokAddr = dynamic_cast<TokenAddress*>(pAddress);
184 if (tokAddr != nullptr && tokAddr->getToken() != nullptr && (tokAddr->getToken()->contID().starts_with(m_persSvcPerInputType.value() + "(") || tokAddr->getToken()->contID().starts_with(m_persSvcPerInputType.value() + "_"))) {
185 const unsigned int maxContext = m_poolSvc->getInputContextMap().size();
186 const unsigned int auxContext = m_poolSvc->getInputContext(tokAddr->getToken()->classID().toString() + tokAddr->getToken()->dbID().toString(), 1);
187 char text[32];
188 const std::string contextStr = std::format("[CTXT={:08X}]", auxContext);
189 std::strncpy(text, contextStr.c_str(), sizeof(text) - 1);
190 text[sizeof(text) - 1] = '\0';
191 if (m_poolSvc->getInputContextMap().size() > maxContext) {
192 if (m_poolSvc->setAttribute("TREE_CACHE", "0", pool::DbType(pool::ROOTTREE_StorageType).type(), "FID:" + tokAddr->getToken()->dbID().toString(), m_persSvcPerInputType.value(), auxContext).isSuccess()) {
193 ATH_MSG_DEBUG("setInputAttribute failed to switch off TTreeCache for id = " << auxContext << ".");
194 }
195 }
196 tokAddr->getToken()->setAuxString(text);
197 }
198 }
199 // Forward to base class createObj
200 StatusCode status = ::AthCnvSvc::createObj(pAddress, refpObject);
201 return(status);
202}
203//______________________________________________________________________________
204StatusCode AthenaPoolCnvSvc::createRep(DataObject* pObject, IOpaqueAddress*& refpAddress) {
205 assert(pObject);
206 std::string objName = "ALL";
207 if (m_useDetailChronoStat.value()) {
208 if (m_clidSvc->getTypeNameOfID(pObject->clID(), objName).isFailure()) {
209 objName = std::to_string(pObject->clID());
210 }
211 objName += '#';
212 objName += pObject->registry()->name();
213 }
214 // StopWatch listens from here until the end of this current scope
215 PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, m_chronoMap);
216 StatusCode status = StatusCode::FAILURE;
217 if (pObject->clID() == 1) {
218 // No transient object was found use cnv to write default persistent object
219 SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(pObject->registry());
220 if (proxy != nullptr) {
221 IConverter* cnv = converter(proxy->clID());
222 status = cnv->createRep(pObject, refpAddress);
223 }
224 } else {
225 // Forward to base class createRep
226 try {
227 status = ::AthCnvSvc::createRep(pObject, refpAddress);
228 } catch(std::runtime_error& e) {
229 ATH_MSG_FATAL(e.what());
230 }
231 }
232 return(status);
233}
234//______________________________________________________________________________
235StatusCode AthenaPoolCnvSvc::fillRepRefs(IOpaqueAddress* pAddress, DataObject* pObject) {
236 assert(pObject);
237 std::string objName = "ALL";
238 if (m_useDetailChronoStat.value()) {
239 if (m_clidSvc->getTypeNameOfID(pObject->clID(), objName).isFailure()) {
240 objName = std::to_string(pObject->clID());
241 }
242 objName += '#';
243 objName += pObject->registry()->name();
244 }
245 // StopWatch listens from here until the end of this current scope
246 PMonUtils::BasicStopWatch stopWatch("fRep_" + objName, m_chronoMap);
247 StatusCode status = StatusCode::FAILURE;
248 if (pObject->clID() == 1) {
249 // No transient object was found use cnv to write default persistent object
250 SG::DataProxy* proxy = dynamic_cast<SG::DataProxy*>(pObject->registry());
251 if (proxy != nullptr) {
252 IConverter* cnv = converter(proxy->clID());
253 status = cnv->fillRepRefs(pAddress, pObject);
254 }
255 } else {
256 // Forward to base class fillRepRefs
257 try {
258 status = ::AthCnvSvc::fillRepRefs(pAddress, pObject);
259 } catch(std::runtime_error& e) {
260 ATH_MSG_FATAL(e.what());
261 }
262 }
263 return(status);
264}
265//______________________________________________________________________________
266StatusCode AthenaPoolCnvSvc::connectOutput(const std::string& outputConnectionSpec,
267 const std::string& /*openMode*/) {
268 return(connectOutput(outputConnectionSpec));
269}
270//______________________________________________________________________________
271StatusCode AthenaPoolCnvSvc::connectOutput(const std::string& outputConnectionSpec) {
272// This is called before DataObjects are being converted.
273 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
274 // Extract the technology
275 int tech{0};
276 if (!decodeOutputSpec(outputConnection, tech).isSuccess()) {
277 ATH_MSG_ERROR("connectOutput FAILED extract file name and technology.");
278 return(StatusCode::FAILURE);
279 }
280 unsigned int contextId = outputContextId(outputConnection);
281 try {
282 if (!m_poolSvc->connect(pool::ITransaction::UPDATE, contextId).isSuccess()) {
283 ATH_MSG_ERROR("connectOutput FAILED to open an UPDATE transaction.");
284 return(StatusCode::FAILURE);
285 }
286 } catch (std::exception& e) {
287 ATH_MSG_ERROR("connectOutput - caught exception: " << e.what());
288 return(StatusCode::FAILURE);
289 }
290
291 std::unique_lock<std::mutex> lock(m_mutex);
292 if (std::find(m_contextAttr.begin(), m_contextAttr.end(), contextId) == m_contextAttr.end()) {
293 std::size_t merge = outputConnection.find("?pmerge="); // Used to remove trailing TMemFile
294 int flush = m_numberEventsPerWrite.value();
295 m_contextAttr.push_back(contextId);
296 // Setting default 'TREE_MAX_SIZE' for ROOT to 1024 GB to avoid file chains.
297 std::vector<std::string> maxFileSize;
298 maxFileSize.push_back("TREE_MAX_SIZE");
299 maxFileSize.push_back("1099511627776L");
300 m_domainAttr.emplace_back(std::move(maxFileSize));
301 // Extracting OUTPUT POOL ItechnologySpecificAttributes for Domain, Database and Container.
303 //FIXME
304 for (auto& dbAttrEntry : m_databaseAttr) {
305 const std::string& opt = dbAttrEntry[0];
306 std::string& data = dbAttrEntry[1];
307 const std::string& file = dbAttrEntry[2];
308 const std::string& cont = dbAttrEntry[3];
309 std::size_t equal = cont.find('='); // Used to remove leading "TTree="
310 if (equal == std::string::npos) equal = 0;
311 else equal++;
312 const auto& prefix = m_containerPrefixProp.value();
313 std::size_t colon = prefix.find(':');
314 if (colon == std::string::npos) colon = 0; // Used to remove leading technology
315 else colon++;
317 const auto& strProp = (prefix == "Default") ? defaultContName : prefix;
318 if (merge != std::string::npos && opt == "TREE_AUTO_FLUSH" && 0 == outputConnection.compare(0, merge, file) &&cont.compare(equal, std::string::npos, strProp, colon) == 0 && data != "int" && data != "DbLonglong" && data != "double" && data != "string") {
319 flush = atoi(data.c_str());
320 if (flush < 0 && m_numberEventsPerWrite.value() > 0) {
321 flush = m_numberEventsPerWrite.value();
322 data = std::to_string(flush);
323 } else if (flush > 0 && flush < m_numberEventsPerWrite.value()) {
324 flush = flush * (int(static_cast<float>(m_numberEventsPerWrite.value()) / flush - 0.5) + 1);
325 }
326 }
327 }
328 if (merge != std::string::npos) {
329 ATH_MSG_INFO("connectOutput setting auto write for: " << outputConnection << " to " << flush << " events");
330 m_fileFlushSetting[outputConnection.substr(0, merge)] = flush;
331 }
332 }
333 if (!processPoolAttributes(m_domainAttr, outputConnection, contextId).isSuccess()) {
334 ATH_MSG_DEBUG("connectOutput failed process POOL domain attributes.");
335 }
336 if (!processPoolAttributes(m_databaseAttr, outputConnection, contextId).isSuccess()) {
337 ATH_MSG_DEBUG("connectOutput failed process POOL database attributes.");
338 }
339 return(StatusCode::SUCCESS);
340}
341
342//______________________________________________________________________________
343StatusCode AthenaPoolCnvSvc::commitOutput(const std::string& outputConnectionSpec, bool doCommit) {
344 // This is called after all DataObjects are converted.
345 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
346 // StopWatch listens from here until the end of this current scope
347 PMonUtils::BasicStopWatch stopWatch("commitOutput", m_chronoMap);
348 std::unique_lock<std::mutex> lock(m_mutex);
349 // Extract the technology
350 int tech{0};
351 if (!decodeOutputSpec(outputConnection, tech).isSuccess()) {
352 ATH_MSG_ERROR("connectOutput FAILED extract file name and technology.");
353 return(StatusCode::FAILURE);
354 }
355 unsigned int contextId = outputContextId(outputConnection);
356 if (!processPoolAttributes(m_domainAttr, outputConnection, contextId).isSuccess()) {
357 ATH_MSG_DEBUG("commitOutput failed process POOL domain attributes.");
358 }
359 if (!processPoolAttributes(m_databaseAttr, outputConnection, contextId).isSuccess()) {
360 ATH_MSG_DEBUG("commitOutput failed process POOL database attributes.");
361 }
362 if (!processPoolAttributes(m_containerAttr, outputConnection, contextId).isSuccess()) {
363 ATH_MSG_DEBUG("commitOutput failed process POOL container attributes.");
364 }
365 std::size_t merge = outputConnection.find("?pmerge="); // Used to remove trailing TMemFile
366 const std::string baseOutputConnection = outputConnection.substr(0, merge);
367 m_fileCommitCounter[baseOutputConnection]++;
368 if (merge != std::string::npos && m_fileFlushSetting[baseOutputConnection] > 0 && m_fileCommitCounter[baseOutputConnection] % m_fileFlushSetting[baseOutputConnection] == 0) {
369 doCommit = true;
370 ATH_MSG_DEBUG("commitOutput sending data.");
371 }
372
373 // lock.unlock(); //MN: first need to make commitCache slot-specific
374 try {
375 if (doCommit) {
376 if (!m_poolSvc->commit(contextId).isSuccess()) {
377 ATH_MSG_ERROR("commitOutput FAILED to commit OutputStream.");
378 return(StatusCode::FAILURE);
379 }
380 } else {
381 if (!m_poolSvc->commitAndHold(contextId).isSuccess()) {
382 ATH_MSG_ERROR("commitOutput FAILED to commitAndHold OutputStream.");
383 return(StatusCode::FAILURE);
384 }
385 }
386 } catch (std::exception& e) {
387 ATH_MSG_ERROR("commitOutput - caught exception: " << e.what());
388 return(StatusCode::FAILURE);
389 }
390 if (!this->cleanUp(baseOutputConnection).isSuccess()) {
391 ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
392 return(StatusCode::FAILURE);
393 }
394 // Check FileSize
395 long long int currentFileSize = m_poolSvc->getFileSize(outputConnection, tech, contextId);
396 if (m_databaseMaxFileSize.find(outputConnection) != m_databaseMaxFileSize.end()) {
397 if (currentFileSize > m_databaseMaxFileSize[outputConnection]) {
398 ATH_MSG_WARNING(std::format("FileSize {} > {} for {}", currentFileSize, m_databaseMaxFileSize[outputConnection], outputConnection));
399 return(StatusCode::RECOVERABLE);
400 }
401 } else if (currentFileSize > m_domainMaxFileSize) {
402 ATH_MSG_WARNING(std::format("FileSize {} > {} for {}", currentFileSize, m_domainMaxFileSize, outputConnection));
403 return(StatusCode::RECOVERABLE);
404 }
405 return(StatusCode::SUCCESS);
406}
407
408//______________________________________________________________________________
409StatusCode AthenaPoolCnvSvc::disconnectOutput(const std::string& outputConnectionSpec) {
410 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
411 unsigned int contextId = outputContextId(outputConnection);
412 StatusCode sc = m_poolSvc->disconnect(contextId);
413 return sc;
414}
415
416//______________________________________________________________________________
417unsigned int AthenaPoolCnvSvc::outputContextId(const std::string& outputConnection) {
418 return m_persSvcPerOutput?
419 m_poolSvc->getOutputContext(outputConnection) : (unsigned int)IPoolSvc::kOutputStream;
420}
421
422//______________________________________________________________________________
426//______________________________________________________________________________
427Token* AthenaPoolCnvSvc::registerForWrite(Placement* placement, const void* obj, const RootType& classDesc) {
428 // StopWatch listens from here until the end of this current scope
429 PMonUtils::BasicStopWatch stopWatch("cRepR_ALL", m_chronoMap);
430 Token* token = nullptr;
431 if (m_persSvcPerOutput) { // Use separate PersistencySvc for each output stream/file
432 char text[32];
433 const std::string contextStr = std::format("[CTXT={:08X}]", m_poolSvc->getOutputContext(placement->fileName()));
434 std::strncpy(text, contextStr.c_str(), sizeof(text) - 1);
435 text[sizeof(text) - 1] = '\0';
436 placement->setAuxString(text);
437 }
438 token = m_poolSvc->registerForWrite(placement, obj, classDesc);
439 return(token);
440}
441//______________________________________________________________________________
442void AthenaPoolCnvSvc::setObjPtr(void*& obj, const Token* token) {
443 ATH_MSG_VERBOSE("Requesting object for: " << token->toString());
444 // StopWatch listens from here until the end of this current scope
445 PMonUtils::BasicStopWatch stopWatch("cObjR_ALL", m_chronoMap);
446 if (token->dbID() != Guid::null()) {
447 ATH_MSG_VERBOSE("Requesting object for: " << token->toString());
448 m_poolSvc->setObjPtr(obj, token);
449 }
450}
451//______________________________________________________________________________
453 return(m_useDetailChronoStat.value());
454}
455//______________________________________________________________________________
456StatusCode AthenaPoolCnvSvc::createAddress(long svcType,
457 const CLID& clid,
458 const std::string* par,
459 const unsigned long* ip,
460 IOpaqueAddress*& refpAddress) {
461 if( svcType != repSvcType() ) {
462 ATH_MSG_ERROR("createAddress: svcType != POOL_StorageType " << svcType << " " << repSvcType());
463 return(StatusCode::FAILURE);
464 }
465 std::unique_ptr<Token> token;
466 if (par[0].compare(0, 3, "SHM") == 0) {
467 token = std::make_unique<Token>();
468 token->setOid(Token::OID_t(ip[0], ip[1]));
469 token->setAuxString("[PNAME=" + par[2] + "]");
470 RootType classDesc = RootType::ByNameNoQuiet(par[2]);
471 token->setClassID(pool::DbReflex::guid(classDesc));
472 } else {
473 token.reset(m_poolSvc->getToken(par[0], par[1], ip[0]));
474 }
475 if (token == nullptr) {
476 return(StatusCode::RECOVERABLE);
477 }
478 refpAddress = new TokenAddress(repSvcType(), clid, "", par[1], IPoolSvc::kInputStream, std::move(token));
479 return(StatusCode::SUCCESS);
480}
481//______________________________________________________________________________
482StatusCode AthenaPoolCnvSvc::createAddress(long svcType,
483 const CLID& clid,
484 const std::string& refAddress,
485 IOpaqueAddress*& refpAddress) {
486 if (svcType != repSvcType()) {
487 ATH_MSG_ERROR("createAddress: svcType != POOL_StorageType " << svcType << " " << repSvcType());
488 return(StatusCode::FAILURE);
489 }
490 refpAddress = new GenericAddress(repSvcType(), clid, refAddress);
491 return(StatusCode::SUCCESS);
492}
493//______________________________________________________________________________
494StatusCode AthenaPoolCnvSvc::convertAddress(const IOpaqueAddress* pAddress,
495 std::string& refAddress) {
496 assert(pAddress);
497 const TokenAddress* tokAddr = dynamic_cast<const TokenAddress*>(pAddress);
498 if (tokAddr != nullptr && tokAddr->getToken() != nullptr) {
499 refAddress = tokAddr->getToken()->toString();
500 } else {
501 refAddress = *pAddress->par();
502 }
503 return(StatusCode::SUCCESS);
504}
505//__________________________________________________________________________
506StatusCode AthenaPoolCnvSvc::decodeOutputSpec(std::string& fileSpec, int& outputTech) const {
507 if (fileSpec.starts_with ( "ROOTKEY:")) {
508 outputTech = pool::ROOTKEY_StorageType.type();
509 fileSpec.erase(0, 8);
510 } else if (fileSpec.starts_with ( "ROOTTREE:")) {
511 outputTech = pool::ROOTTREE_StorageType.type();
512 fileSpec.erase(0, 9);
513 } else if (fileSpec.starts_with ( "ROOTTREEINDEX:")) {
514 outputTech = pool::ROOTTREEINDEX_StorageType.type();
515 fileSpec.erase(0, 14);
516 } else if (fileSpec.starts_with ( "ROOTRNTUPLE:")) {
517 outputTech = pool::ROOTRNTUPLE_StorageType.type();
518 fileSpec.erase(0, 12);
519 } else if (outputTech == 0) {
520 // Extract the file name
521 std::string fileName{fileSpec};
522 if (auto pos = fileSpec.find("?pmerge="); pos != std::string::npos) {
523 fileName = fileSpec.substr(0, pos);
524 }
525 // Find the appropriate event data technology for this file
526 // This will be used for event data and its data header
527 // First we look for an exact file name match
528 // If that fails, we look for a wildcard ("*") match
529 // If that also fails, we use the default value from PoolSvc
530 if (auto it = m_storageTechMap.find(fileName); it != m_storageTechMap.end()) {
531 outputTech = it->second;
532 } else if (it = m_storageTechMap.find("*"); it != m_storageTechMap.end()) {
533 outputTech = it->second;
534 } else {
536 }
537 }
538 return StatusCode::SUCCESS;
539}
540//______________________________________________________________________________
542 m_cnvs.push_back(cnv);
543 return(StatusCode::SUCCESS);
544}
545//______________________________________________________________________________
546StatusCode AthenaPoolCnvSvc::cleanUp(const std::string& connection) {
547 bool retError = false;
548 std::size_t cpos = connection.find(':');
549 std::size_t bpos = connection.find('[');
550 if (cpos == std::string::npos) {
551 cpos = 0;
552 } else {
553 cpos++;
554 }
555 if (bpos != std::string::npos) bpos = bpos - cpos;
556 const std::string conn = connection.substr(cpos, bpos);
557 ATH_MSG_VERBOSE("Cleanup for Connection='"<< conn <<"'");
558 for (auto converter : m_cnvs) {
559 if (!converter->cleanUp(conn).isSuccess()) {
560 ATH_MSG_WARNING("AthenaPoolConverter cleanUp failed.");
561 retError = true;
562 }
563 }
564 return(retError ? StatusCode::FAILURE : StatusCode::SUCCESS);
565}
566//______________________________________________________________________________
567StatusCode AthenaPoolCnvSvc::setInputAttributes(const std::string& fileName) {
568 // Set attributes for input file
569 m_lastInputFileName = fileName; // Save file name for printing attributes per event
570 if (!processPoolAttributes(m_inputAttr, m_lastInputFileName, IPoolSvc::kInputStream, false, true, false).isSuccess()) {
571 ATH_MSG_DEBUG("setInputAttribute failed setting POOL database/container attributes.");
572 }
574 ATH_MSG_DEBUG("setInputAttribute failed getting POOL database/container attributes.");
575 }
576 if (!m_persSvcPerInputType.empty()) {
577 // Loop over all extra event input contexts and switch off TTreeCache
578 const auto& extraInputContextMap = m_poolSvc->getInputContextMap();
579 for (const auto& [label, id]: extraInputContextMap) {
580 if (m_poolSvc->setAttribute("TREE_CACHE", "0", pool::DbType(pool::ROOTTREE_StorageType).type(), m_lastInputFileName, m_persSvcPerInputType.value(), id).isSuccess()) {
581 ATH_MSG_DEBUG("setInputAttribute failed to switch off TTreeCache for = " << label << ".");
582 }
583 }
584 }
585 return(StatusCode::SUCCESS);
586}
587
588//______________________________________________________________________________
589void AthenaPoolCnvSvc::handle(const Incident& incident) {
590 if (incident.type() == "EndEvent") {
592 ATH_MSG_DEBUG("handle EndEvent failed process POOL database attributes.");
593 }
594 }
595}
596//______________________________________________________________________________
597AthenaPoolCnvSvc::AthenaPoolCnvSvc(const std::string& name, ISvcLocator* pSvcLocator) :
598 base_class(name, pSvcLocator, pool::POOL_StorageType.type()) {
599}
600//__________________________________________________________________________
601void AthenaPoolCnvSvc::extractPoolAttributes(const StringArrayProperty& property,
602 std::vector<std::vector<std::string> >* contAttr,
603 std::vector<std::vector<std::string> >* dbAttr,
604 std::vector<std::vector<std::string> >* domAttr) const {
605 std::vector<std::string> opt;
606 std::string attributeName, containerName, databaseName, valueString;
607 for (const auto& propertyValue : property.value()) {
608 opt.clear();
609 attributeName.clear();
610 containerName.clear();
611 databaseName.clear();
612 valueString.clear();
613 using Gaudi::Utils::AttribStringParser;
614 for (const AttribStringParser::Attrib& attrib : AttribStringParser (propertyValue)) {
615 if (attrib.tag == "DatabaseName") {
616 databaseName = attrib.value;
617 } else if (attrib.tag == "ContainerName") {
618 if (databaseName.empty()) {
619 databaseName = "*";
620 }
621 containerName = attrib.value;
622 } else {
623 attributeName = attrib.tag;
624 valueString = attrib.value;
625 }
626 }
627 if (!attributeName.empty() && !valueString.empty()) {
628 opt.push_back(attributeName);
629 opt.push_back(valueString);
630 if (!databaseName.empty()) {
631 opt.push_back(databaseName);
632 if (!containerName.empty()) {
633 opt.push_back(containerName);
634 if (containerName.compare(0, 6, "TTree=") == 0) {
635 dbAttr->push_back(opt);
636 } else {
637 contAttr->push_back(opt);
638 }
639 } else {
640 opt.push_back("");
641 dbAttr->push_back(opt);
642 }
643 } else if (domAttr != 0) {
644 domAttr->push_back(opt);
645 } else {
646 opt.push_back("*");
647 opt.push_back("");
648 dbAttr->push_back(opt);
649 }
650 }
651 }
652}
653//__________________________________________________________________________
654StatusCode AthenaPoolCnvSvc::processPoolAttributes(std::vector<std::vector<std::string> >& attr,
655 const std::string& fileName,
656 unsigned long contextId,
657 bool doGet,
658 bool doSet,
659 bool doClear) const {
660 bool retError = false;
661 for (auto& attrEntry : attr) {
662 if (attrEntry.size() == 2) {
663 const std::string& opt = attrEntry[0];
664 std::string data = attrEntry[1];
665 if (data == "int" || data == "DbLonglong" || data == "double" || data == "string") {
666 if (doGet) {
667 if (!m_poolSvc->getAttribute(opt, data, pool::DbType(pool::ROOTTREE_StorageType).type(), contextId).isSuccess()) {
668 ATH_MSG_DEBUG("getAttribute failed for domain attr " << opt);
669 retError = true;
670 }
671 }
672 } else if (doSet) {
673 if (m_poolSvc->setAttribute(opt, data, pool::DbType(pool::ROOTTREE_StorageType).type(), contextId).isSuccess()) {
674 ATH_MSG_DEBUG("setAttribute " << opt << " to " << data);
675 if (doClear) {
676 attrEntry.clear();
677 }
678 } else {
679 ATH_MSG_DEBUG("setAttribute failed for domain attr " << opt << " to " << data);
680 retError = true;
681 }
682 }
683 }
684 if (attrEntry.size() == 4) {
685 const std::string& opt = attrEntry[0];
686 std::string data = attrEntry[1];
687 const std::string& file = attrEntry[2];
688 const std::string& cont = attrEntry[3];
689 if (!fileName.empty() && (0 == fileName.compare(0, fileName.find('?'), file)
690 || (file[0] == '*' && file.find("," + fileName + ",") == std::string::npos))) {
691 if (data == "int" || data == "DbLonglong" || data == "double" || data == "string") {
692 if (doGet) {
693 if (!m_poolSvc->getAttribute(opt, data, pool::DbType(pool::ROOTTREE_StorageType).type(), fileName, cont, contextId).isSuccess()) {
694 ATH_MSG_DEBUG("getAttribute failed for database/container attr " << opt);
695 retError = true;
696 }
697 }
698 } else if (doSet) {
699 if (m_poolSvc->setAttribute(opt, data, pool::DbType(pool::ROOTTREE_StorageType).type(), fileName, cont, contextId).isSuccess()) {
700 ATH_MSG_DEBUG("setAttribute " << opt << " to " << data << " for db: " << fileName << " and cont: " << cont);
701 if (doClear) {
702 if (file[0] == '*' && !m_persSvcPerOutput) {
703 attrEntry[2] += "," + fileName + ",";
704 } else {
705 attrEntry.clear();
706 }
707 }
708 } else {
709 ATH_MSG_DEBUG("setAttribute failed for " << opt << " to " << data << " for db: " << fileName << " and cont: " << cont);
710 retError = true;
711 }
712 }
713 }
714 }
715 }
716 std::erase_if(attr, [](const auto& entry) { return entry.empty(); });
717 return(retError ? StatusCode::FAILURE : StatusCode::SUCCESS);
718}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the AthenaPoolCnvSvc class.
This file contains the class definition for the DataHeader and DataHeaderElement classes.
uint32_t CLID
The Class ID type.
char data[hepevt_bytes_allocation_ATLAS]
Definition HepEvt.cxx:11
static Double_t sc
This file contains the class definition for the Placement class (migrated from POOL).
TTypeAdapter RootType
Definition RootType.h:211
This file contains the class definition for the TokenAddress class.
This file contains the class definition for the Token class (migrated from POOL).
virtual StatusCode createRep(DataObject *pObject, IOpaqueAddress *&refpAddress) override
Implementation of IConverter: Convert the transient object to the requested representation.
virtual StatusCode createObj(IOpaqueAddress *pAddress, DataObject *&refpObject) override
Implementation of IConverter: Create the transient representation of an object.
virtual StatusCode fillRepRefs(IOpaqueAddress *pAddress, DataObject *pObject) override
Implementation of IConverter: Resolve the references of the converted object.
ServiceHandle< IClassIDSvc > m_clidSvc
virtual IPoolSvc * getPoolSvc() override
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
std::map< std::string, int > m_fileCommitCounter
virtual void handle(const Incident &incident) override
Implementation of IIncidentListener: Handle for EndEvent incidence.
Gaudi::Property< bool > m_useDetailChronoStat
UseDetailChronoStat, enable detailed output for time and size statistics for AthenaPOOL: default = fa...
StatusCode processPoolAttributes(std::vector< std::vector< std::string > > &attr, const std::string &fileName, unsigned long contextId, bool doGet=true, bool doSet=true, bool doClear=true) const
Set/get technology dependent POOL attributes.
std::map< std::string, int > m_storageTechMap
std::vector< std::vector< std::string > > m_inputAttrPerEvent
virtual StatusCode io_finalize() override
virtual StatusCode initialize() override
Required of all Gaudi Services.
std::vector< unsigned int > m_contextAttr
virtual StatusCode stop() override
long long m_domainMaxFileSize
virtual StatusCode io_reinit() override
Gaudi::Property< std::string > m_containerPrefixProp
POOL Container name prefix - will be part of or whole TTree/RNTuple name 'Default' takes the prefix f...
std::map< std::string, long long > m_databaseMaxFileSize
virtual StatusCode registerCleanUp(IAthenaPoolCleanUp *cnv) override
Implement registerCleanUp to register a IAthenaPoolCleanUp to be called during cleanUp.
virtual StatusCode fillRepRefs(IOpaqueAddress *pAddress, DataObject *pObject) override
Implementation of IConversionSvc: Resolve the references of the converted object.
virtual StatusCode decodeOutputSpec(std::string &connectionSpec, int &outputTech) const override
Extract/deduce the DB technology from the connection string/file specification.
std::vector< std::vector< std::string > > m_databaseAttr
virtual StatusCode createObj(IOpaqueAddress *pAddress, DataObject *&refpObject) override
Implementation of IConversionSvc: Create the transient representation of an object from persistent st...
std::map< std::string, int > m_fileFlushSetting
void extractPoolAttributes(const Gaudi::Property< std::vector< std::string > > &property, std::vector< std::vector< std::string > > *contAttr, std::vector< std::vector< std::string > > *dbAttr, std::vector< std::vector< std::string > > *domAttr=0) const
Extract POOL ItechnologySpecificAttributes for Domain, Database and Container from property.
virtual bool useDetailChronoStat() const override
std::vector< std::vector< std::string > > m_containerAttr
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
Gaudi::Property< std::string > m_persSvcPerInputType
PersSvcPerInputType, string property, tree name to use multiple persistency services,...
Gaudi::Property< std::vector< std::string > > m_inputPoolAttr
Input PoolAttributes, vector with names and values of technology specific attributes for POOL.
virtual StatusCode createRep(DataObject *pObject, IOpaqueAddress *&refpAddress) override
Implementation of IConversionSvc: Convert the transient object to the requested representation.
PMonUtils::BasicStopWatchResultMap_t m_chronoMap
Map that holds chrono information.
std::string m_defContainerType
Default container type (from PoolSvc)
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
AthenaPoolCnvSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Gaudi::Property< bool > m_persSvcPerOutput
PersSvcPerOutput, boolean property to use multiple persistency services, one per output stream.
std::string m_lastInputFileName
decoded storage tech requested in "StorageTechnology" property
virtual StatusCode finalize() override
Required of all Gaudi Services.
Gaudi::Property< int > m_numberEventsPerWrite
To use MetadataSvc to merge data placed in a certain container When using TMemFile call Write on numb...
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
void flushDataHeaderForms(const std::string &streamName="*")
Tell DataHeaderCnv to write out all DataHeaderForms for a given streamName (default is all)
unsigned outputContextId(const std::string &outputConnection)
Gaudi::Property< std::vector< std::string > > m_maxFileSizes
MaxFileSizes, vector with maximum file sizes for Athena POOL output files.
ServiceHandle< IPoolSvc > m_poolSvc
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
std::vector< std::vector< std::string > > m_inputAttr
virtual StatusCode convertAddress(const IOpaqueAddress *pAddress, std::string &refAddress) override
Convert address to string form.
Gaudi::Property< std::map< std::string, std::string > > m_storageTechProp
Default Storage Tech for containers (ROOTTREE, ROOTTREEINDEX, ROOTRNTUPLE)
virtual void setObjPtr(void *&obj, const Token *token) override
std::vector< std::vector< std::string > > m_domainAttr
Gaudi::Property< std::vector< std::string > > m_poolAttr
Output PoolAttributes, vector with names and values of technology specific attributes for POOL.
virtual StatusCode setInputAttributes(const std::string &fileName) override
Set the input file attributes, if any are requested from jobOpts.
Gaudi::Property< std::vector< std::string > > m_inputPoolAttrPerEvent
Print input PoolAttributes per event, vector with names of technology specific attributes for POOL to...
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
This class provides the interface for the AthenaPoolCleanUp which is used to clean up AthenaPoolConve...
This class provides the interface to the LCG POOL persistency software.
Definition IPoolSvc.h:35
@ kOutputStream
Definition IPoolSvc.h:39
@ kInputStream
Definition IPoolSvc.h:39
This class holds all the necessary information to guide the writing of an object in a physical place.
Definition Placement.h:19
Placement & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition Placement.h:42
const std::string & fileName() const
Access file name.
Definition Placement.h:28
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition RootType.cxx:586
This class provides a Generic Transient Address for POOL tokens.
Token * getToken()
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition Token.h:21
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition Token.h:93
const std::string & contID() const
Access container identifier.
Definition Token.h:69
const Guid & classID() const
Access database identifier.
Definition Token.h:73
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:134
const Guid & dbID() const
Access database identifier.
Definition Token.h:64
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.
int type() const
Access to full type.
Definition DbType.h:65
static DbType getType(const std::string &name)
Access known storage type object by name.
std::string label(const std::string &format, int i)
Definition label.h:19
Definition merge.py:1
pool namespace
Definition libname.h:15
static const DbType TEST_StorageType
Definition DbType.h:97
static const DbType ROOTTREE_StorageType
Definition DbType.h:101
static const DbType ROOTRNTUPLE_StorageType
Definition DbType.h:103
static const DbType ROOTTREEINDEX_StorageType
Definition DbType.h:102
static const DbType ROOTKEY_StorageType
Definition DbType.h:100
std::size_t erase_if(T_container &container, T_Func pred)
static constexpr const char * EventData
Definition APRDefaults.h:18
static constexpr const char * EventData
Definition APRDefaults.h:12
TFile * file