ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaPoolSharedIOCnvSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
9
11
12#include "GaudiKernel/ClassID.h"
13#include "GaudiKernel/FileIncident.h"
14#include "GaudiKernel/GenericAddress.h"
15#include "GaudiKernel/IIncidentSvc.h"
16#include "GaudiKernel/IOpaqueAddress.h"
17
25
26#include "StorageSvc/DbReflex.h"
27
28#include "AuxDiscoverySvc.h"
29
30#include <algorithm>
31#include <iomanip>
32#include <sstream>
33#include <format>
34
35//______________________________________________________________________________
36// Initialize the service.
38 // Retrieve InputStreamingTool (if configured)
39 if (!m_inputStreamingTool.empty()) {
41 }
42 // Retrieve OutputStreamingTool (if configured)
43 if (!m_outputStreamingTool.empty()) {
45 if (m_makeStreamingToolClient.value() == -1) {
46 // Initialize AthenaRootSharedWriter
47 ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name());
48 ATH_CHECK(arswsvc.retrieve());
49 }
50 // Put PoolSvc into share mode to avoid duplicating catalog.
51 getPoolSvc()->setShareMode(true);
52 }
53 if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) {
54 // Retrieve AthenaSerializeSvc
55 ATH_CHECK(m_serializeSvc.retrieve());
56 }
57 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
58 long int pri = 1000;
59 if (!m_outputStreamingTool.empty()) {
60 incSvc->addListener(this, "StoreCleared", pri);
61 ATH_MSG_DEBUG("Subscribed to StoreCleared");
62 }
63 return this->AthenaPoolCnvSvc::initialize();
64}
65//______________________________________________________________________________
67 // Release AthenaSerializeSvc
68 if (!m_serializeSvc.empty()) {
69 if (!m_serializeSvc.release().isSuccess()) {
70 ATH_MSG_WARNING("Cannot release AthenaSerializeSvc.");
71 }
72 }
73 // Release OutputStreamingTool (if configured)
74 if (!m_outputStreamingTool.empty()) {
75 if (!m_outputStreamingTool.release().isSuccess()) {
76 ATH_MSG_WARNING("Cannot release Output AthenaIPCTool.");
77 }
78 }
79 // Release InputStreamingTool (if configured)
80 if (!m_inputStreamingTool.empty()) {
81 if (!m_inputStreamingTool.release().isSuccess()) {
82 ATH_MSG_WARNING("Cannot release Input AthenaIPCTool.");
83 }
84 }
85 return this->AthenaPoolCnvSvc::finalize();
86}
87//______________________________________________________________________________
88StatusCode AthenaPoolSharedIOCnvSvc::connectOutput(const std::string& outputConnectionSpec,
89 const std::string& /*openMode*/) {
90 return(connectOutput(outputConnectionSpec));
91}
92//______________________________________________________________________________
93StatusCode AthenaPoolSharedIOCnvSvc::connectOutput(const std::string& outputConnectionSpec) {
94// This is called before DataObjects are being converted.
95 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
96 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
97 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
98 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
99 return(StatusCode::FAILURE);
100 }
101 }
102 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
103 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
104 return(StatusCode::SUCCESS);
105 }
106 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
107 if (m_parallelCompression && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) {
108 ATH_MSG_DEBUG(std::format("connectOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
109 return(StatusCode::SUCCESS);
110 }
112 ATH_MSG_DEBUG("connectOutput SKIPPED for expired server.");
113 return(StatusCode::SUCCESS);
114 }
115 }
117 outputConnection += m_streamPortString.value();
118 }
119 std::size_t apend = outputConnectionSpec.find('[');
120 if (apend != std::string::npos) {
121 outputConnection += outputConnectionSpec.substr(apend);
122 }
123 return AthenaPoolCnvSvc::connectOutput(outputConnection);
124}
125
126//______________________________________________________________________________
127StatusCode AthenaPoolSharedIOCnvSvc::commitOutput(const std::string& outputConnectionSpec, bool doCommit) {
128 // This is called after all DataObjects are converted.
129 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
130 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
131 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
132 m_outputStreamingTool->lockObject("wait").ignore();
133 if (!this->cleanUp(outputConnection).isSuccess()) {
134 ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
135 return(StatusCode::FAILURE);
136 }
137 return(StatusCode::SUCCESS);
138 }
139 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
140 ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server.");
141 return(StatusCode::SUCCESS);
142 }
143 std::map<void*, RootType> commitCache;
144 std::string fileName;
145 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && m_outputStreamingTool->isServer() && m_streamServerActive) {
146 // Clear object to get Placements for all objects in a Stream
147 const char* placementStr = nullptr;
148 int num = -1;
149 StatusCode sc = m_outputStreamingTool->clearObject(&placementStr, num);
150 if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
151 const char * matchedChars = strstr(placementStr, "[FILE=");
152 if (!matchedChars){
153 ATH_MSG_ERROR(std::format("No matching filename in {}", placementStr));
154 return abortSharedWrClients(num);
155 }
156 fileName = matchedChars;
157 fileName = fileName.substr(6, fileName.find(']') - 6);
158 if (!this->connectOutput(fileName).isSuccess()) {
159 ATH_MSG_ERROR(std::format("Failed to connectOutput for {}", fileName));
160 return abortSharedWrClients(num);
161 }
162 IConverter* DHcnv = converter(ClassID_traits<DataHeader>::ID());
163 bool dataHeaderSeen = false;
164 std::string dataHeaderID;
165 while (num > 0) {
166 std::string objName = "ALL";
167 if (useDetailChronoStat()) {
168 objName = placementStr; //FIXME, better descriptor
169 }
170 // StopWatch listens from here until the end of this current scope
171 {
172 PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
173 std::string_view pStr = placementStr;
174 std::string::size_type cpos = pStr.find ("[CONT=");
175 if (cpos == std::string::npos) {
176 ATH_MSG_ERROR(std::format("No CONT field in placement string: {}", pStr));
177 return StatusCode::FAILURE;
178 }
179 std::string tokenStr (pStr.substr(0, cpos));
180 std::string contName (pStr.substr(cpos, std::string::npos));
181 std::string::size_type cl1 = contName.find(']');
182 if (cl1 == std::string::npos) {
183 ATH_MSG_ERROR(std::format("Missing close bracket after CONT field in placement string: {}", pStr));
184 return StatusCode::FAILURE;
185 }
186 tokenStr.append(contName, cl1 + 1);
187 contName = contName.substr(6, cl1 - 6);
188
189 std::string::size_type ppos = pStr.find ("[PNAME=");
190 if (ppos == std::string::npos) {
191 ATH_MSG_ERROR(std::format("No PNAME field in placement string: {}", pStr));
192 return StatusCode::FAILURE;
193 }
194 std::string className (pStr.substr(ppos, std::string::npos));
195 std::string::size_type cl2 = className.find(']');
196 if (cl2 == std::string::npos) {
197 ATH_MSG_ERROR(std::format("Missing close bracket after PNAME field in placement string: {}", pStr));
198 return StatusCode::FAILURE;
199 }
200 className = className.substr(7, cl2 - 7);
201 RootType classDesc = RootType::ByNameNoQuiet(className);
202 void* obj = nullptr;
203 const std::string numStr = std::to_string(num);
204 std::string::size_type len = m_metadataContainerProp.value().size();
205 bool foundContainer = false;
206 std::size_t opPos = contName.find('(');
207 if (contName.compare(0, opPos, m_metadataContainerProp.value()) == 0) {
208 foundContainer = true;
209 } else {
210 for (const auto& item: m_metadataContainersAug.value()) {
211 if (contName.compare(0, opPos, item) == 0){
212 foundContainer = true;
213 len = item.size();
214 break;
215 }
216 }
217 }
218 if (len > 0 && foundContainer && contName[len] == '(' ) {
219 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
220 // For Metadata, before moving to next client, fire file incidents
221 if (m_metadataClient != num) {
222 if (m_metadataClient != 0) {
223 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
224 FileIncident beginInputIncident(name(), "BeginInputMemFile", memName);
225 incSvc->fireIncident(beginInputIncident);
226 FileIncident endInputIncident(name(), "EndInputMemFile", std::move(memName));
227 incSvc->fireIncident(endInputIncident);
228 }
229 m_metadataClient = num;
230 }
231 // Retrieve MetaDataSvc
232 ServiceHandle<IAthMetaDataSvc> metadataSvc("MetaDataSvc", name());
233 ATH_CHECK(metadataSvc.retrieve());
234 sc = metadataSvc->shmProxy(std::format("{}[NUM={}]", pStr, numStr));
235 if (sc.isRecoverable()) {
236 ATH_MSG_WARNING("MetaDataSvc::shmProxy() no proxy added.");
237 } else if (sc.isFailure()) {
238 ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!");
239 return abortSharedWrClients(num);
240 }
241 } else {
242 Token readToken;
243 readToken.setOid(Token::OID_t(num, 0));
244 readToken.setAuxString("[PNAME=" + className + "]");
245 this->setObjPtr(obj, &readToken); // Pull/read Object out of shared memory
246 if (len == 0 || contName.compare(0, len, m_metadataContainerProp.value()) != 0) {
247 // Write object
248 if( m_oneDataHeaderForm.value() ) {
249 auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
250 if( className == "DataHeaderForm_p6" ) {
251 // Pass DHForms to the converter for later writing in the correct order - do not write it now
253 "", placementWithSwn());
254 DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
255 tokenStr = "";
256 } else {
257 Placement placement;
258 placement.fromString(placementStr);
259 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
260 if (token == nullptr) {
261 ATH_MSG_ERROR("Failed to write Data for: " << className);
262 return abortSharedWrClients(num);
263 }
264 tokenStr = token->toString();
265 }
266 if( className == "DataHeader_p6" ) {
267 // Found DataHeader - call the converter to update DHForm Ref
269 tokenStr, placementWithSwn());
270 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
271 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
272 return abortSharedWrClients(num);
273 }
274 } else
275 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
276 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
277 }
278 placementStr = nullptr;
279 } else {
280 // Multiple shared DataHeaderForms
281 Placement placement;
282 placement.fromString(placementStr); placementStr = nullptr;
283 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
284 if (token == nullptr) {
285 ATH_MSG_ERROR("Failed to write Data for: " << className);
286 return abortSharedWrClients(num);
287 }
288 tokenStr = token->toString();
289 if (className == "DataHeader_p6") {
290 // Found DataHeader
292 tokenStr, placement.auxString());
293 // call DH converter to add the ref to DHForm (stored earlier) and to itself
294 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
295 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
296 return abortSharedWrClients(num);
297 }
298 dataHeaderSeen = true;
299 // This dataHeaderID is used in DataHeaderCnv to index the DataHeaderForm cache.
300 // It must be unique per worker per stream so that we have a correct DataHeader(Form) association.
301 // This is achieved by building it as "CONTID/WORKERID/DBID".
302 // CONTID, e.g., POOLContainer(DataHeader), allows us to distinguish data and metadata headers,
303 // WORKERID allows us to distinguish AthenaMP workers,
304 // and DBID allows us to distinguish streams.
305 dataHeaderID = std::format("{}/{}/{}", token->contID(), numStr, token->dbID().toString());
306 } else if (dataHeaderSeen) {
307 dataHeaderSeen = false;
308 // next object after DataHeader - may be a DataHeaderForm
309 // in any case we need to call the DH converter to update the DHForm Ref
310 if (className == "DataHeaderForm_p6") {
311 // Tell DataHeaderCnv that it should use a new DHForm
313 tokenStr, dataHeaderID);
314 if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
315 ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr);
316 return abortSharedWrClients(num);
317 }
318 } else {
319 // Tell DataHeaderCnv that it should use the old DHForm
320 GenericAddress address(0, 0, "", dataHeaderID);
321 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
322 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
323 return abortSharedWrClients(num);
324 }
325 }
326 }
327 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
328 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
329 }
330 }
331 }
332 }
333 // Send Token back to Client
334 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
335 while (sc.isRecoverable()) {
336 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
337 }
338 if (!sc.isSuccess()) {
339 ATH_MSG_ERROR("Failed to lock Data for " << tokenStr);
340 return abortSharedWrClients(-1);
341 }
342 }
343 sc = m_outputStreamingTool->clearObject(&placementStr, num);
344 while (sc.isRecoverable()) {
345 sc = m_outputStreamingTool->clearObject(&placementStr, num);
346 }
347 if (sc.isFailure()) {
348 // no more clients, break the loop and exit
349 num = -1;
350 }
351 }
352 if (dataHeaderSeen) {
353 // DataHeader was the last object, need to tell the converter there is no DHForm coming
354 GenericAddress address(0, 0, "", std::move(dataHeaderID));
355 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
356 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
357 return abortSharedWrClients(-1);
358 }
359 }
360 placementStr = nullptr;
361 } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) {
362 return(StatusCode::RECOVERABLE);
363 } else if (sc.isRecoverable() || num == -1) {
364 return(StatusCode::RECOVERABLE);
365 }
366 if (sc.isFailure() || fileName.empty()) {
367 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
368 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
369 FileIncident beginInputIncident(name(), "BeginInputMemFile", memName);
370 incSvc->fireIncident(beginInputIncident);
371 FileIncident endInputIncident(name(), "EndInputMemFile", std::move(memName));
372 incSvc->fireIncident(endInputIncident);
373 if (sc.isFailure()) {
374 ATH_MSG_INFO("All SharedWriter clients stopped - exiting");
375 } else {
376 ATH_MSG_INFO("Failed to get Data for client: " << num);
377 }
378 return(StatusCode::FAILURE);
379 }
380 }
381 if (m_parallelCompression && !fileName.empty()) {
382 ATH_MSG_DEBUG(std::format("commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
383 return(StatusCode::SUCCESS);
384 }
385 if (outputConnection.empty()) {
386 outputConnection = std::move(fileName);
387 } else {
388 outputConnection = outputConnectionSpec;
390 outputConnection += m_streamPortString.value();
391 }
392 }
393 std::size_t merge = outputConnection.find("?pmerge="); // Used to remove trailing TMemFile
394 const std::string baseOutputConnection = outputConnection.substr(0, merge);
395 m_fileCommitCounter[baseOutputConnection]++;
397 m_fileFlushSetting.value().contains(baseOutputConnection) &&
398 m_fileFlushSetting[baseOutputConnection] > 0 &&
399 m_fileCommitCounter[baseOutputConnection] % m_fileFlushSetting[baseOutputConnection] == 0) {
400 doCommit = true;
401 ATH_MSG_DEBUG("commitOutput sending data.");
402 }
403 StatusCode status = AthenaPoolCnvSvc::commitOutput(outputConnection, doCommit);
404 for (auto& [ptr, rootType] : commitCache) {
405 rootType.Destruct(ptr);
406 }
407 return(status);
408}
409
410//______________________________________________________________________________
411StatusCode AthenaPoolSharedIOCnvSvc::disconnectOutput(const std::string& outputConnectionSpec) {
412 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
413 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
414 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
415 return(StatusCode::SUCCESS);
416 }
417 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
419 m_streamServerActive = false;
420 ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server.");
421 return(StatusCode::SUCCESS);
422 } else {
423 m_streamServerActive = false;
424 }
425 ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server.");
426 }
428 outputConnection += m_streamPortString.value();
429 }
430 return AthenaPoolCnvSvc::disconnectOutput(outputConnectionSpec + m_streamPortString.value());
431}
432
433//______________________________________________________________________________
434Token* AthenaPoolSharedIOCnvSvc::registerForWrite(Placement* placement, const void* obj, const RootType& classDesc) {
435 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
436 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
437 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
438 return(nullptr);
439 }
440 }
441 Token* token = nullptr;
442 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
443 && (!m_parallelCompression || placement->containerName().compare(0, m_metadataContainerProp.value().size(), m_metadataContainerProp.value()) == 0)) {
444 // Lock object
445 std::string placementStr = placement->toString();
446 placementStr += "[PNAME=";
447 placementStr += classDesc.Name();
448 placementStr += ']';
449 ATH_MSG_VERBOSE("Requesting write object for: " << placementStr);
450 StatusCode sc = m_outputStreamingTool->lockObject(placementStr.c_str());
451 while (sc.isRecoverable()) {
452 //usleep(100);
453 sc = m_outputStreamingTool->lockObject(placementStr.c_str());
454 }
455 if (!sc.isSuccess()) {
456 ATH_MSG_ERROR("Failed to lock Data for " << placementStr);
457 return(nullptr);
458 }
459 // Serialize object via ROOT
460 const void* buffer = nullptr;
461 std::size_t nbytes = 0;
462 bool own = true;
463 if (classDesc.Name() == "Token") {
464 nbytes = strlen(static_cast<const char*>(obj)) + 1;
465 buffer = obj;
466 own = false;
467 } else if (classDesc.IsFundamental()) {
468 nbytes = classDesc.SizeOf();
469 buffer = obj;
470 own = false;
471 } else {
472 buffer = m_serializeSvc->serialize(obj, classDesc, nbytes);
473 }
474 // Share object
475 sc = m_outputStreamingTool->putObject(buffer, nbytes);
476 while (sc.isRecoverable()) {
477 //usleep(100);
478 sc = m_outputStreamingTool->putObject(buffer, nbytes);
479 }
480 if (own) { delete [] static_cast<const char*>(buffer); }
481 buffer = nullptr;
482 if (!sc.isSuccess()) {
483 ATH_MSG_ERROR("Could not share object for: " << placementStr);
484 m_outputStreamingTool->putObject(nullptr, 0).ignore();
485 return(nullptr);
486 }
487 AuxDiscoverySvc auxDiscover;
488 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) {
489 ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr);
490 m_outputStreamingTool->putObject(nullptr, 0).ignore();
491 return(nullptr);
492 }
493 if (!m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) {
494 ATH_MSG_ERROR("Failed to put Data for " << placementStr);
495 return(nullptr);
496 }
497 // Get Token back from Server
498 const char* tokenStr = nullptr;
499 int num = -1;
500 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
501 while (sc.isRecoverable()) {
502 //usleep(100);
503 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
504 }
505 if (!sc.isSuccess()) {
506 ATH_MSG_ERROR("Failed to get Token");
507 return(nullptr);
508 }
509 if (!strcmp(tokenStr, "ABORT")) {
510 ATH_MSG_ERROR("Writer requested ABORT");
511 // tell the server we are leaving
512 m_outputStreamingTool->stop().ignore();
513 return nullptr;
514 }
515 Token* tempToken = new Token();
516 tempToken->fromString(tokenStr); tokenStr = nullptr;
517 tempToken->setClassID(pool::DbReflex::guid(classDesc));
518 token = tempToken; tempToken = nullptr;
519// Client Write Request
520 } else {
521 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
522 ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString());
523 Token* tempToken = new Token();
524 tempToken->setClassID(pool::DbReflex::guid(classDesc));
525 token = tempToken; tempToken = nullptr;
526 } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_streamServerActive) {
527 ATH_MSG_DEBUG("Requested write object for: " << placement->toString());
528 token = getPoolSvc()->registerForWrite(placement, obj, classDesc);
529 } else {
531 placement->setFileName(placement->fileName() + m_streamPortString.value());
532 }
533 token = AthenaPoolCnvSvc::registerForWrite(placement, obj, classDesc);
534 }
535 }
536 return(token);
537}
538//______________________________________________________________________________
539void AthenaPoolSharedIOCnvSvc::setObjPtr(void*& obj, const Token* token) {
540 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
541 if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
542 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
543 }
544 }
545 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isServer()) {
546 if (token->dbID() == Guid::null()) {
547 int num = token->oid().first;
548 // Get object from SHM
549 void* buffer = nullptr;
550 std::size_t nbytes = 0;
551 StatusCode sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
552 while (sc.isRecoverable()) {
553 //usleep(100);
554 sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
555 }
556 if (!sc.isSuccess()) {
557 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
558 obj = nullptr;
559 } else {
560 ATH_MSG_DEBUG("Server deserializing " << token->toString());
561 if (token->classID() != Guid::null()) {
562 // Deserialize object
563 RootType cltype(pool::DbReflex::forGuid(token->classID()));
564 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
565 } else {
566 // Deserialize object
567 std::string className = token->auxString();
568 className = className.substr(className.find("[PNAME="));
569 className = className.substr(7, className.find(']') - 7);
570 RootType cltype(RootType::ByNameNoQuiet(className));
571 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
572 }
573 AuxDiscoverySvc auxDiscover;
574 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, num).isSuccess()) {
575 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
576 obj = nullptr;
577 }
578 }
579 }
580 }
581 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient() && (m_streamingTechnology.value() < 0 || token->technology() == m_streamingTechnology.value())) {
582 ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
583 if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
584 ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
585 obj = nullptr;
586 } else {
587 void* buffer = nullptr;
588 std::size_t nbytes = 0;
589 StatusCode sc = StatusCode::FAILURE;
590 // StopWatch listens from here until the end of this current scope
591 {
592 PMonUtils::BasicStopWatch stopWatch("gObj_ALL", this->m_chronoMap);
593 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
594 while (sc.isRecoverable()) {
595 // sleep
596 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
597 }
598 }
599 if (!sc.isSuccess()) {
600 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
601 obj = nullptr;
602 } else {
603 obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr;
604 AuxDiscoverySvc auxDiscover;
605 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_inputStreamingTool.get(), obj).isSuccess()) {
606 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
607 obj = nullptr;
608 }
609 }
610 }
611 } else if (token->dbID() != Guid::null()) {
612 AthenaPoolCnvSvc::setObjPtr(obj, token);
613 }
614}
615//______________________________________________________________________________
617 const CLID& clid,
618 const std::string* par,
619 const unsigned long* ip,
620 IOpaqueAddress*& refpAddress) {
621 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
623 }
624 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) {
625 Token addressToken;
626 addressToken.setDb(par[0].substr(4));
627 addressToken.setCont(par[1]);
628 addressToken.setOid(Token::OID_t(ip[0], ip[1]));
629 ATH_CHECK(m_inputStreamingTool->lockObject(addressToken.toString().c_str()));
630 void* buffer = nullptr;
631 std::size_t nbytes = 0;
632 StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes);
633 while (sc.isRecoverable()) {
634 // sleep
635 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
636 }
637 if (!sc.isSuccess()) {
638 ATH_MSG_WARNING("Failed to get Address Token: " << addressToken.toString());
639 return(StatusCode::FAILURE);
640 }
641 auto token = std::make_unique<Token>();
642 token->fromString(static_cast<const char*>(buffer)); buffer = nullptr;
643 if (token->classID() == Guid::null()) {
644 token.reset();
645 }
646 m_inputStreamingTool->getObject(&buffer, nbytes).ignore();
647 if (token) {
648 refpAddress = new TokenAddress(pool::POOL_StorageType.type(), clid, "", par[1], IPoolSvc::kInputStream, std::move(token));
649 return(StatusCode::SUCCESS);
650 }
651 else {
652 return(StatusCode::RECOVERABLE);
653 }
654 } else {
655 return AthenaPoolCnvSvc::createAddress(svcType, clid, par, ip, refpAddress);
656 }
657}
658//______________________________________________________________________________
660 const CLID& clid,
661 const std::string& refAddress,
662 IOpaqueAddress*& refpAddress) {
663 return AthenaPoolCnvSvc::createAddress(svcType, clid, refAddress, refpAddress);
664}
665//__________________________________________________________________________
666StatusCode AthenaPoolSharedIOCnvSvc::decodeOutputSpec(std::string& fileSpec, int& outputTech) const {
667 auto pos = fileSpec.find("?pmerge=");
668 std::string suffix;
669 // Remove trailing TMemFile for decoding
670 if (pos != std::string::npos) {
671 suffix = fileSpec.substr(pos);
672 fileSpec.erase(pos);
673 }
674 StatusCode sc = AthenaPoolCnvSvc::decodeOutputSpec(fileSpec, outputTech);
675 // Append back the suffix
676 if (!suffix.empty()) {
677 fileSpec += suffix;
678 }
679 return sc;
680}
681//______________________________________________________________________________
682StatusCode AthenaPoolSharedIOCnvSvc::cleanUp(const std::string& connection) {
683 auto pos = connection.find("?pmerge=");
684 std::string conn = (pos == std::string::npos) ? connection : connection.substr(0, pos);
685 return AthenaPoolCnvSvc::cleanUp(conn);
686}
687//______________________________________________________________________________
689 if (num < 0) {
690 num = -num;
692 num = num % 1024;
693 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer()) {
694 ATH_MSG_DEBUG(std::format("makeServer: {} = {}", m_outputStreamingTool.name(), num));
695 ATH_MSG_DEBUG(std::format("makeServer: Calling shared memory tool with port suffix {}", m_streamPortString.value()));
696 const std::string streamPortSuffix = m_streamPortString.value();
697 if (m_outputStreamingTool->makeServer(num, streamPortSuffix).isFailure()) {
698 ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed");
699 return(StatusCode::FAILURE);
700 }
701 // Disable PersistencySvc per output file mode, for SharedWriter Server
702 m_persSvcPerOutput.setValue(false);
703 return(StatusCode::SUCCESS);
704 }
705 return(StatusCode::RECOVERABLE);
706 }
707 if (m_inputStreamingTool.empty()) {
708 return(StatusCode::RECOVERABLE);
709 }
710 ATH_MSG_DEBUG("makeServer: " << m_inputStreamingTool << " = " << num);
711 return(m_inputStreamingTool->makeServer(num, ""));
712}
713//________________________________________________________________________________
715 if (!m_outputStreamingTool.empty()) {
716 ATH_MSG_DEBUG("makeClient: " << m_outputStreamingTool << " = " << num);
717 std::string streamPortSuffix;
718 if (m_outputStreamingTool->makeClient(num, streamPortSuffix).isFailure()) {
719 ATH_MSG_ERROR("makeClient: " << m_outputStreamingTool << " failed");
720 return(StatusCode::FAILURE);
721 } else if (m_streamPortString.value().find("localhost:0") != std::string::npos) {
722 // We don't seem to use a dedicated port per stream so doing this for the first client is probably OK
723 ATH_MSG_DEBUG("makeClient: Setting conversion service port suffix to " << streamPortSuffix);
724 m_streamPortString.setValue(streamPortSuffix);
725 }
726 }
727 if (m_inputStreamingTool.empty()) {
728 return(StatusCode::SUCCESS);
729 }
730 ATH_MSG_DEBUG("makeClient: " << m_inputStreamingTool << " = " << num);
731 std::string dummyStr;
732 return(m_inputStreamingTool->makeClient(num, dummyStr));
733}
734//________________________________________________________________________________
736 if (m_inputStreamingTool.empty()) {
737 return(StatusCode::FAILURE);
738 }
739 const char* tokenStr = nullptr;
740 int num = -1;
741 StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
742 if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
743 ATH_MSG_DEBUG("readData: " << tokenStr << ", for client: " << num);
744 } else {
745 return(sc);
746 }
747 // Read object instance via POOL/ROOT
748 void* instance = nullptr;
749 Token token;
750 token.fromString(tokenStr); tokenStr = nullptr;
751 if (token.classID() != Guid::null()) {
752 std::string objName = "ALL";
753 if (useDetailChronoStat()) {
754 objName = token.classID().toString();
755 }
756 // StopWatch listens from here until the end of this current scope
757 PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, this->m_chronoMap);
758 this->setObjPtr(instance, &token);
759 // Serialize object via ROOT
761 void* buffer = nullptr;
762 std::size_t nbytes = 0;
763 buffer = m_serializeSvc->serialize(instance, cltype, nbytes);
764 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
765 while (sc.isRecoverable()) {
766 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
767 }
768 delete [] static_cast<char*>(buffer); buffer = nullptr;
769 if (!sc.isSuccess()) {
770 ATH_MSG_ERROR("Could not share object for: " << token.toString());
771 return(StatusCode::FAILURE);
772 }
773 AuxDiscoverySvc auxDiscover;
774 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) {
775 ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString());
776 return(StatusCode::FAILURE);
777 }
778 cltype.Destruct(instance); instance = nullptr;
779 if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
780 ATH_MSG_ERROR("Could not share object for: " << token.toString());
781 return(StatusCode::FAILURE);
782 }
783 } else if (token.dbID() != Guid::null()) {
784 std::string returnToken;
785 const Token* metadataToken = getPoolSvc()->getToken("FID:" + token.dbID().toString(), token.contID(), token.oid().first);
786 if (metadataToken != nullptr) {
787 returnToken = metadataToken->toString();
788 } else {
789 returnToken = token.toString();
790 }
791 delete metadataToken; metadataToken = nullptr;
792 // Share token
793 sc = m_inputStreamingTool->putObject(returnToken.c_str(), returnToken.size() + 1, num);
794 if (!sc.isSuccess() || !m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
795 ATH_MSG_ERROR("Could not share token for: " << token.toString());
796 return(StatusCode::FAILURE);
797 }
798 } else {
799 return(StatusCode::RECOVERABLE);
800 }
801 return(StatusCode::SUCCESS);
802}
803
804//________________________________________________________________________________
806 pool::IFileCatalog* catalog ATLAS_THREAD_SAFE = // This is on the SharedWriter, after mother process finishes events
807 const_cast<pool::IFileCatalog*>(getPoolSvc()->catalog());
808 catalog->commit();
809 catalog->start();
810 return(StatusCode::SUCCESS);
811}
812
813//______________________________________________________________________________
815{
816 ATH_MSG_ERROR("Sending ABORT to clients");
817 // the master process will kill this process once workers abort
818 // but it could be a time-limited loop
819 StatusCode sc = StatusCode::SUCCESS;
820 while (sc.isSuccess()) {
821 if (client_n >= 0) {
822 sc = m_outputStreamingTool->lockObject("ABORT", client_n);
823 }
824 const char* dummy;
825 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
826 while (sc.isRecoverable()) {
827 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
828 }
829 }
830 return StatusCode::FAILURE;
831}
832
833//______________________________________________________________________________
834void AthenaPoolSharedIOCnvSvc::handle(const Incident& incident) {
835 if (incident.type() == "StoreCleared" && m_outputStreamingTool->isClient() && !m_parallelCompression) {
836 m_outputStreamingTool->lockObject("release").ignore();
837 }
838}
839//______________________________________________________________________________
840AthenaPoolSharedIOCnvSvc::AthenaPoolSharedIOCnvSvc(const std::string& name, ISvcLocator* pSvcLocator) :
841 base_class(name, pSvcLocator) {
842}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the AthenaPoolSharedIOCnvSvc class.
This file contains the class definition for the AuxDiscoverySvc class.
This file contains the class definition for the DataHeader and DataHeaderElement classes.
uint32_t CLID
The Class ID type.
This file contains the class definition for the IAthMetaDataSvc class.
Interface to an output stream tool.
static Double_t sc
This file contains the class definition for the Placement class (migrated from POOL).
TTypeAdapter RootType
Definition RootType.h:211
std::map< std::string, double > instance
This file contains the class definition for the TokenAddress class.
This file contains the class definition for the Token class (migrated from POOL).
#define ATLAS_THREAD_SAFE
std::map< std::string, int > m_fileCommitCounter
Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual StatusCode finalize() override
Required of all Gaudi Services.
AthenaPoolSharedIOCnvSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual void setObjPtr(void *&obj, const Token *token) override
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
virtual StatusCode makeServer(int num) override
Make this a server.
virtual void handle(const Incident &incident) override
Implementation of IIncidentListener: Handle for EndEvent incidence.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
virtual StatusCode readData() override
Read the next data object.
virtual StatusCode commitCatalog() override
Commit Catalog.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
virtual StatusCode decodeOutputSpec(std::string &connectionSpec, int &outputTech) const override
Extract/deduce the DB technology from the connection string/file specification.
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting
Gaudi::Property< int > m_streamingTechnology
Use Streaming for selected technologies only.
virtual StatusCode initialize() override
Required of all Gaudi Services.
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
virtual StatusCode makeClient(int num) override
Make this a client.
This class provides the interface between AthenaPoolCnvSvc and AuxStore classes.
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
@ kInputStream
Definition IPoolSvc.h:40
This class holds all the necessary information to guide the writing of an object in a physical place.
Definition Placement.h:19
const std::string & auxString() const
Access auxiliary string.
Definition Placement.h:40
const std::string & containerName() const
Access container name.
Definition Placement.h:32
Placement & setFileName(const std::string &fileName)
Set file name.
Definition Placement.h:30
const std::string toString() const
Retrieve the string representation of the placement.
Definition Placement.cxx:15
const std::string & fileName() const
Access file name.
Definition Placement.h:28
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
Definition Placement.cxx:28
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition RootType.cxx:586
Bool_t IsFundamental() const
Definition RootType.cxx:731
std::string Name(unsigned int mod=Reflex::SCOPED) const
Definition RootType.cxx:612
void Destruct(void *place) const
Definition RootType.cxx:677
size_t SizeOf() const
Definition RootType.cxx:765
This class provides a Generic Transient Address for POOL tokens.
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition Token.h:21
const std::string & auxString() const
Access auxiliary string.
Definition Token.h:91
Token & setCont(const std::string &cnt)
Set container name.
Definition Token.h:71
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Definition Token.h:93
Token & setDb(const Guid &db)
Set database name.
Definition Token.h:66
const std::string & contID() const
Access container identifier.
Definition Token.h:69
const Guid & classID() const
Access database identifier.
Definition Token.h:73
Token & setClassID(const Guid &cl_id)
Access database identifier.
Definition Token.h:75
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:134
int technology() const
Access technology type.
Definition Token.h:77
Token & setOid(const OID_t &oid)
Set object identifier.
Definition Token.h:85
const OID_t & oid() const
Access object identifier.
Definition Token.h:81
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition Token.cxx:147
const Guid & dbID() const
Access database identifier.
Definition Token.h:64
static const TypeH forGuid(const Guid &info)
Access classes by Guid.
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.
void commit()
Save catalog to file.
Definition merge.py:1
static const DbType POOL_StorageType
Definition DbType.h:98