ATLAS Offline Software
Loading...
Searching...
No Matches
SharedQueue.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7#include <boost/interprocess/ipc/message_queue.hpp>
8#include <iostream>
9
10using namespace boost::interprocess;
11
12
13namespace AthenaInterprocess {
14
15//- construction/destruction -------------------------------------------------
17{
18/* empty */
19}
20
21 SharedQueue::SharedQueue( const std::string& name, int max_msg, std::size_t max_size, bool do_unlink ) :
22 m_queue( 0 ), m_name( 0 ), m_count( 0 )
23{
24 m_queue = new message_queue( open_or_create, name.c_str(), max_msg, max_size );
25 if ( do_unlink )
26 message_queue::remove( name.c_str() );
27
28 m_count = new int( 1 );
29 m_name = new std::string( name );
30}
31
33{
34 copy( other );
35}
36
38{
39 if ( &other != this ) {
40 destroy();
41 copy( other );
42 }
43
44 return *this;
45}
46
51
52
53void SharedQueue::copy( const SharedQueue& other )
54{
55 if ( other.m_count ) {
56 *other.m_count += 1;
57 m_count = other.m_count;
58 m_queue = other.m_queue;
59 m_name = other.m_name;
60 } else {
61 m_count = 0;
62 m_queue = 0;
63 m_name = 0;
64 }
65}
66
68{
69 if ( m_count && --*m_count <= 0 ) {
70 delete m_queue; m_queue = 0;
71 // message_queue::remove( m_name->c_str() );
72 delete m_name; m_name = 0;
73 delete m_count; m_count = 0;
74 }
75}
76
77//- public member functions --------------------------------------------------
78std::string SharedQueue::name() const
79{
80 if ( m_name )
81 return *m_name;
82 return "";
83}
84
85
86static inline bool do_send( message_queue* mq, const std::string& buf, bool block )
87{
88 bool send_ok = true;
89
90 try {
91 if ( block )
92 mq->send( buf.data(), buf.size(), 0 );
93 else
94 send_ok = mq->try_send( buf.data(), buf.size(), 0 );
95 } catch ( interprocess_exception& e ) {
96 send_ok = false;
97 }
98
99 return send_ok;
100}
101
102bool SharedQueue::try_send( const std::string& buf )
103{
104 return do_send( m_queue, buf, false );
105}
106
107bool SharedQueue::send( const std::string& buf )
108{
109 return do_send( m_queue, buf, true );
110}
111
112static inline std::string do_receive( message_queue* mq, bool block )
113{
114 char buf[ MAX_MSG_SIZE ];
115
116 try {
117 std::size_t recvd_size = 0;
118 unsigned int priority = 0;
119
120 bool result = true;
121 if ( block )
122 mq->receive( buf, MAX_MSG_SIZE, recvd_size, priority );
123 else
124 result = mq->try_receive( buf, MAX_MSG_SIZE, recvd_size, priority );
125
126 if ( result ) {
127 if ( recvd_size > MAX_MSG_SIZE )
128 recvd_size = MAX_MSG_SIZE; // this is debatable, but send
129 // should have failed already, so
130 // "can not happen" applies
131 return std::string( buf, recvd_size );
132 }
133 } catch ( interprocess_exception& e ) {
134 /* silent for now: should return a callable error function */
135 }
136
137 return "";
138}
139
141{
142 return do_receive( m_queue, false );
143}
144
146{
147 return do_receive( m_queue, true );
148}
149
150} // namespace AthenaInterprocess
virtual std::string try_receive()
void copy(const SharedQueue &other)
virtual bool try_send(const std::string &)
SharedQueue & operator=(const SharedQueue &other)
virtual std::string receive()
virtual bool send(const std::string &)
boost::interprocess::message_queue * m_queue
Definition SharedQueue.h:66
static std::string do_receive(message_queue *mq, bool block)
static const std::size_t MAX_MSG_SIZE
Definition SharedQueue.h:19
static bool do_send(message_queue *mq, const std::string &buf, bool block)