11#include "GaudiKernel/FileIncident.h"
13#include <boost/exception/diagnostic_information.hpp>
14#include <boost/interprocess/shared_memory_object.hpp>
15#include <boost/interprocess/mapped_region.hpp>
23struct ShareEventHeader {
24 enum ProcessStatus { CLEARED, FILLED, LOCKED, UNLOCKED, UNKNOWN };
25 ProcessStatus evtProcessStatus;
29 std::size_t evtOffset;
30 unsigned int evtCoreStatusFlag;
37 const std::string& name,
38 const IInterface* parent) :
39 base_class(
type, name, parent),
54 return(StatusCode::SUCCESS);
66 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
67 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
71 return(StatusCode::SUCCESS);
79 boost::interprocess::shared_memory_object::remove(
m_sharedMemory.value().c_str());
80 }
catch(boost::interprocess::interprocess_exception &e) {
85 boost::interprocess::shared_memory_object::remove(statusName.c_str());
86 }
catch(boost::interprocess::interprocess_exception &e) {
87 ATH_MSG_WARNING(
"Cannot remove shared memory " << statusName <<
": " << boost::diagnostic_information(e));
94 return(::AthAlgTool::finalize());
100 ATH_MSG_ERROR(
"Cannot make AthenaSharedMemoryTool a Server.");
101 return(StatusCode::FAILURE);
104 ATH_MSG_ERROR(
"Too many clients for AthenaSharedMemoryTool Server.");
105 return(StatusCode::FAILURE);
110 std::optional<boost::interprocess::shared_memory_object> shm;
112 shm.emplace(boost::interprocess::create_only,
m_sharedMemory.value().c_str(), boost::interprocess::read_write);
113 }
catch(boost::interprocess::interprocess_exception &e) {
115 return StatusCode::FAILURE;
118 m_payload = std::make_unique<boost::interprocess::mapped_region>(*shm, boost::interprocess::read_write, 0,
m_maxSize);
119 const std::string statusName =
m_sharedMemory.value() +
"_status";
120 std::optional<boost::interprocess::shared_memory_object> shm_status;
122 shm_status.emplace(boost::interprocess::create_only, statusName.c_str(), boost::interprocess::read_write);
123 }
catch(boost::interprocess::interprocess_exception &e) {
124 ATH_MSG_ERROR(
"Cannot create shared memory " << statusName <<
": " << boost::diagnostic_information(e));
125 return StatusCode::FAILURE;
127 shm_status->truncate(num *
sizeof(ShareEventHeader));
128 m_status = std::make_unique<boost::interprocess::mapped_region>(*shm_status, boost::interprocess::read_write, 0, num *
sizeof(ShareEventHeader));
129 ShareEventHeader evtH = { ShareEventHeader::UNLOCKED, -1, -1, 0, 0, 0,
"" };
130 std::memcpy(evtH.token, streamPortSuffix.c_str(),
maxTokenLength - 1);
132 for (
int i = 0; i < num; i++) {
133 std::memcpy(
static_cast<char*
>(
m_status->get_address()) + i *
sizeof(ShareEventHeader), &evtH,
sizeof(ShareEventHeader));
135 return(StatusCode::SUCCESS);
146 ATH_MSG_ERROR(
"Cannot make AthenaSharedMemoryTool a Client.");
147 return(StatusCode::FAILURE);
150 ATH_MSG_ERROR(
"Too many clients for AthenaSharedMemoryTool.");
151 return(StatusCode::FAILURE);
153 if (
m_num > 0 && num <= 0) {
159 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
160 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
166 return(StatusCode::SUCCESS);
170 const std::string statusName =
m_sharedMemory.value() +
"_status";
171 boost::interprocess::shared_memory_object shm_status(boost::interprocess::open_only,
173 boost::interprocess::read_write);
174 m_status = std::make_unique<boost::interprocess::mapped_region>(shm_status, boost::interprocess::read_write, num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
175 boost::interprocess::shared_memory_object shm(boost::interprocess::open_only,
177 boost::interprocess::read_write);
178 m_payload = std::make_unique<boost::interprocess::mapped_region>(shm, boost::interprocess::read_write, 0,
m_maxSize);
180 }
catch (boost::interprocess::interprocess_exception& e) {
185 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
186 streamPortSuffix.assign(evtH->token);
190 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
195 return(StatusCode::SUCCESS);
204StatusCode AthenaSharedMemoryTool::putEvent(
long eventNumber,
const void* source, std::size_t nbytes,
unsigned int status)
const {
206 ATH_MSG_ERROR(
"Event size = " << nbytes <<
" greater than maximum for eventNumber = " << eventNumber);
207 return(StatusCode::FAILURE);
209 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
210 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
211 ATH_MSG_DEBUG(
"Waiting for putEvent, eventNumber = " << eventNumber);
212 return(StatusCode::RECOVERABLE);
214 if (source ==
nullptr && nbytes == 0) {
216 evtH->evtSeqNumber = 0;
217 evtH->fileSeqNumber = 0;
220 m_status->flush(0,
sizeof(ShareEventHeader));
221 return(StatusCode::SUCCESS);
223 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
224 if (source !=
nullptr && source !=
m_payload->get_address()) {
225 std::memcpy(
m_payload->get_address(), source, nbytes);
230 evtH->evtSize = nbytes;
232 evtH->evtCoreStatusFlag =
status;
233 m_status->flush(0,
sizeof(ShareEventHeader));
234 evtH->evtProcessStatus = ShareEventHeader::FILLED;
235 return(StatusCode::SUCCESS);
240 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
241 if (evtH->evtProcessStatus == ShareEventHeader::LOCKED) {
242 const std::size_t nbytes = evtH->evtSize;
243 if (target !=
nullptr) {
244 char* buf =
new char[nbytes];
245 std::memcpy(buf,
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, nbytes);
248 status = evtH->evtCoreStatusFlag;
251 return(StatusCode::FAILURE);
253 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
254 return(StatusCode::SUCCESS);
259 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
260 if (evtH->evtSeqNumber > eventNumber) {
261 ATH_MSG_ERROR(
"eventNumber = " << eventNumber <<
", already processed");
262 return(StatusCode::FAILURE);
264 if (evtH->evtSeqNumber == 0 && evtH->fileSeqNumber == 0 && evtH->evtSize == 0) {
266 return(StatusCode::FAILURE);
268 if (evtH->evtSeqNumber != eventNumber || evtH->evtProcessStatus != ShareEventHeader::FILLED) {
269 ATH_MSG_DEBUG(
"Waiting for lockEvent, eventNumber = " << eventNumber);
270 return(StatusCode::RECOVERABLE);
273 FileIncident endFileIncident(name(),
"EndInputFile",
"SHM");
276 FileIncident beginFileIncident(name(),
"BeginInputFile",
"SHM");
280 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
281 return(StatusCode::SUCCESS);
287 ATH_MSG_ERROR(
"Object size = " << nbytes <<
" greater than maximum for client = " << num);
288 return(StatusCode::FAILURE);
290 void* status =
static_cast<char*
>(
m_status->get_address()) + num *
sizeof(ShareEventHeader);
291 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(status);
292 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
293 if (evtStatus != ShareEventHeader::CLEARED) {
294 ATH_MSG_DEBUG(
"Waiting for CLEARED putObject, client = " << num <<
", in state " << evtStatus);
295 return(StatusCode::RECOVERABLE);
297 if (source ==
nullptr) {
298 evtH->evtSize = evtH->evtOffset;
300 m_payload->flush(0 + evtH->evtOffset, evtH->evtSize);
301 m_status->flush(num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
302 evtH->evtProcessStatus = ShareEventHeader::FILLED;
304 if (evtH->evtOffset +
sizeof(
size_t) + nbytes >
m_maxSize) {
305 ATH_MSG_ERROR(
"Object location = " << evtH->evtOffset <<
" greater than maximum for client = " << num);
306 return(StatusCode::FAILURE);
308 bool first = (evtH->evtOffset == 0);
309 std::memcpy(
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, &nbytes,
sizeof(
size_t));
310 evtH->evtOffset +=
sizeof(size_t);
311 std::memcpy(
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, source, nbytes);
312 evtH->evtOffset += nbytes;
314 evtH->evtSize = evtH->evtOffset;
319 m_status->flush(num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
322 return(StatusCode::SUCCESS);
327 if (target ==
nullptr) {
328 return(StatusCode::SUCCESS);
330 void* status =
static_cast<char*
>(
m_status->get_address()) + num *
sizeof(ShareEventHeader);
331 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(status);
332 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
333 size_t evtSize = evtH->evtSize;
334 if (evtStatus != ShareEventHeader::FILLED) {
335 ATH_MSG_DEBUG(
"Waiting for FILLED getObject, client = " << num);
337 return(StatusCode::RECOVERABLE);
339 if (evtH->evtOffset < evtSize) {
340 std::memcpy(&nbytes,
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset,
sizeof(
size_t));
341 evtH->evtOffset +=
sizeof(size_t);
342 *target =
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset;
343 evtH->evtOffset += nbytes;
344 return(StatusCode::SUCCESS);
348 m_status->flush(num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
349 while (evtH->evtProcessStatus != ShareEventHeader::FILLED) {
352 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
353 return(StatusCode::SUCCESS);
359 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
360 if (evtH->evtProcessStatus != ShareEventHeader::CLEARED) {
362 return(StatusCode::RECOVERABLE);
364 *tokenString = evtH->token;
365 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
366 return(StatusCode::SUCCESS);
369 for (
int i = 1; i <=
m_num; i++) {
370 void* status =
static_cast<char*
>(
m_status->get_address()) + i *
sizeof(ShareEventHeader);
371 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(status);
372 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
373 if (evtStatus == ShareEventHeader::FILLED) {
374 ATH_MSG_DEBUG(
"Waiting for UNFILL clearObject, client = " << i);
375 return(StatusCode::RECOVERABLE);
376 }
else if (i == num && evtStatus != ShareEventHeader::LOCKED) {
377 ATH_MSG_DEBUG(
"Waiting for LOCK clearObject, client = " << i);
378 return(StatusCode::RECOVERABLE);
381 if ((i == num || num < 0) && evtStatus == ShareEventHeader::LOCKED) {
382 *tokenString = evtH->token;
384 ATH_MSG_DEBUG(
"Setting LOCK clearObject, for client = " << num <<
": " << evtH->token);
391 if (num > 0 && *tokenString !=
nullptr) {
393 void* status =
static_cast<char*
>(
m_status->get_address()) + num *
sizeof(ShareEventHeader);
394 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(status);
395 if (strncmp(*tokenString,
"wait", 4) == 0) {
396 ATH_MSG_INFO(
"Server clearObject() got wait, client = " << num);
397 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
400 return(StatusCode::SUCCESS);
402 if (strncmp(*tokenString,
"release", 7) == 0) {
403 ATH_MSG_INFO(
"Server clearObject() got release, client = " << num);
404 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
406 return(StatusCode::SUCCESS);
408 if (strncmp(*tokenString,
"start", 5) == 0) {
409 ATH_MSG_INFO(
"Server clearObject() got start, client = " << num <<
", of " <<
m_num - 1);
414 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
416 return(StatusCode::SUCCESS);
418 if (strncmp(*tokenString,
"stop", 4) == 0) {
419 ATH_MSG_INFO(
"Server clearObject() got stop, client = " << num);
421 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
424 if (num ==
m_num - 1) {
425 return(StatusCode::SUCCESS);
427 return(StatusCode::FAILURE);
430 return(StatusCode::SUCCESS);
433 m_status->flush(0,
sizeof(ShareEventHeader));
434 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
435 return(StatusCode::SUCCESS);
437 return(StatusCode::RECOVERABLE);
443 ATH_MSG_ERROR(
"Token = " << tokenString <<
", too long for AthenaSharedMemoryTool");
444 return(StatusCode::FAILURE);
446 void* status =
static_cast<char*
>(
m_status->get_address()) + num *
sizeof(ShareEventHeader);
447 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(status);
448 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
449 ATH_MSG_DEBUG(
"Waiting for UNLOCKED lockObject: " << tokenString);
450 return(StatusCode::RECOVERABLE);
455 m_status->flush(num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
456 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
458 m_status->flush(num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
459 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
461 return(StatusCode::SUCCESS);
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)