ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaSharedMemoryTool Class Reference

This class provides the IPCTool for SharedMemory objects. More...

#include <AthenaSharedMemoryTool.h>

Inheritance diagram for AthenaSharedMemoryTool:
Collaboration diagram for AthenaSharedMemoryTool:

Public Member Functions

 AthenaSharedMemoryTool (const std::string &type, const std::string &name, const IInterface *parent)
 Standard Service Constructor.
virtual ~AthenaSharedMemoryTool ()
 Destructor.
virtual StatusCode initialize () override
 Gaudi Service Interface method implementations:
virtual StatusCode stop () override
virtual StatusCode finalize () override
virtual StatusCode makeServer (int num, const std::string &streamPortSuffix) override
virtual bool isServer () const override
virtual StatusCode makeClient (int num, std::string &streamPortSuffix) override
virtual bool isClient () const override
virtual StatusCode putEvent ATLAS_NOT_THREAD_SAFE (long eventNumber, const void *source, size_t nbytes, unsigned int status) const override
virtual StatusCode getLockedEvent (void **target, unsigned int &status) const override
virtual StatusCode lockEvent (long eventNumber) const override
virtual StatusCode putObject (const void *source, size_t nbytes, int num=0) override
virtual StatusCode getObject (void **target, size_t &nbytes, int num=0) override
virtual StatusCode clearObject (const char **tokenString, int &num) override
virtual StatusCode lockObject (const char *tokenString, int num=0) override

Private Attributes

Gaudi::Property< std::string > m_sharedMemory {this, "SharedMemoryName", {}}
Gaudi::Property< size_t > m_maxSize {this, "SharedMemoryObjectSize", 64 * 1024 * 1024, "Maximum shared memory object size in B (default = 64 MB)"}
Gaudi::Property< int > m_maxDataClients {this, "SharedMemoryClientSize", 256, "Maximum number of clients (default = 256)"}
int m_num {-1}
int m_lastClient {-1}
std::set< int > m_dataClients
std::unique_ptr< boost::interprocess::mapped_region > m_payload
std::unique_ptr< boost::interprocess::mapped_region > m_status
long m_fileSeqNumber {0}
bool m_isServer {false}
bool m_isClient {false}
ServiceHandle< IIncidentSvc > m_incidentSvc
std::optional< InputFileIncidentGuard > m_inputFileGuard ATLAS_THREAD_SAFE

Detailed Description

This class provides the IPCTool for SharedMemory objects.

Definition at line 36 of file AthenaSharedMemoryTool.h.

Constructor & Destructor Documentation

◆ AthenaSharedMemoryTool()

AthenaSharedMemoryTool::AthenaSharedMemoryTool ( const std::string & type,
const std::string & name,
const IInterface * parent )

Standard Service Constructor.

Definition at line 36 of file AthenaSharedMemoryTool.cxx.

38 :
39 base_class(type, name, parent),
40 m_incidentSvc("IncidentSvc", name) {
41 m_sharedMemory.setValue(name);
42}
Gaudi::Property< std::string > m_sharedMemory
ServiceHandle< IIncidentSvc > m_incidentSvc

◆ ~AthenaSharedMemoryTool()

AthenaSharedMemoryTool::~AthenaSharedMemoryTool ( )
virtual

Destructor.

Definition at line 45 of file AthenaSharedMemoryTool.cxx.

45 {
46}

Member Function Documentation

◆ ATLAS_NOT_THREAD_SAFE()

virtual StatusCode putEvent AthenaSharedMemoryTool::ATLAS_NOT_THREAD_SAFE ( long eventNumber,
const void * source,
size_t nbytes,
unsigned int status ) const
overridevirtual

◆ clearObject()

StatusCode AthenaSharedMemoryTool::clearObject ( const char ** tokenString,
int & num )
overridevirtual

Definition at line 356 of file AthenaSharedMemoryTool.cxx.

