ATLAS Offline Software
messaging_listen.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
2 
3 import stomp
4 import socket
5 import logging
6 
7 MSGSERVER='atlas-mb.cern.ch'
8 MSGPORT=61013
9 
10 logging.basicConfig()
11 #logging.getLogger('stomp.py').setLevel(logging.DEBUG)
12 
14  def __init__(self, listener, dest='/topic/atlas.dqm.progress',
15  selector=None):
16  self.listener = listener
17  self.dest = dest
18  self.selector = selector
19 
20  def __enter__(self):
21  if stomp.__version__ >= (6,1,0):
22  return self.__enter61__()
23  else:
24  logging.critical("Unable to find stomp.py >= 6.1.0, can't proceed")
25  raise ValueError("Version of stomp.py is too old")
26 
27  def __enter61__(self):
28  serverlist=[_[4] for _ in socket.getaddrinfo(MSGSERVER, MSGPORT,
29  socket.AF_INET,
30  socket.SOCK_STREAM)]
31 
32  from . import stompconfig
33  self.conns = []
34  if hasattr(self.listener, 'conn'):
35  self.listener.conn=[]
36  for svr in serverlist:
37  auth = stompconfig.config()
38  cfg = {}
39  cfg['heartbeats'] = (0,0)
40  cfg['reconnect_attempts_max'] = 3
41  conn=stomp.Connection([svr], **cfg)
42  conn.set_listener('somename',self.listener)
43  if hasattr(self.listener, 'conn'):
44  self.listener.conn.append(conn)
45  conn.connect(wait=True, **auth)
46  hdr = {}
47  if self.selector is not None: hdr['selector'] = self.selector
48  if hasattr(self.listener, 'ack_mode'):
49  ack_mode=self.listener.ack_mode
50  else:
51  ack_mode='auto'
52  conn.subscribe(destination=self.dest, ack=ack_mode, headers = hdr, id=len(self.conns))
53  self.conns.append(conn)
54  return self
55 
56  def __exit__(self, eType, eValue, eTrace):
57  for conn in self.conns:
58  try:
59  conn.disconnect()
60  except Exception as e:
61  print('Exception closing connections:' + repr(e))
62  pass
63  return False
python.messaging_listen.ATLASDQMListener.selector
selector
Definition: messaging_listen.py:17
python.messaging_listen.ATLASDQMListener.__enter__
def __enter__(self)
Definition: messaging_listen.py:20
python.messaging_listen.ATLASDQMListener.__enter61__
def __enter61__(self)
Definition: messaging_listen.py:27
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.messaging_listen.ATLASDQMListener.dest
dest
Definition: messaging_listen.py:16
python.messaging_listen.ATLASDQMListener.listener
listener
Definition: messaging_listen.py:15
PyAthena::repr
std::string repr(PyObject *o)
returns the string representation of a python object equivalent of calling repr(o) in python
Definition: PyAthenaUtils.cxx:106
Muon::print
std::string print(const MuPatSegment &)
Definition: MuonTrackSteering.cxx:28
pickleTool.object
object
Definition: pickleTool.py:30
python.messaging_listen.ATLASDQMListener
Definition: messaging_listen.py:13
python.messaging_listen.ATLASDQMListener.__exit__
def __exit__(self, eType, eValue, eTrace)
Definition: messaging_listen.py:56
python.messaging_listen.ATLASDQMListener.__init__
def __init__(self, listener, dest='/topic/atlas.dqm.progress', selector=None)
Definition: messaging_listen.py:14
python.messaging_listen.ATLASDQMListener.conns
conns
Definition: messaging_listen.py:33