ATLAS Offline Software
SharedQueue.h
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #ifndef ATHENAINTERPROCESS_SHAREDQUEUE_H
6 #define ATHENAINTERPROCESS_SHAREDQUEUE_H
7 
9 
10 // full include needed, as message_queue has changed into a
11 // template for more recent versions of boost
12 #include <boost/interprocess/ipc/message_queue.hpp>
13 
14 #include <string>
15 
16 namespace AthenaInterprocess {
17 
18 static const int SHAREDQUEUE_MAX_MSG = 1000;
19 static const std::size_t MAX_MSG_SIZE = 256;
20 
21 class SharedQueue {
22 public:
23 
24 public:
25  SharedQueue();
26  SharedQueue( const std::string& name,
27  int max_msg = SHAREDQUEUE_MAX_MSG,
28  std::size_t max_size = MAX_MSG_SIZE,
29  bool do_unlink = true );
30  SharedQueue( const SharedQueue& other );
32  virtual ~SharedQueue();
33 
34 public:
35  std::string name() const;
36 
37  virtual bool try_send( const std::string& ); // non-blocking
38  virtual bool send( const std::string& );
39 
40  virtual std::string try_receive(); // non-blocking
41  virtual std::string receive();
42 
43  template<typename T> bool try_send_basic(T);
44  template<typename T> bool send_basic(T);
45 
46  template<typename T> bool try_receive_basic(T&);
47  template<typename T> bool receive_basic(T&);
48 
49  operator bool() const {
50  return (bool)m_queue;
51  }
52 
53 protected:
54  boost::interprocess::message_queue* operator->() {
55  return m_queue;
56  }
57 
58 private:
59  void copy( const SharedQueue& other );
60  void destroy();
61 
62  template<typename T> bool do_send_basic(T,bool);
63  template<typename T> bool do_receive_basic(T&,bool);
64 
65 private:
66  boost::interprocess::message_queue* m_queue;
67  std::string* m_name;
68  int* m_count;
69 };
70 
71 template<typename T> bool SharedQueue::do_send_basic(T data,bool block)
72 {
73  bool send_ok = true;
74 
75  try {
76  if(block)
77  m_queue->send(&data, sizeof(data), 0);
78  else
79  send_ok = m_queue->try_send(&data, sizeof(data), 0);
80  }
81  catch(boost::interprocess::interprocess_exception&) {
82  send_ok = false;
83  }
84 
85  return send_ok;
86 }
87 
88 template<typename T> bool SharedQueue::try_send_basic(T data)
89 {
90  return do_send_basic<T>(data,false);
91 }
92 
93 template<typename T> bool SharedQueue::send_basic(T data)
94 {
95  return do_send_basic<T>(data,true);
96 }
97 
98 template<typename T> bool SharedQueue::do_receive_basic(T& data,bool block)
99 {
100  unsigned int priority = 0;
101  std::size_t recvd_size = 0;
102  bool receive_ok = true;
103 
104  try {
105  if(block) {
106  m_queue->receive(&data,sizeof(T),recvd_size,priority);
107  }
108  else {
109  receive_ok = m_queue->try_receive(&data,sizeof(T),recvd_size,priority);
110  }
111  }
112  catch(boost::interprocess::interprocess_exception&) {
113  return false; // TODO: perhaps add a printout or something
114  }
115 
116  return receive_ok;
117 }
118 
119 template<typename T> bool SharedQueue::try_receive_basic(T& data)
120 {
121  return do_receive_basic(data,false);
122 }
123 
124 template<typename T> bool SharedQueue::receive_basic(T& data)
125 {
126  return do_receive_basic(data,true);
127 }
128 
129 } // namespace AthenaInterprocess
130 
132 
133 #endif // !ATHENAINTERPROCESS_SHAREDQUEUE_H
AthenaInterprocess
Definition: FdsRegistry.h:11
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
AthenaInterprocess::SharedQueue::m_name
std::string * m_name
Definition: SharedQueue.h:67
AthenaInterprocess::SharedQueue::m_queue
boost::interprocess::message_queue * m_queue
Definition: SharedQueue.h:66
AthenaInterprocess::SharedQueue::m_count
int * m_count
Definition: SharedQueue.h:68
AthenaInterprocess::SharedQueue::name
std::string name() const
Definition: SharedQueue.cxx:78
AthenaInterprocess::SharedQueue::destroy
void destroy()
Definition: SharedQueue.cxx:67
AthenaInterprocess::SharedQueue::try_receive
virtual std::string try_receive()
Definition: SharedQueue.cxx:140
python.selector.AtlRunQuerySelectorLhcOlc.priority
priority
Definition: AtlRunQuerySelectorLhcOlc.py:611
AthenaInterprocess::SharedQueue::try_receive_basic
bool try_receive_basic(T &)
Definition: SharedQueue.h:119
AthenaInterprocess::SharedQueue::operator=
SharedQueue & operator=(const SharedQueue &other)
Definition: SharedQueue.cxx:37
AthenaInterprocess::SharedQueue::send
virtual bool send(const std::string &)
Definition: SharedQueue.cxx:107
AthenaInterprocess::SharedQueue
Definition: SharedQueue.h:21
AthenaInterprocess::SharedQueue::do_send_basic
bool do_send_basic(T, bool)
Definition: SharedQueue.h:71
AthenaInterprocess::SharedQueue::operator->
boost::interprocess::message_queue * operator->()
Definition: SharedQueue.h:54
AthenaInterprocess::SharedQueue::receive_basic
bool receive_basic(T &)
Definition: SharedQueue.h:124
AthenaInterprocess::SharedQueue::do_receive_basic
bool do_receive_basic(T &, bool)
Definition: SharedQueue.h:98
AthenaInterprocess::SharedQueue::try_send_basic
bool try_send_basic(T)
Definition: SharedQueue.h:88
AthenaInterprocess::SharedQueue::copy
void copy(const SharedQueue &other)
Definition: SharedQueue.cxx:53
AthenaInterprocess::SharedQueue::SharedQueue
SharedQueue()
Definition: SharedQueue.cxx:16
InDetDD::other
@ other
Definition: InDetDD_Defs.h:16
CLASS_DEF
#define CLASS_DEF(NAME, CID, VERSION)
associate a clid and a version to a type eg
Definition: Control/AthenaKernel/AthenaKernel/CLASS_DEF.h:64
AthenaInterprocess::SharedQueue::~SharedQueue
virtual ~SharedQueue()
Definition: SharedQueue.cxx:47
AthenaInterprocess::SharedQueue::try_send
virtual bool try_send(const std::string &)
Definition: SharedQueue.cxx:102
AthenaInterprocess::SharedQueue::send_basic
bool send_basic(T)
Definition: SharedQueue.h:93
xAOD::bool
setBGCode setTAP setLVL2ErrorBits bool
Definition: TrigDecision_v1.cxx:60
CLASS_DEF.h
macros to associate a CLID to a type
AthenaInterprocess::SharedQueue::receive
virtual std::string receive()
Definition: SharedQueue.cxx:145