This class provides the IPCTool for SharedMemory objects.
More...
#include <AthenaSharedMemoryTool.h>
|
| AthenaSharedMemoryTool (const std::string &type, const std::string &name, const IInterface *parent) |
| Standard Service Constructor. More...
|
|
virtual | ~AthenaSharedMemoryTool () |
| Destructor. More...
|
|
virtual StatusCode | initialize () override |
| Gaudi Service Interface method implementations: More...
|
|
virtual StatusCode | stop () override |
|
virtual StatusCode | finalize () override |
|
virtual StatusCode | makeServer (int num, const std::string &streamPortSuffix) override |
|
virtual bool | isServer () const override |
|
virtual StatusCode | makeClient (int num, std::string &streamPortSuffix) override |
|
virtual bool | isClient () const override |
|
virtual StatusCode putEvent | ATLAS_NOT_THREAD_SAFE (long eventNumber, const void *source, size_t nbytes, unsigned int status) const override |
|
virtual StatusCode | getLockedEvent (void **target, unsigned int &status) const override |
|
virtual StatusCode | lockEvent (long eventNumber) const override |
|
virtual StatusCode | putObject (const void *source, size_t nbytes, int num=0) override |
|
virtual StatusCode | getObject (void **target, size_t &nbytes, int num=0) override |
|
virtual StatusCode | clearObject (const char **tokenString, int &num) override |
|
virtual StatusCode | lockObject (const char *tokenString, int num=0) override |
|
This class provides the IPCTool for SharedMemory objects.
Definition at line 34 of file AthenaSharedMemoryTool.h.
◆ AthenaSharedMemoryTool()
AthenaSharedMemoryTool::AthenaSharedMemoryTool |
( |
const std::string & |
type, |
|
|
const std::string & |
name, |
|
|
const IInterface * |
parent |
|
) |
| |
◆ ~AthenaSharedMemoryTool()
AthenaSharedMemoryTool::~AthenaSharedMemoryTool |
( |
| ) |
|
|
virtual |
◆ ATLAS_NOT_THREAD_SAFE()
virtual StatusCode putEvent AthenaSharedMemoryTool::ATLAS_NOT_THREAD_SAFE |
( |
long |
eventNumber, |
|
|
const void * |
source, |
|
|
size_t |
nbytes, |
|
|
unsigned int |
status |
|
) |
| const |
|
overridevirtual |
◆ clearObject()
StatusCode AthenaSharedMemoryTool::clearObject |
( |
const char ** |
tokenString, |
|
|
int & |
num |
|
) |
| |
|
overridevirtual |
Definition at line 358 of file AthenaSharedMemoryTool.cxx.
360 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
361 if (evtH->evtProcessStatus != ShareEventHeader::CLEARED) {
363 return(StatusCode::RECOVERABLE);
365 *tokenString = evtH->token;
366 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
367 return(StatusCode::SUCCESS);
371 void*
status =
static_cast<char*
>(
m_status->get_address()) +
i *
sizeof(ShareEventHeader);
372 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
373 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
374 if (evtStatus == ShareEventHeader::FILLED) {
376 return(StatusCode::RECOVERABLE);
377 }
else if (
i ==
num && evtStatus != ShareEventHeader::LOCKED) {
379 return(StatusCode::RECOVERABLE);
382 if ((
i ==
num ||
num < 0) && evtStatus == ShareEventHeader::LOCKED) {
383 *tokenString = evtH->token;
385 ATH_MSG_DEBUG(
"Setting LOCK clearObject, for client = " <<
num <<
": " << evtH->token);
392 if (
num > 0 && *tokenString !=
nullptr) {
394 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
395 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
396 if (strncmp(*tokenString,
"wait", 4) == 0) {
398 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
401 return(StatusCode::SUCCESS);
403 if (strncmp(*tokenString,
"release", 7) == 0) {
405 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
407 return(StatusCode::SUCCESS);
409 if (strncmp(*tokenString,
"start", 5) == 0) {
415 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
417 return(StatusCode::SUCCESS);
419 if (strncmp(*tokenString,
"stop", 4) == 0) {
422 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
426 return(StatusCode::SUCCESS);
428 return(StatusCode::FAILURE);
431 return(StatusCode::SUCCESS);
434 m_status->flush(0,
sizeof(ShareEventHeader));
435 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
436 return(StatusCode::SUCCESS);
438 return(StatusCode::RECOVERABLE);
◆ finalize()
StatusCode AthenaSharedMemoryTool::finalize |
( |
| ) |
|
|
overridevirtual |
Definition at line 75 of file AthenaSharedMemoryTool.cxx.
80 }
catch(boost::interprocess::interprocess_exception &
e) {
86 }
catch(boost::interprocess::interprocess_exception &
e) {
◆ getLockedEvent()
StatusCode AthenaSharedMemoryTool::getLockedEvent |
( |
void ** |
target, |
|
|
unsigned int & |
status |
|
) |
| const |
|
overridevirtual |
Definition at line 239 of file AthenaSharedMemoryTool.cxx.
240 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
241 if (evtH->evtProcessStatus == ShareEventHeader::LOCKED) {
242 const std::size_t nbytes = evtH->evtSize;
244 char*
buf =
new char[nbytes];
245 std::memcpy(buf,
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, nbytes);
248 status = evtH->evtCoreStatusFlag;
251 return(StatusCode::FAILURE);
253 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
254 return(StatusCode::SUCCESS);
◆ getObject()
StatusCode AthenaSharedMemoryTool::getObject |
( |
void ** |
target, |
|
|
size_t & |
nbytes, |
|
|
int |
num = 0 |
|
) |
| |
|
overridevirtual |
Definition at line 326 of file AthenaSharedMemoryTool.cxx.
328 return(StatusCode::SUCCESS);
330 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
331 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
332 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
333 size_t evtSize = evtH->evtSize;
334 if (evtStatus != ShareEventHeader::FILLED) {
337 return(StatusCode::RECOVERABLE);
339 if (evtH->evtOffset < evtSize) {
340 std::memcpy(&nbytes,
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset,
sizeof(
size_t));
341 evtH->evtOffset +=
sizeof(size_t);
342 *
target =
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset;
343 evtH->evtOffset += nbytes;
345 if (evtH->evtOffset == evtSize) {
347 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
348 while (evtH->evtProcessStatus != ShareEventHeader::FILLED) {
351 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
354 return(StatusCode::SUCCESS);
◆ initialize()
StatusCode AthenaSharedMemoryTool::initialize |
( |
| ) |
|
|
overridevirtual |
◆ isClient()
bool AthenaSharedMemoryTool::isClient |
( |
| ) |
const |
|
overridevirtual |
◆ isServer()
bool AthenaSharedMemoryTool::isServer |
( |
| ) |
const |
|
overridevirtual |
◆ lockEvent()
StatusCode AthenaSharedMemoryTool::lockEvent |
( |
long |
eventNumber | ) |
const |
|
overridevirtual |
Definition at line 258 of file AthenaSharedMemoryTool.cxx.
259 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
262 return(StatusCode::FAILURE);
264 if (evtH->evtSeqNumber == 0 && evtH->fileSeqNumber == 0 && evtH->evtSize == 0) {
266 return(StatusCode::FAILURE);
268 if (evtH->evtSeqNumber !=
eventNumber || evtH->evtProcessStatus != ShareEventHeader::FILLED) {
270 return(StatusCode::RECOVERABLE);
273 FileIncident endFileIncident(
name(),
"EndInputFile",
"SHM");
276 FileIncident beginFileIncident(
name(),
"BeginInputFile",
"SHM");
280 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
281 return(StatusCode::SUCCESS);
◆ lockObject()
StatusCode AthenaSharedMemoryTool::lockObject |
( |
const char * |
tokenString, |
|
|
int |
num = 0 |
|
) |
| |
|
overridevirtual |
Definition at line 442 of file AthenaSharedMemoryTool.cxx.
444 ATH_MSG_ERROR(
"Token = " << tokenString <<
", too long for AthenaSharedMemoryTool");
445 return(StatusCode::FAILURE);
447 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
448 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
449 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
450 ATH_MSG_DEBUG(
"Waiting for UNLOCKED lockObject: " << tokenString);
451 return(StatusCode::RECOVERABLE);
456 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
457 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
459 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
460 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
462 return(StatusCode::SUCCESS);
◆ makeClient()
StatusCode AthenaSharedMemoryTool::makeClient |
( |
int |
num, |
|
|
std::string & |
streamPortSuffix |
|
) |
| |
|
overridevirtual |
Definition at line 144 of file AthenaSharedMemoryTool.cxx.
146 ATH_MSG_ERROR(
"Cannot make AthenaSharedMemoryTool a Client.");
147 return(StatusCode::FAILURE);
150 ATH_MSG_ERROR(
"Too many clients for AthenaSharedMemoryTool.");
151 return(StatusCode::FAILURE);
159 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
160 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
166 return(StatusCode::SUCCESS);
171 boost::interprocess::shared_memory_object shm_status(boost::interprocess::open_only,
173 boost::interprocess::read_write);
174 m_status = std::make_unique<boost::interprocess::mapped_region>(shm_status, boost::interprocess::read_write,
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
175 boost::interprocess::shared_memory_object shm(boost::interprocess::open_only,
177 boost::interprocess::read_write);
178 m_payload = std::make_unique<boost::interprocess::mapped_region>(shm, boost::interprocess::read_write, 0,
m_maxSize);
180 }
catch (boost::interprocess::interprocess_exception&
e) {
184 if (m_num <= 0 && num > 0) {
185 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
186 streamPortSuffix.assign(evtH->token);
190 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
195 return(StatusCode::SUCCESS);
◆ makeServer()
StatusCode AthenaSharedMemoryTool::makeServer |
( |
int |
num, |
|
|
const std::string & |
streamPortSuffix |
|
) |
| |
|
overridevirtual |
Definition at line 98 of file AthenaSharedMemoryTool.cxx.
100 ATH_MSG_ERROR(
"Cannot make AthenaSharedMemoryTool a Server.");
101 return(StatusCode::FAILURE);
104 ATH_MSG_ERROR(
"Too many clients for AthenaSharedMemoryTool Server.");
105 return(StatusCode::FAILURE);
110 std::optional<boost::interprocess::shared_memory_object> shm;
112 shm.emplace(boost::interprocess::create_only,
m_sharedMemory.value().c_str(), boost::interprocess::read_write);
113 }
catch(boost::interprocess::interprocess_exception &
e) {
115 return StatusCode::FAILURE;
118 m_payload = std::make_unique<boost::interprocess::mapped_region>(*shm, boost::interprocess::read_write, 0,
m_maxSize);
120 std::optional<boost::interprocess::shared_memory_object> shm_status;
122 shm_status.emplace(boost::interprocess::create_only,
statusName.c_str(), boost::interprocess::read_write);
123 }
catch(boost::interprocess::interprocess_exception &
e) {
125 return StatusCode::FAILURE;
127 shm_status->truncate(
num *
sizeof(ShareEventHeader));
128 m_status = std::make_unique<boost::interprocess::mapped_region>(*shm_status, boost::interprocess::read_write, 0,
num *
sizeof(ShareEventHeader));
129 ShareEventHeader evtH = { ShareEventHeader::UNLOCKED, -1, -1, 0, 0, 0,
"" };
130 std::memcpy(evtH.token, streamPortSuffix.c_str(),
maxTokenLength - 1);
132 for (
int i = 0;
i <
num;
i++) {
133 std::memcpy(
static_cast<char*
>(
m_status->get_address()) +
i *
sizeof(ShareEventHeader), &evtH,
sizeof(ShareEventHeader));
135 return(StatusCode::SUCCESS);
◆ putObject()
StatusCode AthenaSharedMemoryTool::putObject |
( |
const void * |
source, |
|
|
size_t |
nbytes, |
|
|
int |
num = 0 |
|
) |
| |
|
overridevirtual |
Definition at line 285 of file AthenaSharedMemoryTool.cxx.
287 ATH_MSG_ERROR(
"Object size = " << nbytes <<
" greater than maximum for client = " <<
num);
288 return(StatusCode::FAILURE);
290 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
291 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
292 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
293 if (evtStatus != ShareEventHeader::CLEARED) {
294 ATH_MSG_DEBUG(
"Waiting for CLEARED putObject, client = " <<
num <<
", in state " << evtStatus);
295 return(StatusCode::RECOVERABLE);
298 evtH->evtSize = evtH->evtOffset;
300 m_payload->flush(0 + evtH->evtOffset, evtH->evtSize);
301 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
302 evtH->evtProcessStatus = ShareEventHeader::FILLED;
304 if (evtH->evtOffset +
sizeof(
size_t) + nbytes >
m_maxSize) {
305 ATH_MSG_ERROR(
"Object location = " << evtH->evtOffset <<
" greater than maximum for client = " <<
num);
306 return(StatusCode::FAILURE);
308 bool first = (evtH->evtOffset == 0);
309 std::memcpy(
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, &nbytes,
sizeof(
size_t));
310 evtH->evtOffset +=
sizeof(size_t);
311 std::memcpy(
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset,
source, nbytes);
312 evtH->evtOffset += nbytes;
314 evtH->evtSize = evtH->evtOffset;
319 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
322 return(StatusCode::SUCCESS);
◆ stop()
StatusCode AthenaSharedMemoryTool::stop |
( |
| ) |
|
|
overridevirtual |
Definition at line 58 of file AthenaSharedMemoryTool.cxx.
66 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
67 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
71 return(StatusCode::SUCCESS);
◆ m_dataClients
std::set<int> AthenaSharedMemoryTool::m_dataClients |
|
private |
◆ m_fileSeqNumber
long AthenaSharedMemoryTool::m_fileSeqNumber {0} |
|
private |
◆ m_incidentSvc
ServiceHandle<IIncidentSvc> AthenaSharedMemoryTool::m_incidentSvc |
|
private |
◆ m_isClient
bool AthenaSharedMemoryTool::m_isClient {false} |
|
private |
◆ m_isServer
bool AthenaSharedMemoryTool::m_isServer {false} |
|
private |
◆ m_lastClient
int AthenaSharedMemoryTool::m_lastClient {-1} |
|
private |
◆ m_maxDataClients
const int AthenaSharedMemoryTool::m_maxDataClients {256} |
|
private |
◆ m_maxSize
const size_t AthenaSharedMemoryTool::m_maxSize {64 * 1024 * 1024} |
|
private |
◆ m_num
int AthenaSharedMemoryTool::m_num {-1} |
|
private |
◆ m_payload
std::unique_ptr<boost::interprocess::mapped_region> AthenaSharedMemoryTool::m_payload |
|
private |
◆ m_sharedMemory
Gaudi::Property<std::string> AthenaSharedMemoryTool::m_sharedMemory {this, "SharedMemoryName", {}} |
|
private |
◆ m_status
std::unique_ptr<boost::interprocess::mapped_region> AthenaSharedMemoryTool::m_status |
|
private |
The documentation for this class was generated from the following files: