ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaPoolSharedIOCnvSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
9
11
12#include "GaudiKernel/ClassID.h"
13#include "GaudiKernel/FileIncident.h"
14#include "GaudiKernel/GenericAddress.h"
15#include "GaudiKernel/IIncidentSvc.h"
16#include "GaudiKernel/IOpaqueAddress.h"
17
26
27#include "StorageSvc/DbReflex.h"
28
29#include "AuxDiscoverySvc.h"
30
31#include <algorithm>
32#include <iomanip>
33#include <sstream>
34#include <format>
35
36//______________________________________________________________________________
37// Initialize the service.
39 // Retrieve InputStreamingTool (if configured)
40 if (!m_inputStreamingTool.empty()) {
42 }
43 // Retrieve OutputStreamingTool (if configured)
44 if (!m_outputStreamingTool.empty()) {
46 if (m_makeStreamingToolClient.value() == -1) {
47 // Initialize AthenaRootSharedWriter
48 ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name());
49 ATH_CHECK(arswsvc.retrieve());
50 }
51 // Put PoolSvc into share mode to avoid duplicating catalog.
52 getPoolSvc()->setShareMode(true);
53 }
54 if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) {
55 // Retrieve AthenaSerializeSvc
56 ATH_CHECK(m_serializeSvc.retrieve());
57 }
58 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
59 long int pri = 1000;
60 if (!m_outputStreamingTool.empty()) {
61 incSvc->addListener(this, "StoreCleared", pri);
62 ATH_MSG_DEBUG("Subscribed to StoreCleared");
63 }
64 return this->AthenaPoolCnvSvc::initialize();
65}
66//______________________________________________________________________________
68 // Release AthenaSerializeSvc
69 if (!m_serializeSvc.empty()) {
70 if (!m_serializeSvc.release().isSuccess()) {
71 ATH_MSG_WARNING("Cannot release AthenaSerializeSvc.");
72 }
73 }
74 // Release OutputStreamingTool (if configured)
75 if (!m_outputStreamingTool.empty()) {
76 if (!m_outputStreamingTool.release().isSuccess()) {
77 ATH_MSG_WARNING("Cannot release Output AthenaIPCTool.");
78 }
79 }
80 // Release InputStreamingTool (if configured)
81 if (!m_inputStreamingTool.empty()) {
82 if (!m_inputStreamingTool.release().isSuccess()) {
83 ATH_MSG_WARNING("Cannot release Input AthenaIPCTool.");
84 }
85 }
86 return this->AthenaPoolCnvSvc::finalize();
87}
88//______________________________________________________________________________
89StatusCode AthenaPoolSharedIOCnvSvc::connectOutput(const std::string& outputConnectionSpec,
90 const std::string& /*openMode*/) {
91 return(connectOutput(outputConnectionSpec));
92}
93//______________________________________________________________________________
94StatusCode AthenaPoolSharedIOCnvSvc::connectOutput(const std::string& outputConnectionSpec) {
95// This is called before DataObjects are being converted.
96 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
97 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
98 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
99 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
100 return(StatusCode::FAILURE);
101 }
102 }
103 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
104 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
105 return(StatusCode::SUCCESS);
106 }
107 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
108 if (m_parallelCompression && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) {
109 ATH_MSG_DEBUG(std::format("connectOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
110 return(StatusCode::SUCCESS);
111 }
113 ATH_MSG_DEBUG("connectOutput SKIPPED for expired server.");
114 return(StatusCode::SUCCESS);
115 }
116 }
118 outputConnection += m_streamPortString.value();
119 }
120 std::size_t apend = outputConnectionSpec.find('[');
121 if (apend != std::string::npos) {
122 outputConnection += outputConnectionSpec.substr(apend);
123 }
124 return AthenaPoolCnvSvc::connectOutput(outputConnection);
125}
126
127//______________________________________________________________________________
128StatusCode AthenaPoolSharedIOCnvSvc::commitOutput(const std::string& outputConnectionSpec, bool doCommit) {
129 // This is called after all DataObjects are converted.
130 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
131 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
132 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
133 m_outputStreamingTool->lockObject("wait").ignore();
134 if (!this->cleanUp(outputConnection).isSuccess()) {
135 ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
136 return(StatusCode::FAILURE);
137 }
138 return(StatusCode::SUCCESS);
139 }
140 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
141 ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server.");
142 return(StatusCode::SUCCESS);
143 }
144 std::map<void*, RootType> commitCache;
145 std::string fileName;
146 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && m_outputStreamingTool->isServer() && m_streamServerActive) {
147 // Clear object to get Placements for all objects in a Stream
148 const char* placementStr = nullptr;
149 int num = -1;
150 StatusCode sc = m_outputStreamingTool->clearObject(&placementStr, num);
151 if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
152 const char * matchedChars = strstr(placementStr, "[FILE=");
153 if (!matchedChars){
154 ATH_MSG_ERROR(std::format("No matching filename in {}", placementStr));
155 return abortSharedWrClients(num);
156 }
157 fileName = matchedChars;
158 fileName = fileName.substr(6, fileName.find(']') - 6);
159 if (!this->connectOutput(fileName).isSuccess()) {
160 ATH_MSG_ERROR(std::format("Failed to connectOutput for {}", fileName));
161 return abortSharedWrClients(num);
162 }
163 IConverter* DHcnv = converter(ClassID_traits<DataHeader>::ID());
164 bool dataHeaderSeen = false;
165 std::string dataHeaderID;
166 while (num > 0) {
167 std::string objName = "ALL";
168 if (useDetailChronoStat()) {
169 objName = placementStr; //FIXME, better descriptor
170 }
171 // StopWatch listens from here until the end of this current scope
172 {
173 PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
174 std::string_view pStr = placementStr;
175 std::string::size_type cpos = pStr.find ("[CONT=");
176 if (cpos == std::string::npos) {
177 ATH_MSG_ERROR(std::format("No CONT field in placement string: {}", pStr));
178 return StatusCode::FAILURE;
179 }
180 std::string tokenStr (pStr.substr(0, cpos));
181 std::string contName (pStr.substr(cpos, std::string::npos));
182 std::string::size_type cl1 = contName.find(']');
183 if (cl1 == std::string::npos) {
184 ATH_MSG_ERROR(std::format("Missing close bracket after CONT field in placement string: {}", pStr));
185 return StatusCode::FAILURE;
186 }
187 tokenStr.append(contName, cl1 + 1);
188 contName = contName.substr(6, cl1 - 6);
189
190 std::string::size_type ppos = pStr.find ("[PNAME=");
191 if (ppos == std::string::npos) {
192 ATH_MSG_ERROR(std::format("No PNAME field in placement string: {}", pStr));
193 return StatusCode::FAILURE;
194 }
195 std::string className (pStr.substr(ppos, std::string::npos));
196 std::string::size_type cl2 = className.find(']');
197 if (cl2 == std::string::npos) {
198 ATH_MSG_ERROR(std::format("Missing close bracket after PNAME field in placement string: {}", pStr));
199 return StatusCode::FAILURE;
200 }
201 className = className.substr(7, cl2 - 7);
202 RootType classDesc = RootType::ByNameNoQuiet(className);
203 void* obj = nullptr;
204 const std::string numStr = std::to_string(num);
205 std::string::size_type len = m_metadataContainerProp.value().size();
206 bool foundContainer = false;
207 std::size_t opPos = contName.find('(');
208 if (contName.compare(0, opPos, m_metadataContainerProp.value()) == 0) {
209 foundContainer = true;
210 } else {
211 for (const auto& item: m_metadataContainersAug.value()) {
212 if (contName.compare(0, opPos, item) == 0){
213 foundContainer = true;
214 len = item.size();
215 break;
216 }
217 }
218 }
219 if (len > 0 && foundContainer && contName[len] == '(' ) {
220 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
221 // For Metadata, before moving to next client, fire file incidents
222 if (m_metadataClient != num) {
223 if (m_metadataClient != 0) {
224 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
225 auto guard = InputFileIncidentGuard::begin(*incSvc, name(),
226 memName, {}, /*endFileName=*/memName,
227 "BeginInputMemFile", "EndInputMemFile");
228 }
229 m_metadataClient = num;
230 }
231 // Retrieve MetaDataSvc
232 ServiceHandle<IAthMetaDataSvc> metadataSvc("MetaDataSvc", name());
233 ATH_CHECK(metadataSvc.retrieve());
234 sc = metadataSvc->shmProxy(std::format("{}[NUM={}]", pStr, numStr));
235 if (sc.isRecoverable()) {
236 ATH_MSG_WARNING("MetaDataSvc::shmProxy() no proxy added.");
237 } else if (sc.isFailure()) {
238 ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!");
239 return abortSharedWrClients(num);
240 }
241 } else {
242 Token readToken;
243 readToken.setOid(Token::OID_t(num, 0));
244 readToken.setAuxString("[PNAME=" + className + "]");
245 this->setObjPtr(obj, &readToken); // Pull/read Object out of shared memory
246 if (len == 0 || contName.compare(0, len, m_metadataContainerProp.value()) != 0) {
247 // Write object
248 if( m_oneDataHeaderForm.value() ) {
249 auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
250 if( className == "DataHeaderForm_p6" ) {
251 // Pass DHForms to the converter for later writing in the correct order - do not write it now
253 "", placementWithSwn());
254 DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
255 tokenStr = "";
256 } else {
257 Placement placement;
258 placement.fromString(placementStr);
259 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
260 if (token == nullptr) {
261 ATH_MSG_ERROR("Failed to write Data for: " << className);
262 return abortSharedWrClients(num);
263 }
264 tokenStr = token->toString();
265 }
266 if( className == "DataHeader_p6" ) {
267 // Found DataHeader - call the converter to update DHForm Ref
269 tokenStr, placementWithSwn());
270 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
271 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
272 return abortSharedWrClients(num);
273 }
274 } else
275 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
276 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
277 }
278 placementStr = nullptr;
279 } else {
280 // Multiple shared DataHeaderForms
281 Placement placement;
282 placement.fromString(placementStr); placementStr = nullptr;
283 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
284 if (token == nullptr) {
285 ATH_MSG_ERROR("Failed to write Data for: " << className);
286 return abortSharedWrClients(num);
287 }
288 tokenStr = token->toString();
289 if (className == "DataHeader_p6") {
290 // Found DataHeader
292 tokenStr, placement.auxString());
293 // call DH converter to add the ref to DHForm (stored earlier) and to itself
294 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
295 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
296 return abortSharedWrClients(num);
297 }
298 dataHeaderSeen = true;
299 // This dataHeaderID is used in DataHeaderCnv to index the DataHeaderForm cache.
300 // It must be unique per worker per stream so that we have a correct DataHeader(Form) association.
301 // This is achieved by building it as "CONTID/WORKERID/DBID".
302 // CONTID, e.g., POOLContainer(DataHeader), allows us to distinguish data and metadata headers,
303 // WORKERID allows us to distinguish AthenaMP workers,
304 // and DBID allows us to distinguish streams.
305 dataHeaderID = std::format("{}/{}/{}", token->contID(), numStr, token->dbID().toString());
306 } else if (dataHeaderSeen) {
307 dataHeaderSeen = false;
308 // next object after DataHeader - may be a DataHeaderForm
309 // in any case we need to call the DH converter to update the DHForm Ref
310 if (className == "DataHeaderForm_p6") {
311 // Tell DataHeaderCnv that it should use a new DHForm
313 tokenStr, dataHeaderID);
314 if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
315 ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr);
316 return abortSharedWrClients(num);
317 }
318 } else {
319 // Tell DataHeaderCnv that it should use the old DHForm
320 GenericAddress address(0, 0, "", dataHeaderID);
321 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
322 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
323 return abortSharedWrClients(num);
324 }
325 }
326 }
327 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
328 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
329 }
330 }
331 }
332 }
333 // Send Token back to Client
334 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
335 while (sc.isRecoverable()) {
336 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
337 }
338 if (!sc.isSuccess()) {
339 ATH_MSG_ERROR("Failed to lock Data for " << tokenStr);
340 return abortSharedWrClients(-1);
341 }
342 }
343 sc = m_outputStreamingTool->clearObject(&placementStr, num);
344 while (sc.isRecoverable()) {
345 sc = m_outputStreamingTool->clearObject(&placementStr, num);
346 }
347 if (sc.isFailure()) {
348 // no more clients, break the loop and exit
349 num = -1;
350 }
351 }
352 if (dataHeaderSeen) {
353 // DataHeader was the last object, need to tell the converter there is no DHForm coming
354 GenericAddress address(0, 0, "", std::move(dataHeaderID));
355 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
356 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
357 return abortSharedWrClients(-1);
358 }
359 }
360 placementStr = nullptr;
361 } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) {
362 return(StatusCode::RECOVERABLE);
363 } else if (sc.isRecoverable() || num == -1) {
364 return(StatusCode::RECOVERABLE);
365 }
366 if (sc.isFailure() || fileName.empty()) {
367 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
368 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
369 {
370 auto guard = InputFileIncidentGuard::begin(*incSvc, name(),
371 memName, {}, /*endFileName=*/memName,
372 "BeginInputMemFile", "EndInputMemFile");
373 }
374 if (sc.isFailure()) {
375 ATH_MSG_INFO("All SharedWriter clients stopped - exiting");
376 } else {
377 ATH_MSG_INFO("Failed to get Data for client: " << num);
378 }
379 return(StatusCode::FAILURE);
380 }
381 }
382 if (m_parallelCompression && !fileName.empty()) {
383 ATH_MSG_DEBUG(std::format("commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
384 return(StatusCode::SUCCESS);
385 }
386 if (outputConnection.empty()) {
387 outputConnection = std::move(fileName);
388 } else {
389 outputConnection = outputConnectionSpec;
391 outputConnection += m_streamPortString.value();
392 }
393 }
394 std::size_t merge = outputConnection.find("?pmerge="); // Used to remove trailing TMemFile
395 const std::string baseOutputConnection = outputConnection.substr(0, merge);
396 m_fileCommitCounter[baseOutputConnection]++;
398 m_fileFlushSetting.value().contains(baseOutputConnection) &&
399 m_fileFlushSetting[baseOutputConnection] > 0 &&
400 m_fileCommitCounter[baseOutputConnection] % m_fileFlushSetting[baseOutputConnection] == 0) {
401 doCommit = true;
402 ATH_MSG_DEBUG("commitOutput sending data.");
403 }
404 StatusCode status = AthenaPoolCnvSvc::commitOutput(outputConnection, doCommit);
405 for (auto& [ptr, rootType] : commitCache) {
406 rootType.Destruct(ptr);
407 }
408 return(status);
409}
410
411//______________________________________________________________________________
412StatusCode AthenaPoolSharedIOCnvSvc::disconnectOutput(const std::string& outputConnectionSpec) {
413 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
414 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
415 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
416 return(StatusCode::SUCCESS);
417 }
418 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
420 m_streamServerActive = false;
421 ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server.");
422 return(StatusCode::SUCCESS);
423 } else {
424 m_streamServerActive = false;
425 }
426 ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server.");
427 }
429 outputConnection += m_streamPortString.value();
430 }
431 return AthenaPoolCnvSvc::disconnectOutput(outputConnectionSpec + m_streamPortString.value());
432}
433
434//______________________________________________________________________________
435Token* AthenaPoolSharedIOCnvSvc::registerForWrite(Placement* placement, const void* obj, const RootType& classDesc) {
436 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
437 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
438 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
439 return(nullptr);
440 }
441 }
442 Token* token = nullptr;
443 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
444 && (!m_parallelCompression || placement->containerName().compare(0, m_metadataContainerProp.value().size(), m_metadataContainerProp.value()) == 0)) {
445 // Lock object
446 std::string placementStr = placement->toString();
447 placementStr += "[PNAME=";
448 placementStr += classDesc.Name();
449 placementStr += ']';
450 ATH_MSG_VERBOSE("Requesting write object for: " << placementStr);
451 StatusCode sc = m_outputStreamingTool->lockObject(placementStr.c_str());
452 while (sc.isRecoverable()) {
453 //usleep(100);
454 sc = m_outputStreamingTool->lockObject(placementStr.c_str());
455 }
456 if (!sc.isSuccess()) {
457 ATH_MSG_ERROR("Failed to lock Data for " << placementStr);
458 return(nullptr);
459 }
460 // Serialize object via ROOT
461 const void* buffer = nullptr;
462 std::size_t nbytes = 0;
463 bool own = true;
464 if (classDesc.Name() == "Token") {
465 nbytes = strlen(static_cast<const char*>(obj)) + 1;
466 buffer = obj;
467 own = false;
468 } else if (classDesc.IsFundamental()) {
469 nbytes = classDesc.SizeOf();
470 buffer = obj;
471 own = false;
472 } else {
473 buffer = m_serializeSvc->serialize(obj, classDesc, nbytes);
474 }
475 // Share object
476 sc = m_outputStreamingTool->putObject(buffer, nbytes);
477 while (sc.isRecoverable()) {
478 //usleep(100);
479 sc = m_outputStreamingTool->putObject(buffer, nbytes);
480 }
481 if (own) { delete [] static_cast<const char*>(buffer); }
482 buffer = nullptr;
483 if (!sc.isSuccess()) {
484 ATH_MSG_ERROR("Could not share object for: " << placementStr);
485 m_outputStreamingTool->putObject(nullptr, 0).ignore();
486 return(nullptr);
487 }
488 AuxDiscoverySvc auxDiscover;
489 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) {
490 ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr);
491 m_outputStreamingTool->putObject(nullptr, 0).ignore();
492 return(nullptr);
493 }
494 if (!m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) {
495 ATH_MSG_ERROR("Failed to put Data for " << placementStr);
496 return(nullptr);
497 }
498 // Get Token back from Server
499 const char* tokenStr = nullptr;
500 int num = -1;
501 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
502 while (sc.isRecoverable()) {
503 //usleep(100);
504 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
505 }
506 if (!sc.isSuccess()) {
507 ATH_MSG_ERROR("Failed to get Token");
508 return(nullptr);
509 }
510 if (!strcmp(tokenStr, "ABORT")) {
511 ATH_MSG_ERROR("Writer requested ABORT");
512 // tell the server we are leaving
513 m_outputStreamingTool->stop().ignore();
514 return nullptr;
515 }
516 Token* tempToken = new Token();
517 tempToken->fromString(tokenStr); tokenStr = nullptr;
518 tempToken->setClassID(pool::DbReflex::guid(classDesc));
519 token = tempToken; tempToken = nullptr;
520// Client Write Request
521 } else {
522 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
523 ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString());
524 Token* tempToken = new Token();
525 tempToken->setClassID(pool::DbReflex::guid(classDesc));
526 token = tempToken; tempToken = nullptr;
527 } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_streamServerActive) {
528 ATH_MSG_DEBUG("Requested write object for: " << placement->toString());
529 token = getPoolSvc()->registerForWrite(placement, obj, classDesc);
530 } else {
532 placement->setFileName(placement->fileName() + m_streamPortString.value());
533 }
534 token = AthenaPoolCnvSvc::registerForWrite(placement, obj, classDesc);
535 }
536 }
537 return(token);
538}
539//______________________________________________________________________________
540void AthenaPoolSharedIOCnvSvc::setObjPtr(void*& obj, const Token* token) {
541 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
542 if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
543 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
544 }
545 }
546 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isServer()) {
547 if (token->dbID() == Guid::null()) {
548 int num = token->oid().first;
549 // Get object from SHM
550 void* buffer = nullptr;
551 std::size_t nbytes = 0;
552 StatusCode sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
553 while (sc.isRecoverable()) {
554 //usleep(100);
555 sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
556 }
557 if (!sc.isSuccess()) {
558 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
559 obj = nullptr;
560 } else {
561 ATH_MSG_DEBUG("Server deserializing " << token->toString());
562 if (token->classID() != Guid::null()) {
563 // Deserialize object
564 RootType cltype(pool::DbReflex::forGuid(token->classID()));
565 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
566 } else {
567 // Deserialize object
568 std::string className = token->auxString();
569 className = className.substr(className.find("[PNAME="));
570 className = className.substr(7, className.find(']') - 7);
571 RootType cltype(RootType::ByNameNoQuiet(className));
572 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
573 }
574 AuxDiscoverySvc auxDiscover;
575 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, num).isSuccess()) {
576 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
577 obj = nullptr;
578 }
579 }
580 }
581 }
582 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient() && (m_streamingTechnology.value() < 0 || token->technology() == m_streamingTechnology.value())) {
583 ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
584 if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
585 ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
586 obj = nullptr;
587 } else {
588 void* buffer = nullptr;
589 std::size_t nbytes = 0;
590 StatusCode sc = StatusCode::FAILURE;
591 // StopWatch listens from here until the end of this current scope
592 {
593 PMonUtils::BasicStopWatch stopWatch("gObj_ALL", this->m_chronoMap);
594 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
595 while (sc.isRecoverable()) {
596 // sleep
597 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
598 }
599 }
600 if (!sc.isSuccess()) {
601 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
602 obj = nullptr;
603 } else {
604 obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr;
605 AuxDiscoverySvc auxDiscover;
606 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_inputStreamingTool.get(), obj).isSuccess()) {
607 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
608 obj = nullptr;
609 }
610 }
611 }
612 } else if (token->dbID() != Guid::null()) {
613 AthenaPoolCnvSvc::setObjPtr(obj, token);
614 }
615}
616//______________________________________________________________________________
618 const CLID& clid,
619 const std::string* par,
620 const unsigned long* ip,
621 IOpaqueAddress*& refpAddress) {
622 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
624 }
625 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) {
626 Token addressToken;
627 addressToken.setDb(par[0].substr(4));
628 addressToken.setCont(par[1]);
629 addressToken.setOid(Token::OID_t(ip[0], ip[1]));
630 ATH_CHECK(m_inputStreamingTool->lockObject(addressToken.toString().c_str()));
631 void* buffer = nullptr;
632 std::size_t nbytes = 0;
633 StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes);
634 while (sc.isRecoverable()) {
635 // sleep
636 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
637 }
638 if (!sc.isSuccess()) {
639 ATH_MSG_WARNING("Failed to get Address Token: " << addressToken.toString());
640 return(StatusCode::FAILURE);
641 }
642 auto token = std::make_unique<Token>();
643 token->fromString(static_cast<const char*>(buffer)); buffer = nullptr;
644 if (token->classID() == Guid::null()) {
645 token.reset();
646 }
647 m_inputStreamingTool->getObject(&buffer, nbytes).ignore();
648 if (token) {
649 refpAddress = new TokenAddress(pool::POOL_StorageType.type(), clid, "", par[1], IPoolSvc::kInputStream, std::move(token));
650 return(StatusCode::SUCCESS);
651 }
652 else {
653 return(StatusCode::RECOVERABLE);
654 }
655 } else {
656 return AthenaPoolCnvSvc::createAddress(svcType, clid, par, ip, refpAddress);
657 }
658}
659//______________________________________________________________________________
661 const CLID& clid,
662 const std::string& refAddress,
663 IOpaqueAddress*& refpAddress) {
664 return AthenaPoolCnvSvc::createAddress(svcType, clid, refAddress, refpAddress);
665}
666//______________________________________________________________________________
667StatusCode AthenaPoolSharedIOCnvSvc::cleanUp(const std::string& connection) {
668 auto pos = connection.find("?pmerge=");
669 std::string conn = (pos == std::string::npos) ? connection : connection.substr(0, pos);
670 return AthenaPoolCnvSvc::cleanUp(conn);
671}
672//______________________________________________________________________________
674 if (num < 0) {
675 num = -num;
677 num = num % 1024;
678 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer()) {
679 ATH_MSG_DEBUG(std::format("makeServer: {} = {}", m_outputStreamingTool.name(), num));
680 ATH_MSG_DEBUG(std::format("makeServer: Calling shared memory tool with port suffix {}", m_streamPortString.value()));
681 const std::string streamPortSuffix = m_streamPortString.value();
682 if (m_outputStreamingTool->makeServer(num, streamPortSuffix).isFailure()) {
683 ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed");
684 return(StatusCode::FAILURE);
685 }
686 // Disable PersistencySvc per output file mode, for SharedWriter Server
687 m_persSvcPerOutput.setValue(false);
688 return(StatusCode::SUCCESS);
689 }
690 return(StatusCode::RECOVERABLE);
691 }
692 if (m_inputStreamingTool.empty()) {
693 return(StatusCode::RECOVERABLE);
694 }
695 ATH_MSG_DEBUG("makeServer: " << m_inputStreamingTool << " = " << num);
696 return(m_inputStreamingTool->makeServer(num, ""));
697}
698//________________________________________________________________________________
700 if (!m_outputStreamingTool.empty()) {
701 ATH_MSG_DEBUG("makeClient: " << m_outputStreamingTool << " = " << num);
702 std::string streamPortSuffix;
703 if (m_outputStreamingTool->makeClient(num, streamPortSuffix).isFailure()) {
704 ATH_MSG_ERROR("makeClient: " << m_outputStreamingTool << " failed");
705 return(StatusCode::FAILURE);
706 } else if (m_streamPortString.value().find("localhost:0") != std::string::npos) {
707 // We don't seem to use a dedicated port per stream so doing this for the first client is probably OK
708 ATH_MSG_DEBUG("makeClient: Setting conversion service port suffix to " << streamPortSuffix);
709 m_streamPortString.setValue(streamPortSuffix);
710 }
711 }
712 if (m_inputStreamingTool.empty()) {
713 return(StatusCode::SUCCESS);
714 }
715 ATH_MSG_DEBUG("makeClient: " << m_inputStreamingTool << " = " << num);
716 std::string dummyStr;
717 return(m_inputStreamingTool->makeClient(num, dummyStr));
718}
719//________________________________________________________________________________
721 if (m_inputStreamingTool.empty()) {
722 return(StatusCode::FAILURE);
723 }
724 const char* tokenStr = nullptr;
725 int num = -1;
726 StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
727 if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
728 ATH_MSG_DEBUG("readData: " << tokenStr << ", for client: " << num);
729 } else {
730 return(sc);
731 }
732 // Read object instance via POOL/ROOT
733 void* instance = nullptr;
734 Token token;
735 token.fromString(tokenStr); tokenStr = nullptr;
736 if (token.classID() != Guid::null()) {
737 std::string objName = "ALL";
738 if (useDetailChronoStat()) {
739 objName = token.classID().toString();
740 }
741 // StopWatch listens from here until the end of this current scope
742 PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, this->m_chronoMap);
743 this->setObjPtr(instance, &token);
744 // Serialize object via ROOT
746 void* buffer = nullptr;
747 std::size_t nbytes = 0;
748 buffer = m_serializeSvc->serialize(instance, cltype, nbytes);
749 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
750 while (sc.isRecoverable()) {
751 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
752 }
753 delete [] static_cast<char*>(buffer); buffer = nullptr;
754 if (!sc.isSuccess()) {
755 ATH_MSG_ERROR("Could not share object for: " << token.toString());
756 return(StatusCode::FAILURE);
757 }
758 AuxDiscoverySvc auxDiscover;
759 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) {
760 ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString());
761 return(StatusCode::FAILURE);
762 }
763 cltype.Destruct(instance); instance = nullptr;
764 if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
765 ATH_MSG_ERROR("Could not share object for: " << token.toString());
766 return(StatusCode::FAILURE);
767 }
768 } else if (token.dbID() != Guid::null()) {
769 std::string returnToken;
770 Token* metadataToken = getPoolSvc()->getToken("FID:" + token.dbID().toString(), token.contID(), token.oid().first);
771 if( metadataToken ) {
772 returnToken = metadataToken->toString();
773 metadataToken->release(); metadataToken = nullptr;
774 } else {
775 returnToken = token.toString();
776 }
777 // Share token
778 sc = m_inputStreamingTool->putObject(returnToken.c_str(), returnToken.size() + 1, num);
779 if (!sc.isSuccess() || !m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
780 ATH_MSG_ERROR("Could not share token for: " << token.toString());
781 return(StatusCode::FAILURE);
782 }
783 } else {
784 return(StatusCode::RECOVERABLE);
785 }
786 return(StatusCode::SUCCESS);
787}
788
789//________________________________________________________________________________
791 pool::IFileCatalog* catalog ATLAS_THREAD_SAFE = // This is on the SharedWriter, after mother process finishes events
792 const_cast<pool::IFileCatalog*>(getPoolSvc()->catalog());
793 catalog->commit();
794 catalog->start();
795 return(StatusCode::SUCCESS);
796}
797
798//______________________________________________________________________________
800{
801 ATH_MSG_ERROR("Sending ABORT to clients");
802 // the master process will kill this process once workers abort
803 // but it could be a time-limited loop
804 StatusCode sc = StatusCode::SUCCESS;
805 while (sc.isSuccess()) {
806 if (client_n >= 0) {
807 sc = m_outputStreamingTool->lockObject("ABORT", client_n);
808 }
809 const char* dummy;
810 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
811 while (sc.isRecoverable()) {
812 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
813 }
814 }
815 return StatusCode::FAILURE;
816}
817
818//______________________________________________________________________________
819void AthenaPoolSharedIOCnvSvc::handle(const Incident& incident) {
820 if (incident.type() == "StoreCleared" && m_outputStreamingTool->isClient() && !m_parallelCompression) {
821 m_outputStreamingTool->lockObject("release").ignore();
822 }
823}
824//______________________________________________________________________________
825AthenaPoolSharedIOCnvSvc::AthenaPoolSharedIOCnvSvc(const std::string& name, ISvcLocator* pSvcLocator) :
826 base_class(name, pSvcLocator) {
827}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the AthenaPoolSharedIOCnvSvc class.
This file contains the class definition for the AuxDiscoverySvc class.
This file contains the class definition for the DataHeader and DataHeaderElement classes.
uint32_t CLID
The Class ID type.
This file contains the class definition for the IAthMetaDataSvc class.
Interface to an output stream tool.
RAII guard that guarantees a matching end-incident for every begin-incident.
static Double_t sc
This file contains the class definition for the Placement class (migrated from POOL).
TTypeAdapter RootType
Definition RootType.h:211
std::map< std::string, double > instance
This file contains the class definition for the TokenAddress class.
This file contains the class definition for the Token class (migrated from POOL).
#define ATLAS_THREAD_SAFE
std::map< std::string, int > m_fileCommitCounter
Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual StatusCode finalize() override
Required of all Gaudi Services.
AthenaPoolSharedIOCnvSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual void setObjPtr(void *&obj, const Token *token) override
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
virtual StatusCode makeServer(int num) override
Make this a server.
virtual void handle(const Incident &incident) override
Implementation of IIncidentListener: Handle for EndEvent incidence.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
virtual StatusCode readData() override
Read the next data object.
virtual StatusCode commitCatalog() override
Commit Catalog.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting
Gaudi::Property< int > m_streamingTechnology
Use Streaming for selected technologies only.
virtual StatusCode initialize() override
Required of all Gaudi Services.
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
virtual StatusCode makeClient(int num) override
Make this a client.
This class provides the interface between AthenaPoolCnvSvc and AuxStore classes.
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
@ kInputStream
Definition IPoolSvc.h:40
static InputFileIncidentGuard begin(IIncidentSvc &incSvc, std::string_view source, std::string_view beginFileName, std::string_view guid, std::string_view endFileName={}, std::string_view beginType=IncidentType::BeginInputFile, std::string_view endType=IncidentType::EndInputFile)
Factory: fire the begin incident and return a guard whose destructor fires the matching end incident.
This class holds all the necessary information to guide the writing of an object in a physical place.
Definition Placement.h:19
const std::string & auxString() const
Access auxiliary string.
Definition Placement.h:40
const std::string & containerName() const
Access container name.
Definition Placement.h:32
Placement & setFileName(const std::string &fileName)
Set file name.
Definition Placement.h:30
const std::string toString() const
Retrieve the string representation of the placement.
Definition Placement.cxx:15
const std::string & fileName() const
Access file name.
Definition Placement.h:28
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
Definition Placement.cxx:28
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition RootType.cxx:586
Bool_t IsFundamental() const
Definition RootType.cxx:731
std::string Name(unsigned int mod=Reflex::SCOPED) const
Definition RootType.cxx:612
void Destruct(void *place) const
Definition RootType.cxx:677
size_t SizeOf() const
Definition RootType.cxx:765
This class provides a Generic Transient Address for POOL tokens.
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition Token.h:21
const std::string & auxString() const
Access auxiliary string.
Definition Token.h:91
Token & setCont(const std::string &cnt)
Set container name.
Definition Token.h:71
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition Token.h:93
Token & setDb(const Guid &db)
Set database name.
Definition Token.h:66
const std::string & contID() const
Access container identifier.
Definition Token.h:69
const Guid & classID() const
Access database identifier.
Definition Token.h:73
Token & setClassID(const Guid &cl_id)
Access database identifier.
Definition Token.h:75
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:134
int technology() const
Access technology type.
Definition Token.h:77
int release()
Release token: Decrease reference count and eventually delete.
Definition Token.cxx:80
Token & setOid(const OID_t &oid)
Set object identifier.
Definition Token.h:85
const OID_t & oid() const
Access object identifier.
Definition Token.h:81
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition Token.cxx:147
const Guid & dbID() const
Access database identifier.
Definition Token.h:64
static const TypeH forGuid(const Guid &info)
Access classes by Guid.
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.
void commit()
Save catalog to file.
Definition merge.py:1
static const DbType POOL_StorageType
Definition DbType.h:84