ATLAS Offline Software
SharedQueue.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 
7 #include <boost/interprocess/ipc/message_queue.hpp>
8 #include <iostream>
9 
10 using namespace boost::interprocess;
11 
12 
13 namespace AthenaInterprocess {
14 
15 //- construction/destruction -------------------------------------------------
16 SharedQueue::SharedQueue() : m_queue( 0 ), m_name( 0 ), m_count( 0 )
17 {
18 /* empty */
19 }
20 
21  SharedQueue::SharedQueue( const std::string& name, int max_msg, std::size_t max_size, bool do_unlink ) :
22  m_queue( 0 ), m_name( 0 ), m_count( 0 )
23 {
24  m_queue = new message_queue( open_or_create, name.c_str(), max_msg, max_size );
25  if ( do_unlink )
26  message_queue::remove( name.c_str() );
27 
28  m_count = new int( 1 );
29  m_name = new std::string( name );
30 }
31 
33 {
34  copy( other );
35 }
36 
38 {
39  if ( &other != this ) {
40  destroy();
41  copy( other );
42  }
43 
44  return *this;
45 }
46 
48 {
49  destroy();
50 }
51 
52 
54 {
55  if ( other.m_count ) {
56  *other.m_count += 1;
57  m_count = other.m_count;
58  m_queue = other.m_queue;
59  m_name = other.m_name;
60  } else {
61  m_count = 0;
62  m_queue = 0;
63  m_name = 0;
64  }
65 }
66 
68 {
69  if ( m_count && --*m_count <= 0 ) {
70  delete m_queue; m_queue = 0;
71  // message_queue::remove( m_name->c_str() );
72  delete m_name; m_name = 0;
73  delete m_count; m_count = 0;
74  }
75 }
76 
77 //- public member functions --------------------------------------------------
78 std::string SharedQueue::name() const
79 {
80  if ( m_name )
81  return *m_name;
82  return "";
83 }
84 
85 
86 static inline bool do_send( message_queue* mq, const std::string& buf, bool block )
87 {
88  bool send_ok = true;
89 
90  try {
91  if ( block )
92  mq->send( buf.data(), buf.size(), 0 );
93  else
94  send_ok = mq->try_send( buf.data(), buf.size(), 0 );
95  } catch ( interprocess_exception& e ) {
96  send_ok = false;
97  }
98 
99  return send_ok;
100 }
101 
102 bool SharedQueue::try_send( const std::string& buf )
103 {
104  return do_send( m_queue, buf, false );
105 }
106 
107 bool SharedQueue::send( const std::string& buf )
108 {
109  return do_send( m_queue, buf, true );
110 }
111 
112 static inline std::string do_receive( message_queue* mq, bool block )
113 {
114  char buf[ MAX_MSG_SIZE ];
115 
116  try {
117  std::size_t recvd_size = 0;
118  unsigned int priority = 0;
119 
120  bool result = true;
121  if ( block )
122  mq->receive( buf, MAX_MSG_SIZE, recvd_size, priority );
123  else
124  result = mq->try_receive( buf, MAX_MSG_SIZE, recvd_size, priority );
125 
126  if ( result ) {
127  if ( recvd_size > MAX_MSG_SIZE )
128  recvd_size = MAX_MSG_SIZE; // this is debatable, but send
129  // should have failed already, so
130  // "can not happen" applies
131  return std::string( buf, recvd_size );
132  }
133  } catch ( interprocess_exception& e ) {
134  /* silent for now: should return a callable error function */
135  }
136 
137  return "";
138 }
139 
141 {
142  return do_receive( m_queue, false );
143 }
144 
145 std::string SharedQueue::receive()
146 {
147  return do_receive( m_queue, true );
148 }
149 
150 } // namespace AthenaInterprocess
AllowedVariables::e
e
Definition: AsgElectronSelectorTool.cxx:37
AthenaInterprocess
Definition: FdsRegistry.h:11
AthenaInterprocess::SharedQueue::m_name
std::string * m_name
Definition: SharedQueue.h:67
get_generator_info.result
result
Definition: get_generator_info.py:21
AthenaInterprocess::SharedQueue::m_queue
boost::interprocess::message_queue * m_queue
Definition: SharedQueue.h:66
AthenaInterprocess::SharedQueue::m_count
int * m_count
Definition: SharedQueue.h:68
AthenaInterprocess::SharedQueue::name
std::string name() const
Definition: SharedQueue.cxx:78
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
AthenaInterprocess::SharedQueue::destroy
void destroy()
Definition: SharedQueue.cxx:67
AthenaInterprocess::SharedQueue::try_receive
virtual std::string try_receive()
Definition: SharedQueue.cxx:140
python.selector.AtlRunQuerySelectorLhcOlc.priority
priority
Definition: AtlRunQuerySelectorLhcOlc.py:611
AthenaInterprocess::SharedQueue::operator=
SharedQueue & operator=(const SharedQueue &other)
Definition: SharedQueue.cxx:37
PixelModuleFeMask_create_db.remove
string remove
Definition: PixelModuleFeMask_create_db.py:83
AthenaInterprocess::SharedQueue::send
virtual bool send(const std::string &)
Definition: SharedQueue.cxx:107
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
AthenaInterprocess::SharedQueue::copy
void copy(const SharedQueue &other)
Definition: SharedQueue.cxx:53
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
AthenaInterprocess::SharedQueue::SharedQueue
SharedQueue()
Definition: SharedQueue.cxx:16
InDetDD::other
@ other
Definition: InDetDD_Defs.h:16
SharedQueue.h
AthenaInterprocess::SharedQueue::~SharedQueue
virtual ~SharedQueue()
Definition: SharedQueue.cxx:47
AthenaInterprocess::SharedQueue::try_send
virtual bool try_send(const std::string &)
Definition: SharedQueue.cxx:102
boost::interprocess
Definition: AthenaSharedMemoryTool.h:24
AthenaInterprocess::SharedQueue::receive
virtual std::string receive()
Definition: SharedQueue.cxx:145