356 {
357 if (m_isClient) {
358 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address());
359 if (evtH->evtProcessStatus != ShareEventHeader::CLEARED) {
360 ATH_MSG_DEBUG("Waiting for FILL clearObject");
361 return(StatusCode::RECOVERABLE);
362 }
363 *tokenString = evtH->token;
364 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
365 return(StatusCode::SUCCESS);
366 }
367
368 for (int i = 1; i <= m_num; i++) { // FIXME: PvG, do round robin
369 void* status = static_cast<char*>(m_status->get_address()) + i * sizeof(ShareEventHeader);
370 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(status);
371 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus; // read only once
372 if (evtStatus == ShareEventHeader::FILLED) {
373 ATH_MSG_DEBUG("Waiting for UNFILL clearObject, client = " << i);
374 return(StatusCode::RECOVERABLE);
375 } else if (i == num && evtStatus != ShareEventHeader::LOCKED) {
376 ATH_MSG_DEBUG("Waiting for LOCK clearObject, client = " << i);
377 return(StatusCode::RECOVERABLE);
378 }
379 if (m_lastClient < 0 || m_lastClient == i) { // If Writer is not yet released, only consider last client
380 if ((i == num || num < 0) && evtStatus == ShareEventHeader::LOCKED) {
381 *tokenString = evtH->token;
382 num = i;
383 ATH_MSG_DEBUG("Setting LOCK clearObject, for client = " << num << ": " << evtH->token);
384 m_lastClient = -1;
385 // break;
386 }
387 }
388 }
389
390 if (num > 0 && *tokenString != nullptr) {
391 m_lastClient = -1; // Release Writer from last client
392 void* status = static_cast<char*>(m_status->get_address()) + num * sizeof(ShareEventHeader);
393 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(status);
394 if (strncmp(*tokenString, "wait", 4) == 0) {
395 ATH_MSG_INFO("Server clearObject() got wait, client = " << num);
396 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
397 m_lastClient = num; // Keep Writer allocated
398 num = -1;
399 return(StatusCode::SUCCESS);
400 }
401 if (strncmp(*tokenString, "release", 7) == 0) {
402 ATH_MSG_INFO("Server clearObject() got release, client = " << num);
403 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
404 num = -1;
405 return(StatusCode::SUCCESS);
406 }
407 if (strncmp(*tokenString, "start", 5) == 0) {
408 ATH_MSG_INFO("Server clearObject() got start, client = " << num << ", of " << m_num - 1);
409 if (m_dataClients.empty()) { // Annouce all workers to prevent early writer termination
410 for (int i = 1; i < m_num - 1; i++) m_dataClients.insert(i);
411 }
412 m_dataClients.insert(num);
413 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
414 num = -1;
415 return(StatusCode::SUCCESS);
416 }
417 if (strncmp(*tokenString, "stop", 4) == 0) {
418 ATH_MSG_INFO("Server clearObject() got stop, client = " << num);
419 m_dataClients.erase(num);
420 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
421 if (m_dataClients.empty()) {
422 ATH_MSG_INFO("Server clearObject() got stop, client ALL: " << m_num);
423 if (num == m_num - 1) { // mother process
424 return(StatusCode::SUCCESS);
425 }
426 return(StatusCode::FAILURE);
427 }
428 num = -1;
429 return(StatusCode::SUCCESS);
430 }
431 evtH->evtSize = 0;
432 m_status->flush(0, sizeof(ShareEventHeader));
433 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
434 return(StatusCode::SUCCESS);
435 }
436 return(StatusCode::RECOVERABLE);
437}
#define ATH_MSG_INFO(x)
#define ATH_MSG_DEBUG(x)
std::unique_ptr< boost::interprocess::mapped_region > m_status
status
Definition merge.py:16

◆ finalize()

StatusCode AthenaSharedMemoryTool::finalize ( )
overridevirtual

Definition at line 77 of file AthenaSharedMemoryTool.cxx.

