ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaPoolSharedIOCnvSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
9
11
12#include "GaudiKernel/ClassID.h"
13#include "GaudiKernel/FileIncident.h"
14#include "GaudiKernel/GenericAddress.h"
15#include "GaudiKernel/IIncidentSvc.h"
16#include "GaudiKernel/IOpaqueAddress.h"
17
24
25
26#include "StorageSvc/DbReflex.h"
28
29#include "AuxDiscoverySvc.h"
30
31#include <algorithm>
32#include <iomanip>
33#include <sstream>
34#include <format>
35
36//______________________________________________________________________________
37// Initialize the service.
39 // Retrieve InputStreamingTool (if configured)
40 if (!m_inputStreamingTool.empty()) {
42 }
43 // Retrieve OutputStreamingTool (if configured)
44 if (!m_outputStreamingTool.empty()) {
46 if (m_makeStreamingToolClient.value() == -1) {
47 // Initialize AthenaRootSharedWriter
48 ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name());
49 ATH_CHECK(arswsvc.retrieve());
50 }
51 // Put PoolSvc into share mode to avoid duplicating catalog.
52 getPoolSvc()->setShareMode(true);
53 }
54 if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) {
55 // Retrieve AthenaSerializeSvc
56 ATH_CHECK(m_serializeSvc.retrieve());
57 }
58 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
59 long int pri = 1000;
60 if (!m_outputStreamingTool.empty()) {
61 incSvc->addListener(this, "StoreCleared", pri);
62 ATH_MSG_DEBUG("Subscribed to StoreCleared");
63 }
64 return this->AthenaPoolCnvSvc::initialize();
65}
66//______________________________________________________________________________
68 // Release AthenaSerializeSvc
69 if (!m_serializeSvc.empty()) {
70 if (!m_serializeSvc.release().isSuccess()) {
71 ATH_MSG_WARNING("Cannot release AthenaSerializeSvc.");
72 }
73 }
74 // Release OutputStreamingTool (if configured)
75 if (!m_outputStreamingTool.empty()) {
76 if (!m_outputStreamingTool.release().isSuccess()) {
77 ATH_MSG_WARNING("Cannot release Output AthenaIPCTool.");
78 }
79 }
80 // Release InputStreamingTool (if configured)
81 if (!m_inputStreamingTool.empty()) {
82 if (!m_inputStreamingTool.release().isSuccess()) {
83 ATH_MSG_WARNING("Cannot release Input AthenaIPCTool.");
84 }
85 }
86 return this->AthenaPoolCnvSvc::finalize();
87}
88//______________________________________________________________________________
89StatusCode AthenaPoolSharedIOCnvSvc::connectOutput(const std::string& outputConnectionSpec,
90 const std::string& /*openMode*/) {
91 return(connectOutput(outputConnectionSpec));
92}
93//______________________________________________________________________________
94StatusCode AthenaPoolSharedIOCnvSvc::connectOutput(const std::string& outputConnectionSpec) {
95// This is called before DataObjects are being converted.
96 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
97 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
98 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
99 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
100 return(StatusCode::FAILURE);
101 }
102 }
103 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
104 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
105 return(StatusCode::SUCCESS);
106 }
107 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
108 if (m_parallelCompression && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) {
109 ATH_MSG_DEBUG(std::format("connectOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
110 return(StatusCode::SUCCESS);
111 }
113 ATH_MSG_DEBUG("connectOutput SKIPPED for expired server.");
114 return(StatusCode::SUCCESS);
115 }
116 }
118 outputConnection += m_streamPortString.value();
119 }
120 std::size_t apend = outputConnectionSpec.find('[');
121 if (apend != std::string::npos) {
122 outputConnection += outputConnectionSpec.substr(apend);
123 }
124 return AthenaPoolCnvSvc::connectOutput(outputConnection);
125}
126
127//______________________________________________________________________________
128StatusCode AthenaPoolSharedIOCnvSvc::commitOutput(const std::string& outputConnectionSpec, bool doCommit) {
129 // This is called after all DataObjects are converted.
130 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
131 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
132 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
133 m_outputStreamingTool->lockObject("wait").ignore();
134 if (!this->cleanUp(outputConnection).isSuccess()) {
135 ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
136 return(StatusCode::FAILURE);
137 }
138 return(StatusCode::SUCCESS);
139 }
140 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
141 ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server.");
142 return(StatusCode::SUCCESS);
143 }
144 std::map<void*, RootType> commitCache;
145 std::string fileName;
146 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && m_outputStreamingTool->isServer() && m_streamServerActive) {
147 // Clear object to get Placements for all objects in a Stream
148 const char* placementStr = nullptr;
149 int num = -1;
150 StatusCode sc = m_outputStreamingTool->clearObject(&placementStr, num);
151 if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
152 const char * matchedChars = strstr(placementStr, "[FILE=");
153 if (!matchedChars){
154 ATH_MSG_ERROR(std::format("No matching filename in {}", placementStr));
155 return abortSharedWrClients(num);
156 }
157 fileName = matchedChars;
158 fileName = fileName.substr(6, fileName.find(']') - 6);
159 if (!this->connectOutput(fileName).isSuccess()) {
160 ATH_MSG_ERROR(std::format("Failed to connectOutput for {}", fileName));
161 return abortSharedWrClients(num);
162 }
163 IConverter* DHcnv = converter(ClassID_traits<DataHeader>::ID());
164 bool dataHeaderSeen = false;
165 std::string dataHeaderID;
166 while (num > 0) {
167 std::string objName = "ALL";
168 if (useDetailChronoStat()) {
169 objName = placementStr; //FIXME, better descriptor
170 }
171 // StopWatch listens from here until the end of this current scope
172 {
173 PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
174 std::string_view pStr = placementStr;
175 std::string::size_type cpos = pStr.find ("[CONT=");
176 if (cpos == std::string::npos) {
177 ATH_MSG_ERROR(std::format("No CONT field in placement string: {}", pStr));
178 return StatusCode::FAILURE;
179 }
180 std::string tokenStr (pStr.substr(0, cpos));
181 std::string contName (pStr.substr(cpos, std::string::npos));
182 std::string::size_type cl1 = contName.find(']');
183 if (cl1 == std::string::npos) {
184 ATH_MSG_ERROR(std::format("Missing close bracket after CONT field in placement string: {}", pStr));
185 return StatusCode::FAILURE;
186 }
187 tokenStr.append(contName, cl1 + 1);
188 contName = contName.substr(6, cl1 - 6);
189
190 std::string::size_type ppos = pStr.find ("[PNAME=");
191 if (ppos == std::string::npos) {
192 ATH_MSG_ERROR(std::format("No PNAME field in placement string: {}", pStr));
193 return StatusCode::FAILURE;
194 }
195 std::string className (pStr.substr(ppos, std::string::npos));
196 std::string::size_type cl2 = className.find(']');
197 if (cl2 == std::string::npos) {
198 ATH_MSG_ERROR(std::format("Missing close bracket after PNAME field in placement string: {}", pStr));
199 return StatusCode::FAILURE;
200 }
201 className = className.substr(7, cl2 - 7);
202 RootType classDesc = RootType::ByNameNoQuiet(className);
203 void* obj = nullptr;
204 const std::string numStr = std::to_string(num);
205 std::string::size_type len = m_metadataContainerProp.value().size();
206 bool foundContainer = false;
207 std::size_t opPos = contName.find('(');
208 if (contName.compare(0, opPos, m_metadataContainerProp.value()) == 0) {
209 foundContainer = true;
210 } else {
211 for (const auto& item: m_metadataContainersAug.value()) {
212 if (contName.compare(0, opPos, item) == 0){
213 foundContainer = true;
214 len = item.size();
215 break;
216 }
217 }
218 }
219 if (len > 0 && foundContainer && contName[len] == '(' ) {
220 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
221 // For Metadata, before moving to next client, fire file incidents
222 if (m_metadataClient != num) {
223 if (m_metadataClient != 0) {
224 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
225 FileIncident beginInputIncident(name(), "BeginInputMemFile", memName);
226 incSvc->fireIncident(beginInputIncident);
227 FileIncident endInputIncident(name(), "EndInputMemFile", std::move(memName));
228 incSvc->fireIncident(endInputIncident);
229 }
230 m_metadataClient = num;
231 }
232 // Retrieve MetaDataSvc
233 ServiceHandle<IAthMetaDataSvc> metadataSvc("MetaDataSvc", name());
234 ATH_CHECK(metadataSvc.retrieve());
235 sc = metadataSvc->shmProxy(std::format("{}[NUM={}]", pStr, numStr));
236 if (sc.isRecoverable()) {
237 ATH_MSG_WARNING("MetaDataSvc::shmProxy() no proxy added.");
238 } else if (sc.isFailure()) {
239 ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!");
240 return abortSharedWrClients(num);
241 }
242 } else {
243 Token readToken;
244 readToken.setOid(Token::OID_t(num, 0));
245 readToken.setAuxString("[PNAME=" + className + "]");
246 this->setObjPtr(obj, &readToken); // Pull/read Object out of shared memory
247 if (len == 0 || contName.compare(0, len, m_metadataContainerProp.value()) != 0) {
248 // Write object
249 if( m_oneDataHeaderForm.value() ) {
250 auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
251 if( className == "DataHeaderForm_p6" ) {
252 // Pass DHForms to the converter for later writing in the correct order - do not write it now
254 "", placementWithSwn());
255 DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
256 tokenStr = "";
257 } else {
258 Placement placement;
259 placement.fromString(placementStr);
260 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
261 if (token == nullptr) {
262 ATH_MSG_ERROR("Failed to write Data for: " << className);
263 return abortSharedWrClients(num);
264 }
265 tokenStr = token->toString();
266 }
267 if( className == "DataHeader_p6" ) {
268 // Found DataHeader - call the converter to update DHForm Ref
270 tokenStr, placementWithSwn());
271 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
272 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
273 return abortSharedWrClients(num);
274 }
275 } else
276 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
277 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
278 }
279 placementStr = nullptr;
280 } else {
281 // Multiple shared DataHeaderForms
282 Placement placement;
283 placement.fromString(placementStr); placementStr = nullptr;
284 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
285 if (token == nullptr) {
286 ATH_MSG_ERROR("Failed to write Data for: " << className);
287 return abortSharedWrClients(num);
288 }
289 tokenStr = token->toString();
290 if (className == "DataHeader_p6") {
291 // Found DataHeader
293 tokenStr, placement.auxString());
294 // call DH converter to add the ref to DHForm (stored earlier) and to itself
295 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
296 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
297 return abortSharedWrClients(num);
298 }
299 dataHeaderSeen = true;
300 // This dataHeaderID is used in DataHeaderCnv to index the DataHeaderForm cache.
301 // It must be unique per worker per stream so that we have a correct DataHeader(Form) association.
302 // This is achieved by building it as "CONTID/WORKERID/DBID".
303 // CONTID, e.g., POOLContainer(DataHeader), allows us to distinguish data and metadata headers,
304 // WORKERID allows us to distinguish AthenaMP workers,
305 // and DBID allows us to distinguish streams.
306 dataHeaderID = std::format("{}/{}/{}", token->contID(), numStr, token->dbID().toString());
307 } else if (dataHeaderSeen) {
308 dataHeaderSeen = false;
309 // next object after DataHeader - may be a DataHeaderForm
310 // in any case we need to call the DH converter to update the DHForm Ref
311 if (className == "DataHeaderForm_p6") {
312 // Tell DataHeaderCnv that it should use a new DHForm
314 tokenStr, dataHeaderID);
315 if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
316 ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr);
317 return abortSharedWrClients(num);
318 }
319 } else {
320 // Tell DataHeaderCnv that it should use the old DHForm
321 GenericAddress address(0, 0, "", dataHeaderID);
322 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
323 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
324 return abortSharedWrClients(num);
325 }
326 }
327 }
328 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
329 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
330 }
331 }
332 }
333 }
334 // Send Token back to Client
335 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
336 while (sc.isRecoverable()) {
337 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
338 }
339 if (!sc.isSuccess()) {
340 ATH_MSG_ERROR("Failed to lock Data for " << tokenStr);
341 return abortSharedWrClients(-1);
342 }
343 }
344 sc = m_outputStreamingTool->clearObject(&placementStr, num);
345 while (sc.isRecoverable()) {
346 sc = m_outputStreamingTool->clearObject(&placementStr, num);
347 }
348 if (sc.isFailure()) {
349 // no more clients, break the loop and exit
350 num = -1;
351 }
352 }
353 if (dataHeaderSeen) {
354 // DataHeader was the last object, need to tell the converter there is no DHForm coming
355 GenericAddress address(0, 0, "", std::move(dataHeaderID));
356 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
357 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
358 return abortSharedWrClients(-1);
359 }
360 }
361 placementStr = nullptr;
362 } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) {
363 return(StatusCode::RECOVERABLE);
364 } else if (sc.isRecoverable() || num == -1) {
365 return(StatusCode::RECOVERABLE);
366 }
367 if (sc.isFailure() || fileName.empty()) {
368 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
369 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
370 FileIncident beginInputIncident(name(), "BeginInputMemFile", memName);
371 incSvc->fireIncident(beginInputIncident);
372 FileIncident endInputIncident(name(), "EndInputMemFile", std::move(memName));
373 incSvc->fireIncident(endInputIncident);
374 if (sc.isFailure()) {
375 ATH_MSG_INFO("All SharedWriter clients stopped - exiting");
376 } else {
377 ATH_MSG_INFO("Failed to get Data for client: " << num);
378 }
379 return(StatusCode::FAILURE);
380 }
381 }
382 if (m_parallelCompression && !fileName.empty()) {
383 ATH_MSG_DEBUG(std::format("commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
384 return(StatusCode::SUCCESS);
385 }
386 if (outputConnection.empty()) {
387 outputConnection = std::move(fileName);
388 } else {
389 outputConnection = outputConnectionSpec;
391 outputConnection += m_streamPortString.value();
392 }
393 }
394 StatusCode status = AthenaPoolCnvSvc::commitOutput(outputConnection, doCommit);
395 for (auto& [ptr, rootType] : commitCache) {
396 rootType.Destruct(ptr);
397 }
398 return(status);
399}
400
401//______________________________________________________________________________
402StatusCode AthenaPoolSharedIOCnvSvc::disconnectOutput(const std::string& outputConnectionSpec) {
403 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
404 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
405 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
406 return(StatusCode::SUCCESS);
407 }
408 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
410 m_streamServerActive = false;
411 ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server.");
412 return(StatusCode::SUCCESS);
413 } else {
414 m_streamServerActive = false;
415 }
416 ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server.");
417 }
419 outputConnection += m_streamPortString.value();
420 }
421 return AthenaPoolCnvSvc::disconnectOutput(outputConnectionSpec + m_streamPortString.value());
422}
423
424//______________________________________________________________________________
425Token* AthenaPoolSharedIOCnvSvc::registerForWrite(Placement* placement, const void* obj, const RootType& classDesc) {
426 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
427 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
428 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
429 return(nullptr);
430 }
431 }
432 Token* token = nullptr;
433 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
434 && (!m_parallelCompression || placement->containerName().compare(0, m_metadataContainerProp.value().size(), m_metadataContainerProp.value()) == 0)) {
435 // Lock object
436 std::string placementStr = placement->toString();
437 placementStr += "[PNAME=";
438 placementStr += classDesc.Name();
439 placementStr += ']';
440 ATH_MSG_VERBOSE("Requesting write object for: " << placementStr);
441 StatusCode sc = m_outputStreamingTool->lockObject(placementStr.c_str());
442 while (sc.isRecoverable()) {
443 //usleep(100);
444 sc = m_outputStreamingTool->lockObject(placementStr.c_str());
445 }
446 if (!sc.isSuccess()) {
447 ATH_MSG_ERROR("Failed to lock Data for " << placementStr);
448 return(nullptr);
449 }
450 // Serialize object via ROOT
451 const void* buffer = nullptr;
452 std::size_t nbytes = 0;
453 bool own = true;
454 if (classDesc.Name() == "Token") {
455 nbytes = strlen(static_cast<const char*>(obj)) + 1;
456 buffer = obj;
457 own = false;
458 } else if (classDesc.IsFundamental()) {
459 nbytes = classDesc.SizeOf();
460 buffer = obj;
461 own = false;
462 } else {
463 buffer = m_serializeSvc->serialize(obj, classDesc, nbytes);
464 }
465 // Share object
466 sc = m_outputStreamingTool->putObject(buffer, nbytes);
467 while (sc.isRecoverable()) {
468 //usleep(100);
469 sc = m_outputStreamingTool->putObject(buffer, nbytes);
470 }
471 if (own) { delete [] static_cast<const char*>(buffer); }
472 buffer = nullptr;
473 if (!sc.isSuccess()) {
474 ATH_MSG_ERROR("Could not share object for: " << placementStr);
475 m_outputStreamingTool->putObject(nullptr, 0).ignore();
476 return(nullptr);
477 }
478 AuxDiscoverySvc auxDiscover;
479 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) {
480 ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr);
481 m_outputStreamingTool->putObject(nullptr, 0).ignore();
482 return(nullptr);
483 }
484 if (!m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) {
485 ATH_MSG_ERROR("Failed to put Data for " << placementStr);
486 return(nullptr);
487 }
488 // Get Token back from Server
489 const char* tokenStr = nullptr;
490 int num = -1;
491 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
492 while (sc.isRecoverable()) {
493 //usleep(100);
494 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
495 }
496 if (!sc.isSuccess()) {
497 ATH_MSG_ERROR("Failed to get Token");
498 return(nullptr);
499 }
500 if (!strcmp(tokenStr, "ABORT")) {
501 ATH_MSG_ERROR("Writer requested ABORT");
502 // tell the server we are leaving
503 m_outputStreamingTool->stop().ignore();
504 return nullptr;
505 }
506 Token* tempToken = new Token();
507 tempToken->fromString(tokenStr); tokenStr = nullptr;
508 tempToken->setClassID(pool::DbReflex::guid(classDesc));
509 token = tempToken; tempToken = nullptr;
510// Client Write Request
511 } else {
512 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
513 ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString());
514 Token* tempToken = new Token();
515 tempToken->setClassID(pool::DbReflex::guid(classDesc));
516 token = tempToken; tempToken = nullptr;
517 } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_streamServerActive) {
518 ATH_MSG_DEBUG("Requested write object for: " << placement->toString());
519 token = getPoolSvc()->registerForWrite(placement, obj, classDesc);
520 } else {
522 placement->setFileName(placement->fileName() + m_streamPortString.value());
523 }
524 token = AthenaPoolCnvSvc::registerForWrite(placement, obj, classDesc);
525 }
526 }
527 return(token);
528}
529//______________________________________________________________________________
530void AthenaPoolSharedIOCnvSvc::setObjPtr(void*& obj, const Token* token) {
531 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
532 if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
533 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
534 }
535 }
536 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isServer()) {
537 if (token->dbID() == Guid::null()) {
538 int num = token->oid().first;
539 // Get object from SHM
540 void* buffer = nullptr;
541 std::size_t nbytes = 0;
542 StatusCode sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
543 while (sc.isRecoverable()) {
544 //usleep(100);
545 sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
546 }
547 if (!sc.isSuccess()) {
548 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
549 obj = nullptr;
550 } else {
551 ATH_MSG_DEBUG("Server deserializing " << token->toString());
552 if (token->classID() != Guid::null()) {
553 // Deserialize object
554 RootType cltype(pool::DbReflex::forGuid(token->classID()));
555 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
556 } else {
557 // Deserialize object
558 std::string className = token->auxString();
559 className = className.substr(className.find("[PNAME="));
560 className = className.substr(7, className.find(']') - 7);
561 RootType cltype(RootType::ByNameNoQuiet(className));
562 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
563 }
564 AuxDiscoverySvc auxDiscover;
565 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, num).isSuccess()) {
566 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
567 obj = nullptr;
568 }
569 }
570 }
571 }
572 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient() && (m_streamingTechnology.value() < 0 || token->technology() == m_streamingTechnology.value())) {
573 ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
574 if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
575 ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
576 obj = nullptr;
577 } else {
578 void* buffer = nullptr;
579 std::size_t nbytes = 0;
580 StatusCode sc = StatusCode::FAILURE;
581 // StopWatch listens from here until the end of this current scope
582 {
583 PMonUtils::BasicStopWatch stopWatch("gObj_ALL", this->m_chronoMap);
584 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
585 while (sc.isRecoverable()) {
586 // sleep
587 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
588 }
589 }
590 if (!sc.isSuccess()) {
591 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
592 obj = nullptr;
593 } else {
594 obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr;
595 AuxDiscoverySvc auxDiscover;
596 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_inputStreamingTool.get(), obj).isSuccess()) {
597 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
598 obj = nullptr;
599 }
600 }
601 }
602 } else if (token->dbID() != Guid::null()) {
603 AthenaPoolCnvSvc::setObjPtr(obj, token);
604 }
605}
606//______________________________________________________________________________
608 const CLID& clid,
609 const std::string* par,
610 const unsigned long* ip,
611 IOpaqueAddress*& refpAddress) {
612 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
614 }
615 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) {
616 Token addressToken;
617 addressToken.setDb(par[0].substr(4));
618 addressToken.setCont(par[1]);
619 addressToken.setOid(Token::OID_t(ip[0], ip[1]));
620 ATH_CHECK(m_inputStreamingTool->lockObject(addressToken.toString().c_str()));
621 void* buffer = nullptr;
622 std::size_t nbytes = 0;
623 StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes);
624 while (sc.isRecoverable()) {
625 // sleep
626 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
627 }
628 if (!sc.isSuccess()) {
629 ATH_MSG_WARNING("Failed to get Address Token: " << addressToken.toString());
630 return(StatusCode::FAILURE);
631 }
632 auto token = std::make_unique<Token>();
633 token->fromString(static_cast<const char*>(buffer)); buffer = nullptr;
634 if (token->classID() == Guid::null()) {
635 token.reset();
636 }
637 m_inputStreamingTool->getObject(&buffer, nbytes).ignore();
638 if (token) {
639 refpAddress = new TokenAddress(pool::POOL_StorageType.type(), clid, "", par[1], IPoolSvc::kInputStream, std::move(token));
640 return(StatusCode::SUCCESS);
641 }
642 else {
643 return(StatusCode::RECOVERABLE);
644 }
645 } else {
646 return AthenaPoolCnvSvc::createAddress(svcType, clid, par, ip, refpAddress);
647 }
648}
649//______________________________________________________________________________
651 const CLID& clid,
652 const std::string& refAddress,
653 IOpaqueAddress*& refpAddress) {
654 return AthenaPoolCnvSvc::createAddress(svcType, clid, refAddress, refpAddress);
655}
656//______________________________________________________________________________
658 if (num < 0) {
659 num = -num;
661 num = num % 1024;
662 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer()) {
663 ATH_MSG_DEBUG(std::format("makeServer: {} = {}", m_outputStreamingTool.name(), num));
664 ATH_MSG_DEBUG(std::format("makeServer: Calling shared memory tool with port suffix {}", m_streamPortString.value()));
665 const std::string streamPortSuffix = m_streamPortString.value();
666 if (m_outputStreamingTool->makeServer(num, streamPortSuffix).isFailure()) {
667 ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed");
668 return(StatusCode::FAILURE);
669 }
670 // Disable PersistencySvc per output file mode, for SharedWriter Server
671 m_persSvcPerOutput.setValue(false);
672 return(StatusCode::SUCCESS);
673 }
674 return(StatusCode::RECOVERABLE);
675 }
676 if (m_inputStreamingTool.empty()) {
677 return(StatusCode::RECOVERABLE);
678 }
679 ATH_MSG_DEBUG("makeServer: " << m_inputStreamingTool << " = " << num);
680 return(m_inputStreamingTool->makeServer(num, ""));
681}
682//________________________________________________________________________________
684 if (!m_outputStreamingTool.empty()) {
685 ATH_MSG_DEBUG("makeClient: " << m_outputStreamingTool << " = " << num);
686 std::string streamPortSuffix;
687 if (m_outputStreamingTool->makeClient(num, streamPortSuffix).isFailure()) {
688 ATH_MSG_ERROR("makeClient: " << m_outputStreamingTool << " failed");
689 return(StatusCode::FAILURE);
690 } else if (m_streamPortString.value().find("localhost:0") != std::string::npos) {
691 // We don't seem to use a dedicated port per stream so doing this for the first client is probably OK
692 ATH_MSG_DEBUG("makeClient: Setting conversion service port suffix to " << streamPortSuffix);
693 m_streamPortString.setValue(streamPortSuffix);
694 }
695 }
696 if (m_inputStreamingTool.empty()) {
697 return(StatusCode::SUCCESS);
698 }
699 ATH_MSG_DEBUG("makeClient: " << m_inputStreamingTool << " = " << num);
700 std::string dummyStr;
701 return(m_inputStreamingTool->makeClient(num, dummyStr));
702}
703//________________________________________________________________________________
705 if (m_inputStreamingTool.empty()) {
706 return(StatusCode::FAILURE);
707 }
708 const char* tokenStr = nullptr;
709 int num = -1;
710 StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
711 if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
712 ATH_MSG_DEBUG("readData: " << tokenStr << ", for client: " << num);
713 } else {
714 return(sc);
715 }
716 // Read object instance via POOL/ROOT
717 void* instance = nullptr;
718 Token token;
719 token.fromString(tokenStr); tokenStr = nullptr;
720 if (token.classID() != Guid::null()) {
721 std::string objName = "ALL";
722 if (useDetailChronoStat()) {
723 objName = token.classID().toString();
724 }
725 // StopWatch listens from here until the end of this current scope
726 PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, this->m_chronoMap);
727 this->setObjPtr(instance, &token);
728 // Serialize object via ROOT
730 void* buffer = nullptr;
731 std::size_t nbytes = 0;
732 buffer = m_serializeSvc->serialize(instance, cltype, nbytes);
733 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
734 while (sc.isRecoverable()) {
735 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
736 }
737 delete [] static_cast<char*>(buffer); buffer = nullptr;
738 if (!sc.isSuccess()) {
739 ATH_MSG_ERROR("Could not share object for: " << token.toString());
740 return(StatusCode::FAILURE);
741 }
742 AuxDiscoverySvc auxDiscover;
743 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) {
744 ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString());
745 return(StatusCode::FAILURE);
746 }
747 cltype.Destruct(instance); instance = nullptr;
748 if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
749 ATH_MSG_ERROR("Could not share object for: " << token.toString());
750 return(StatusCode::FAILURE);
751 }
752 } else if (token.dbID() != Guid::null()) {
753 std::string returnToken;
754 const Token* metadataToken = getPoolSvc()->getToken("FID:" + token.dbID().toString(), token.contID(), token.oid().first);
755 if (metadataToken != nullptr) {
756 returnToken = metadataToken->toString();
757 } else {
758 returnToken = token.toString();
759 }
760 delete metadataToken; metadataToken = nullptr;
761 // Share token
762 sc = m_inputStreamingTool->putObject(returnToken.c_str(), returnToken.size() + 1, num);
763 if (!sc.isSuccess() || !m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
764 ATH_MSG_ERROR("Could not share token for: " << token.toString());
765 return(StatusCode::FAILURE);
766 }
767 } else {
768 return(StatusCode::RECOVERABLE);
769 }
770 return(StatusCode::SUCCESS);
771}
772
773//________________________________________________________________________________
775 pool::IFileCatalog* catalog ATLAS_THREAD_SAFE = // This is on the SharedWriter, after mother process finishes events
776 const_cast<pool::IFileCatalog*>(getPoolSvc()->catalog());
777 catalog->commit();
778 catalog->start();
779 return(StatusCode::SUCCESS);
780}
781
782//______________________________________________________________________________
784{
785 ATH_MSG_ERROR("Sending ABORT to clients");
786 // the master process will kill this process once workers abort
787 // but it could be a time-limited loop
788 StatusCode sc = StatusCode::SUCCESS;
789 while (sc.isSuccess()) {
790 if (client_n >= 0) {
791 sc = m_outputStreamingTool->lockObject("ABORT", client_n);
792 }
793 const char* dummy;
794 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
795 while (sc.isRecoverable()) {
796 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
797 }
798 }
799 return StatusCode::FAILURE;
800}
801
802//______________________________________________________________________________
803void AthenaPoolSharedIOCnvSvc::handle(const Incident& incident) {
804 if (incident.type() == "StoreCleared" && m_outputStreamingTool->isClient() && !m_parallelCompression) {
805 m_outputStreamingTool->lockObject("release").ignore();
806 }
807}
808//______________________________________________________________________________
809AthenaPoolSharedIOCnvSvc::AthenaPoolSharedIOCnvSvc(const std::string& name, ISvcLocator* pSvcLocator) :
810 base_class(name, pSvcLocator) {
811}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the AthenaPoolSharedIOCnvSvc class.
This file contains the class definition for the AuxDiscoverySvc class.
This file contains the class definition for the DataHeader and DataHeaderElement classes.
uint32_t CLID
The Class ID type.
This file contains the class definition for the IAthMetaDataSvc class.
Interface to an output stream tool.
static Double_t sc
This file contains the class definition for the Placement class (migrated from POOL).
TTypeAdapter RootType
Definition RootType.h:211
std::map< std::string, double > instance
This file contains the class definition for the TokenAddress class.
This file contains the class definition for the Token class (migrated from POOL).
#define ATLAS_THREAD_SAFE
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual StatusCode finalize() override
Required of all Gaudi Services.
AthenaPoolSharedIOCnvSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
virtual void setObjPtr(void *&obj, const Token *token) override
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
virtual StatusCode makeServer(int num) override
Make this a server.
virtual void handle(const Incident &incident) override
Implementation of IIncidentListener: Handle for EndEvent incidence.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
virtual StatusCode readData() override
Read the next data object.
virtual StatusCode commitCatalog() override
Commit Catalog.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
Gaudi::Property< int > m_streamingTechnology
Use Streaming for selected technologies only.
virtual StatusCode initialize() override
Required of all Gaudi Services.
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
virtual StatusCode makeClient(int num) override
Make this a client.
This class provides the interface between AthenaPoolCnvSvc and AuxStore classes.
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
@ kInputStream
Definition IPoolSvc.h:39
This class holds all the necessary information to guide the writing of an object in a physical place.
Definition Placement.h:19
const std::string & auxString() const
Access auxiliary string.
Definition Placement.h:40
const std::string & containerName() const
Access container name.
Definition Placement.h:32
Placement & setFileName(const std::string &fileName)
Set file name.
Definition Placement.h:30
const std::string toString() const
Retrieve the string representation of the placement.
Definition Placement.cxx:15
const std::string & fileName() const
Access file name.
Definition Placement.h:28
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
Definition Placement.cxx:28
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition RootType.cxx:586
Bool_t IsFundamental() const
Definition RootType.cxx:731
std::string Name(unsigned int mod=Reflex::SCOPED) const
Definition RootType.cxx:612
void Destruct(void *place) const
Definition RootType.cxx:677
size_t SizeOf() const
Definition RootType.cxx:765
This class provides a Generic Transient Address for POOL tokens.
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition Token.h:21
const std::string & auxString() const
Access auxiliary string.
Definition Token.h:91
Token & setCont(const std::string &cnt)
Set container name.
Definition Token.h:71
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition Token.h:93
Token & setDb(const Guid &db)
Set database name.
Definition Token.h:66
const std::string & contID() const
Access container identifier.
Definition Token.h:69
const Guid & classID() const
Access database identifier.
Definition Token.h:73
Token & setClassID(const Guid &cl_id)
Access database identifier.
Definition Token.h:75
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:134
int technology() const
Access technology type.
Definition Token.h:77
Token & setOid(const OID_t &oid)
Set object identifier.
Definition Token.h:85
const OID_t & oid() const
Access object identifier.
Definition Token.h:81
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition Token.cxx:147
const Guid & dbID() const
Access database identifier.
Definition Token.h:64
static const TypeH forGuid(const Guid &info)
Access classes by Guid.
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.
void commit()
Save catalog to file.
static const DbType POOL_StorageType
Definition DbType.h:98