This class provides the IPCTool for SharedMemory objects.
More...
#include <AthenaSharedMemoryTool.h>
|
| | AthenaSharedMemoryTool (const std::string &type, const std::string &name, const IInterface *parent) |
| | Standard Service Constructor. More...
|
| |
| virtual | ~AthenaSharedMemoryTool () |
| | Destructor. More...
|
| |
| virtual StatusCode | initialize () override |
| | Gaudi Service Interface method implementations: More...
|
| |
| virtual StatusCode | stop () override |
| |
| virtual StatusCode | finalize () override |
| |
| virtual StatusCode | makeServer (int num, const std::string &streamPortSuffix) override |
| |
| virtual bool | isServer () const override |
| |
| virtual StatusCode | makeClient (int num, std::string &streamPortSuffix) override |
| |
| virtual bool | isClient () const override |
| |
| virtual StatusCode putEvent | ATLAS_NOT_THREAD_SAFE (long eventNumber, const void *source, size_t nbytes, unsigned int status) const override |
| |
| virtual StatusCode | getLockedEvent (void **target, unsigned int &status) const override |
| |
| virtual StatusCode | lockEvent (long eventNumber) const override |
| |
| virtual StatusCode | putObject (const void *source, size_t nbytes, int num=0) override |
| |
| virtual StatusCode | getObject (void **target, size_t &nbytes, int num=0) override |
| |
| virtual StatusCode | clearObject (const char **tokenString, int &num) override |
| |
| virtual StatusCode | lockObject (const char *tokenString, int num=0) override |
| |
This class provides the IPCTool for SharedMemory objects.
Definition at line 34 of file AthenaSharedMemoryTool.h.
◆ AthenaSharedMemoryTool()
| AthenaSharedMemoryTool::AthenaSharedMemoryTool |
( |
const std::string & |
type, |
|
|
const std::string & |
name, |
|
|
const IInterface * |
parent |
|
) |
| |
◆ ~AthenaSharedMemoryTool()
| AthenaSharedMemoryTool::~AthenaSharedMemoryTool |
( |
| ) |
|
|
virtual |
◆ ATLAS_NOT_THREAD_SAFE()
| virtual StatusCode putEvent AthenaSharedMemoryTool::ATLAS_NOT_THREAD_SAFE |
( |
long |
eventNumber, |
|
|
const void * |
source, |
|
|
size_t |
nbytes, |
|
|
unsigned int |
status |
|
) |
| const |
|
overridevirtual |
◆ clearObject()
| StatusCode AthenaSharedMemoryTool::clearObject |
( |
const char ** |
tokenString, |
|
|
int & |
num |
|
) |
| |
|
overridevirtual |
Definition at line 357 of file AthenaSharedMemoryTool.cxx.
359 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
360 if (evtH->evtProcessStatus != ShareEventHeader::CLEARED) {
362 return(StatusCode::RECOVERABLE);
364 *tokenString = evtH->token;
365 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
366 return(StatusCode::SUCCESS);
370 void*
status =
static_cast<char*
>(
m_status->get_address()) +
i *
sizeof(ShareEventHeader);
371 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
372 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
373 if (evtStatus == ShareEventHeader::FILLED) {
375 return(StatusCode::RECOVERABLE);
376 }
else if (
i ==
num && evtStatus != ShareEventHeader::LOCKED) {
378 return(StatusCode::RECOVERABLE);
381 if ((
i ==
num ||
num < 0) && evtStatus == ShareEventHeader::LOCKED) {
382 *tokenString = evtH->token;
384 ATH_MSG_DEBUG(
"Setting LOCK clearObject, for client = " <<
num <<
": " << evtH->token);
391 if (
num > 0 && *tokenString !=
nullptr) {
393 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
394 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
395 if (strncmp(*tokenString,
"wait", 4) == 0) {
397 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
400 return(StatusCode::SUCCESS);
402 if (strncmp(*tokenString,
"release", 7) == 0) {
404 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
406 return(StatusCode::SUCCESS);
408 if (strncmp(*tokenString,
"start", 5) == 0) {
414 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
416 return(StatusCode::SUCCESS);
418 if (strncmp(*tokenString,
"stop", 4) == 0) {
421 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
425 return(StatusCode::SUCCESS);
427 return(StatusCode::FAILURE);
430 return(StatusCode::SUCCESS);
433 m_status->flush(0,
sizeof(ShareEventHeader));
434 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
435 return(StatusCode::SUCCESS);
437 return(StatusCode::RECOVERABLE);
◆ finalize()
| StatusCode AthenaSharedMemoryTool::finalize |
( |
| ) |
|
|
overridevirtual |
Definition at line 75 of file AthenaSharedMemoryTool.cxx.
80 }
catch(boost::interprocess::interprocess_exception &
e) {
86 }
catch(boost::interprocess::interprocess_exception &
e) {
◆ getLockedEvent()
| StatusCode AthenaSharedMemoryTool::getLockedEvent |
( |
void ** |
target, |
|
|
unsigned int & |
status |
|
) |
| const |
|
overridevirtual |
Definition at line 239 of file AthenaSharedMemoryTool.cxx.
240 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
241 if (evtH->evtProcessStatus == ShareEventHeader::LOCKED) {
242 const std::size_t nbytes = evtH->evtSize;
244 char*
buf =
new char[nbytes];
245 std::memcpy(buf,
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, nbytes);
248 status = evtH->evtCoreStatusFlag;
251 return(StatusCode::FAILURE);
253 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
254 return(StatusCode::SUCCESS);
◆ getObject()
| StatusCode AthenaSharedMemoryTool::getObject |
( |
void ** |
target, |
|
|
size_t & |
nbytes, |
|
|
int |
num = 0 |
|
) |
| |
|
overridevirtual |
Definition at line 326 of file AthenaSharedMemoryTool.cxx.
328 return(StatusCode::SUCCESS);
330 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
331 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
332 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
333 size_t evtSize = evtH->evtSize;
334 if (evtStatus != ShareEventHeader::FILLED) {
337 return(StatusCode::RECOVERABLE);
339 if (evtH->evtOffset < evtSize) {
340 std::memcpy(&nbytes,
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset,
sizeof(
size_t));
341 evtH->evtOffset +=
sizeof(size_t);
342 *
target =
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset;
343 evtH->evtOffset += nbytes;
344 return(StatusCode::SUCCESS);
348 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
349 while (evtH->evtProcessStatus != ShareEventHeader::FILLED) {
352 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
353 return(StatusCode::SUCCESS);
◆ initialize()
| StatusCode AthenaSharedMemoryTool::initialize |
( |
| ) |
|
|
overridevirtual |
◆ isClient()
| bool AthenaSharedMemoryTool::isClient |
( |
| ) |
const |
|
overridevirtual |
◆ isServer()
| bool AthenaSharedMemoryTool::isServer |
( |
| ) |
const |
|
overridevirtual |
◆ lockEvent()
| StatusCode AthenaSharedMemoryTool::lockEvent |
( |
long |
eventNumber | ) |
const |
|
overridevirtual |
Definition at line 258 of file AthenaSharedMemoryTool.cxx.
259 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
262 return(StatusCode::FAILURE);
264 if (evtH->evtSeqNumber == 0 && evtH->fileSeqNumber == 0 && evtH->evtSize == 0) {
266 return(StatusCode::FAILURE);
268 if (evtH->evtSeqNumber !=
eventNumber || evtH->evtProcessStatus != ShareEventHeader::FILLED) {
270 return(StatusCode::RECOVERABLE);
273 FileIncident endFileIncident(
name(),
"EndInputFile",
"SHM");
276 FileIncident beginFileIncident(
name(),
"BeginInputFile",
"SHM");
280 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
281 return(StatusCode::SUCCESS);
◆ lockObject()
| StatusCode AthenaSharedMemoryTool::lockObject |
( |
const char * |
tokenString, |
|
|
int |
num = 0 |
|
) |
| |
|
overridevirtual |
Definition at line 441 of file AthenaSharedMemoryTool.cxx.
443 ATH_MSG_ERROR(
"Token = " << tokenString <<
", too long for AthenaSharedMemoryTool");
444 return(StatusCode::FAILURE);
446 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
447 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
448 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
449 ATH_MSG_DEBUG(
"Waiting for UNLOCKED lockObject: " << tokenString);
450 return(StatusCode::RECOVERABLE);
455 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
456 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
458 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
459 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
461 return(StatusCode::SUCCESS);
◆ makeClient()
| StatusCode AthenaSharedMemoryTool::makeClient |
( |
int |
num, |
|
|
std::string & |
streamPortSuffix |
|
) |
| |
|
overridevirtual |
Definition at line 144 of file AthenaSharedMemoryTool.cxx.
146 ATH_MSG_ERROR(
"Cannot make AthenaSharedMemoryTool a Client.");
147 return(StatusCode::FAILURE);
150 ATH_MSG_ERROR(
"Too many clients for AthenaSharedMemoryTool.");
151 return(StatusCode::FAILURE);
159 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
160 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
166 return(StatusCode::SUCCESS);
171 boost::interprocess::shared_memory_object shm_status(boost::interprocess::open_only,
173 boost::interprocess::read_write);
174 m_status = std::make_unique<boost::interprocess::mapped_region>(shm_status, boost::interprocess::read_write,
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
175 boost::interprocess::shared_memory_object shm(boost::interprocess::open_only,
177 boost::interprocess::read_write);
178 m_payload = std::make_unique<boost::interprocess::mapped_region>(shm, boost::interprocess::read_write, 0,
m_maxSize);
180 }
catch (boost::interprocess::interprocess_exception&
e) {
184 if (m_num <= 0 && num > 0) {
185 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
186 streamPortSuffix.assign(evtH->token);
190 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
195 return(StatusCode::SUCCESS);
◆ makeServer()
| StatusCode AthenaSharedMemoryTool::makeServer |
( |
int |
num, |
|
|
const std::string & |
streamPortSuffix |
|
) |
| |
|
overridevirtual |
Definition at line 98 of file AthenaSharedMemoryTool.cxx.
100 ATH_MSG_ERROR(
"Cannot make AthenaSharedMemoryTool a Server.");
101 return(StatusCode::FAILURE);
104 ATH_MSG_ERROR(
"Too many clients for AthenaSharedMemoryTool Server.");
105 return(StatusCode::FAILURE);
110 std::optional<boost::interprocess::shared_memory_object> shm;
112 shm.emplace(boost::interprocess::create_only,
m_sharedMemory.value().c_str(), boost::interprocess::read_write);
113 }
catch(boost::interprocess::interprocess_exception &
e) {
115 return StatusCode::FAILURE;
118 m_payload = std::make_unique<boost::interprocess::mapped_region>(*shm, boost::interprocess::read_write, 0,
m_maxSize);
120 std::optional<boost::interprocess::shared_memory_object> shm_status;
122 shm_status.emplace(boost::interprocess::create_only,
statusName.c_str(), boost::interprocess::read_write);
123 }
catch(boost::interprocess::interprocess_exception &
e) {
125 return StatusCode::FAILURE;
127 shm_status->truncate(
num *
sizeof(ShareEventHeader));
128 m_status = std::make_unique<boost::interprocess::mapped_region>(*shm_status, boost::interprocess::read_write, 0,
num *
sizeof(ShareEventHeader));
129 ShareEventHeader evtH = { ShareEventHeader::UNLOCKED, -1, -1, 0, 0, 0,
"" };
130 std::memcpy(evtH.token, streamPortSuffix.c_str(),
maxTokenLength - 1);
132 for (
int i = 0;
i <
num;
i++) {
133 std::memcpy(
static_cast<char*
>(
m_status->get_address()) +
i *
sizeof(ShareEventHeader), &evtH,
sizeof(ShareEventHeader));
135 return(StatusCode::SUCCESS);
◆ putObject()
| StatusCode AthenaSharedMemoryTool::putObject |
( |
const void * |
source, |
|
|
size_t |
nbytes, |
|
|
int |
num = 0 |
|
) |
| |
|
overridevirtual |
Definition at line 285 of file AthenaSharedMemoryTool.cxx.
287 ATH_MSG_ERROR(
"Object size = " << nbytes <<
" greater than maximum for client = " <<
num);
288 return(StatusCode::FAILURE);
290 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
291 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
292 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
293 if (evtStatus != ShareEventHeader::CLEARED) {
294 ATH_MSG_DEBUG(
"Waiting for CLEARED putObject, client = " <<
num <<
", in state " << evtStatus);
295 return(StatusCode::RECOVERABLE);
298 evtH->evtSize = evtH->evtOffset;
300 m_payload->flush(0 + evtH->evtOffset, evtH->evtSize);
301 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
302 evtH->evtProcessStatus = ShareEventHeader::FILLED;
304 if (evtH->evtOffset +
sizeof(
size_t) + nbytes >
m_maxSize) {
305 ATH_MSG_ERROR(
"Object location = " << evtH->evtOffset <<
" greater than maximum for client = " <<
num);
306 return(StatusCode::FAILURE);
308 bool first = (evtH->evtOffset == 0);
309 std::memcpy(
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, &nbytes,
sizeof(
size_t));
310 evtH->evtOffset +=
sizeof(size_t);
311 std::memcpy(
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset,
source, nbytes);
312 evtH->evtOffset += nbytes;
314 evtH->evtSize = evtH->evtOffset;
319 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
322 return(StatusCode::SUCCESS);
◆ stop()
| StatusCode AthenaSharedMemoryTool::stop |
( |
| ) |
|
|
overridevirtual |
Definition at line 58 of file AthenaSharedMemoryTool.cxx.
66 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
67 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
71 return(StatusCode::SUCCESS);
◆ m_dataClients
| std::set<int> AthenaSharedMemoryTool::m_dataClients |
|
private |
◆ m_fileSeqNumber
| long AthenaSharedMemoryTool::m_fileSeqNumber {0} |
|
private |
◆ m_incidentSvc
| ServiceHandle<IIncidentSvc> AthenaSharedMemoryTool::m_incidentSvc |
|
private |
◆ m_isClient
| bool AthenaSharedMemoryTool::m_isClient {false} |
|
private |
◆ m_isServer
| bool AthenaSharedMemoryTool::m_isServer {false} |
|
private |
◆ m_lastClient
| int AthenaSharedMemoryTool::m_lastClient {-1} |
|
private |
◆ m_maxDataClients
| const int AthenaSharedMemoryTool::m_maxDataClients {256} |
|
private |
◆ m_maxSize
| const size_t AthenaSharedMemoryTool::m_maxSize {64 * 1024 * 1024} |
|
private |
◆ m_num
| int AthenaSharedMemoryTool::m_num {-1} |
|
private |
◆ m_payload
| std::unique_ptr<boost::interprocess::mapped_region> AthenaSharedMemoryTool::m_payload |
|
private |
◆ m_sharedMemory
| Gaudi::Property<std::string> AthenaSharedMemoryTool::m_sharedMemory {this, "SharedMemoryName", {}} |
|
private |
◆ m_status
| std::unique_ptr<boost::interprocess::mapped_region> AthenaSharedMemoryTool::m_status |
|
private |
The documentation for this class was generated from the following files: