11 #include "GaudiKernel/FileIncident.h"
13 #include <boost/exception/diagnostic_information.hpp>
14 #include <boost/interprocess/shared_memory_object.hpp>
15 #include <boost/interprocess/mapped_region.hpp>
23 struct ShareEventHeader {
24 enum ProcessStatus { CLEARED, FILLED, LOCKED, UNLOCKED,
UNKNOWN };
25 ProcessStatus evtProcessStatus;
29 std::size_t evtOffset;
30 unsigned int evtCoreStatusFlag;
37 const std::string&
name,
39 m_maxSize(64 * 1024 * 1024),
40 m_maxDataClients(256),
49 m_incidentSvc(
"IncidentSvc",
name) {
51 declareInterface<IAthenaIPCTool>(
this);
65 return(StatusCode::FAILURE);
70 return(StatusCode::FAILURE);
72 return(StatusCode::SUCCESS);
84 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
85 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
89 return(StatusCode::SUCCESS);
98 }
catch(boost::interprocess::interprocess_exception &
e) {
104 }
catch(boost::interprocess::interprocess_exception &
e) {
118 ATH_MSG_ERROR(
"Cannot make AthenaSharedMemoryTool a Server.");
119 return(StatusCode::FAILURE);
122 ATH_MSG_ERROR(
"Too many clients for AthenaSharedMemoryTool Server.");
123 return(StatusCode::FAILURE);
128 std::optional<boost::interprocess::shared_memory_object> shm;
130 shm.emplace(boost::interprocess::create_only,
m_sharedMemory.value().c_str(), boost::interprocess::read_write);
131 }
catch(boost::interprocess::interprocess_exception &
e) {
133 return StatusCode::FAILURE;
136 m_payload =
new boost::interprocess::mapped_region(*shm, boost::interprocess::read_write, 0,
m_maxSize);
138 std::optional<boost::interprocess::shared_memory_object> shm_status;
140 shm_status.emplace(boost::interprocess::create_only,
statusName.c_str(), boost::interprocess::read_write);
141 }
catch(boost::interprocess::interprocess_exception &
e) {
143 return StatusCode::FAILURE;
145 shm_status->truncate(
num *
sizeof(ShareEventHeader));
146 m_status =
new boost::interprocess::mapped_region(*shm_status, boost::interprocess::read_write, 0,
num *
sizeof(ShareEventHeader));
147 ShareEventHeader evtH = { ShareEventHeader::UNLOCKED, -1, -1, 0, 0, 0,
"" };
148 std::memcpy(evtH.token, streamPortSuffix.c_str(),
maxTokenLength - 1);
150 for (
int i = 0;
i <
num;
i++) {
151 std::memcpy(
static_cast<char*
>(
m_status->get_address()) +
i *
sizeof(ShareEventHeader), &evtH,
sizeof(ShareEventHeader));
153 return(StatusCode::SUCCESS);
164 ATH_MSG_ERROR(
"Cannot make AthenaSharedMemoryTool a Client.");
165 return(StatusCode::FAILURE);
168 ATH_MSG_ERROR(
"Too many clients for AthenaSharedMemoryTool.");
169 return(StatusCode::FAILURE);
177 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
178 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
184 return(StatusCode::SUCCESS);
189 boost::interprocess::shared_memory_object shm_status(boost::interprocess::open_only,
191 boost::interprocess::read_write);
192 m_status =
new boost::interprocess::mapped_region(shm_status, boost::interprocess::read_write,
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
193 boost::interprocess::shared_memory_object shm(boost::interprocess::open_only,
195 boost::interprocess::read_write);
196 m_payload =
new boost::interprocess::mapped_region(shm, boost::interprocess::read_write, 0,
m_maxSize);
198 }
catch (boost::interprocess::interprocess_exception&
e) {
202 if (m_num <= 0 && num > 0) {
203 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
204 streamPortSuffix.assign(evtH->token);
208 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
213 return(StatusCode::SUCCESS);
225 return(StatusCode::FAILURE);
227 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
228 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
230 return(StatusCode::RECOVERABLE);
232 if (
source ==
nullptr && nbytes == 0) {
234 evtH->evtSeqNumber = 0;
235 evtH->fileSeqNumber = 0;
238 m_status->flush(0,
sizeof(ShareEventHeader));
239 return(StatusCode::SUCCESS);
241 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
248 evtH->evtSize = nbytes;
250 evtH->evtCoreStatusFlag =
status;
251 m_status->flush(0,
sizeof(ShareEventHeader));
252 evtH->evtProcessStatus = ShareEventHeader::FILLED;
253 return(StatusCode::SUCCESS);
258 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
259 if (evtH->evtProcessStatus == ShareEventHeader::LOCKED) {
260 const std::size_t nbytes = evtH->evtSize;
262 char* buf =
new char[nbytes];
263 std::memcpy(buf,
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, nbytes);
266 status = evtH->evtCoreStatusFlag;
269 return(StatusCode::FAILURE);
271 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
272 return(StatusCode::SUCCESS);
277 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
280 return(StatusCode::FAILURE);
282 if (evtH->evtSeqNumber == 0 && evtH->fileSeqNumber == 0 && evtH->evtSize == 0) {
284 return(StatusCode::FAILURE);
286 if (evtH->evtSeqNumber !=
eventNumber || evtH->evtProcessStatus != ShareEventHeader::FILLED) {
288 return(StatusCode::RECOVERABLE);
291 FileIncident endFileIncident(
name(),
"EndInputFile",
"SHM");
294 FileIncident beginFileIncident(
name(),
"BeginInputFile",
"SHM");
298 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
299 return(StatusCode::SUCCESS);
305 ATH_MSG_ERROR(
"Object size = " << nbytes <<
" greater than maximum for client = " <<
num);
306 return(StatusCode::FAILURE);
308 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
309 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
310 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
311 if (evtStatus != ShareEventHeader::CLEARED) {
312 ATH_MSG_DEBUG(
"Waiting for CLEARED putObject, client = " <<
num <<
", in state " << evtStatus);
313 return(StatusCode::RECOVERABLE);
316 evtH->evtSize = evtH->evtOffset;
318 m_payload->flush(0 + evtH->evtOffset, evtH->evtSize);
319 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
320 evtH->evtProcessStatus = ShareEventHeader::FILLED;
322 if (evtH->evtOffset +
sizeof(
size_t) + nbytes >
m_maxSize) {
323 ATH_MSG_ERROR(
"Object location = " << evtH->evtOffset <<
" greater than maximum for client = " <<
num);
324 return(StatusCode::FAILURE);
326 bool first = (evtH->evtOffset == 0);
327 std::memcpy(
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset, &nbytes,
sizeof(
size_t));
328 evtH->evtOffset +=
sizeof(size_t);
329 std::memcpy(
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset,
source, nbytes);
330 evtH->evtOffset += nbytes;
332 evtH->evtSize = evtH->evtOffset;
337 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
340 return(StatusCode::SUCCESS);
346 return(StatusCode::SUCCESS);
348 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
349 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
350 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
351 size_t evtSize = evtH->evtSize;
352 if (evtStatus != ShareEventHeader::FILLED) {
355 return(StatusCode::RECOVERABLE);
357 if (evtH->evtOffset < evtSize) {
358 std::memcpy(&nbytes,
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset,
sizeof(
size_t));
359 evtH->evtOffset +=
sizeof(size_t);
360 *
target =
static_cast<char*
>(
m_payload->get_address()) + evtH->evtOffset;
361 evtH->evtOffset += nbytes;
363 if (evtH->evtOffset == evtSize) {
365 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
366 while (evtH->evtProcessStatus != ShareEventHeader::FILLED) {
369 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
372 return(StatusCode::SUCCESS);
378 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
m_status->get_address());
379 if (evtH->evtProcessStatus != ShareEventHeader::CLEARED) {
381 return(StatusCode::RECOVERABLE);
383 *tokenString = evtH->token;
384 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
385 return(StatusCode::SUCCESS);
389 void*
status =
static_cast<char*
>(
m_status->get_address()) +
i *
sizeof(ShareEventHeader);
390 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
391 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus;
392 if (evtStatus == ShareEventHeader::FILLED) {
394 return(StatusCode::RECOVERABLE);
395 }
else if (
i ==
num && evtStatus != ShareEventHeader::LOCKED) {
397 return(StatusCode::RECOVERABLE);
400 if ((
i ==
num ||
num < 0) && evtStatus == ShareEventHeader::LOCKED) {
401 *tokenString = evtH->token;
403 ATH_MSG_DEBUG(
"Setting LOCK clearObject, for client = " <<
num <<
": " << evtH->token);
410 if (
num > 0 && *tokenString !=
nullptr) {
412 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
413 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
414 if (strncmp(*tokenString,
"wait", 4) == 0) {
416 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
419 return(StatusCode::SUCCESS);
421 if (strncmp(*tokenString,
"release", 7) == 0) {
423 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
425 return(StatusCode::SUCCESS);
427 if (strncmp(*tokenString,
"start", 5) == 0) {
433 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
435 return(StatusCode::SUCCESS);
437 if (strncmp(*tokenString,
"stop", 4) == 0) {
440 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
444 return(StatusCode::SUCCESS);
446 return(StatusCode::FAILURE);
449 return(StatusCode::SUCCESS);
452 m_status->flush(0,
sizeof(ShareEventHeader));
453 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
454 return(StatusCode::SUCCESS);
456 return(StatusCode::RECOVERABLE);
462 ATH_MSG_ERROR(
"Token = " << tokenString <<
", too long for AthenaSharedMemoryTool");
463 return(StatusCode::FAILURE);
465 void*
status =
static_cast<char*
>(
m_status->get_address()) +
num *
sizeof(ShareEventHeader);
466 ShareEventHeader* evtH =
static_cast<ShareEventHeader*
>(
status);
467 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
468 ATH_MSG_DEBUG(
"Waiting for UNLOCKED lockObject: " << tokenString);
469 return(StatusCode::RECOVERABLE);
474 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
475 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
477 m_status->flush(
num *
sizeof(ShareEventHeader),
sizeof(ShareEventHeader));
478 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
480 return(StatusCode::SUCCESS);