77 {
78 ATH_MSG_INFO("in finalize()");
79 if (m_isServer) {
80 try {
81 boost::interprocess::shared_memory_object::remove(m_sharedMemory.value().c_str());
82 } catch(boost::interprocess::interprocess_exception &e) {
83 ATH_MSG_WARNING("Cannot remove shared memory " << m_sharedMemory.value() << ": " << boost::diagnostic_information(e));
84 }
85 const std::string statusName = m_sharedMemory.value() + "_status";
86 try {
87 boost::interprocess::shared_memory_object::remove(statusName.c_str());
88 } catch(boost::interprocess::interprocess_exception &e) {
89 ATH_MSG_WARNING("Cannot remove shared memory " << statusName << ": " << boost::diagnostic_information(e));
90 }
91 }
92 // Release IncidentSvc
93 if (!m_incidentSvc.release().isSuccess()) {
94 ATH_MSG_WARNING("Cannot release IncidentSvc.");
95 }
96 return(::AthAlgTool::finalize());
97}
#define ATH_MSG_WARNING(x)

◆ getLockedEvent()

StatusCode AthenaSharedMemoryTool::getLockedEvent ( void ** target,
unsigned int & status ) const
overridevirtual

Definition at line 241 of file AthenaSharedMemoryTool.cxx.

241 {
242 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address());
243 if (evtH->evtProcessStatus == ShareEventHeader::LOCKED) {
244 const std::size_t nbytes = evtH->evtSize;
245 if (target != nullptr) {
246 char* buf = new char[nbytes];
247 std::memcpy(buf, static_cast<char*>(m_payload->get_address()) + evtH->evtOffset, nbytes);
248 *target = buf;
249 }
250 status = evtH->evtCoreStatusFlag;
251 } else {
252 ATH_MSG_ERROR("No event locked");
253 return(StatusCode::FAILURE);
254 }
255 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
256 return(StatusCode::SUCCESS);
257}
#define ATH_MSG_ERROR(x)
std::unique_ptr< boost::interprocess::mapped_region > m_payload

◆ getObject()

StatusCode AthenaSharedMemoryTool::getObject ( void ** target,
size_t & nbytes,
int num = 0 )
overridevirtual

Definition at line 325 of file AthenaSharedMemoryTool.cxx.

325 {
326 if (target == nullptr) {
327 return(StatusCode::SUCCESS);
328 }
329 void* status = static_cast<char*>(m_status->get_address()) + num * sizeof(ShareEventHeader);
330 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(status);
331 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus; // read only once
332 size_t evtSize = evtH->evtSize; // read only once
333 if (evtStatus != ShareEventHeader::FILLED) {
334 ATH_MSG_DEBUG("Waiting for FILLED getObject, client = " << num);
335 nbytes = 0;
336 return(StatusCode::RECOVERABLE);
337 }
338 if (evtH->evtOffset < evtSize) {
339 std::memcpy(&nbytes, static_cast<char*>(m_payload->get_address()) + evtH->evtOffset, sizeof(size_t));
340 evtH->evtOffset += sizeof(size_t);
341 *target = static_cast<char*>(m_payload->get_address()) + evtH->evtOffset;
342 evtH->evtOffset += nbytes;
343 return(StatusCode::SUCCESS); // return object
344 }
345 nbytes = 0;
346 evtH->evtOffset = 0;
347 m_status->flush(num * sizeof(ShareEventHeader), sizeof(ShareEventHeader));
348 while (evtH->evtProcessStatus != ShareEventHeader::FILLED) {
349 usleep(10);
350 }
351 evtH->evtProcessStatus = ShareEventHeader::UNLOCKED;
352 return(StatusCode::SUCCESS); // unlock server
353}

◆ initialize()

StatusCode AthenaSharedMemoryTool::initialize ( )
overridevirtual

Gaudi Service Interface method implementations:

Definition at line 49 of file AthenaSharedMemoryTool.cxx.

