11 #include "GaudiKernel/FileIncident.h"
13 #include <boost/exception/diagnostic_information.hpp>
14 #include <boost/interprocess/shared_memory_object.hpp>
15 #include <boost/interprocess/mapped_region.hpp>
23 struct ShareEventHeader {
24 enum ProcessStatus { CLEARED, FILLED, LOCKED, UNLOCKED, PARTIAL, SHARED,
UNKNOWN };
25 ProcessStatus evtProcessStatus;
29 std::size_t evtOffset;
30 std::size_t evtCursor;
31 unsigned int evtCoreStatusFlag;
38 const std::string&
name,
40 m_maxSize(64 * 1024 * 1024),
41 m_maxDataClients(256),
50 m_incidentSvc(
"IncidentSvc",
name) {
52 declareInterface<IAthenaIPCTool>(
this);
66 return(StatusCode::FAILURE);
71 return(StatusCode::FAILURE);
73 return(StatusCode::SUCCESS);
85 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
86 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
90 return(StatusCode::SUCCESS);
99 }
catch(boost::interprocess::interprocess_exception &
e) {
105 }
catch(boost::interprocess::interprocess_exception &
e) {
119 ATH_MSG_ERROR(
"Cannot make AthenaSharedMemoryTool a Server.");
120 return(StatusCode::FAILURE);
123 ATH_MSG_ERROR(
"Too many clients for AthenaSharedMemoryTool Server.");
124 return(StatusCode::FAILURE);
129 std::optional<boost::interprocess::shared_memory_object> shm;
131 shm.emplace(boost::interprocess::create_only,
m_sharedMemory.value().c_str(), boost::interprocess::read_write);
132 }
catch(boost::interprocess::interprocess_exception &
e) {
134 return StatusCode::FAILURE;
137 m_payload =
new boost::interprocess::mapped_region(*shm, boost::interprocess::read_write, 0,
m_maxSize);
139 std::optional<boost::interprocess::shared_memory_object> shm_status;
141 shm_status.emplace(boost::interprocess::create_only,
statusName.c_str(), boost::interprocess::read_write);
142 }
catch(boost::interprocess::interprocess_exception &
e) {
144 return StatusCode::FAILURE;
146 shm_status->truncate(
num *
sizeof(ShareEventHeader));
147 m_status =
new boost::interprocess::mapped_region(*shm_status, boost::interprocess::read_write, 0,
num *
sizeof(ShareEventHeader));
148 ShareEventHeader evtH = { ShareEventHeader::UNLOCKED, -1, -1, 0, 0, 0, 0,
"" };
149 std::memcpy(evtH.token, streamPortSuffix.c_str(),
maxTokenLength - 1);
151 for (
int i = 0;
i <
num;
i++) {
152 std::memcpy(
static_cast<char*
>(
m_status->get_address()) +
i *
sizeof(ShareEventHeader), &evtH,
sizeof(ShareEventHeader));
154 return(StatusCode::SUCCESS);
165 ATH_MSG_ERROR(
"Cannot make AthenaSharedMemoryTool a Client.");
166 return(StatusCode::FAILURE);
169 ATH_MSG_ERROR(
"Too many clients for AthenaSharedMemoryTool.");
170 return(StatusCode::FAILURE);
178 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
179 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
185 return(StatusCode::SUCCESS);
190 boost::interprocess::shared_memory_object shm_status(boost::interprocess::open_only,
192 boost::interprocess::read_write);
193 m_status =
new boost::interprocess::mapped_region(shm_status, boost::interprocess::read_write,
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
194 boost::interprocess::shared_memory_object shm(boost::interprocess::open_only,
196 boost::interprocess::read_write);
197 m_payload =
new boost::interprocess::mapped_region(shm, boost::interprocess::read_write, 0,
m_maxSize);
199 }
catch (boost::interprocess::interprocess_exception&
e) {
203 if (m_num <= 0 && num > 0) {
204 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
205 streamPortSuffix.assign(evtH->token);
209 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
214 return(StatusCode::SUCCESS);
226 return(StatusCode::FAILURE);
228 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
229 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
231 return(StatusCode::RECOVERABLE);
233 if (source ==
nullptr && nbytes == 0) {
235 evtH->evtSeqNumber = 0;
236 evtH->fileSeqNumber = 0;
239 m_status->flush(0,
sizeof(ShareEventHeader));
240 return(StatusCode::SUCCESS);
242 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
243 if (source !=
nullptr && source !=
m_payload->get_address()) {
244 std::memcpy(
m_payload->get_address(), source, nbytes);
249 evtH->evtSize = nbytes;
251 evtH->evtCoreStatusFlag =
status;
252 m_status->flush(0,
sizeof(ShareEventHeader));
253 evtH->evtProcessStatus = ShareEventHeader::FILLED;
254 return(StatusCode::SUCCESS);
259 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
260 if (evtH->evtProcessStatus == ShareEventHeader::LOCKED) {
261 const std::size_t nbytes = evtH->evtSize;
263 char* buf =
new char[nbytes];
264 std::memcpy(buf,
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, nbytes);
267 status = evtH->evtCoreStatusFlag;
270 return(StatusCode::FAILURE);
272 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
273 return(StatusCode::SUCCESS);
278 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
281 return(StatusCode::FAILURE);
283 if (evtH->evtSeqNumber == 0 && evtH->fileSeqNumber == 0 && evtH->evtSize == 0) {
285 return(StatusCode::FAILURE);
287 if (evtH->evtSeqNumber !=
eventNumber || evtH->evtProcessStatus != ShareEventHeader::FILLED) {
289 return(StatusCode::RECOVERABLE);
292 FileIncident endFileIncident(
name(),
"EndInputFile",
"SHM");
295 FileIncident beginFileIncident(
name(),
"BeginInputFile",
"SHM");
299 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
300 return(StatusCode::SUCCESS);
306 ATH_MSG_ERROR(
"Object size = " << nbytes <<
" greater than maximum for client = " <<
num);
307 return(StatusCode::FAILURE);
309 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
310 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
311 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
312 if (evtStatus != ShareEventHeader::CLEARED && evtStatus != ShareEventHeader::PARTIAL) {
313 ATH_MSG_DEBUG(
"Waiting for CLEARED putObject, client = " <<
num <<
", in state " << evtStatus);
314 return(StatusCode::RECOVERABLE);
316 if (source ==
nullptr) {
317 evtH->evtSize = evtH->evtOffset;
319 m_payload->flush(0 + evtH->evtOffset + evtH->evtCursor, evtH->evtSize - evtH->evtCursor);
320 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
321 evtH->evtProcessStatus = ShareEventHeader::FILLED;
323 if (evtH->evtOffset +
sizeof(
size_t) + nbytes >
m_maxSize) {
324 ATH_MSG_ERROR(
"Object location = " << evtH->evtOffset <<
" greater than maximum for client = " <<
num);
325 return(StatusCode::FAILURE);
327 bool first = (evtH->evtOffset == 0);
328 std::memcpy(
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, &nbytes,
sizeof(
size_t));
329 evtH->evtOffset +=
sizeof(size_t);
330 std::memcpy(
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, source, nbytes);
331 evtH->evtOffset += nbytes;
333 evtH->evtSize = evtH->evtOffset;
334 evtH->evtProcessStatus = ShareEventHeader::PARTIAL;
339 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
342 return(StatusCode::SUCCESS);
348 return(StatusCode::SUCCESS);
350 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
351 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
352 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
353 size_t evtSize = evtH->evtSize;
354 if (evtStatus == ShareEventHeader::PARTIAL && evtH->evtCursor > 0) {
356 nbytes = evtH->evtCursor;
357 return(StatusCode::RECOVERABLE);
359 if (evtStatus != ShareEventHeader::FILLED &&
360 evtStatus != ShareEventHeader::SHARED &&
361 evtStatus != ShareEventHeader::PARTIAL) {
364 return(StatusCode::RECOVERABLE);
366 if (evtH->evtCursor < evtSize) {
367 std::memcpy(&nbytes,
static_cast<char*
>(
m_payload->get_address()) + evtH->evtCursor,
sizeof(
size_t));
368 evtH->evtCursor +=
sizeof(size_t);
369 *
target =
static_cast<char*
>(
m_payload->get_address()) + evtH->evtCursor;
370 if (evtStatus != ShareEventHeader::PARTIAL) {
371 evtH->evtProcessStatus = ShareEventHeader::SHARED;
373 evtH->evtCursor += nbytes;
377 if (evtH->evtCursor == evtSize) {
378 if (evtH->evtProcessStatus == ShareEventHeader::SHARED) {
379 evtH->evtProcessStatus = ShareEventHeader::FILLED;
382 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
383 while (evtH->evtProcessStatus != ShareEventHeader::FILLED) {
386 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
389 return(StatusCode::SUCCESS);
395 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
396 if (evtH->evtProcessStatus != ShareEventHeader::CLEARED) {
398 return(StatusCode::RECOVERABLE);
400 *tokenString = evtH->token;
401 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
402 return(StatusCode::SUCCESS);
406 void*
status =
static_cast<char*
>(
m_status->get_address()) +
i *
sizeof(ShareEventHeader);
407 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
408 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
409 if (evtStatus == ShareEventHeader::FILLED || evtStatus == ShareEventHeader::SHARED) {
411 return(StatusCode::RECOVERABLE);
412 }
else if (
i ==
num && evtStatus != ShareEventHeader::LOCKED) {
414 return(StatusCode::RECOVERABLE);
417 if ((
i ==
num ||
num < 0) && evtStatus == ShareEventHeader::LOCKED) {
418 *tokenString = evtH->token;
420 ATH_MSG_DEBUG(
"Setting LOCK clearObject, for client = " <<
num <<
": " << evtH->token);
427 if (
num > 0 && *tokenString !=
nullptr) {
429 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
430 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
431 if (strncmp(*tokenString,
"wait", 4) == 0) {
433 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
436 return(StatusCode::SUCCESS);
438 if (strncmp(*tokenString,
"release", 7) == 0) {
440 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
442 return(StatusCode::SUCCESS);
444 if (strncmp(*tokenString,
"start", 5) == 0) {
450 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
452 return(StatusCode::SUCCESS);
454 if (strncmp(*tokenString,
"stop", 4) == 0) {
457 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
461 return(StatusCode::SUCCESS);
463 return(StatusCode::FAILURE);
466 return(StatusCode::SUCCESS);
469 m_status->flush(0,
sizeof(ShareEventHeader));
470 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
471 return(StatusCode::SUCCESS);
473 return(StatusCode::RECOVERABLE);
479 ATH_MSG_ERROR(
"Token = " << tokenString <<
", too long for AthenaSharedMemoryTool");
480 return(StatusCode::FAILURE);
482 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
483 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
484 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
485 ATH_MSG_DEBUG(
"Waiting for UNLOCKED lockObject: " << tokenString);
486 return(StatusCode::RECOVERABLE);
491 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
492 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
494 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
495 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
497 return(StatusCode::SUCCESS);