ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaPoolSharedIOCnvSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
9
11
12#include "GaudiKernel/ClassID.h"
13#include "GaudiKernel/FileIncident.h"
14#include "GaudiKernel/GenericAddress.h"
15#include "GaudiKernel/IIncidentSvc.h"
16#include "GaudiKernel/IOpaqueAddress.h"
17
26
27#include "StorageSvc/DbReflex.h"
28
29#include "AuxDiscoverySvc.h"
30
31#include <algorithm>
32#include <iomanip>
33#include <sstream>
34#include <format>
35
36//______________________________________________________________________________
37// Initialize the service.
39 // Retrieve InputStreamingTool (if configured)
40 if (!m_inputStreamingTool.empty()) {
42 }
43 // Retrieve OutputStreamingTool (if configured)
44 if (!m_outputStreamingTool.empty()) {
46 if (m_makeStreamingToolClient.value() == -1) {
47 // Initialize AthenaRootSharedWriter
48 ServiceHandle<IService> arswsvc("AthenaRootSharedWriterSvc", this->name());
49 ATH_CHECK(arswsvc.retrieve());
50 }
51 // Put PoolSvc into share mode to avoid duplicating catalog.
52 getPoolSvc()->setShareMode(true);
53 }
54 if (!m_inputStreamingTool.empty() || !m_outputStreamingTool.empty()) {
55 // Retrieve AthenaSerializeSvc
56 ATH_CHECK(m_serializeSvc.retrieve());
57 }
58 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
59 long int pri = 1000;
60 if (!m_outputStreamingTool.empty()) {
61 incSvc->addListener(this, "StoreCleared", pri);
62 ATH_MSG_DEBUG("Subscribed to StoreCleared");
63 }
64 return this->AthenaPoolCnvSvc::initialize();
65}
66//______________________________________________________________________________
68 // Release AthenaSerializeSvc
69 if (!m_serializeSvc.empty()) {
70 if (!m_serializeSvc.release().isSuccess()) {
71 ATH_MSG_WARNING("Cannot release AthenaSerializeSvc.");
72 }
73 }
74 // Release OutputStreamingTool (if configured)
75 if (!m_outputStreamingTool.empty()) {
76 if (!m_outputStreamingTool.release().isSuccess()) {
77 ATH_MSG_WARNING("Cannot release Output AthenaIPCTool.");
78 }
79 }
80 // Release InputStreamingTool (if configured)
81 if (!m_inputStreamingTool.empty()) {
82 if (!m_inputStreamingTool.release().isSuccess()) {
83 ATH_MSG_WARNING("Cannot release Input AthenaIPCTool.");
84 }
85 }
86 return this->AthenaPoolCnvSvc::finalize();
87}
88//______________________________________________________________________________
89StatusCode AthenaPoolSharedIOCnvSvc::connectOutput(const std::string& outputConnectionSpec,
90 const std::string& openMode) {
91 return AthenaPoolCnvSvc::connectOutput(outputConnectionSpec, openMode);
92}
93//______________________________________________________________________________
94StatusCode AthenaPoolSharedIOCnvSvc::connectOutput(const std::string& outputConnectionSpec) {
95// This is called before DataObjects are being converted.
96 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
97 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
98 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
99 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
100 return(StatusCode::FAILURE);
101 }
102 }
103 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
104 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
105 return(StatusCode::SUCCESS);
106 }
107 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
108 if (m_parallelCompression && outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") == std::string::npos) {
109 ATH_MSG_DEBUG(std::format("connectOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
110 return(StatusCode::SUCCESS);
111 }
113 ATH_MSG_DEBUG("connectOutput SKIPPED for expired server.");
114 return(StatusCode::SUCCESS);
115 }
116 }
118 outputConnection += m_streamPortString.value();
119 }
120 std::size_t apend = outputConnectionSpec.find('[');
121 if (apend != std::string::npos) {
122 outputConnection += outputConnectionSpec.substr(apend);
123 }
124 if (outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos) {
125 return AthenaPoolCnvSvc::connectOutput(outputConnection, "APPEND");
126 }
127 return AthenaPoolCnvSvc::connectOutput(outputConnection);
128}
129
130//______________________________________________________________________________
131StatusCode AthenaPoolSharedIOCnvSvc::commitOutput(const std::string& outputConnectionSpec, bool doCommit) {
132 // This is called after all DataObjects are converted.
133 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
134 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
135 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
136 m_outputStreamingTool->lockObject("wait").ignore();
137 if (!this->cleanUp(outputConnection).isSuccess()) {
138 ATH_MSG_ERROR("commitOutput FAILED to cleanup converters.");
139 return(StatusCode::FAILURE);
140 }
141 return(StatusCode::SUCCESS);
142 }
143 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
144 ATH_MSG_DEBUG("commitOutput SKIPPED for uninitialized server.");
145 return(StatusCode::SUCCESS);
146 }
147 std::map<void*, RootType> commitCache;
148 std::string fileName;
149 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && m_outputStreamingTool->isServer() && m_streamServerActive) {
150 // Clear object to get Placements for all objects in a Stream
151 const char* placementStr = nullptr;
152 int num = -1;
153 StatusCode sc = m_outputStreamingTool->clearObject(&placementStr, num);
154 if (sc.isSuccess() && placementStr != nullptr && strlen(placementStr) > 6 && num > 0) {
155 const char * matchedChars = strstr(placementStr, "[FILE=");
156 if (!matchedChars){
157 ATH_MSG_ERROR(std::format("No matching filename in {}", placementStr));
158 return abortSharedWrClients(num);
159 }
160 fileName = matchedChars;
161 fileName = fileName.substr(6, fileName.find(']') - 6);
162 if (!this->connectOutput(fileName).isSuccess()) {
163 ATH_MSG_ERROR(std::format("Failed to connectOutput for {}", fileName));
164 return abortSharedWrClients(num);
165 }
166 IConverter* DHcnv = converter(ClassID_traits<DataHeader>::ID());
167 bool dataHeaderSeen = false;
168 std::string dataHeaderID;
169 while (num > 0) {
170 std::string objName = "ALL";
171 if (useDetailChronoStat()) {
172 objName = placementStr; //FIXME, better descriptor
173 }
174 // StopWatch listens from here until the end of this current scope
175 {
176 PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
177 std::string_view pStr = placementStr;
178 std::string::size_type cpos = pStr.find ("[CONT=");
179 if (cpos == std::string::npos) {
180 ATH_MSG_ERROR(std::format("No CONT field in placement string: {}", pStr));
181 return StatusCode::FAILURE;
182 }
183 std::string tokenStr (pStr.substr(0, cpos));
184 std::string contName (pStr.substr(cpos, std::string::npos));
185 std::string::size_type cl1 = contName.find(']');
186 if (cl1 == std::string::npos) {
187 ATH_MSG_ERROR(std::format("Missing close bracket after CONT field in placement string: {}", pStr));
188 return StatusCode::FAILURE;
189 }
190 tokenStr.append(contName, cl1 + 1);
191 contName = contName.substr(6, cl1 - 6);
192
193 std::string::size_type ppos = pStr.find ("[PNAME=");
194 if (ppos == std::string::npos) {
195 ATH_MSG_ERROR(std::format("No PNAME field in placement string: {}", pStr));
196 return StatusCode::FAILURE;
197 }
198 std::string className (pStr.substr(ppos, std::string::npos));
199 std::string::size_type cl2 = className.find(']');
200 if (cl2 == std::string::npos) {
201 ATH_MSG_ERROR(std::format("Missing close bracket after PNAME field in placement string: {}", pStr));
202 return StatusCode::FAILURE;
203 }
204 className = className.substr(7, cl2 - 7);
205 RootType classDesc = RootType::ByNameNoQuiet(className);
206 void* obj = nullptr;
207 const std::string numStr = std::to_string(num);
208 std::string::size_type len = m_metadataContainerProp.value().size();
209 bool foundContainer = false;
210 std::size_t opPos = contName.find('(');
211 if (contName.compare(0, opPos, m_metadataContainerProp.value()) == 0) {
212 foundContainer = true;
213 } else {
214 for (const auto& item: m_metadataContainersAug.value()) {
215 if (contName.compare(0, opPos, item) == 0){
216 foundContainer = true;
217 len = item.size();
218 break;
219 }
220 }
221 }
222 if (len > 0 && foundContainer && contName[len] == '(' ) {
223 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
224 // For Metadata, before moving to next client, fire file incidents
225 if (m_metadataClient != num) {
226 if (m_metadataClient != 0) {
227 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
228 auto guard = InputFileIncidentGuard::begin(*incSvc, name(),
229 memName, {}, /*endFileName=*/memName,
230 "BeginInputMemFile", "EndInputMemFile");
231 }
232 m_metadataClient = num;
233 }
234 // Retrieve MetaDataSvc
235 ServiceHandle<IAthMetaDataSvc> metadataSvc("MetaDataSvc", name());
236 ATH_CHECK(metadataSvc.retrieve());
237 sc = metadataSvc->shmProxy(std::format("{}[NUM={}]", pStr, numStr));
238 if (sc.isRecoverable()) {
239 ATH_MSG_WARNING("MetaDataSvc::shmProxy() no proxy added.");
240 } else if (sc.isFailure()) {
241 ATH_MSG_FATAL("MetaDataSvc::shmProxy() failed!");
242 return abortSharedWrClients(num);
243 }
244 } else {
245 Token readToken;
246 readToken.setOid(Token::OID_t(num, 0));
247 readToken.setAuxString("[PNAME=" + className + "]");
248 this->setObjPtr(obj, &readToken); // Pull/read Object out of shared memory
249 if (len == 0 || contName.compare(0, len, m_metadataContainerProp.value()) != 0) {
250 // Write object
251 if( m_oneDataHeaderForm.value() ) {
252 auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
253 if( className == "DataHeaderForm_p6" ) {
254 // Pass DHForms to the converter for later writing in the correct order - do not write it now
256 "", placementWithSwn());
257 DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
258 tokenStr = "";
259 } else {
260 Placement placement;
261 placement.fromString(placementStr);
262 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
263 if (token == nullptr) {
264 ATH_MSG_ERROR("Failed to write Data for: " << className);
265 return abortSharedWrClients(num);
266 }
267 tokenStr = token->toString();
268 }
269 if( className == "DataHeader_p6" ) {
270 // Found DataHeader - call the converter to update DHForm Ref
272 tokenStr, placementWithSwn());
273 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
274 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
275 return abortSharedWrClients(num);
276 }
277 } else
278 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
279 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
280 }
281 placementStr = nullptr;
282 } else {
283 // Multiple shared DataHeaderForms
284 Placement placement;
285 placement.fromString(placementStr); placementStr = nullptr;
286 std::unique_ptr<Token> token(registerForWrite(&placement, obj, classDesc));
287 if (token == nullptr) {
288 ATH_MSG_ERROR("Failed to write Data for: " << className);
289 return abortSharedWrClients(num);
290 }
291 tokenStr = token->toString();
292 if (className == "DataHeader_p6") {
293 // Found DataHeader
295 tokenStr, placement.auxString());
296 // call DH converter to add the ref to DHForm (stored earlier) and to itself
297 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
298 ATH_MSG_ERROR("Failed updateRep for obj = " << tokenStr);
299 return abortSharedWrClients(num);
300 }
301 dataHeaderSeen = true;
302 // This dataHeaderID is used in DataHeaderCnv to index the DataHeaderForm cache.
303 // It must be unique per worker per stream so that we have a correct DataHeader(Form) association.
304 // This is achieved by building it as "CONTID/WORKERID/DBID".
305 // CONTID, e.g., POOLContainer(DataHeader), allows us to distinguish data and metadata headers,
306 // WORKERID allows us to distinguish AthenaMP workers,
307 // and DBID allows us to distinguish streams.
308 dataHeaderID = std::format("{}/{}/{}", token->contID(), numStr, token->dbID().toString());
309 } else if (dataHeaderSeen) {
310 dataHeaderSeen = false;
311 // next object after DataHeader - may be a DataHeaderForm
312 // in any case we need to call the DH converter to update the DHForm Ref
313 if (className == "DataHeaderForm_p6") {
314 // Tell DataHeaderCnv that it should use a new DHForm
316 tokenStr, dataHeaderID);
317 if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
318 ATH_MSG_ERROR("Failed updateRepRefs for obj = " << tokenStr);
319 return abortSharedWrClients(num);
320 }
321 } else {
322 // Tell DataHeaderCnv that it should use the old DHForm
323 GenericAddress address(0, 0, "", dataHeaderID);
324 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
325 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
326 return abortSharedWrClients(num);
327 }
328 }
329 }
330 if (className != "Token" && className != "DataHeaderForm_p6" && !classDesc.IsFundamental()) {
331 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
332 }
333 }
334 }
335 }
336 // Send Token back to Client
337 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
338 while (sc.isRecoverable()) {
339 sc = m_outputStreamingTool->lockObject(tokenStr.c_str(), num);
340 }
341 if (!sc.isSuccess()) {
342 ATH_MSG_ERROR("Failed to lock Data for " << tokenStr);
343 return abortSharedWrClients(-1);
344 }
345 }
346 sc = m_outputStreamingTool->clearObject(&placementStr, num);
347 while (sc.isRecoverable()) {
348 sc = m_outputStreamingTool->clearObject(&placementStr, num);
349 }
350 if (sc.isFailure()) {
351 // no more clients, break the loop and exit
352 num = -1;
353 }
354 }
355 if (dataHeaderSeen) {
356 // DataHeader was the last object, need to tell the converter there is no DHForm coming
357 GenericAddress address(0, 0, "", std::move(dataHeaderID));
358 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
359 ATH_MSG_ERROR("Failed updateRepRefs for DataHeader");
360 return abortSharedWrClients(-1);
361 }
362 }
363 placementStr = nullptr;
364 } else if (sc.isSuccess() && placementStr != nullptr && strncmp(placementStr, "stop", 4) == 0) {
365 return(StatusCode::RECOVERABLE);
366 } else if (sc.isRecoverable() || num == -1) {
367 return(StatusCode::RECOVERABLE);
368 }
369 if (sc.isFailure() || fileName.empty()) {
370 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
371 std::string memName = std::format("SHM[NUM={}]", m_metadataClient);
372 {
373 auto guard = InputFileIncidentGuard::begin(*incSvc, name(),
374 memName, {}, /*endFileName=*/memName,
375 "BeginInputMemFile", "EndInputMemFile");
376 }
377 if (sc.isFailure()) {
378 ATH_MSG_INFO("All SharedWriter clients stopped - exiting");
379 } else {
380 ATH_MSG_INFO("Failed to get Data for client: " << num);
381 }
382 return(StatusCode::FAILURE);
383 }
384 }
385 if (m_parallelCompression && !fileName.empty()) {
386 ATH_MSG_DEBUG(std::format("commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
387 return(StatusCode::SUCCESS);
388 }
389 if (outputConnection.empty()) {
390 outputConnection = std::move(fileName);
391 } else {
392 outputConnection = outputConnectionSpec;
394 outputConnection += m_streamPortString.value();
395 }
396 }
397 std::size_t merge = outputConnection.find("?pmerge="); // Used to remove trailing TMemFile
398 const std::string baseOutputConnection = outputConnection.substr(0, merge);
399 m_fileCommitCounter[baseOutputConnection]++;
401 m_fileFlushSetting.value().contains(baseOutputConnection) &&
402 m_fileFlushSetting[baseOutputConnection] > 0 &&
403 m_fileCommitCounter[baseOutputConnection] % m_fileFlushSetting[baseOutputConnection] == 0) {
404 doCommit = true;
405 ATH_MSG_DEBUG("commitOutput sending data.");
406 }
407 StatusCode status = AthenaPoolCnvSvc::commitOutput(outputConnection, doCommit);
408 for (auto& [ptr, rootType] : commitCache) {
409 rootType.Destruct(ptr);
410 }
411 return(status);
412}
413
414//______________________________________________________________________________
415StatusCode AthenaPoolSharedIOCnvSvc::disconnectOutput(const std::string& outputConnectionSpec) {
416 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
417 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
418 && (!m_parallelCompression || outputConnectionSpec.find("[PoolContainerPrefix=" + m_metadataContainerProp.value() + "]") != std::string::npos)) {
419 return(StatusCode::SUCCESS);
420 }
421 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient()) {
423 m_streamServerActive = false;
424 ATH_MSG_DEBUG("disconnectOutput SKIPPED to expire server.");
425 return(StatusCode::SUCCESS);
426 } else {
427 m_streamServerActive = false;
428 }
429 ATH_MSG_DEBUG("disconnectOutput not SKIPPED for server.");
430 }
432 outputConnection += m_streamPortString.value();
433 }
434 return AthenaPoolCnvSvc::disconnectOutput(outputConnectionSpec + m_streamPortString.value());
435}
436
437//______________________________________________________________________________
438Token* AthenaPoolSharedIOCnvSvc::registerForWrite(Placement* placement, const void* obj, const RootType& classDesc) {
439 if (m_makeStreamingToolClient.value() > 0 && !m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer() && !m_outputStreamingTool->isClient()) {
440 if (!makeClient(m_makeStreamingToolClient.value()).isSuccess()) {
441 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
442 return(nullptr);
443 }
444 }
445 Token* token = nullptr;
446 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isClient()
447 && (!m_parallelCompression || placement->containerName().compare(0, m_metadataContainerProp.value().size(), m_metadataContainerProp.value()) == 0)) {
448 // Lock object
449 std::string placementStr = placement->toString();
450 placementStr += "[PNAME=";
451 placementStr += classDesc.Name();
452 placementStr += ']';
453 ATH_MSG_VERBOSE("Requesting write object for: " << placementStr);
454 StatusCode sc = m_outputStreamingTool->lockObject(placementStr.c_str());
455 while (sc.isRecoverable()) {
456 //usleep(100);
457 sc = m_outputStreamingTool->lockObject(placementStr.c_str());
458 }
459 if (!sc.isSuccess()) {
460 ATH_MSG_ERROR("Failed to lock Data for " << placementStr);
461 return(nullptr);
462 }
463 // Serialize object via ROOT
464 const void* buffer = nullptr;
465 std::size_t nbytes = 0;
466 bool own = true;
467 if (classDesc.Name() == "Token") {
468 nbytes = strlen(static_cast<const char*>(obj)) + 1;
469 buffer = obj;
470 own = false;
471 } else if (classDesc.IsFundamental()) {
472 nbytes = classDesc.SizeOf();
473 buffer = obj;
474 own = false;
475 } else {
476 buffer = m_serializeSvc->serialize(obj, classDesc, nbytes);
477 }
478 // Share object
479 sc = m_outputStreamingTool->putObject(buffer, nbytes);
480 while (sc.isRecoverable()) {
481 //usleep(100);
482 sc = m_outputStreamingTool->putObject(buffer, nbytes);
483 }
484 if (own) { delete [] static_cast<const char*>(buffer); }
485 buffer = nullptr;
486 if (!sc.isSuccess()) {
487 ATH_MSG_ERROR("Could not share object for: " << placementStr);
488 m_outputStreamingTool->putObject(nullptr, 0).ignore();
489 return(nullptr);
490 }
491 AuxDiscoverySvc auxDiscover;
492 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, pool::DbReflex::guid(classDesc), placement->containerName()).isSuccess()) {
493 ATH_MSG_ERROR("Could not share dynamic aux store for: " << placementStr);
494 m_outputStreamingTool->putObject(nullptr, 0).ignore();
495 return(nullptr);
496 }
497 if (!m_outputStreamingTool->putObject(nullptr, 0).isSuccess()) {
498 ATH_MSG_ERROR("Failed to put Data for " << placementStr);
499 return(nullptr);
500 }
501 // Get Token back from Server
502 const char* tokenStr = nullptr;
503 int num = -1;
504 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
505 while (sc.isRecoverable()) {
506 //usleep(100);
507 sc = m_outputStreamingTool->clearObject(&tokenStr, num);
508 }
509 if (!sc.isSuccess()) {
510 ATH_MSG_ERROR("Failed to get Token");
511 return(nullptr);
512 }
513 if (!strcmp(tokenStr, "ABORT")) {
514 ATH_MSG_ERROR("Writer requested ABORT");
515 // tell the server we are leaving
516 m_outputStreamingTool->stop().ignore();
517 return nullptr;
518 }
519 Token* tempToken = new Token();
520 tempToken->fromString(tokenStr); tokenStr = nullptr;
521 tempToken->setClassID(pool::DbReflex::guid(classDesc));
522 token = tempToken; tempToken = nullptr;
523// Client Write Request
524 } else {
525 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_outputStreamingTool->isServer()) {
526 ATH_MSG_DEBUG("registerForWrite SKIPPED for uninitialized server, Placement = " << placement->toString());
527 Token* tempToken = new Token();
528 tempToken->setClassID(pool::DbReflex::guid(classDesc));
529 token = tempToken; tempToken = nullptr;
530 } else if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isClient() && !m_streamServerActive) {
531 if(placement->technology() == 0) { // No technology specified, use the default
532 placement->setTechnology(pool::DbType::getType(m_defaultContainerType).type());
533 }
534 ATH_MSG_DEBUG("Requested write object for: " << placement->toString());
535 token = getPoolSvc()->registerForWrite(placement, obj, classDesc);
536 } else {
538 placement->setFileName(placement->fileName() + m_streamPortString.value());
539 }
540 token = AthenaPoolCnvSvc::registerForWrite(placement, obj, classDesc);
541 }
542 }
543 return(token);
544}
545//______________________________________________________________________________
546void AthenaPoolSharedIOCnvSvc::setObjPtr(void*& obj, const Token* token) {
547 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
548 if (!makeClient(-m_makeStreamingToolClient.value()).isSuccess()) {
549 ATH_MSG_ERROR("Could not make AthenaPoolSharedIOCnvSvc a Share Client");
550 }
551 }
552 if (!m_outputStreamingTool.empty() && m_outputStreamingTool->isServer()) {
553 if (token->dbID() == Guid::null()) {
554 int num = token->oid().first;
555 // Get object from SHM
556 void* buffer = nullptr;
557 std::size_t nbytes = 0;
558 StatusCode sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
559 while (sc.isRecoverable()) {
560 //usleep(100);
561 sc = m_outputStreamingTool->getObject(&buffer, nbytes, num);
562 }
563 if (!sc.isSuccess()) {
564 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
565 obj = nullptr;
566 } else {
567 ATH_MSG_DEBUG("Server deserializing " << token->toString());
568 if (token->classID() != Guid::null()) {
569 // Deserialize object
570 RootType cltype(pool::DbReflex::forGuid(token->classID()));
571 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
572 } else {
573 // Deserialize object
574 std::string className = token->auxString();
575 className = className.substr(className.find("[PNAME="));
576 className = className.substr(7, className.find(']') - 7);
577 RootType cltype(RootType::ByNameNoQuiet(className));
578 obj = m_serializeSvc->deserialize(buffer, nbytes, cltype); buffer = nullptr;
579 }
580 AuxDiscoverySvc auxDiscover;
581 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_outputStreamingTool.get(), obj, num).isSuccess()) {
582 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
583 obj = nullptr;
584 }
585 }
586 }
587 }
588 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient() && (m_streamingTechnology.value() < 0 || token->technology() == m_streamingTechnology.value())) {
589 ATH_MSG_VERBOSE("Requesting remote object for: " << token->toString());
590 if (!m_inputStreamingTool->lockObject(token->toString().c_str()).isSuccess()) {
591 ATH_MSG_ERROR("Failed to lock Data for " << token->toString());
592 obj = nullptr;
593 } else {
594 void* buffer = nullptr;
595 std::size_t nbytes = 0;
596 StatusCode sc = StatusCode::FAILURE;
597 // StopWatch listens from here until the end of this current scope
598 {
599 PMonUtils::BasicStopWatch stopWatch("gObj_ALL", this->m_chronoMap);
600 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
601 while (sc.isRecoverable()) {
602 // sleep
603 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
604 }
605 }
606 if (!sc.isSuccess()) {
607 ATH_MSG_ERROR("Failed to get Data for " << token->toString());
608 obj = nullptr;
609 } else {
610 obj = m_serializeSvc->deserialize(buffer, nbytes, token->classID()); buffer = nullptr;
611 AuxDiscoverySvc auxDiscover;
612 if (!auxDiscover.receiveStore(m_serializeSvc.get(), m_inputStreamingTool.get(), obj).isSuccess()) {
613 ATH_MSG_ERROR("Failed to get Dynamic Aux Store for " << token->toString());
614 obj = nullptr;
615 }
616 }
617 }
618 } else if (token->dbID() != Guid::null()) {
619 AthenaPoolCnvSvc::setObjPtr(obj, token);
620 }
621}
622//______________________________________________________________________________
624 const CLID& clid,
625 const std::string* par,
626 const unsigned long* ip,
627 IOpaqueAddress*& refpAddress) {
628 if (m_makeStreamingToolClient.value() > 0 && !m_inputStreamingTool.empty() && !m_inputStreamingTool->isServer() && !m_inputStreamingTool->isClient()) {
630 }
631 if (!m_inputStreamingTool.empty() && m_inputStreamingTool->isClient()) {
632 Token addressToken;
633 addressToken.setDb(par[0].substr(4));
634 addressToken.setCont(par[1]);
635 addressToken.setOid(Token::OID_t(ip[0], ip[1]));
636 ATH_CHECK(m_inputStreamingTool->lockObject(addressToken.toString().c_str()));
637 void* buffer = nullptr;
638 std::size_t nbytes = 0;
639 StatusCode sc = m_inputStreamingTool->getObject(&buffer, nbytes);
640 while (sc.isRecoverable()) {
641 // sleep
642 sc = m_inputStreamingTool->getObject(&buffer, nbytes);
643 }
644 if (!sc.isSuccess()) {
645 ATH_MSG_WARNING("Failed to get Address Token: " << addressToken.toString());
646 return(StatusCode::FAILURE);
647 }
648 auto token = std::make_unique<Token>();
649 token->fromString(static_cast<const char*>(buffer)); buffer = nullptr;
650 if (token->classID() == Guid::null()) {
651 token.reset();
652 }
653 m_inputStreamingTool->getObject(&buffer, nbytes).ignore();
654 if (token) {
655 refpAddress = new TokenAddress(pool::POOL_StorageType.type(), clid, "", par[1], IPoolSvc::kInputStream, std::move(token));
656 return(StatusCode::SUCCESS);
657 }
658 else {
659 return(StatusCode::RECOVERABLE);
660 }
661 } else {
662 return AthenaPoolCnvSvc::createAddress(svcType, clid, par, ip, refpAddress);
663 }
664}
665//______________________________________________________________________________
667 const CLID& clid,
668 const std::string& refAddress,
669 IOpaqueAddress*& refpAddress) {
670 return AthenaPoolCnvSvc::createAddress(svcType, clid, refAddress, refpAddress);
671}
672//______________________________________________________________________________
673StatusCode AthenaPoolSharedIOCnvSvc::cleanUp(const std::string& connection) {
674 auto pos = connection.find("?pmerge=");
675 std::string conn = (pos == std::string::npos) ? connection : connection.substr(0, pos);
676 return AthenaPoolCnvSvc::cleanUp(conn);
677}
678//______________________________________________________________________________
680 if (num < 0) {
681 num = -num;
683 num = num % 1024;
684 if (!m_outputStreamingTool.empty() && !m_outputStreamingTool->isServer()) {
685 ATH_MSG_DEBUG(std::format("makeServer: {} = {}", m_outputStreamingTool.name(), num));
686 ATH_MSG_DEBUG(std::format("makeServer: Calling shared memory tool with port suffix {}", m_streamPortString.value()));
687 const std::string streamPortSuffix = m_streamPortString.value();
688 if (m_outputStreamingTool->makeServer(num, streamPortSuffix).isFailure()) {
689 ATH_MSG_ERROR("makeServer: " << m_outputStreamingTool << " failed");
690 return(StatusCode::FAILURE);
691 }
692 // Disable PersistencySvc per output file mode, for SharedWriter Server
693 m_persSvcPerOutput.setValue(false);
694 return(StatusCode::SUCCESS);
695 }
696 return(StatusCode::RECOVERABLE);
697 }
698 if (m_inputStreamingTool.empty()) {
699 return(StatusCode::RECOVERABLE);
700 }
701 ATH_MSG_DEBUG("makeServer: " << m_inputStreamingTool << " = " << num);
702 return(m_inputStreamingTool->makeServer(num, ""));
703}
704//________________________________________________________________________________
706 if (!m_outputStreamingTool.empty()) {
707 ATH_MSG_DEBUG("makeClient: " << m_outputStreamingTool << " = " << num);
708 std::string streamPortSuffix;
709 if (m_outputStreamingTool->makeClient(num, streamPortSuffix).isFailure()) {
710 ATH_MSG_ERROR("makeClient: " << m_outputStreamingTool << " failed");
711 return(StatusCode::FAILURE);
712 } else if (m_streamPortString.value().find("localhost:0") != std::string::npos) {
713 // We don't seem to use a dedicated port per stream so doing this for the first client is probably OK
714 ATH_MSG_DEBUG("makeClient: Setting conversion service port suffix to " << streamPortSuffix);
715 m_streamPortString.setValue(streamPortSuffix);
716 }
717 }
718 if (m_inputStreamingTool.empty()) {
719 return(StatusCode::SUCCESS);
720 }
721 ATH_MSG_DEBUG("makeClient: " << m_inputStreamingTool << " = " << num);
722 std::string dummyStr;
723 return(m_inputStreamingTool->makeClient(num, dummyStr));
724}
725//________________________________________________________________________________
727 if (m_inputStreamingTool.empty()) {
728 return(StatusCode::FAILURE);
729 }
730 const char* tokenStr = nullptr;
731 int num = -1;
732 StatusCode sc = m_inputStreamingTool->clearObject(&tokenStr, num);
733 if (sc.isSuccess() && tokenStr != nullptr && strlen(tokenStr) > 0 && num > 0) {
734 ATH_MSG_DEBUG("readData: " << tokenStr << ", for client: " << num);
735 } else {
736 return(sc);
737 }
738 // Read object instance via POOL/ROOT
739 void* instance = nullptr;
740 Token token;
741 token.fromString(tokenStr); tokenStr = nullptr;
742 if (token.classID() != Guid::null()) {
743 std::string objName = "ALL";
744 if (useDetailChronoStat()) {
745 objName = token.classID().toString();
746 }
747 // StopWatch listens from here until the end of this current scope
748 PMonUtils::BasicStopWatch stopWatch("cObj_" + objName, this->m_chronoMap);
749 this->setObjPtr(instance, &token);
750 // Serialize object via ROOT
752 void* buffer = nullptr;
753 std::size_t nbytes = 0;
754 buffer = m_serializeSvc->serialize(instance, cltype, nbytes);
755 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
756 while (sc.isRecoverable()) {
757 sc = m_inputStreamingTool->putObject(buffer, nbytes, num);
758 }
759 delete [] static_cast<char*>(buffer); buffer = nullptr;
760 if (!sc.isSuccess()) {
761 ATH_MSG_ERROR("Could not share object for: " << token.toString());
762 return(StatusCode::FAILURE);
763 }
764 AuxDiscoverySvc auxDiscover;
765 if (!auxDiscover.sendStore(m_serializeSvc.get(), m_inputStreamingTool.get(), instance, token.classID(), token.contID(), num).isSuccess()) {
766 ATH_MSG_ERROR("Could not share dynamic aux store for: " << token.toString());
767 return(StatusCode::FAILURE);
768 }
769 cltype.Destruct(instance); instance = nullptr;
770 if (!m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
771 ATH_MSG_ERROR("Could not share object for: " << token.toString());
772 return(StatusCode::FAILURE);
773 }
774 } else if (token.dbID() != Guid::null()) {
775 std::string returnToken;
776 Token* metadataToken = getPoolSvc()->getToken("FID:" + token.dbID().toString(), token.contID(), token.oid().first);
777 if( metadataToken ) {
778 returnToken = metadataToken->toString();
779 metadataToken->release(); metadataToken = nullptr;
780 } else {
781 returnToken = token.toString();
782 }
783 // Share token
784 sc = m_inputStreamingTool->putObject(returnToken.c_str(), returnToken.size() + 1, num);
785 if (!sc.isSuccess() || !m_inputStreamingTool->putObject(nullptr, 0, num).isSuccess()) {
786 ATH_MSG_ERROR("Could not share token for: " << token.toString());
787 return(StatusCode::FAILURE);
788 }
789 } else {
790 return(StatusCode::RECOVERABLE);
791 }
792 return(StatusCode::SUCCESS);
793}
794
795//________________________________________________________________________________
797 pool::IFileCatalog* catalog ATLAS_THREAD_SAFE = // This is on the SharedWriter, after mother process finishes events
798 const_cast<pool::IFileCatalog*>(getPoolSvc()->catalog());
799 catalog->commit();
800 catalog->start();
801 return(StatusCode::SUCCESS);
802}
803
804//______________________________________________________________________________
806{
807 ATH_MSG_ERROR("Sending ABORT to clients");
808 // the master process will kill this process once workers abort
809 // but it could be a time-limited loop
810 StatusCode sc = StatusCode::SUCCESS;
811 while (sc.isSuccess()) {
812 if (client_n >= 0) {
813 sc = m_outputStreamingTool->lockObject("ABORT", client_n);
814 }
815 const char* dummy;
816 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
817 while (sc.isRecoverable()) {
818 sc = m_outputStreamingTool->clearObject(&dummy, client_n);
819 }
820 }
821 return StatusCode::FAILURE;
822}
823
824//______________________________________________________________________________
825void AthenaPoolSharedIOCnvSvc::handle(const Incident& incident) {
826 if (incident.type() == "StoreCleared" && m_outputStreamingTool->isClient() && !m_parallelCompression) {
827 m_outputStreamingTool->lockObject("release").ignore();
828 }
829}
830//______________________________________________________________________________
831AthenaPoolSharedIOCnvSvc::AthenaPoolSharedIOCnvSvc(const std::string& name, ISvcLocator* pSvcLocator) :
832 base_class(name, pSvcLocator) {
833}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the AthenaPoolSharedIOCnvSvc class.
This file contains the class definition for the AuxDiscoverySvc class.
This file contains the class definition for the DataHeader and DataHeaderElement classes.
uint32_t CLID
The Class ID type.
This file contains the class definition for the IAthMetaDataSvc class.
Interface to an output stream tool.
RAII guard that guarantees a matching end-incident for every begin-incident.
static Double_t sc
This file contains the class definition for the Placement class (migrated from POOL).
TTypeAdapter RootType
Definition RootType.h:211
std::map< std::string, double > instance
This file contains the class definition for the TokenAddress class.
This file contains the class definition for the Token class (migrated from POOL).
#define ATLAS_THREAD_SAFE
std::map< std::string, int > m_fileCommitCounter
Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual StatusCode finalize() override
Required of all Gaudi Services.
AthenaPoolSharedIOCnvSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual void setObjPtr(void *&obj, const Token *token) override
StatusCode createAddress(long svcType, const CLID &clid, const std::string *par, const unsigned long *ip, IOpaqueAddress *&refpAddress) override
Create a Generic address using explicit arguments to identify a single object.
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
virtual StatusCode makeServer(int num) override
Make this a server.
virtual void handle(const Incident &incident) override
Implementation of IIncidentListener: Handle for EndEvent incidence.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode disconnectOutput(const std::string &outputConnectionSpec) override
Disconnect to the output connection.
virtual StatusCode readData() override
Read the next data object.
virtual StatusCode commitCatalog() override
Commit Catalog.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
ToolHandle< IAthenaIPCTool > m_outputStreamingTool
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting
Gaudi::Property< int > m_streamingTechnology
Use Streaming for selected technologies only.
virtual StatusCode initialize() override
Required of all Gaudi Services.
ToolHandle< IAthenaIPCTool > m_inputStreamingTool
ServiceHandle< IAthenaSerializeSvc > m_serializeSvc
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first connect/write automatically.
virtual StatusCode makeClient(int num) override
Make this a client.
This class provides the interface between AthenaPoolCnvSvc and AuxStore classes.
StatusCode sendStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, const void *obj, const Guid &classId, const std::string &contName, int num=0)
Send dynamic aux store variables to streaming tool.
StatusCode receiveStore(const IAthenaSerializeSvc *serSvc, IAthenaIPCTool *ipcTool, void *obj, int num=0)
Receive dynamic aux store variables from streaming tool.
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
@ kInputStream
Definition IPoolSvc.h:41
static InputFileIncidentGuard begin(IIncidentSvc &incSvc, std::string_view source, std::string_view beginFileName, std::string_view guid, std::string_view endFileName={}, std::string_view beginType=IncidentType::BeginInputFile, std::string_view endType=IncidentType::EndInputFile)
Factory: fire the begin incident and return a guard whose destructor fires the matching end incident.
This class holds all the necessary information to guide the writing of an object in a physical place.
Definition Placement.h:20
const std::string & auxString() const
Access auxiliary string.
Definition Placement.h:41
const std::string & containerName() const
Access container name.
Definition Placement.h:33
Placement & setFileName(const std::string &fileName)
Set file name.
Definition Placement.h:31
const std::string toString() const
Retrieve the string representation of the placement.
Definition Placement.cxx:15
Placement & setTechnology(int technology)
Set technology type.
Definition Placement.h:39
const std::string & fileName() const
Access file name.
Definition Placement.h:29
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
Definition Placement.cxx:28
int technology() const
Access technology type.
Definition Placement.h:37
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition RootType.cxx:586
Bool_t IsFundamental() const
Definition RootType.cxx:731
std::string Name(unsigned int mod=Reflex::SCOPED) const
Definition RootType.cxx:612
void Destruct(void *place) const
Definition RootType.cxx:677
size_t SizeOf() const
Definition RootType.cxx:765
This class provides a Generic Transient Address for POOL tokens.
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition Token.h:22
const std::string & auxString() const
Access auxiliary string.
Definition Token.h:92
Token & setCont(const std::string &cnt)
Set container name.
Definition Token.h:72
Token & setDb(const Guid &db)
Set database name.
Definition Token.h:67
const std::string & contID() const
Access container identifier.
Definition Token.h:70
const Guid & classID() const
Access database identifier.
Definition Token.h:74
Token & setClassID(const Guid &cl_id)
Access database identifier.
Definition Token.h:76
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:135
int technology() const
Access technology type.
Definition Token.h:78
int release()
Release token: Decrease reference count and eventually delete.
Definition Token.cxx:81
Token & setOid(const OID_t &oid)
Set object identifier.
Definition Token.h:86
const OID_t & oid() const
Access object identifier.
Definition Token.h:82
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition Token.cxx:169
const Guid & dbID() const
Access database identifier.
Definition Token.h:65
Token & setAuxString(std::string &&auxString)
Set auxiliary string.
Definition Token.h:94
static const TypeH forGuid(const Guid &info)
Access classes by Guid.
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.
static DbType getType(const std::string &name)
Access known storage type object by name.
void commit()
Save catalog to file.
Definition merge.py:1
static const DbType POOL_StorageType
Definition DbType.h:84