49 {
50 ATH_MSG_INFO("Initializing " << name());
51
52 ATH_CHECK( m_incidentSvc.retrieve() );
53
54 return(StatusCode::SUCCESS);
55}
#define ATH_CHECK
Evaluate an expression and check for errors.

◆ isClient()

bool AthenaSharedMemoryTool::isClient ( ) const
overridevirtual

Definition at line 201 of file AthenaSharedMemoryTool.cxx.

201 {
202 return(m_isClient);
203}

◆ isServer()

bool AthenaSharedMemoryTool::isServer ( ) const
overridevirtual

Definition at line 141 of file AthenaSharedMemoryTool.cxx.

141 {
142 return(m_isServer);
143}

◆ lockEvent()

StatusCode AthenaSharedMemoryTool::lockEvent ( long eventNumber) const
overridevirtual

Definition at line 260 of file AthenaSharedMemoryTool.cxx.

260 {
261 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address());
262 if (evtH->evtSeqNumber > eventNumber) {
263 ATH_MSG_ERROR("eventNumber = " << eventNumber << ", already processed");
264 return(StatusCode::FAILURE);
265 }
266 if (evtH->evtSeqNumber == 0 && evtH->fileSeqNumber == 0 && evtH->evtSize == 0) {
267 ATH_MSG_DEBUG("Server has been terminated");
268 return(StatusCode::FAILURE);
269 }
270 if (evtH->evtSeqNumber != eventNumber || evtH->evtProcessStatus != ShareEventHeader::FILLED) {
271 ATH_MSG_DEBUG("Waiting for lockEvent, eventNumber = " << eventNumber);
272 return(StatusCode::RECOVERABLE);
273 }
274 if (evtH->fileSeqNumber != m_fileSeqNumber && m_fileSeqNumber > 0) {
275 InputFileIncidentGuard::transition(m_inputFileGuard, *m_incidentSvc, name(), "SHM", {});
276 const_cast<AthenaSharedMemoryTool*>(this)->m_fileSeqNumber = evtH->fileSeqNumber;
277 }
278 ATH_MSG_DEBUG("Locking eventNumber = " << eventNumber);
279 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
280 return(StatusCode::SUCCESS);
281}
AthenaSharedMemoryTool(const std::string &type, const std::string &name, const IInterface *parent)
Standard Service Constructor.
static void transition(std::optional< InputFileIncidentGuard > &guard, IIncidentSvc &incSvc, std::string_view source, std::string_view beginFileName, std::string_view guid, std::string_view endFileName={}, std::string_view beginType=IncidentType::BeginInputFile, std::string_view endType=IncidentType::EndInputFile)
Replace the guard in an optional, with strict End-before-Begin ordering.

◆ lockObject()

StatusCode AthenaSharedMemoryTool::lockObject ( const char * tokenString,
int num = 0 )
overridevirtual

Definition at line 440 of file AthenaSharedMemoryTool.cxx.

440 {
441 if (strlen(tokenString) >= maxTokenLength) {
442 ATH_MSG_ERROR("Token = " << tokenString << ", too long for AthenaSharedMemoryTool");
443 return(StatusCode::FAILURE);
444 }
445 void* status = static_cast<char*>(m_status->get_address()) + num * sizeof(ShareEventHeader);
446 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(status);
447 if (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
448 ATH_MSG_DEBUG("Waiting for UNLOCKED lockObject: " << tokenString);
449 return(StatusCode::RECOVERABLE);
450 }
451 strncpy(evtH->token, tokenString, maxTokenLength - 1); evtH->token[maxTokenLength - 1] = 0;
452 if (m_isServer) {
453 evtH->evtSize = 0;
454 m_status->flush(num * sizeof(ShareEventHeader), sizeof(ShareEventHeader));
455 evtH->evtProcessStatus = ShareEventHeader::CLEARED;
456 } else {
457 m_status->flush(num * sizeof(ShareEventHeader), sizeof(ShareEventHeader));
458 evtH->evtProcessStatus = ShareEventHeader::LOCKED;
459 }
460 return(StatusCode::SUCCESS);
461}
const std::size_t maxTokenLength

