ATLAS Offline Software
process_handling.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
2 
3 from AthenaCommon import Logging
4 from .non_blocking_stream_reader import NonBlockingStreamReader
5 import subprocess
6 
7 
8 logger = Logging.logging.getLogger("PowhegControl")
9 
10 
12  """! Wrapper to handle multiple Powheg subprocesses.
13 
14  @author James Robinson <james.robinson@cern.ch>
15  """
16 
17  def __init__(self, process_list):
18  """! Constructor.
19 
20  @param process_list List of processes to manage.
21  """
22  self.__process_list = process_list
23  self.__n_initial = len(process_list)
24 
25  def monitor(self):
26  """! Monitor each of the managed processes and log when they are finished."""
27  for idx, process in enumerate(self.__process_list):
28  process.id_number = idx + 1
29  while len(self.__process_list) > 0:
30  for process in list(self.__process_list):
31  if not process.has_output():
32  _return_code = process.return_code
33  self.__process_list.remove(process)
34  if _return_code == 0:
35  logger.info("Finished process #{}: there are now {}/{} running".format(process.id_number, len(self.__process_list), self.__n_initial))
36  else:
37  logger.warning("Process #{} terminated unexpectedly (return code {}): there are now {}/{} running".format(process.id_number, _return_code, len(self.__process_list), self.__n_initial))
38 
39 
41  """! Single executable running in a subprocess (usually PowhegBox).
42 
43  @author James Robinson <james.robinson@cern.ch>
44  """
45 
46  log_level = {"stdout": "info", "stderr": "error"}
47  __output_prefix = " | "
48  __ignore_output = []
49 
50  def __init__(self, command_list, seed_index=None, stdin=None, ignore_output=None, warning_output=[], info_output=[], error_output=[]):
51  """! Constructor.
52 
53  Setup underlying process together with non-blocking readers for stdout and stderr.
54 
55  @param command_list Command that will be run (possibly with options).
56  @param seed_index Which seed from pwgseeds.dat to use.
57  @param stdin An open file handle providing input.
58  @param ignore_output List of strings to filter out from messages.
59  @param warning_output List of strings which would always trigger a warning only, even if produced in stderr.
60  @param info_output List of strings which would always trigger an info only, even if produced in stderr.
61  @param error_output List of strings which would always trigger an error, even if produced in stdout.
62  """
63  if not isinstance(command_list, list):
64  command_list = [command_list]
65  command_list = [str(x) for x in command_list]
66  # Set up messages to ignore
67  if ignore_output is not None:
68  self.__ignore_output = ignore_output
69  # Set up messages with special treatment
70  self.__warning_output = warning_output
71  self.__info_output = info_output
72  self.__error_output = error_output
73  # Usual case, where no open file handle is provided
74  if stdin is None:
75  self.__process = subprocess.Popen(command_list, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
76  # Write seed to stdin
77  if seed_index is not None:
78  self.__output_prefix += "Process #{}: ".format(seed_index)
79  self.__process.stdin.write(str(seed_index))
80  self.__process.stdin.close()
81  with open("pwgseeds.dat", "r") as seed_file:
82  random_seed_list = seed_file.read().splitlines()
83  self.log("Providing random seed: {}".format(random_seed_list[seed_index - 1]))
84  # Using an open file handle to provide input to stdin: remember to close this later
85  else:
86  self.__process = subprocess.Popen(command_list, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE, text=True)
87  # Setup non-blocking stream readers for stdout and stderr
90 
91 
92 
93  def has_output(self):
94  """! Write queued output and return process status."""
95  status = self.is_running()
96  self.write_queued_output()
97  return status
98 
99  def is_running(self):
100  """! Check if the underlying process is running and finalise stream readers if not."""
101  if self.__process.poll() is not None: # process has ended
102  for nbsr in ("stdout", "stderr"):
103  getattr(self, nbsr).finalise()
104  return False
105  return True
106 
107  def log(self, message, log_level="info"):
108  """! Write to the logger with appropriate log-level.
109 
110  @param message The message to pass to the logger.
111  @param log_level Which level to log at.
112  """
113  for word in self.__ignore_output:
114  while word in message:
115  message = message.replace(word, "")
116  getattr(logger, log_level)("{}{}".format(self.__output_prefix, message.strip()))
117 
119  """! Pass queued output to the logger."""
120  for stream in ["stdout", "stderr"]:
121  while True:
122  output, queue_size = getattr(self, stream).readline(timeout=0.1)
123  if output is not None and any([(pattern in output) for pattern in self.__error_output]):
124  self.log(output, "error")
125  elif output is not None and any([(pattern in output) for pattern in self.__warning_output]):
126  self.log(output, "warning")
127  elif output is not None and any([(pattern in output) for pattern in self.__info_output]):
128  self.log(output, "info")
129  elif not (output is None or len(output) == 0):
130  self.log(output, self.log_level[stream])
131  if queue_size == 0:
132  break
133 
134  @property
135  def return_code(self):
136  """! Return code of underlying process."""
137  return self.__process.returncode
138 
139  @property
140  def stdout(self):
141  """! stdout stream from underlying process."""
142  return self.__stdout
143 
144  @property
145  def stderr(self):
146  """! stderr stream from underlying process."""
147  return self.__stderr
vtune_athena.format
format
Definition: vtune_athena.py:14
python.utility.process_handling.SingleProcessThread.stdout
def stdout(self)
stdout stream from underlying process.
Definition: process_handling.py:140
python.utility.process_handling.ProcessManager.monitor
def monitor(self)
Monitor each of the managed processes and log when they are finished.
Definition: process_handling.py:25
python.utility.process_handling.SingleProcessThread.__stderr
__stderr
Definition: process_handling.py:89
python.utility.process_handling.SingleProcessThread.log_level
dictionary log_level
Definition: process_handling.py:46
PixelModuleFeMask_create_db.remove
string remove
Definition: PixelModuleFeMask_create_db.py:83
python.utility.process_handling.SingleProcessThread.has_output
def has_output(self)
Write queued output and return process status.
Definition: process_handling.py:93
python.utility.process_handling.SingleProcessThread.is_running
def is_running(self)
Check if the underlying process is running and finalise stream readers if not.
Definition: process_handling.py:99
python.utility.process_handling.ProcessManager.__process_list
__process_list
Definition: process_handling.py:22
python.utility.process_handling.SingleProcessThread.__info_output
__info_output
Definition: process_handling.py:71
python.utility.process_handling.ProcessManager.__init__
def __init__(self, process_list)
Constructor.
Definition: process_handling.py:17
python.utility.process_handling.SingleProcessThread.write_queued_output
def write_queued_output(self)
Pass queued output to the logger.
Definition: process_handling.py:118
python.utility.process_handling.SingleProcessThread
Single executable running in a subprocess (usually PowhegBox).
Definition: process_handling.py:40
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.utility.process_handling.ProcessManager.__n_initial
__n_initial
Definition: process_handling.py:23
python.utility.process_handling.ProcessManager
Wrapper to handle multiple Powheg subprocesses.
Definition: process_handling.py:11
python.utility.process_handling.SingleProcessThread.__output_prefix
string __output_prefix
Definition: process_handling.py:47
python.utility.non_blocking_stream_reader.NonBlockingStreamReader
Read an output stream without blocking.
Definition: non_blocking_stream_reader.py:7
python.utility.process_handling.SingleProcessThread.__ignore_output
list __ignore_output
Definition: process_handling.py:48
python.utility.process_handling.SingleProcessThread.__warning_output
__warning_output
Definition: process_handling.py:70
python.utility.process_handling.SingleProcessThread.__stdout
__stdout
Definition: process_handling.py:88
Trk::open
@ open
Definition: BinningType.h:40
python.utility.process_handling.SingleProcessThread.__error_output
__error_output
Definition: process_handling.py:72
python.utility.process_handling.SingleProcessThread.__init__
def __init__(self, command_list, seed_index=None, stdin=None, ignore_output=None, warning_output=[], info_output=[], error_output=[])
Constructor.
Definition: process_handling.py:50
python.utility.process_handling.SingleProcessThread.return_code
def return_code(self)
Return code of underlying process.
Definition: process_handling.py:135
python.utility.process_handling.SingleProcessThread.__process
__process
Definition: process_handling.py:75
pickleTool.object
object
Definition: pickleTool.py:30
str
Definition: BTagTrackIpAccessor.cxx:11
python.utility.process_handling.SingleProcessThread.log
def log(self, message, log_level="info")
Write to the logger with appropriate log-level.
Definition: process_handling.py:107
python.utility.process_handling.SingleProcessThread.stderr
def stderr(self)
stderr stream from underlying process.
Definition: process_handling.py:145