ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaSharedMemoryTool.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
5/* file contains the implementation for the AthenaSharedMemoryTool class.
6 * @author Peter van Gemmeren <gemmeren@anl.gov>
7 **/
8
10
11#include "GaudiKernel/FileIncident.h"
12
13#include <boost/exception/diagnostic_information.hpp>
14#include <boost/interprocess/shared_memory_object.hpp>
15#include <boost/interprocess/mapped_region.hpp>
16
17#include <optional>
18#include <sstream>
19
20const std::size_t maxTokenLength = 512;
21
22namespace {
23struct ShareEventHeader {
24 enum ProcessStatus { CLEARED, FILLED, LOCKED, UNLOCKED, UNKNOWN };
25 ProcessStatus evtProcessStatus;
26 long evtSeqNumber;
27 long fileSeqNumber;
28 std::size_t evtSize;
29 std::size_t evtOffset;
30 unsigned int evtCoreStatusFlag;
31 char token[maxTokenLength];
32};
33} // anonymous namespace
34
35//___________________________________________________________________________
37 const std::string& name,
38 const IInterface* parent) :
39 base_class(type, name, parent),
40 m_incidentSvc("IncidentSvc", name) {
41 m_sharedMemory.setValue(name);
42}
43
44//___________________________________________________________________________
47
48//___________________________________________________________________________
50 ATH_MSG_INFO("Initializing " << name());
51
52 ATH_CHECK( m_incidentSvc.retrieve() );
53
54 return(StatusCode::SUCCESS);
55}
56
57//___________________________________________________________________________
59 ATH_MSG_INFO("in stop()");
60 // Fire EndInputFile for any file still open.
61 m_inputFileGuard.reset();
62 if (m_isClient && m_num > 0) {
63 ATH_MSG_INFO("Client stop() inform Server: " << m_num);
64 m_num = -1;
65 while (lockObject("stop").isRecoverable()) {
66 usleep(100);
67 }
68 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address());
69 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
70 usleep(100);
71 }
72 }
73 return(StatusCode::SUCCESS);
74}
75
76//___________________________________________________________________________
78 ATH_MSG_INFO("in finalize()");
79 if (m_isServer) {
80 try {
81 boost::interprocess::shared_memory_object::remove(m_sharedMemory.value().c_str());
82 } catch(boost::interprocess::interprocess_exception &e) {
83 ATH_MSG_WARNING("Cannot remove shared memory " << m_sharedMemory.value() << ": " << boost::diagnostic_information(e));
84 }
85 const std::string statusName = m_sharedMemory.value() + "_status";
86 try {
87 boost::interprocess::shared_memory_object::remove(statusName.c_str());
88 } catch(boost::interprocess::interprocess_exception &e) {
89 ATH_MSG_WARNING("Cannot remove shared memory " << statusName << ": " << boost::diagnostic_information(e));
90 }
91 }
92 // Release IncidentSvc
93 if (!m_incidentSvc.release().isSuccess()) {
94 ATH_MSG_WARNING("Cannot release IncidentSvc.");
95 }
96 return(::AthAlgTool::finalize());
97}
98
99//___________________________________________________________________________
100StatusCode AthenaSharedMemoryTool::makeServer(int num, const std::string& streamPortSuffix) {
101 if (m_isServer || m_isClient) {
102 ATH_MSG_ERROR("Cannot make AthenaSharedMemoryTool a Server.");
103 return(StatusCode::FAILURE);
104 }
105 if (num > m_maxDataClients) {
106 ATH_MSG_ERROR("Too many clients for AthenaSharedMemoryTool Server.");
107 return(StatusCode::FAILURE);
108 }
109 m_num = num;
110 m_isServer = true;
111 ATH_MSG_DEBUG("Creating shared memory object with name \"" << m_sharedMemory.value() << "\"");
112 std::optional<boost::interprocess::shared_memory_object> shm;
113 try {
114 shm.emplace(boost::interprocess::create_only, m_sharedMemory.value().c_str(), boost::interprocess::read_write);
115 } catch(boost::interprocess::interprocess_exception &e) {
116 ATH_MSG_ERROR("Cannot create shared memory " << m_sharedMemory.value() << ": " << boost::diagnostic_information(e));
117 return StatusCode::FAILURE;
118 }
119 shm->truncate(m_maxSize);
120 m_payload = std::make_unique<boost::interprocess::mapped_region>(*shm, boost::interprocess::read_write, 0, m_maxSize);
121 const std::string statusName = m_sharedMemory.value() + "_status";
122 std::optional<boost::interprocess::shared_memory_object> shm_status;
123 try {
124 shm_status.emplace(boost::interprocess::create_only, statusName.c_str(), boost::interprocess::read_write);
125 } catch(boost::interprocess::interprocess_exception &e) {
126 ATH_MSG_ERROR("Cannot create shared memory " << statusName << ": " << boost::diagnostic_information(e));
127 return StatusCode::FAILURE;
128 }
129 shm_status->truncate(num * sizeof(ShareEventHeader));
130 m_status = std::make_unique<boost::interprocess::mapped_region>(*shm_status, boost::interprocess::read_write, 0, num * sizeof(ShareEventHeader));
131 ShareEventHeader evtH = { ShareEventHeader::UNLOCKED, -1, -1, 0, 0, 0, "" };
132 std::memcpy(evtH.token, streamPortSuffix.c_str(), maxTokenLength - 1);
133 evtH.token[maxTokenLength - 1] = 0;
134 for (int i = 0; i < num; i++) {
135 std::memcpy(static_cast<char*>(m_status->get_address()) + i * sizeof(ShareEventHeader), &evtH, sizeof(ShareEventHeader));
136 }
137 return(StatusCode::SUCCESS);
138}
139
140//___________________________________________________________________________
142 return(m_isServer);
143}
144
145//___________________________________________________________________________
146StatusCode AthenaSharedMemoryTool::makeClient(int num, std::string& streamPortSuffix) {
147 if (m_isServer) {
148 ATH_MSG_ERROR("Cannot make AthenaSharedMemoryTool a Client.");
149 return(StatusCode::FAILURE);
150 }
151 if (num >= m_maxDataClients) {
152 ATH_MSG_ERROR("Too many clients for AthenaSharedMemoryTool.");
153 return(StatusCode::FAILURE);
154 }
155 if (m_num > 0 && num <= 0) {// stop running client
156 ATH_MSG_DEBUG("Stop AthenaSharedMemoryTool Client.");
157 m_num = -1;
158 while (lockObject("stop").isRecoverable()) {
159 usleep(100);
160 }
161 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address());
162 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
163 usleep(100);
164 }
165 m_payload.reset();
166 m_status.reset();
167 m_isClient = false;
168 return(StatusCode::SUCCESS);
169 }
170 while (!m_isClient) {
171 try { // Check whether Server created shared memory object
172 const std::string statusName = m_sharedMemory.value() + "_status";
173 boost::interprocess::shared_memory_object shm_status(boost::interprocess::open_only,
174 statusName.c_str(),
175 boost::interprocess::read_write);
176 m_status = std::make_unique<boost::interprocess::mapped_region>(shm_status, boost::interprocess::read_write, num * sizeof(ShareEventHeader), sizeof(ShareEventHeader));
177 boost::interprocess::shared_memory_object shm(boost::interprocess::open_only,
178 m_sharedMemory.value().c_str(),
179 boost::interprocess::read_write);
180 m_payload = std::make_unique<boost::interprocess::mapped_region>(shm, boost::interprocess::read_write, 0, m_maxSize);
181 m_isClient = true;
182 } catch (boost::interprocess::interprocess_exception& e) {
183 usleep(100000);
184 }
185 }
186 if (m_num <= 0 && num > 0) {
187 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address());
188 streamPortSuffix.assign(evtH->token);
189 while (lockObject("start").isRecoverable()) {
190 usleep(100);
191 }
192 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
193 usleep(100);
194 }
195 m_num = num;
196 }
197 return(StatusCode::SUCCESS);
198}
199
200//___________________________________________________________________________
202 return(m_isClient);
203}
204
205//___________________________________________________________________________
206StatusCode AthenaSharedMemoryTool::putEvent(long eventNumber, const void* source, std::size_t nbytes, unsigned int status) const {
207 if (nbytes > m_maxSize) {
208 ATH_MSG_ERROR("Event size = " << nbytes << " greater than maximum for eventNumber = " << eventNumber);
209 return(StatusCode::FAILURE);
210 }
211 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address());
212 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
213 ATH_MSG_DEBUG("Waiting for putEvent, eventNumber = " << eventNumber);
214 return(StatusCode::RECOVERABLE);
215 }
216 if (source == nullptr && nbytes == 0) {
217 ATH_MSG_DEBUG("putEvent got last Event marker");
218 evtH->evtSeqNumber = 0;
219 evtH->fileSeqNumber = 0;
220 evtH->evtSize = 0;
221 evtH->evtOffset = 0;
222 m_status->flush(0, sizeof(ShareEventHeader));
223 return(StatusCode::SUCCESS);
224 }
225 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
226 if (source != nullptr && source != m_payload->get_address()) {
227 std::memcpy(m_payload->get_address(), source, nbytes);
228 m_payload->flush(0, nbytes);
229 }
230 evtH->evtSeqNumber = eventNumber;
231 evtH->fileSeqNumber = m_fileSeqNumber;
232 evtH->evtSize = nbytes;
233 evtH->evtOffset = 0;
234 evtH->evtCoreStatusFlag = status;
235 m_status->flush(0, sizeof(ShareEventHeader));
236 evtH->evtProcessStatus = ShareEventHeader::FILLED;
237 return(StatusCode::SUCCESS);
238}
239
240//___________________________________________________________________________
241StatusCode AthenaSharedMemoryTool::getLockedEvent(void** target, unsigned int& status) const {
242 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address());
243 if (evtH->evtProcessStatus == ShareEventHeader::LOCKED) {
244 const std::size_t nbytes = evtH->evtSize;
245 if (target != nullptr) {
246 char* buf = new char[nbytes];
247 std::memcpy(buf, static_cast<char*>(m_payload->get_address()) + evtH->evtOffset, nbytes);
248 *target = buf;
249 }
250 status = evtH->evtCoreStatusFlag;
251 } else {
252 ATH_MSG_ERROR("No event locked");
253 return(StatusCode::FAILURE);
254 }
255 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
256 return(StatusCode::SUCCESS);
257}
258
259//___________________________________________________________________________
260StatusCode AthenaSharedMemoryTool::lockEvent(long eventNumber) const {
261 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address());
262 if (evtH->evtSeqNumber > eventNumber) {
263 ATH_MSG_ERROR("eventNumber = " << eventNumber << ", already processed");
264 return(StatusCode::FAILURE);
265 }
266 if (evtH->evtSeqNumber == 0 && evtH->fileSeqNumber == 0 && evtH->evtSize == 0) {
267 ATH_MSG_DEBUG("Server has been terminated");
268 return(StatusCode::FAILURE);
269 }
270 if (evtH->evtSeqNumber != eventNumber || evtH->evtProcessStatus != ShareEventHeader::FILLED) {
271 ATH_MSG_DEBUG("Waiting for lockEvent, eventNumber = " << eventNumber);
272 return(StatusCode::RECOVERABLE);
273 }
274 if (evtH->fileSeqNumber != m_fileSeqNumber && m_fileSeqNumber > 0) {
275 InputFileIncidentGuard::transition(m_inputFileGuard, *m_incidentSvc, name(), "SHM", {});
276 const_cast<AthenaSharedMemoryTool*>(this)->m_fileSeqNumber = evtH->fileSeqNumber;
277 }
278 ATH_MSG_DEBUG("Locking eventNumber = " << eventNumber);
279 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
280 return(StatusCode::SUCCESS);
281}
282
283//___________________________________________________________________________
284StatusCode AthenaSharedMemoryTool::putObject(const void* source, size_t nbytes, int num) {
285 if (nbytes > m_maxSize) {
286 ATH_MSG_ERROR("Object size = " << nbytes << " greater than maximum for client = " << num);
287 return(StatusCode::FAILURE);
288 }
289 void* status = static_cast<char*>(m_status->get_address()) + num * sizeof(ShareEventHeader);
290 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(status);
291 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus; // read only once
292 if (evtStatus != ShareEventHeader::CLEARED) {
293 ATH_MSG_DEBUG("Waiting for CLEARED putObject, client = " << num << ", in state " << evtStatus);
294 return(StatusCode::RECOVERABLE);
295 }
296 if (source == nullptr) {
297 evtH->evtSize = evtH->evtOffset;
298 evtH->evtOffset = 0;
299 m_payload->flush(0 + evtH->evtOffset, evtH->evtSize);
300 m_status->flush(num * sizeof(ShareEventHeader), sizeof(ShareEventHeader));
301 evtH->evtProcessStatus = ShareEventHeader::FILLED;
302 } else {
303 if (evtH->evtOffset + sizeof(size_t) + nbytes > m_maxSize) {
304 ATH_MSG_ERROR("Object location = " << evtH->evtOffset << " greater than maximum for client = " << num);
305 return(StatusCode::FAILURE);
306 }
307 bool first = (evtH->evtOffset == 0);
308 std::memcpy(static_cast<char*>(m_payload->get_address()) + evtH->evtOffset, &nbytes, sizeof(size_t));
309 evtH->evtOffset += sizeof(size_t);
310 std::memcpy(static_cast<char*>(m_payload->get_address()) + evtH->evtOffset, source, nbytes);
311 evtH->evtOffset += nbytes;
312 if (evtH->evtSize == m_maxSize) {
313 evtH->evtSize = evtH->evtOffset;
314 }
315 if (first) {
316 evtH->evtSize = m_maxSize;
317 m_payload->flush(0, evtH->evtOffset);
318 m_status->flush(num * sizeof(ShareEventHeader), sizeof(ShareEventHeader));
319 }
320 }
321 return(StatusCode::SUCCESS);
322}
323
324//___________________________________________________________________________
325StatusCode AthenaSharedMemoryTool::getObject(void** target, size_t& nbytes, int num) {
326 if (target == nullptr) {
327 return(StatusCode::SUCCESS);
328 }
329 void* status = static_cast<char*>(m_status->get_address()) + num * sizeof(ShareEventHeader);
330 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(status);
331 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus; // read only once
332 size_t evtSize = evtH->evtSize; // read only once
333 if (evtStatus != ShareEventHeader::FILLED) {
334 ATH_MSG_DEBUG("Waiting for FILLED getObject, client = " << num);
335 nbytes = 0;
336 return(StatusCode::RECOVERABLE);
337 }
338 if (evtH->evtOffset < evtSize) {
339 std::memcpy(&nbytes, static_cast<char*>(m_payload->get_address()) + evtH->evtOffset, sizeof(size_t));
340 evtH->evtOffset += sizeof(size_t);
341 *target = static_cast<char*>(m_payload->get_address()) + evtH->evtOffset;
342 evtH->evtOffset += nbytes;
343 return(StatusCode::SUCCESS); // return object
344 }
345 nbytes = 0;
346 evtH->evtOffset = 0;
347 m_status->flush(num * sizeof(ShareEventHeader), sizeof(ShareEventHeader));
348 while (evtH->evtProcessStatus != ShareEventHeader::FILLED) {
349 usleep(10);
350 }
351 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
352 return(StatusCode::SUCCESS); // unlock server
353}
354
355//___________________________________________________________________________
356StatusCode AthenaSharedMemoryTool::clearObject(const char** tokenString, int& num) {
357 if (m_isClient) {
358 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address());
359 if (evtH->evtProcessStatus != ShareEventHeader::CLEARED) {
360 ATH_MSG_DEBUG("Waiting for FILL clearObject");
361 return(StatusCode::RECOVERABLE);
362 }
363 *tokenString = evtH->token;
364 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
365 return(StatusCode::SUCCESS);
366 }
367
368 for (int i = 1; i <= m_num; i++) { // FIXME: PvG, do round robin
369 void* status = static_cast<char*>(m_status->get_address()) + i * sizeof(ShareEventHeader);
370 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(status);
371 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus; // read only once
372 if (evtStatus == ShareEventHeader::FILLED) {
373 ATH_MSG_DEBUG("Waiting for UNFILL clearObject, client = " << i);
374 return(StatusCode::RECOVERABLE);
375 } else if (i == num && evtStatus != ShareEventHeader::LOCKED) {
376 ATH_MSG_DEBUG("Waiting for LOCK clearObject, client = " << i);
377 return(StatusCode::RECOVERABLE);
378 }
379 if (m_lastClient < 0 || m_lastClient == i) { // If Writer is not yet released, only consider last client
380 if ((i == num || num < 0) && evtStatus == ShareEventHeader::LOCKED) {
381 *tokenString = evtH->token;
382 num = i;
383 ATH_MSG_DEBUG("Setting LOCK clearObject, for client = " << num << ": " << evtH->token);
384 m_lastClient = -1;
385 // break;
386 }
387 }
388 }
389
390 if (num > 0 && *tokenString != nullptr) {
391 m_lastClient = -1; // Release Writer from last client
392 void* status = static_cast<char*>(m_status->get_address()) + num * sizeof(ShareEventHeader);
393 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(status);
394 if (strncmp(*tokenString, "wait", 4) == 0) {
395 ATH_MSG_INFO("Server clearObject() got wait, client = " << num);
396 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
397 m_lastClient = num; // Keep Writer allocated
398 num = -1;
399 return(StatusCode::SUCCESS);
400 }
401 if (strncmp(*tokenString, "release", 7) == 0) {
402 ATH_MSG_INFO("Server clearObject() got release, client = " << num);
403 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
404 num = -1;
405 return(StatusCode::SUCCESS);
406 }
407 if (strncmp(*tokenString, "start", 5) == 0) {
408 ATH_MSG_INFO("Server clearObject() got start, client = " << num << ", of " << m_num - 1);
409 if (m_dataClients.empty()) { // Annouce all workers to prevent early writer termination
410 for (int i = 1; i < m_num - 1; i++) m_dataClients.insert(i);
411 }
412 m_dataClients.insert(num);
413 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
414 num = -1;
415 return(StatusCode::SUCCESS);
416 }
417 if (strncmp(*tokenString, "stop", 4) == 0) {
418 ATH_MSG_INFO("Server clearObject() got stop, client = " << num);
419 m_dataClients.erase(num);
420 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
421 if (m_dataClients.empty()) {
422 ATH_MSG_INFO("Server clearObject() got stop, client ALL: " << m_num);
423 if (num == m_num - 1) { // mother process
424 return(StatusCode::SUCCESS);
425 }
426 return(StatusCode::FAILURE);
427 }
428 num = -1;
429 return(StatusCode::SUCCESS);
430 }
431 evtH->evtSize = 0;
432 m_status->flush(0, sizeof(ShareEventHeader));
433 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
434 return(StatusCode::SUCCESS);
435 }
436 return(StatusCode::RECOVERABLE);
437}
438
439//___________________________________________________________________________
440StatusCode AthenaSharedMemoryTool::lockObject(const char* tokenString, int num) {
441 if (strlen(tokenString) >= maxTokenLength) {
442 ATH_MSG_ERROR("Token = " << tokenString << ", too long for AthenaSharedMemoryTool");
443 return(StatusCode::FAILURE);
444 }
445 void* status = static_cast<char*>(m_status->get_address()) + num * sizeof(ShareEventHeader);
446 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(status);
447 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
448 ATH_MSG_DEBUG("Waiting for UNLOCKED lockObject: " << tokenString);
449 return(StatusCode::RECOVERABLE);
450 }
451 strncpy(evtH->token, tokenString, maxTokenLength - 1); evtH->token[maxTokenLength - 1] = 0;
452 if (m_isServer) {
453 evtH->evtSize = 0;
454 m_status->flush(num * sizeof(ShareEventHeader), sizeof(ShareEventHeader));
455 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
456 } else {
457 m_status->flush(num * sizeof(ShareEventHeader), sizeof(ShareEventHeader));
458 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
459 }
460 return(StatusCode::SUCCESS);
461}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
const std::size_t maxTokenLength
This file contains the class definition for the AthenaSharedMemoryTool class.
std::unique_ptr< boost::interprocess::mapped_region > m_payload
virtual StatusCode lockEvent(long eventNumber) const override
virtual StatusCode initialize() override
Gaudi Service Interface method implementations:
Gaudi::Property< std::string > m_sharedMemory
std::unique_ptr< boost::interprocess::mapped_region > m_status
virtual bool isClient() const override
virtual StatusCode clearObject(const char **tokenString, int &num) override
ServiceHandle< IIncidentSvc > m_incidentSvc
virtual StatusCode lockObject(const char *tokenString, int num=0) override
virtual StatusCode getObject(void **target, size_t &nbytes, int num=0) override
Gaudi::Property< size_t > m_maxSize
virtual StatusCode getLockedEvent(void **target, unsigned int &status) const override
virtual StatusCode stop() override
virtual StatusCode makeClient(int num, std::string &streamPortSuffix) override
virtual bool isServer() const override
Gaudi::Property< int > m_maxDataClients
virtual StatusCode finalize() override
virtual StatusCode putObject(const void *source, size_t nbytes, int num=0) override
AthenaSharedMemoryTool(const std::string &type, const std::string &name, const IInterface *parent)
Standard Service Constructor.
virtual ~AthenaSharedMemoryTool()
Destructor.
virtual StatusCode makeServer(int num, const std::string &streamPortSuffix) override
static void transition(std::optional< InputFileIncidentGuard > &guard, IIncidentSvc &incSvc, std::string_view source, std::string_view beginFileName, std::string_view guid, std::string_view endFileName={}, std::string_view beginType=IncidentType::BeginInputFile, std::string_view endType=IncidentType::EndInputFile)
Replace the guard in an optional, with strict End-before-Begin ordering.
status
Definition merge.py:16