◆ makeClient()

StatusCode AthenaSharedMemoryTool::makeClient ( int num,
std::string & streamPortSuffix )
overridevirtual

Definition at line 146 of file AthenaSharedMemoryTool.cxx.

146 {
147 if (m_isServer) {
148 ATH_MSG_ERROR("Cannot make AthenaSharedMemoryTool a Client.");
149 return(StatusCode::FAILURE);
150 }
151 if (num >= m_maxDataClients) {
152 ATH_MSG_ERROR("Too many clients for AthenaSharedMemoryTool.");
153 return(StatusCode::FAILURE);
154 }
155 if (m_num > 0 && num <= 0) {// stop running client
156 ATH_MSG_DEBUG("Stop AthenaSharedMemoryTool Client.");
157 m_num = -1;
158 while (lockObject("stop").isRecoverable()) {
159 usleep(100);
160 }
161 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address());
162 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
163 usleep(100);
164 }
165 m_payload.reset();
166 m_status.reset();
167 m_isClient = false;
168 return(StatusCode::SUCCESS);
169 }
170 while (!m_isClient) {
171 try { // Check whether Server created shared memory object
172 const std::string statusName = m_sharedMemory.value() + "_status";
173 boost::interprocess::shared_memory_object shm_status(boost::interprocess::open_only,
174 statusName.c_str(),
175 boost::interprocess::read_write);
176 m_status = std::make_unique<boost::interprocess::mapped_region>(shm_status, boost::interprocess::read_write, num * sizeof(ShareEventHeader), sizeof(ShareEventHeader));
177 boost::interprocess::shared_memory_object shm(boost::interprocess::open_only,
178 m_sharedMemory.value().c_str(),
179 boost::interprocess::read_write);
180 m_payload = std::make_unique<boost::interprocess::mapped_region>(shm, boost::interprocess::read_write, 0, m_maxSize);
181 m_isClient = true;
182 } catch (boost::interprocess::interprocess_exception& e) {
183 usleep(100000);
184 }
185 }
186 if (m_num <= 0 && num > 0) {
187 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address());
188 streamPortSuffix.assign(evtH->token);
189 while (lockObject("start").isRecoverable()) {
190 usleep(100);
191 }
192 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
193 usleep(100);
194 }
195 m_num = num;
196 }
197 return(StatusCode::SUCCESS);
198}
virtual StatusCode lockObject(const char *tokenString, int num=0) override
Gaudi::Property< size_t > m_maxSize
Gaudi::Property< int > m_maxDataClients

◆ makeServer()

StatusCode AthenaSharedMemoryTool::makeServer ( int num,
const std::string & streamPortSuffix )
overridevirtual

Definition at line 100 of file AthenaSharedMemoryTool.cxx.

