ATLAS Offline Software
non_blocking_stream_reader.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
2 
3 import threading
4 import queue as Queue
5 
6 
8  """! Read an output stream without blocking.
9 
10  see http://eyalarubas.com/python-subproc-nonblock.html
11  @author James Robinson <james.robinson@cern.ch>
12  """
13 
14  def __init__(self, input_stream):
15  """! Set up input stream, output queue and process to transfer between them."""
16  self._stream = input_stream
17  self._queue = Queue.Queue()
18  self._running = True
19 
20  self._output_thread = threading.Thread(target=self.populate_queue, args=(self._stream, self._queue))
21  self._output_thread.daemon = True
22  self._output_thread.start() # start collecting lines from the stream
23 
24  def populate_queue(self, stream, queue):
25  """! Collect lines from stream and put them in queue."""
26  while self._running:
27  line = stream.readline()
28  if line:
29  queue.put(line)
30 
31  def readline(self, timeout=None):
32  """! Return lines from queue."""
33  try:
34  output = self._queue.get(block=(timeout is not None), timeout=timeout).rstrip()
35  return (output, self._queue.qsize())
36  except Queue.Empty:
37  return (None, 0)
38 
39  def finalise(self):
40  """! Release thread resources on finalise."""
41  self._stream.flush()
42  self._running = False
43  self._output_thread.join()
python.utility.non_blocking_stream_reader.NonBlockingStreamReader._queue
_queue
Definition: non_blocking_stream_reader.py:17
python.utility.non_blocking_stream_reader.NonBlockingStreamReader.__init__
def __init__(self, input_stream)
Set up input stream, output queue and process to transfer between them.
Definition: non_blocking_stream_reader.py:14
FullCPAlgorithmsTest_eljob.flush
flush
Definition: FullCPAlgorithmsTest_eljob.py:182
python.utility.non_blocking_stream_reader.NonBlockingStreamReader.finalise
def finalise(self)
Release thread resources on finalise.
Definition: non_blocking_stream_reader.py:39
mergePhysValFiles.start
start
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:14
python.utility.non_blocking_stream_reader.NonBlockingStreamReader.readline
def readline(self, timeout=None)
Return lines from queue.
Definition: non_blocking_stream_reader.py:31
python.utility.non_blocking_stream_reader.NonBlockingStreamReader._stream
_stream
Definition: non_blocking_stream_reader.py:16
python.utility.non_blocking_stream_reader.NonBlockingStreamReader.populate_queue
def populate_queue(self, stream, queue)
Collect lines from stream and put them in queue.
Definition: non_blocking_stream_reader.py:24
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.utility.non_blocking_stream_reader.NonBlockingStreamReader
Read an output stream without blocking.
Definition: non_blocking_stream_reader.py:7
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
pickleTool.object
object
Definition: pickleTool.py:30
python.utility.non_blocking_stream_reader.NonBlockingStreamReader._output_thread
_output_thread
Definition: non_blocking_stream_reader.py:20
python.utility.non_blocking_stream_reader.NonBlockingStreamReader._running
_running
Definition: non_blocking_stream_reader.py:18