ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaPoolSharedIOCnvSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
9
11
12#include "GaudiKernel/ClassID.h"
13#include "GaudiKernel/FileIncident.h"
14#include "GaudiKernel/GenericAddress.h"
15#include "GaudiKernel/IIncidentSvc.h"
16#include "GaudiKernel/IOpaqueAddress.h"
17
25
26#include "StorageSvc/DbReflex.h"
27
28#include "AuxDiscoverySvc.h"
29
30#include <algorithm>
31#include <iomanip>
32#include <sstream>
33#include <format>
34
35//______________________________________________________________________________
36// Initialize the service.
38 // Retrieve InputStreamingTool (if configured)
39 if (!m_inputStreamingTool.empty()) {
41 }
42 // Retrieve OutputStreamingTool (if configured)
43 if (!m_outputStreamingTool.empty()) {
45 if (m_makeStreamingToolClient.value() == -1) {
46 // Initialize AthenaRootSharedWriter
47 ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name());
48 ATH_CHECK(arswsvc.retrieve());
49 }
50 // Put PoolSvc into share mode to avoid duplicating catalog.
51 getPoolSvc()->setShareMode(true);
52 }
53 if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) {
54 // Retrieve AthenaSerializeSvc
55 ATH_CHECK(m_serializeSvc.retrieve());
56 }
57 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
58 long int pri = 1000;
59 if (!m_outputStreamingTool.empty()) {
60 incSvc->addListener(this, "StoreCleared", pri);
61 ATH_MSG_DEBUG("Subscribed to StoreCleared");
62 }
63 return this->AthenaPoolCnvSvc::initialize();
64}
65//______________________________________________________________________________
67 // Release AthenaSerializeSvc
68 if (!m_serializeSvc.empty()) {
69 if (!m_serializeSvc.release().isSuccess()) {
70 ATH_MSG_WARNING("Cannot release AthenaSerializeSvc.");
71 }
72 }
73 // Release OutputStreamingTool (if configured)
74 if (!m_outputStreamingTool.empty()) {
75 if (!m_outputStreamingTool.release().isSuccess()) {
76 ATH_MSG_WARNING("Cannot release Output AthenaIPCTool.");
77 }
78 }
79 // Release InputStreamingTool (if configured)
80 if (!m_inputStreamingTool.empty()) {
81 if (!m_inputStreamingTool.release().isSuccess()) {
82 ATH_MSG_WARNING("Cannot release Input AthenaIPCTool.");
83 }
84 }
85 return this->AthenaPoolCnvSvc::finalize();
86}
87//______________________________________________________________________________
88StatusCode AthenaPoolSharedIOCnvSvc::connectOutput(const std::string& outputConnectionSpec,
89 const std::string& /*openMode*/) {
90 return(connectOutput(outputConnectionSpec));
91}
92//______________________________________________________________________________
93StatusCode AthenaPoolSharedIOCnvSvc::connectOutput(const std::string& outputConnectionSpec) {
94// This is called before DataObjects are being converted.
95 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
96 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
97 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
98 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
99 return(StatusCode::FAILURE);
100 }
101 }
102 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
103 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
104 return(StatusCode::SUCCESS);
105 }
106 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
107 if (m_parallelCompression && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) {
108 ATH_MSG_DEBUG(std::format("connectOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
109 return(StatusCode::SUCCESS);
110 }
112 ATH_MSG_DEBUG("connectOutput SKIPPED for expired server.");
113 return(StatusCode::SUCCESS);
114 }
115 }
117 outputConnection += m_streamPortString.value();
118 }
119 std::size_t apend = outputConnectionSpec.find('[');
120 if (apend != std::string::npos) {
121 outputConnection += outputConnectionSpec.substr(apend);
122 }
123 return AthenaPoolCnvSvc::connectOutput(outputConnection);
124}
125
126//______________________________________________________________________________
127StatusCode AthenaPoolSharedIOCnvSvc::commitOutput(const std::string& outputConnectionSpec, bool doCommit) {
128 // This is called after all DataObjects are converted.
129 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
130 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
131 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
132 m_outputStreamingTool->lockObject("wait").ignore();
133 if (!this->cleanUp(outputConnection).isSuccess()) {
134 ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
135 return(StatusCode::FAILURE);
136 }
137 return(StatusCode::SUCCESS);
138 }
139 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
140 ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server.");
141 return(StatusCode::SUCCESS);
142 }
143 std::map<void*, RootType> commitCache;
144 std::string fileName;
145 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && m_outputStreamingTool->isServer() && m_streamServerActive) {
146 // Clear object to get Placements for all objects in a Stream
147 const char* placementStr = nullptr;
148 int num = -1;
149 StatusCode sc = m_outputStreamingTool->clearObject(&placementStr, num);
150 if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
151 const char * matchedChars = strstr(placementStr, "[FILE=");
152 if (!matchedChars){
153 ATH_MSG_ERROR(std::format("No matching filename in {}", placementStr));
154 return abortSharedWrClients(num);
155 }
156 fileName = matchedChars;
157 fileName = fileName.substr(6, fileName.find(']') - 6);
158 if (!this->connectOutput(fileName).isSuccess()) {
159 ATH_MSG_ERROR(std::format("Failed to connectOutput for {}", fileName));
160 return abortSharedWrClients(num);
161 }
162 IConverter* DHcnv = converter(ClassID_traits<DataHeader>::ID());
163 bool dataHeaderSeen = false;
164 std::string dataHeaderID;
165 while (num > 0) {
166 std::string objName = "ALL";
167 if (useDetailChronoStat()) {
168 objName = placementStr; //FIXME, better descriptor
169 }
170 // StopWatch listens from here until the end of this current scope
171 {
172 PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
173 std::string_view pStr = placementStr;
174 std::string::size_type cpos = pStr.find ("[CONT=");
175 if (cpos == std::string::npos) {
176 ATH_MSG_ERROR(std::format("No CONT field in placement string: {}", pStr));
177 return StatusCode::FAILURE;
178 }
179 std::string tokenStr (pStr.substr(0, cpos));
180 std::string contName (pStr.substr(cpos, std::string::npos));
181 std::string::size_type cl1 = contName.find(']');
182 if (cl1 == std::string::npos) {
183 ATH_MSG_ERROR(std::format("Missing close bracket after CONT field in placement string: {}", pStr));
184 return StatusCode::FAILURE;
185 }
186 tokenStr.append(contName, cl1 + 1);
187 contName = contName.substr(6, cl1 - 6);
188
189 std::string::size_type ppos = pStr.find ("[PNAME=");
190 if (ppos == std::string::npos) {
191 ATH_MSG_ERROR(std::format("No PNAME field in placement string: {}", pStr));
192 return StatusCode::FAILURE;
193 }
194 std::string className (pStr.substr(ppos, std::string::npos));
195 std::string::size_type cl2 = className.find(']');
196 if (cl2 == std::string::npos) {
197 ATH_MSG_ERROR(std::format("Missing close bracket after PNAME field in placement string: {}", pStr));
198 return StatusCode::FAILURE;
199 }
200 className = className.substr(7, cl2 - 7);
201 RootType classDesc = RootType::ByNameNoQuiet(className);
202 void* obj = nullptr;
203 const std::string numStr = std::to_string(num);
204 std::string::size_type len = m_metadataContainerProp.value().size();
205 bool foundContainer = false;
206 std::size_t opPos = contName.find('(');
207 if (contName.compare(0, opPos, m_metadataContainerProp.value()) == 0) {
208 foundContainer = true;
209 } else {
210 for (const auto& item: m_metadataContainersAug.value()) {
211 if (contName.compare(0, opPos, item) == 0){
212 foundContainer = true;
213 len = item.size();
214 break;
215 }
216 }
217 }
218 if (len > 0 && foundContainer && contName[len] == '(' ) {
219 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
220 // For Metadata, before moving to next client, fire file incidents
221 if (m_metadataClient != num) {
222 if (m_metadataClient != 0) {
223 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
224 FileIncident beginInputIncident(name(), "BeginInputMemFile", memName);
225 incSvc->fireIncident(beginInputIncident);
226 FileIncident endInputIncident(name(), "EndInputMemFile", std::move(memName));
227 incSvc->fireIncident(endInputIncident);
228 }
229 m_metadataClient = num;
230 }
231 // Retrieve MetaDataSvc
232 ServiceHandle<IAthMetaDataSvc> metadataSvc("MetaDataSvc", name());
233 ATH_CHECK(metadataSvc.retrieve());
234 sc = metadataSvc->shmProxy(std::format("{}[NUM={}]", pStr, numStr));
235 if (sc.isRecoverable()) {
236 ATH_MSG_WARNING("MetaDataSvc::shmProxy() no proxy added.");
237 } else if (sc.isFailure()) {
238 ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!");
239 return abortSharedWrClients(num);
240 }
241 } else {
242 Token readToken;
243 readToken.setOid(Token::OID_t(num, 0));
244 readToken.setAuxString("[PNAME=" + className + "]");
245 this->setObjPtr(obj, &readToken); // Pull/read Object out of shared memory
246 if (len == 0 || contName.compare(0, len, m_metadataContainerProp.value()) != 0) {
247 // Write object
248 if( m_oneDataHeaderForm.value() ) {
249 auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
250 if( className == "DataHeaderForm_p6" ) {
251 // Pass DHForms to the converter for later writing in the correct order - do not write it now
253 "", placementWithSwn());
254 DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
255 tokenStr = "";
256 } else {
257 Placement placement;
258 placement.fromString(placementStr);
259 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
260 if (token == nullptr) {
261 ATH_MSG_ERROR("Failed to write Data for: " << className);
262 return abortSharedWrClients(num);
263 }
264 tokenStr = token->toString();
265 }
266 if( className == "DataHeader_p6" ) {
267 // Found DataHeader - call the converter to update DHForm Ref
269 tokenStr, placementWithSwn());
270 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
271 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
272 return abortSharedWrClients(num);
273 }
274 } else
275 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
276 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
277 }
278 placementStr = nullptr;
279 } else {
280 // Multiple shared DataHeaderForms
281 Placement placement;
282 placement.fromString(placementStr); placementStr = nullptr;
283 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
284 if (token == nullptr) {
285 ATH_MSG_ERROR("Failed to write Data for: " << className);
286 return abortSharedWrClients(num);
287 }
288 tokenStr = token->toString();
289 if (className == "DataHeader_p6") {
290 // Found DataHeader
292 tokenStr, placement.auxString());
293 // call DH converter to add the ref to DHForm (stored earlier) and to itself
294 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
295 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
296 return abortSharedWrClients(num);
297 }
298 dataHeaderSeen = true;
299 // This dataHeaderID is used in DataHeaderCnv to index the DataHeaderForm cache.
300 // It must be unique per worker per stream so that we have a correct DataHeader(Form) association.
301 // This is achieved by building it as "CONTID/WORKERID/DBID".
302 // CONTID, e.g., POOLContainer(DataHeader), allows us to distinguish data and metadata headers,
303 // WORKERID allows us to distinguish AthenaMP workers,
304 // and DBID allows us to distinguish streams.
305 dataHeaderID = std::format("{}/{}/{}", token->contID(), numStr, token->dbID().toString());
306 } else if (dataHeaderSeen) {
307 dataHeaderSeen = false;
308 // next object after DataHeader - may be a DataHeaderForm
309 // in any case we need to call the DH converter to update the DHForm Ref
310 if (className == "DataHeaderForm_p6") {
311 // Tell DataHeaderCnv that it should use a new DHForm
313 tokenStr, dataHeaderID);
314 if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
315 ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr);
316 return abortSharedWrClients(num);
317 }
318 } else {
319 // Tell DataHeaderCnv that it should use the old DHForm
320 GenericAddress address(0, 0, "", dataHeaderID);
321 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
322 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
323 return abortSharedWrClients(num);
324 }
325 }
326 }
327 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
328 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
329 }
330 }
331 }
332 }
333 // Send Token back to Client
334 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
335 while (sc.isRecoverable()) {
336 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
337 }
338 if (!sc.isSuccess()) {
339 ATH_MSG_ERROR("Failed to lock Data for " << tokenStr);
340 return abortSharedWrClients(-1);
341 }
342 }
343 sc = m_outputStreamingTool->clearObject(&placementStr, num);
344 while (sc.isRecoverable()) {
345 sc = m_outputStreamingTool->clearObject(&placementStr, num);
346 }
347 if (sc.isFailure()) {
348 // no more clients, break the loop and exit
349 num = -1;
350 }
351 }
352 if (dataHeaderSeen) {
353 // DataHeader was the last object, need to tell the converter there is no DHForm coming
354 GenericAddress address(0, 0, "", std::move(dataHeaderID));
355 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
356 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
357 return abortSharedWrClients(-1);
358 }
359 }
360 placementStr = nullptr;
361 } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) {
362 return(StatusCode::RECOVERABLE);
363 } else if (sc.isRecoverable() || num == -1) {
364 return(StatusCode::RECOVERABLE);
365 }
366 if (sc.isFailure() || fileName.empty()) {
367 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
368 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
369 FileIncident beginInputIncident(name(), "BeginInputMemFile", memName);
370 incSvc->fireIncident(beginInputIncident);
371 FileIncident endInputIncident(name(), "EndInputMemFile", std::move(memName));
372 incSvc->fireIncident(endInputIncident);
373 if (sc.isFailure()) {
374 ATH_MSG_INFO("All SharedWriter clients stopped - exiting");
375 } else {
376 ATH_MSG_INFO("Failed to get Data for client: " << num);
377 }
378 return(StatusCode::FAILURE);
379 }
380 }
381 if (m_parallelCompression && !fileName.empty()) {
382 ATH_MSG_DEBUG(std::format("commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
383 return(StatusCode::SUCCESS);
384 }
385 if (outputConnection.empty()) {
386 outputConnection = std::move(fileName);
387 } else {
388 outputConnection = outputConnectionSpec;
390 outputConnection += m_streamPortString.value();
391 }
392 }
393 std::size_t merge = outputConnection.find("?pmerge="); // Used to remove trailing TMemFile
394 const std::string baseOutputConnection = outputConnection.substr(0, merge);
395 m_fileCommitCounter[baseOutputConnection]++;
397 m_fileFlushSetting.value().contains(baseOutputConnection) &&
398 m_fileFlushSetting[baseOutputConnection] > 0 &&
399 m_fileCommitCounter[baseOutputConnection] % m_fileFlushSetting[baseOutputConnection] == 0) {
400 doCommit = true;
401 ATH_MSG_DEBUG("commitOutput sending data.");
402 }
403 StatusCode status = AthenaPoolCnvSvc::commitOutput(outputConnection, doCommit);
404 for (auto& [ptr, rootType] : commitCache) {
405 rootType.Destruct(ptr);
406 }
407 return(status);
408}
409
410//______________________________________________________________________________
411StatusCode AthenaPoolSharedIOCnvSvc::disconnectOutput(const std::string& outputConnectionSpec) {
412 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
413 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
414 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
415 return(StatusCode::SUCCESS);
416 }
417 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
419 m_streamServerActive = false;
420 ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server.");
421 return(StatusCode::SUCCESS);
422 } else {
423 m_streamServerActive = false;
424 }
425 ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server.");
426 }
428 outputConnection += m_streamPortString.value();
429 }
430 return AthenaPoolCnvSvc::disconnectOutput(outputConnectionSpec + m_streamPortString.value());
431}
432
433//______________________________________________________________________________
434Token* AthenaPoolSharedIOCnvSvc::registerForWrite(Placement* placement, const void* obj, const RootType& classDesc) {
435 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
436 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
437 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
438 return(nullptr);
439 }
440 }
441 Token* token = nullptr;
442 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
443 && (!m_parallelCompression || placement->containerName().compare(0, m_metadataContainerProp.value().size(), m_metadataContainerProp.value()) == 0)) {
444 // Lock object
445 std::string placementStr = placement->toString();
446 placementStr += "[PNAME=";
447 placementStr += classDesc.Name();
448 placementStr += ']';
449 ATH_MSG_VERBOSE("Requesting write object for: " << placementStr);
450 StatusCode sc = m_outputStreamingTool->lockObject(placementStr.c_str());
451 while (sc.isRecoverable()) {
452 //usleep(100);
453 sc = m_outputStreamingTool->lockObject(placementStr.c_str());
454 }
455 if (!sc.isSuccess()) {
456 ATH_MSG_ERROR("Failed to lock Data for " << placementStr);
457 return(nullptr);
458 }
459 // Serialize object via ROOT
460 const void* buffer = nullptr;
461 std::size_t nbytes = 0;
462 bool own = true;
463 if (classDesc.Name() == "Token") {
464 nbytes = strlen(static_cast<const char*>(obj)) + 1;
465 buffer = obj;
466 own = false;
467 } else if (classDesc.IsFundamental()) {
468 nbytes = classDesc.SizeOf();
469 buffer = obj;
470 own = false;
471 } else {
472 buffer = m_serializeSvc->serialize(obj, classDesc, nbytes);
473 }
474 // Share object
475 sc = m_outputStreamingTool->putObject(buffer, nbytes);
476 while (sc.isRecoverable()) {
477 //usleep(100);
478 sc = m_outputStreamingTool->putObject(buffer, nbytes);
479 }
480 if (own) { delete [] static_cast<const char*>(buffer); }
481 buffer = nullptr;
482 if (!sc.isSuccess()) {
483 ATH_MSG_ERROR("Could not share object for: " << placementStr);
484 m_outputStreamingTool->putObject(nullptr, 0).ignore();
485 return(nullptr);
486 }
487 AuxDiscoverySvc auxDiscover;
488 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) {
489 ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr);
490 m_outputStreamingTool->putObject(nullptr, 0).ignore();
491 return(nullptr);
492 }
493 if (!m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) {
494 ATH_MSG_ERROR("Failed to put Data for " << placementStr);
495 return(nullptr);
496 }
497 // Get Token back from Server
498 const char* tokenStr = nullptr;
499 int num = -1;
500 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
501 while (sc.isRecoverable()) {
502 //usleep(100);
503 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
504 }
505 if (!sc.isSuccess()) {
506 ATH_MSG_ERROR("Failed to get Token");
507 return(nullptr);
508 }
509 if (!strcmp(tokenStr, "ABORT")) {
510 ATH_MSG_ERROR("Writer requested ABORT");
511 // tell the server we are leaving
512 m_outputStreamingTool->stop().ignore();
513 return nullptr;
514 }
515 Token* tempToken = new Token();
516 tempToken->fromString(tokenStr); tokenStr = nullptr;
517 tempToken->setClassID(pool::DbReflex::guid(classDesc));
518 token = tempToken; tempToken = nullptr;
519// Client Write Request
520 } else {
521 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
522 ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString());
523 Token* tempToken = new Token();
524 tempToken->setClassID(pool::DbReflex::guid(classDesc));
525 token = tempToken; tempToken = nullptr;
526 } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_streamServerActive) {
527 ATH_MSG_DEBUG("Requested write object for: " << placement->toString());
528 token = getPoolSvc()->registerForWrite(placement, obj, classDesc);
529 } else {
531 placement->setFileName(placement->fileName() + m_streamPortString.value());
532 }
533 token = AthenaPoolCnvSvc::registerForWrite(placement, obj, classDesc);
534 }
535 }
536 return(token);
537}
538//______________________________________________________________________________
539void AthenaPoolSharedIOCnvSvc::setObjPtr(void*& obj, const Token* token) {
540 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
541 if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
542 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
543 }
544 }
545 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isServer()) {
546 if (token->dbID() == Guid::null()) {
547 int num = token->oid().first;
548 // Get object from SHM
549 void* buffer = nullptr;
550 std::size_t nbytes = 0;
551 StatusCode sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
552 while (sc.isRecoverable()) {
553 //usleep(100);
554 sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
555 }
556 if (!sc.isSuccess()) {
557 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
558 obj = nullptr;
559 } else {
560 ATH_MSG_DEBUG("Server deserializing " << token->toString());
561 if (token->classID() != Guid::null()) {
562 // Deserialize object
563 RootType cltype(pool::DbReflex::forGuid(token->classID()));
564 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
565 } else {
566 // Deserialize object
567 std::string className = token->auxString();
568 className = className.substr(className.find("[PNAME="));
569 className = className.substr(7, className.find(']') - 7);
570 RootType cltype(RootType::ByNameNoQuiet(className));
571 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
572 }
573 AuxDiscoverySvc auxDiscover;
574 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, num).isSuccess()) {
575 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
576 obj = nullptr;
577 }
578 }
579 }
580 }
581 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient() && (m_streamingTechnology.value() < 0 || token->technology() == m_streamingTechnology.value())) {
582 ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
583 if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
584 ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
585 obj = nullptr;
586 } else {
587 void* buffer = nullptr;
588 std::size_t nbytes = 0;
589 StatusCode sc = StatusCode::FAILURE;
590 // StopWatch listens from here until the end of this current scope
591 {
592 PMonUtils::BasicStopWatch stopWatch("gObj_ALL", this->m_chronoMap);
593 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
594 while (sc.isRecoverable()) {
595 // sleep
596 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
597 }
598 }
599 if (!sc.isSuccess()) {
600 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
601 obj = nullptr;
602 } else {
603 obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr;
604 AuxDiscoverySvc auxDiscover;
605 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_inputStreamingTool.get(), obj).isSuccess()) {
606 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
607 obj = nullptr;
608 }
609 }
610 }
611 } else if (token->dbID() != Guid::null()) {
612 AthenaPoolCnvSvc::setObjPtr(obj, token);
613 }
614}
615//______________________________________________________________________________
617 const CLID& clid,
618 const std::string* par,
619 const unsigned long* ip,
620 IOpaqueAddress*& refpAddress) {
621 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
623 }
624 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) {
625 Token addressToken;
626 addressToken.setDb(par[0].substr(4));
627 addressToken.setCont(par[1]);
628 addressToken.setOid(Token::OID_t(ip[0], ip[1]));
629 ATH_CHECK(m_inputStreamingTool->lockObject(addressToken.toString().c_str()));
630 void* buffer = nullptr;
631 std::size_t nbytes = 0;
632 StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes);
633 while (sc.isRecoverable()) {
634 // sleep
635 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
636 }
637 if (!sc.isSuccess()) {
638 ATH_MSG_WARNING("Failed to get Address Token: " << addressToken.toString());
639 return(StatusCode::FAILURE);
640 }
641 auto token = std::make_unique<Token>();
642 token->fromString(static_cast<const char*>(buffer)); buffer = nullptr;
643 if (token->classID() == Guid::null()) {
644 token.reset();
645 }
646 m_inputStreamingTool->getObject(&buffer, nbytes).ignore();
647 if (token) {
648 refpAddress = new TokenAddress(pool::POOL_StorageType.type(), clid, "", par[1], IPoolSvc::kInputStream, std::move(token));
649 return(StatusCode::SUCCESS);
650 }
651 else {
652 return(StatusCode::RECOVERABLE);
653 }
654 } else {
655 return AthenaPoolCnvSvc::createAddress(svcType, clid, par, ip, refpAddress);
656 }
657}
658//______________________________________________________________________________
660 const CLID& clid,
661 const std::string& refAddress,
662 IOpaqueAddress*& refpAddress) {
663 return AthenaPoolCnvSvc::createAddress(svcType, clid, refAddress, refpAddress);
664}
665//______________________________________________________________________________
666StatusCode AthenaPoolSharedIOCnvSvc::cleanUp(const std::string& connection) {
667 auto pos = connection.find("?pmerge=");
668 std::string conn = (pos == std::string::npos) ? connection : connection.substr(0, pos);
669 return AthenaPoolCnvSvc::cleanUp(conn);
670}
671//______________________________________________________________________________
673 if (num < 0) {
674 num = -num;
676 num = num % 1024;
677 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer()) {
678 ATH_MSG_DEBUG(std::format("makeServer: {} = {}", m_outputStreamingTool.name(), num));
679 ATH_MSG_DEBUG(std::format("makeServer: Calling shared memory tool with port suffix {}", m_streamPortString.value()));
680 const std::string streamPortSuffix = m_streamPortString.value();
681 if (m_outputStreamingTool->makeServer(num, streamPortSuffix).isFailure()) {
682 ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed");
683 return(StatusCode::FAILURE);
684 }
685 // Disable PersistencySvc per output file mode, for SharedWriter Server
686 m_persSvcPerOutput.setValue(false);
687 return(StatusCode::SUCCESS);
688 }
689 return(StatusCode::RECOVERABLE);
690 }
691 if (m_inputStreamingTool.empty()) {
692 return(StatusCode::RECOVERABLE);
693 }
694 ATH_MSG_DEBUG("makeServer: " << m_inputStreamingTool << " = " << num);
695 return(m_inputStreamingTool->makeServer(num, ""));
696}
697//________________________________________________________________________________
699 if (!m_outputStreamingTool.empty()) {
700 ATH_MSG_DEBUG("makeClient: " << m_outputStreamingTool << " = " << num);
701 std::string streamPortSuffix;
702 if (m_outputStreamingTool->makeClient(num, streamPortSuffix).isFailure()) {
703 ATH_MSG_ERROR("makeClient: " << m_outputStreamingTool << " failed");
704 return(StatusCode::FAILURE);
705 } else if (m_streamPortString.value().find("localhost:0") != std::string::npos) {
706 // We don't seem to use a dedicated port per stream so doing this for the first client is probably OK
707 ATH_MSG_DEBUG("makeClient: Setting conversion service port suffix to " << streamPortSuffix);
708 m_streamPortString.setValue(streamPortSuffix);
709 }
710 }
711 if (m_inputStreamingTool.empty()) {
712 return(StatusCode::SUCCESS);
713 }
714 ATH_MSG_DEBUG("makeClient: " << m_inputStreamingTool << " = " << num);
715 std::string dummyStr;
716 return(m_inputStreamingTool->makeClient(num, dummyStr));
717}
718//________________________________________________________________________________
720 if (m_inputStreamingTool.empty()) {
721 return(StatusCode::FAILURE);
722 }
723 const char* tokenStr = nullptr;
724 int num = -1;
725 StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
726 if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
727 ATH_MSG_DEBUG("readData: " << tokenStr << ", for client: " << num);
728 } else {
729 return(sc);
730 }
731 // Read object instance via POOL/ROOT
732 void* instance = nullptr;
733 Token token;
734 token.fromString(tokenStr); tokenStr = nullptr;
735 if (token.classID() != Guid::null()) {
736 std::string objName = "ALL";
737 if (useDetailChronoStat()) {
738 objName = token.classID().toString();
739 }
740 // StopWatch listens from here until the end of this current scope
741 PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, this->m_chronoMap);
742 this->setObjPtr(instance, &token);
743 // Serialize object via ROOT
745 void* buffer = nullptr;
746 std::size_t nbytes = 0;
747 buffer = m_serializeSvc->serialize(instance, cltype, nbytes);
748 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
749 while (sc.isRecoverable()) {
750 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
751 }
752 delete [] static_cast<char*>(buffer); buffer = nullptr;
753 if (!sc.isSuccess()) {
754 ATH_MSG_ERROR("Could not share object for: " << token.toString());
755 return(StatusCode::FAILURE);
756 }
757 AuxDiscoverySvc auxDiscover;
758 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) {
759 ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString());
760 return(StatusCode::FAILURE);
761 }
762 cltype.Destruct(instance); instance = nullptr;
763 if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
764 ATH_MSG_ERROR("Could not share object for: " << token.toString());
765 return(StatusCode::FAILURE);
766 }
767 } else if (token.dbID() != Guid::null()) {
768 std::string returnToken;
769 const Token* metadataToken = getPoolSvc()->getToken("FID:" + token.dbID().toString(), token.contID(), token.oid().first);
770 if (metadataToken != nullptr) {
771 returnToken = metadataToken->toString();
772 } else {
773 returnToken = token.toString();
774 }
775 delete metadataToken; metadataToken = nullptr;
776 // Share token
777 sc = m_inputStreamingTool->putObject(returnToken.c_str(), returnToken.size() + 1, num);
778 if (!sc.isSuccess() || !m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
779 ATH_MSG_ERROR("Could not share token for: " << token.toString());
780 return(StatusCode::FAILURE);
781 }
782 } else {
783 return(StatusCode::RECOVERABLE);
784 }
785 return(StatusCode::SUCCESS);
786}
787
788//________________________________________________________________________________
790 pool::IFileCatalog* catalog ATLAS_THREAD_SAFE = // This is on the SharedWriter, after mother process finishes events
791 const_cast<pool::IFileCatalog*>(getPoolSvc()->catalog());
792 catalog->commit();
793 catalog->start();
794 return(StatusCode::SUCCESS);
795}
796
797//______________________________________________________________________________
799{
800 ATH_MSG_ERROR("Sending ABORT to clients");
801 // the master process will kill this process once workers abort
802 // but it could be a time-limited loop
803 StatusCode sc = StatusCode::SUCCESS;
804 while (sc.isSuccess()) {
805 if (client_n >= 0) {
806 sc = m_outputStreamingTool->lockObject("ABORT", client_n);
807 }
808 const char* dummy;
809 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
810 while (sc.isRecoverable()) {
811 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
812 }
813 }
814 return StatusCode::FAILURE;
815}
816
817//______________________________________________________________________________
818void AthenaPoolSharedIOCnvSvc::handle(const Incident& incident) {
819 if (incident.type() == "StoreCleared" && m_outputStreamingTool->isClient() && !m_parallelCompression) {
820 m_outputStreamingTool->lockObject("release").ignore();
821 }
822}
823//______________________________________________________________________________
824AthenaPoolSharedIOCnvSvc::AthenaPoolSharedIOCnvSvc(const std::string& name, ISvcLocator* pSvcLocator) :
825 base_class(name, pSvcLocator) {
826}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the AthenaPoolSharedIOCnvSvc class.
This file contains the class definition for the AuxDiscoverySvc class.
This file contains the class definition for the DataHeader and DataHeaderElement classes.
uint32_t CLID
The Class ID type.
This file contains the class definition for the IAthMetaDataSvc class.
Interface to an output stream tool.
static Double_t sc
This file contains the class definition for the Placement class (migrated from POOL).
TTypeAdapter RootType
Definition RootType.h:211
std::map< std::string, double > instance
This file contains the class definition for the TokenAddress class.
This file contains the class definition for the Token class (migrated from POOL).
#define ATLAS_THREAD_SAFE
std::map< std::string, int > m_fileCommitCounter
Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual StatusCode finalize() override
Required of all Gaudi Services.
AthenaPoolSharedIOCnvSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual void setObjPtr(void *&obj, const Token *token) override
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
virtual StatusCode makeServer(int num) override
Make this a server.
virtual void handle(const Incident &incident) override
Implementation of IIncidentListener: Handle for EndEvent incidence.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
virtual StatusCode readData() override
Read the next data object.
virtual StatusCode commitCatalog() override
Commit Catalog.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting
Gaudi::Property< int > m_streamingTechnology
Use Streaming for selected technologies only.
virtual StatusCode initialize() override
Required of all Gaudi Services.
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
virtual StatusCode makeClient(int num) override
Make this a client.
This class provides the interface between AthenaPoolCnvSvc and AuxStore classes.
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
@ kInputStream
Definition IPoolSvc.h:40
This class holds all the necessary information to guide the writing of an object in a physical place.
Definition Placement.h:19
const std::string & auxString() const
Access auxiliary string.
Definition Placement.h:40
const std::string & containerName() const
Access container name.
Definition Placement.h:32
Placement & setFileName(const std::string &fileName)
Set file name.
Definition Placement.h:30
const std::string toString() const
Retrieve the string representation of the placement.
Definition Placement.cxx:15
const std::string & fileName() const
Access file name.
Definition Placement.h:28
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
Definition Placement.cxx:28
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition RootType.cxx:586
Bool_t IsFundamental() const
Definition RootType.cxx:731
std::string Name(unsigned int mod=Reflex::SCOPED) const
Definition RootType.cxx:612
void Destruct(void *place) const
Definition RootType.cxx:677
size_t SizeOf() const
Definition RootType.cxx:765
This class provides a Generic Transient Address for POOL tokens.
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition Token.h:21
const std::string & auxString() const
Access auxiliary string.
Definition Token.h:91
Token & setCont(const std::string &cnt)
Set container name.
Definition Token.h:71
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition Token.h:93
Token & setDb(const Guid &db)
Set database name.
Definition Token.h:66
const std::string & contID() const
Access container identifier.
Definition Token.h:69
const Guid & classID() const
Access database identifier.
Definition Token.h:73
Token & setClassID(const Guid &cl_id)
Access database identifier.
Definition Token.h:75
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:134
int technology() const
Access technology type.
Definition Token.h:77
Token & setOid(const OID_t &oid)
Set object identifier.
Definition Token.h:85
const OID_t & oid() const
Access object identifier.
Definition Token.h:81
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition Token.cxx:147
const Guid & dbID() const
Access database identifier.
Definition Token.h:64
static const TypeH forGuid(const Guid &info)
Access classes by Guid.
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.
void commit()
Save catalog to file.
Definition merge.py:1
static const DbType POOL_StorageType
Definition DbType.h:84