100 {
101 if (m_isServer || m_isClient) {
102 ATH_MSG_ERROR("Cannot make AthenaSharedMemoryTool a Server.");
103 return(StatusCode::FAILURE);
104 }
105 if (num > m_maxDataClients) {
106 ATH_MSG_ERROR("Too many clients for AthenaSharedMemoryTool Server.");
107 return(StatusCode::FAILURE);
108 }
109 m_num = num;
110 m_isServer = true;
111 ATH_MSG_DEBUG("Creating shared memory object with name \"" << m_sharedMemory.value() << "\"");
112 std::optional<boost::interprocess::shared_memory_object> shm;
113 try {
114 shm.emplace(boost::interprocess::create_only, m_sharedMemory.value().c_str(), boost::interprocess::read_write);
115 } catch(boost::interprocess::interprocess_exception &e) {
116 ATH_MSG_ERROR("Cannot create shared memory " << m_sharedMemory.value() << ": " << boost::diagnostic_information(e));
117 return StatusCode::FAILURE;
118 }
119 shm->truncate(m_maxSize);
120 m_payload = std::make_unique<boost::interprocess::mapped_region>(*shm, boost::interprocess::read_write, 0, m_maxSize);
121 const std::string statusName = m_sharedMemory.value() + "_status";
122 std::optional<boost::interprocess::shared_memory_object> shm_status;
123 try {
124 shm_status.emplace(boost::interprocess::create_only, statusName.c_str(), boost::interprocess::read_write);
125 } catch(boost::interprocess::interprocess_exception &e) {
126 ATH_MSG_ERROR("Cannot create shared memory " << statusName << ": " << boost::diagnostic_information(e));
127 return StatusCode::FAILURE;
128 }
129 shm_status->truncate(num * sizeof(ShareEventHeader));
130 m_status = std::make_unique<boost::interprocess::mapped_region>(*shm_status, boost::interprocess::read_write, 0, num * sizeof(ShareEventHeader));
131 ShareEventHeader evtH = { ShareEventHeader::UNLOCKED, -1, -1, 0, 0, 0, "" };
132 std::memcpy(evtH.token, streamPortSuffix.c_str(), maxTokenLength - 1);
133 evtH.token[maxTokenLength - 1] = 0;
134 for (int i = 0; i < num; i++) {
135 std::memcpy(static_cast<char*>(m_status->get_address()) + i * sizeof(ShareEventHeader), &evtH, sizeof(ShareEventHeader));
136 }
137 return(StatusCode::SUCCESS);
138}

◆ putObject()

StatusCode AthenaSharedMemoryTool::putObject ( const void * source,
size_t nbytes,
int num = 0 )
overridevirtual

Definition at line 284 of file AthenaSharedMemoryTool.cxx.

284 {
285 if (nbytes > m_maxSize) {
286 ATH_MSG_ERROR("Object size = " << nbytes << " greater than maximum for client = " << num);
287 return(StatusCode::FAILURE);
288 }
289 void* status = static_cast<char*>(m_status->get_address()) + num * sizeof(ShareEventHeader);
290 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(status);
291 ShareEventHeader::ProcessStatus evtStatus = evtH->evtProcessStatus; // read only once
292 if (evtStatus != ShareEventHeader::CLEARED) {
293 ATH_MSG_DEBUG("Waiting for CLEARED putObject, client = " << num << ", in state " << evtStatus);
294 return(StatusCode::RECOVERABLE);
295 }
296 if (source == nullptr) {
297 evtH->evtSize = evtH->evtOffset;
298 evtH->evtOffset = 0;
299 m_payload->flush(0 + evtH->evtOffset, evtH->evtSize);
300 m_status->flush(num * sizeof(ShareEventHeader), sizeof(ShareEventHeader));
301 evtH->evtProcessStatus = ShareEventHeader::FILLED;
302 } else {
303 if (evtH->evtOffset + sizeof(size_t) + nbytes > m_maxSize) {
304 ATH_MSG_ERROR("Object location = " << evtH->evtOffset << " greater than maximum for client = " << num);
305 return(StatusCode::FAILURE);
306 }
307 bool first = (evtH->evtOffset == 0);
308 std::memcpy(static_cast<char*>(m_payload->get_address()) + evtH->evtOffset, &nbytes, sizeof(size_t));
309 evtH->evtOffset += sizeof(size_t);
310 std::memcpy(static_cast<char*>(m_payload->get_address()) + evtH->evtOffset, source, nbytes);
311 evtH->evtOffset += nbytes;
312 if (evtH->evtSize == m_maxSize) {
313 evtH->evtSize = evtH->evtOffset;
314 }
315 if (first) {
316 evtH->evtSize = m_maxSize;
317 m_payload->flush(0, evtH->evtOffset);
318 m_status->flush(num * sizeof(ShareEventHeader), sizeof(ShareEventHeader));
319 }
320 }
321 return(StatusCode::SUCCESS);
322}
bool first
Definition DeMoScan.py:534

