11 #include "GaudiKernel/FileIncident.h"
13 #include <boost/exception/diagnostic_information.hpp>
14 #include <boost/interprocess/shared_memory_object.hpp>
15 #include <boost/interprocess/mapped_region.hpp>
23 struct ShareEventHeader {
24 enum ProcessStatus { CLEARED, FILLED, LOCKED, UNLOCKED,
UNKNOWN };
25 ProcessStatus evtProcessStatus;
29 std::size_t evtOffset;
30 unsigned int evtCoreStatusFlag;
37 const std::string&
name,
38 const IInterface*
parent) :
40 m_incidentSvc(
"IncidentSvc",
name) {
54 return(StatusCode::SUCCESS);
66 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
67 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
71 return(StatusCode::SUCCESS);
80 }
catch(boost::interprocess::interprocess_exception &
e) {
86 }
catch(boost::interprocess::interprocess_exception &
e) {
100 ATH_MSG_ERROR(
"Cannot make AthenaSharedMemoryTool a Server.");
101 return(StatusCode::FAILURE);
104 ATH_MSG_ERROR(
"Too many clients for AthenaSharedMemoryTool Server.");
105 return(StatusCode::FAILURE);
110 std::optional<boost::interprocess::shared_memory_object> shm;
112 shm.emplace(boost::interprocess::create_only,
m_sharedMemory.value().c_str(), boost::interprocess::read_write);
113 }
catch(boost::interprocess::interprocess_exception &
e) {
115 return StatusCode::FAILURE;
118 m_payload = std::make_unique<boost::interprocess::mapped_region>(*shm, boost::interprocess::read_write, 0,
m_maxSize);
120 std::optional<boost::interprocess::shared_memory_object> shm_status;
122 shm_status.emplace(boost::interprocess::create_only,
statusName.c_str(), boost::interprocess::read_write);
123 }
catch(boost::interprocess::interprocess_exception &
e) {
125 return StatusCode::FAILURE;
127 shm_status->truncate(
num *
sizeof(ShareEventHeader));
128 m_status = std::make_unique<boost::interprocess::mapped_region>(*shm_status, boost::interprocess::read_write, 0,
num *
sizeof(ShareEventHeader));
129 ShareEventHeader evtH = { ShareEventHeader::UNLOCKED, -1, -1, 0, 0, 0,
"" };
130 std::memcpy(evtH.token, streamPortSuffix.c_str(),
maxTokenLength - 1);
132 for (
int i = 0;
i <
num;
i++) {
133 std::memcpy(
static_cast<char*
>(
m_status->get_address()) +
i *
sizeof(ShareEventHeader), &evtH,
sizeof(ShareEventHeader));
135 return(StatusCode::SUCCESS);
146 ATH_MSG_ERROR(
"Cannot make AthenaSharedMemoryTool a Client.");
147 return(StatusCode::FAILURE);
150 ATH_MSG_ERROR(
"Too many clients for AthenaSharedMemoryTool.");
151 return(StatusCode::FAILURE);
159 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
160 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
166 return(StatusCode::SUCCESS);
171 boost::interprocess::shared_memory_object shm_status(boost::interprocess::open_only,
173 boost::interprocess::read_write);
174 m_status = std::make_unique<boost::interprocess::mapped_region>(shm_status, boost::interprocess::read_write,
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
175 boost::interprocess::shared_memory_object shm(boost::interprocess::open_only,
177 boost::interprocess::read_write);
178 m_payload = std::make_unique<boost::interprocess::mapped_region>(shm, boost::interprocess::read_write, 0,
m_maxSize);
180 }
catch (boost::interprocess::interprocess_exception&
e) {
184 if (m_num <= 0 && num > 0) {
185 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
186 streamPortSuffix.assign(evtH->token);
190 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
195 return(StatusCode::SUCCESS);
207 return(StatusCode::FAILURE);
209 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
210 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
212 return(StatusCode::RECOVERABLE);
214 if (
source ==
nullptr && nbytes == 0) {
216 evtH->evtSeqNumber = 0;
217 evtH->fileSeqNumber = 0;
220 m_status->flush(0,
sizeof(ShareEventHeader));
221 return(StatusCode::SUCCESS);
223 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
230 evtH->evtSize = nbytes;
232 evtH->evtCoreStatusFlag =
status;
233 m_status->flush(0,
sizeof(ShareEventHeader));
234 evtH->evtProcessStatus = ShareEventHeader::FILLED;
235 return(StatusCode::SUCCESS);
240 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
241 if (evtH->evtProcessStatus == ShareEventHeader::LOCKED) {
242 const std::size_t nbytes = evtH->evtSize;
244 char* buf =
new char[nbytes];
245 std::memcpy(buf,
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, nbytes);
248 status = evtH->evtCoreStatusFlag;
251 return(StatusCode::FAILURE);
253 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
254 return(StatusCode::SUCCESS);
259 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
262 return(StatusCode::FAILURE);
264 if (evtH->evtSeqNumber == 0 && evtH->fileSeqNumber == 0 && evtH->evtSize == 0) {
266 return(StatusCode::FAILURE);
268 if (evtH->evtSeqNumber !=
eventNumber || evtH->evtProcessStatus != ShareEventHeader::FILLED) {
270 return(StatusCode::RECOVERABLE);
273 FileIncident endFileIncident(
name(),
"EndInputFile",
"SHM");
276 FileIncident beginFileIncident(
name(),
"BeginInputFile",
"SHM");
280 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
281 return(StatusCode::SUCCESS);
287 ATH_MSG_ERROR(
"Object size = " << nbytes <<
" greater than maximum for client = " <<
num);
288 return(StatusCode::FAILURE);
290 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
291 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
292 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
293 if (evtStatus != ShareEventHeader::CLEARED) {
294 ATH_MSG_DEBUG(
"Waiting for CLEARED putObject, client = " <<
num <<
", in state " << evtStatus);
295 return(StatusCode::RECOVERABLE);
298 evtH->evtSize = evtH->evtOffset;
300 m_payload->flush(0 + evtH->evtOffset, evtH->evtSize);
301 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
302 evtH->evtProcessStatus = ShareEventHeader::FILLED;
304 if (evtH->evtOffset +
sizeof(
size_t) + nbytes >
m_maxSize) {
305 ATH_MSG_ERROR(
"Object location = " << evtH->evtOffset <<
" greater than maximum for client = " <<
num);
306 return(StatusCode::FAILURE);
308 bool first = (evtH->evtOffset == 0);
309 std::memcpy(
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, &nbytes,
sizeof(
size_t));
310 evtH->evtOffset +=
sizeof(size_t);
311 std::memcpy(
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset,
source, nbytes);
312 evtH->evtOffset += nbytes;
314 evtH->evtSize = evtH->evtOffset;
319 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
322 return(StatusCode::SUCCESS);
328 return(StatusCode::SUCCESS);
330 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
331 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
332 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
333 size_t evtSize = evtH->evtSize;
334 if (evtStatus != ShareEventHeader::FILLED) {
337 return(StatusCode::RECOVERABLE);
339 if (evtH->evtOffset < evtSize) {
340 std::memcpy(&nbytes,
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset,
sizeof(
size_t));
341 evtH->evtOffset +=
sizeof(size_t);
342 *
target =
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset;
343 evtH->evtOffset += nbytes;
345 if (evtH->evtOffset == evtSize) {
347 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
348 while (evtH->evtProcessStatus != ShareEventHeader::FILLED) {
351 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
354 return(StatusCode::SUCCESS);
360 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
361 if (evtH->evtProcessStatus != ShareEventHeader::CLEARED) {
363 return(StatusCode::RECOVERABLE);
365 *tokenString = evtH->token;
366 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
367 return(StatusCode::SUCCESS);
371 void*
status =
static_cast<char*
>(
m_status->get_address()) +
i *
sizeof(ShareEventHeader);
372 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
373 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
374 if (evtStatus == ShareEventHeader::FILLED) {
376 return(StatusCode::RECOVERABLE);
377 }
else if (
i ==
num && evtStatus != ShareEventHeader::LOCKED) {
379 return(StatusCode::RECOVERABLE);
382 if ((
i ==
num ||
num < 0) && evtStatus == ShareEventHeader::LOCKED) {
383 *tokenString = evtH->token;
385 ATH_MSG_DEBUG(
"Setting LOCK clearObject, for client = " <<
num <<
": " << evtH->token);
392 if (
num > 0 && *tokenString !=
nullptr) {
394 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
395 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
396 if (strncmp(*tokenString,
"wait", 4) == 0) {
398 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
401 return(StatusCode::SUCCESS);
403 if (strncmp(*tokenString,
"release", 7) == 0) {
405 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
407 return(StatusCode::SUCCESS);
409 if (strncmp(*tokenString,
"start", 5) == 0) {
415 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
417 return(StatusCode::SUCCESS);
419 if (strncmp(*tokenString,
"stop", 4) == 0) {
422 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
426 return(StatusCode::SUCCESS);
428 return(StatusCode::FAILURE);
431 return(StatusCode::SUCCESS);
434 m_status->flush(0,
sizeof(ShareEventHeader));
435 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
436 return(StatusCode::SUCCESS);
438 return(StatusCode::RECOVERABLE);
444 ATH_MSG_ERROR(
"Token = " << tokenString <<
", too long for AthenaSharedMemoryTool");
445 return(StatusCode::FAILURE);
447 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
448 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
449 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
450 ATH_MSG_DEBUG(
"Waiting for UNLOCKED lockObject: " << tokenString);
451 return(StatusCode::RECOVERABLE);
456 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
457 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
459 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
460 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
462 return(StatusCode::SUCCESS);