ATLAS Offline Software
Loading...
Searching...
No Matches
SharedQueue.h
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
3*/
4
5#ifndef ATHENAINTERPROCESS_SHAREDQUEUE_H
6#define ATHENAINTERPROCESS_SHAREDQUEUE_H
7
9
10// full include needed, as message_queue has changed into a
11// template for more recent versions of boost
12#include <boost/interprocess/ipc/message_queue.hpp>
13
14#include <string>
15
16namespace AthenaInterprocess {
17
18static const int SHAREDQUEUE_MAX_MSG = 1000;
19static const std::size_t MAX_MSG_SIZE = 256;
20
22public:
23
24public:
26 SharedQueue( const std::string& name,
27 int max_msg = SHAREDQUEUE_MAX_MSG,
28 std::size_t max_size = MAX_MSG_SIZE,
29 bool do_unlink = true );
30 SharedQueue( const SharedQueue& other );
31 SharedQueue& operator=( const SharedQueue& other );
32 virtual ~SharedQueue();
33
34public:
35 std::string name() const;
36
37 virtual bool try_send( const std::string& ); // non-blocking
38 virtual bool send( const std::string& );
39
40 virtual std::string try_receive(); // non-blocking
41 virtual std::string receive();
42
43 template<typename T> bool try_send_basic(T);
44 template<typename T> bool send_basic(T);
45
46 template<typename T> bool try_receive_basic(T&);
47 template<typename T> bool receive_basic(T&);
48
49 operator bool() const {
50 return (bool)m_queue;
51 }
52
53protected:
54 boost::interprocess::message_queue* operator->() {
55 return m_queue;
56 }
57
58private:
59 void copy( const SharedQueue& other );
60 void destroy();
61
62 template<typename T> bool do_send_basic(T,bool);
63 template<typename T> bool do_receive_basic(T&,bool);
64
65private:
66 boost::interprocess::message_queue* m_queue;
67 std::string* m_name;
68 int* m_count;
69};
70
71template<typename T> bool SharedQueue::do_send_basic(T data,bool block)
72{
73 bool send_ok = true;
74
75 try {
76 if(block)
77 m_queue->send(&data, sizeof(data), 0);
78 else
79 send_ok = m_queue->try_send(&data, sizeof(data), 0);
80 }
81 catch(boost::interprocess::interprocess_exception&) {
82 send_ok = false;
83 }
84
85 return send_ok;
86}
87
88template<typename T> bool SharedQueue::try_send_basic(T data)
89{
90 return do_send_basic<T>(data,false);
91}
92
93template<typename T> bool SharedQueue::send_basic(T data)
94{
95 return do_send_basic<T>(data,true);
96}
97
98template<typename T> bool SharedQueue::do_receive_basic(T& data,bool block)
99{
100 unsigned int priority = 0;
101 std::size_t recvd_size = 0;
102 bool receive_ok = true;
103
104 try {
105 if(block) {
106 m_queue->receive(&data,sizeof(T),recvd_size,priority);
107 }
108 else {
109 receive_ok = m_queue->try_receive(&data,sizeof(T),recvd_size,priority);
110 }
111 }
112 catch(boost::interprocess::interprocess_exception&) {
113 return false; // TODO: perhaps add a printout or something
114 }
115
116 return receive_ok;
117}
118
119template<typename T> bool SharedQueue::try_receive_basic(T& data)
120{
121 return do_receive_basic(data,false);
122}
123
124template<typename T> bool SharedQueue::receive_basic(T& data)
125{
126 return do_receive_basic(data,true);
127}
128
129} // namespace AthenaInterprocess
130
132
133#endif // !ATHENAINTERPROCESS_SHAREDQUEUE_H
macros to associate a CLID to a type
#define CLASS_DEF(NAME, CID, VERSION)
associate a clid and a version to a type eg
char data[hepevt_bytes_allocation_ATLAS]
Definition HepEvt.cxx:11
virtual std::string try_receive()
void copy(const SharedQueue &other)
virtual bool try_send(const std::string &)
SharedQueue & operator=(const SharedQueue &other)
virtual std::string receive()
boost::interprocess::message_queue * operator->()
Definition SharedQueue.h:54
virtual bool send(const std::string &)
boost::interprocess::message_queue * m_queue
Definition SharedQueue.h:66
static const std::size_t MAX_MSG_SIZE
Definition SharedQueue.h:19
static const int SHAREDQUEUE_MAX_MSG
Definition SharedQueue.h:18