◆ stop()

StatusCode AthenaSharedMemoryTool::stop ( )
overridevirtual

Definition at line 58 of file AthenaSharedMemoryTool.cxx.

58 {
59 ATH_MSG_INFO("in stop()");
60 // Fire EndInputFile for any file still open.
61 m_inputFileGuard.reset();
62 if (m_isClient && m_num > 0) {
63 ATH_MSG_INFO("Client stop() inform Server: " << m_num);
64 m_num = -1;
65 while (lockObject("stop").isRecoverable()) {
66 usleep(100);
67 }
68 ShareEventHeader* evtH = static_cast<ShareEventHeader*>(m_status->get_address());
69 while (evtH->evtProcessStatus != ShareEventHeader::UNLOCKED) {
70 usleep(100);
71 }
72 }
73 return(StatusCode::SUCCESS);
74}

Member Data Documentation

◆ ATLAS_THREAD_SAFE

std::optional<InputFileIncidentGuard> m_inputFileGuard AthenaSharedMemoryTool::ATLAS_THREAD_SAFE
mutableprivate

Definition at line 76 of file AthenaSharedMemoryTool.h.

◆ m_dataClients

std::set<int> AthenaSharedMemoryTool::m_dataClients
private

Definition at line 69 of file AthenaSharedMemoryTool.h.

◆ m_fileSeqNumber

long AthenaSharedMemoryTool::m_fileSeqNumber {0}
private

Definition at line 72 of file AthenaSharedMemoryTool.h.

72{0};

◆ m_incidentSvc

ServiceHandle<IIncidentSvc> AthenaSharedMemoryTool::m_incidentSvc
private

Definition at line 75 of file AthenaSharedMemoryTool.h.

◆ m_isClient

bool AthenaSharedMemoryTool::m_isClient {false}
private

Definition at line 74 of file AthenaSharedMemoryTool.h.

74{false};

◆ m_isServer

bool AthenaSharedMemoryTool::m_isServer {false}
private

Definition at line 73 of file AthenaSharedMemoryTool.h.

73{false};

◆ m_lastClient

int AthenaSharedMemoryTool::m_lastClient {-1}
private

Definition at line 68 of file AthenaSharedMemoryTool.h.

68{-1};

◆ m_maxDataClients

Gaudi::Property<int> AthenaSharedMemoryTool::m_maxDataClients {this, "SharedMemoryClientSize", 256, "Maximum number of clients (default = 256)"}
private

Definition at line 65 of file AthenaSharedMemoryTool.h.

65{this, "SharedMemoryClientSize", 256, "Maximum number of clients (default = 256)"};

◆ m_maxSize

Gaudi::Property<size_t> AthenaSharedMemoryTool::m_maxSize {this, "SharedMemoryObjectSize", 64 * 1024 * 1024, "Maximum shared memory object size in B (default = 64 MB)"}
private

Definition at line 64 of file AthenaSharedMemoryTool.h.

64{this, "SharedMemoryObjectSize", 64 * 1024 * 1024, "Maximum shared memory object size in B (default = 64 MB)"};

◆ m_num

int AthenaSharedMemoryTool::m_num {-1}
private

Definition at line 67 of file AthenaSharedMemoryTool.h.

67{-1};

◆ m_payload

std::unique_ptr<boost::interprocess::mapped_region> AthenaSharedMemoryTool::m_payload
private

Definition at line 70 of file AthenaSharedMemoryTool.h.

◆ m_sharedMemory

Gaudi::Property<std::string> AthenaSharedMemoryTool::m_sharedMemory {this, "SharedMemoryName", {}}
private

Definition at line 63 of file AthenaSharedMemoryTool.h.

63{this, "SharedMemoryName", {}};

◆ m_status

std::unique_ptr<boost::interprocess::mapped_region> AthenaSharedMemoryTool::m_status
private

Definition at line 71 of file AthenaSharedMemoryTool.h.


The documentation for this class was generated from the following files: