11#include "GaudiKernel/FileIncident.h"
13#include <boost/exception/diagnostic_information.hpp>
14#include <boost/interprocess/shared_memory_object.hpp>
15#include <boost/interprocess/mapped_region.hpp>
23struct ShareEventHeader {
24 enum ProcessStatus { CLEARED, FILLED, LOCKED, UNLOCKED, UNKNOWN };
25 ProcessStatus evtProcessStatus;
29 std::size_t evtOffset;
30 unsigned int evtCoreStatusFlag;
37 const std::string& name,
38 const IInterface* parent) :
39 base_class(
type, name, parent),
54 return(StatusCode::SUCCESS);
61 m_inputFileGuard.reset();
68 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
69 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
73 return(StatusCode::SUCCESS);
81 boost::interprocess::shared_memory_object::remove(
m_sharedMemory.value().c_str());
82 }
catch(boost::interprocess::interprocess_exception &e) {
87 boost::interprocess::shared_memory_object::remove(statusName.c_str());
88 }
catch(boost::interprocess::interprocess_exception &e) {
89 ATH_MSG_WARNING(
"Cannot remove shared memory " << statusName <<
": " << boost::diagnostic_information(e));
96 return(::AthAlgTool::finalize());
102 ATH_MSG_ERROR(
"Cannot make AthenaSharedMemoryTool a Server.");
103 return(StatusCode::FAILURE);
106 ATH_MSG_ERROR(
"Too many clients for AthenaSharedMemoryTool Server.");
107 return(StatusCode::FAILURE);
112 std::optional<boost::interprocess::shared_memory_object> shm;
114 shm.emplace(boost::interprocess::create_only,
m_sharedMemory.value().c_str(), boost::interprocess::read_write);
115 }
catch(boost::interprocess::interprocess_exception &e) {
117 return StatusCode::FAILURE;
120 m_payload = std::make_unique<boost::interprocess::mapped_region>(*shm, boost::interprocess::read_write, 0,
m_maxSize);
121 const std::string statusName =
m_sharedMemory.value() +
"_status";
122 std::optional<boost::interprocess::shared_memory_object> shm_status;
124 shm_status.emplace(boost::interprocess::create_only, statusName.c_str(), boost::interprocess::read_write);
125 }
catch(boost::interprocess::interprocess_exception &e) {
126 ATH_MSG_ERROR(
"Cannot create shared memory " << statusName <<
": " << boost::diagnostic_information(e));
127 return StatusCode::FAILURE;
129 shm_status->truncate(num *
sizeof(ShareEventHeader));
130 m_status = std::make_unique<boost::interprocess::mapped_region>(*shm_status, boost::interprocess::read_write, 0, num *
sizeof(ShareEventHeader));
131 ShareEventHeader evtH = { ShareEventHeader::UNLOCKED, -1, -1, 0, 0, 0,
"" };
132 std::memcpy(evtH.token, streamPortSuffix.c_str(),
maxTokenLength - 1);
134 for (
int i = 0; i < num; i++) {
135 std::memcpy(
static_cast<char*
>(
m_status->get_address()) + i *
sizeof(ShareEventHeader), &evtH,
sizeof(ShareEventHeader));
137 return(StatusCode::SUCCESS);
148 ATH_MSG_ERROR(
"Cannot make AthenaSharedMemoryTool a Client.");
149 return(StatusCode::FAILURE);
152 ATH_MSG_ERROR(
"Too many clients for AthenaSharedMemoryTool.");
153 return(StatusCode::FAILURE);
155 if (
m_num > 0 && num <= 0) {
161 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
162 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
168 return(StatusCode::SUCCESS);
172 const std::string statusName =
m_sharedMemory.value() +
"_status";
173 boost::interprocess::shared_memory_object shm_status(boost::interprocess::open_only,
175 boost::interprocess::read_write);
176 m_status = std::make_unique<boost::interprocess::mapped_region>(shm_status, boost::interprocess::read_write, num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
177 boost::interprocess::shared_memory_object shm(boost::interprocess::open_only,
179 boost::interprocess::read_write);
180 m_payload = std::make_unique<boost::interprocess::mapped_region>(shm, boost::interprocess::read_write, 0,
m_maxSize);
182 }
catch (boost::interprocess::interprocess_exception& e) {
187 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
188 streamPortSuffix.assign(evtH->token);
192 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
197 return(StatusCode::SUCCESS);
206StatusCode AthenaSharedMemoryTool::putEvent(
long eventNumber,
const void* source, std::size_t nbytes,
unsigned int status)
const {
208 ATH_MSG_ERROR(
"Event size = " << nbytes <<
" greater than maximum for eventNumber = " << eventNumber);
209 return(StatusCode::FAILURE);
211 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
212 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
213 ATH_MSG_DEBUG(
"Waiting for putEvent, eventNumber = " << eventNumber);
214 return(StatusCode::RECOVERABLE);
216 if (source ==
nullptr && nbytes == 0) {
218 evtH->evtSeqNumber = 0;
219 evtH->fileSeqNumber = 0;
222 m_status->flush(0,
sizeof(ShareEventHeader));
223 return(StatusCode::SUCCESS);
225 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
226 if (source !=
nullptr && source !=
m_payload->get_address()) {
227 std::memcpy(
m_payload->get_address(), source, nbytes);
232 evtH->evtSize = nbytes;
234 evtH->evtCoreStatusFlag =
status;
235 m_status->flush(0,
sizeof(ShareEventHeader));
236 evtH->evtProcessStatus = ShareEventHeader::FILLED;
237 return(StatusCode::SUCCESS);
242 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
243 if (evtH->evtProcessStatus == ShareEventHeader::LOCKED) {
244 const std::size_t nbytes = evtH->evtSize;
245 if (target !=
nullptr) {
246 char* buf =
new char[nbytes];
247 std::memcpy(buf,
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, nbytes);
250 status = evtH->evtCoreStatusFlag;
253 return(StatusCode::FAILURE);
255 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
256 return(StatusCode::SUCCESS);
261 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
262 if (evtH->evtSeqNumber > eventNumber) {
263 ATH_MSG_ERROR(
"eventNumber = " << eventNumber <<
", already processed");
264 return(StatusCode::FAILURE);
266 if (evtH->evtSeqNumber == 0 && evtH->fileSeqNumber == 0 && evtH->evtSize == 0) {
268 return(StatusCode::FAILURE);
270 if (evtH->evtSeqNumber != eventNumber || evtH->evtProcessStatus != ShareEventHeader::FILLED) {
271 ATH_MSG_DEBUG(
"Waiting for lockEvent, eventNumber = " << eventNumber);
272 return(StatusCode::RECOVERABLE);
279 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
280 return(StatusCode::SUCCESS);
286 ATH_MSG_ERROR(
"Object size = " << nbytes <<
" greater than maximum for client = " << num);
287 return(StatusCode::FAILURE);
289 void* status =
static_cast<char*
>(
m_status->get_address()) + num *
sizeof(ShareEventHeader);
290 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(status);
291 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
292 if (evtStatus != ShareEventHeader::CLEARED) {
293 ATH_MSG_DEBUG(
"Waiting for CLEARED putObject, client = " << num <<
", in state " << evtStatus);
294 return(StatusCode::RECOVERABLE);
296 if (source ==
nullptr) {
297 evtH->evtSize = evtH->evtOffset;
299 m_payload->flush(0 + evtH->evtOffset, evtH->evtSize);
300 m_status->flush(num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
301 evtH->evtProcessStatus = ShareEventHeader::FILLED;
303 if (evtH->evtOffset +
sizeof(
size_t) + nbytes >
m_maxSize) {
304 ATH_MSG_ERROR(
"Object location = " << evtH->evtOffset <<
" greater than maximum for client = " << num);
305 return(StatusCode::FAILURE);
307 bool first = (evtH->evtOffset == 0);
308 std::memcpy(
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, &nbytes,
sizeof(
size_t));
309 evtH->evtOffset +=
sizeof(size_t);
310 std::memcpy(
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, source, nbytes);
311 evtH->evtOffset += nbytes;
313 evtH->evtSize = evtH->evtOffset;
318 m_status->flush(num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
321 return(StatusCode::SUCCESS);
326 if (target ==
nullptr) {
327 return(StatusCode::SUCCESS);
329 void* status =
static_cast<char*
>(
m_status->get_address()) + num *
sizeof(ShareEventHeader);
330 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(status);
331 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
332 size_t evtSize = evtH->evtSize;
333 if (evtStatus != ShareEventHeader::FILLED) {
334 ATH_MSG_DEBUG(
"Waiting for FILLED getObject, client = " << num);
336 return(StatusCode::RECOVERABLE);
338 if (evtH->evtOffset < evtSize) {
339 std::memcpy(&nbytes,
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset,
sizeof(
size_t));
340 evtH->evtOffset +=
sizeof(size_t);
341 *target =
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset;
342 evtH->evtOffset += nbytes;
343 return(StatusCode::SUCCESS);
347 m_status->flush(num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
348 while (evtH->evtProcessStatus != ShareEventHeader::FILLED) {
351 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
352 return(StatusCode::SUCCESS);
358 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
359 if (evtH->evtProcessStatus != ShareEventHeader::CLEARED) {
361 return(StatusCode::RECOVERABLE);
363 *tokenString = evtH->token;
364 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
365 return(StatusCode::SUCCESS);
368 for (
int i = 1; i <=
m_num; i++) {
369 void* status =
static_cast<char*
>(
m_status->get_address()) + i *
sizeof(ShareEventHeader);
370 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(status);
371 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
372 if (evtStatus == ShareEventHeader::FILLED) {
373 ATH_MSG_DEBUG(
"Waiting for UNFILL clearObject, client = " << i);
374 return(StatusCode::RECOVERABLE);
375 }
else if (i == num && evtStatus != ShareEventHeader::LOCKED) {
376 ATH_MSG_DEBUG(
"Waiting for LOCK clearObject, client = " << i);
377 return(StatusCode::RECOVERABLE);
380 if ((i == num || num < 0) && evtStatus == ShareEventHeader::LOCKED) {
381 *tokenString = evtH->token;
383 ATH_MSG_DEBUG(
"Setting LOCK clearObject, for client = " << num <<
": " << evtH->token);
390 if (num > 0 && *tokenString !=
nullptr) {
392 void* status =
static_cast<char*
>(
m_status->get_address()) + num *
sizeof(ShareEventHeader);
393 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(status);
394 if (strncmp(*tokenString,
"wait", 4) == 0) {
395 ATH_MSG_INFO(
"Server clearObject() got wait, client = " << num);
396 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
399 return(StatusCode::SUCCESS);
401 if (strncmp(*tokenString,
"release", 7) == 0) {
402 ATH_MSG_INFO(
"Server clearObject() got release, client = " << num);
403 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
405 return(StatusCode::SUCCESS);
407 if (strncmp(*tokenString,
"start", 5) == 0) {
408 ATH_MSG_INFO(
"Server clearObject() got start, client = " << num <<
", of " <<
m_num - 1);
413 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
415 return(StatusCode::SUCCESS);
417 if (strncmp(*tokenString,
"stop", 4) == 0) {
418 ATH_MSG_INFO(
"Server clearObject() got stop, client = " << num);
420 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
423 if (num ==
m_num - 1) {
424 return(StatusCode::SUCCESS);
426 return(StatusCode::FAILURE);
429 return(StatusCode::SUCCESS);
432 m_status->flush(0,
sizeof(ShareEventHeader));
433 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
434 return(StatusCode::SUCCESS);
436 return(StatusCode::RECOVERABLE);
442 ATH_MSG_ERROR(
"Token = " << tokenString <<
", too long for AthenaSharedMemoryTool");
443 return(StatusCode::FAILURE);
445 void* status =
static_cast<char*
>(
m_status->get_address()) + num *
sizeof(ShareEventHeader);
446 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(status);
447 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
448 ATH_MSG_DEBUG(
"Waiting for UNLOCKED lockObject: " << tokenString);
449 return(StatusCode::RECOVERABLE);
454 m_status->flush(num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
455 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
457 m_status->flush(num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
458 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
460 return(StatusCode::SUCCESS);
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)