ATLAS Offline Software
Loading...
Searching...
No Matches
process_handling.py
Go to the documentation of this file.
1# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
2
3from AthenaCommon import Logging
4from .non_blocking_stream_reader import NonBlockingStreamReader
5import subprocess
6
7
8logger = Logging.logging.getLogger("PowhegControl")
9
10
12 """! Wrapper to handle multiple Powheg subprocesses.
13
14 @author James Robinson <james.robinson@cern.ch>
15 """
16
17 def __init__(self, process_list):
18 """! Constructor.
19
20 @param process_list List of processes to manage.
21 """
22 self.__process_list = process_list
23 self.__n_initial = len(process_list)
24
25 def monitor(self):
26 """! Monitor each of the managed processes and log when they are finished."""
27 for idx, process in enumerate(self.__process_list):
28 process.id_number = idx + 1
29 while len(self.__process_list) > 0:
30 for process in list(self.__process_list):
31 if not process.has_output():
32 _return_code = process.return_code
33 self.__process_list.remove(process)
34 if _return_code == 0:
35 logger.info("Finished process #{}: there are now {}/{} running".format(process.id_number, len(self.__process_list), self.__n_initial))
36 else:
37 logger.warning("Process #{} terminated unexpectedly (return code {}): there are now {}/{} running".format(process.id_number, _return_code, len(self.__process_list), self.__n_initial))
38
39
41 """! Single executable running in a subprocess (usually PowhegBox).
42
43 @author James Robinson <james.robinson@cern.ch>
44 """
45
46 log_level = {"stdout": "info", "stderr": "error"}
47 __output_prefix = " | "
48 __ignore_output = []
49
50 def __init__(self, command_list, seed_index=None, stdin=None, ignore_output=None, warning_output=[], info_output=[], error_output=[]):
51 """! Constructor.
52
53 Setup underlying process together with non-blocking readers for stdout and stderr.
54
55 @param command_list Command that will be run (possibly with options).
56 @param seed_index Which seed from pwgseeds.dat to use.
57 @param stdin An open file handle providing input.
58 @param ignore_output List of strings to filter out from messages.
59 @param warning_output List of strings which would always trigger a warning only, even if produced in stderr.
60 @param info_output List of strings which would always trigger an info only, even if produced in stderr.
61 @param error_output List of strings which would always trigger an error, even if produced in stdout.
62 """
63 if not isinstance(command_list, list):
64 command_list = [command_list]
65 command_list = [str(x) for x in command_list]
66 # Set up messages to ignore
67 if ignore_output is not None:
68 self.__ignore_output = ignore_output
69 # Set up messages with special treatment
70 self.__warning_output = warning_output
71 self.__info_output = info_output
72 self.__error_output = error_output
73 # Usual case, where no open file handle is provided
74 if stdin is None:
75 self.__process = subprocess.Popen(command_list, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
76 # Write seed to stdin
77 if seed_index is not None:
78 self.__output_prefix += "Process #{}: ".format(seed_index)
79 self.__process.stdin.write(str(seed_index))
80 self.__process.stdin.close()
81 with open("pwgseeds.dat", "r") as seed_file:
82 random_seed_list = seed_file.read().splitlines()
83 self.log("Providing random seed: {}".format(random_seed_list[seed_index - 1]))
84 # Using an open file handle to provide input to stdin: remember to close this later
85 else:
86 self.__process = subprocess.Popen(command_list, stdout=subprocess.PIPE, stdin=stdin, stderr=subprocess.PIPE, text=True)
87 # Setup non-blocking stream readers for stdout and stderr
90
91
92
93 def has_output(self):
94 """! Write queued output and return process status."""
95 status = self.is_running()
97 return status
98
99 def is_running(self):
100 """! Check if the underlying process is running and finalise stream readers if not."""
101 if self.__process.poll() is not None: # process has ended
102 for nbsr in ("stdout", "stderr"):
103 getattr(self, nbsr).finalise()
104 return False
105 return True
106
107 def log(self, message, log_level="info"):
108 """! Write to the logger with appropriate log-level.
109
110 @param message The message to pass to the logger.
111 @param log_level Which level to log at.
112 """
113 for word in self.__ignore_output:
114 while word in message:
115 message = message.replace(word, "")
116 getattr(logger, log_level)("{}{}".format(self.__output_prefix, message.strip()))
117
119 """! Pass queued output to the logger."""
120 for stream in ["stdout", "stderr"]:
121 while True:
122 output, queue_size = getattr(self, stream).readline(timeout=0.1)
123 if output is not None and any([(pattern in output) for pattern in self.__error_output]):
124 self.log(output, "error")
125 elif output is not None and any([(pattern in output) for pattern in self.__warning_output]):
126 self.log(output, "warning")
127 elif output is not None and any([(pattern in output) for pattern in self.__info_output]):
128 self.log(output, "info")
129 elif not (output is None or len(output) == 0):
130 self.log(output, self.log_level[stream])
131 if queue_size == 0:
132 break
133
134 @property
135 def return_code(self):
136 """! Return code of underlying process."""
137 return self.__process.returncode
138
139 @property
140 def stdout(self):
141 """! stdout stream from underlying process."""
142 return self.__stdout
143
144 @property
145 def stderr(self):
146 """! stderr stream from underlying process."""
147 return self.__stderr
Wrapper to handle multiple Powheg subprocesses.
monitor(self)
Monitor each of the managed processes and log when they are finished.
Single executable running in a subprocess (usually PowhegBox).
__init__(self, command_list, seed_index=None, stdin=None, ignore_output=None, warning_output=[], info_output=[], error_output=[])
Constructor.
log(self, message, log_level="info")
Write to the logger with appropriate log-level.
stdout(self)
stdout stream from underlying process.
has_output(self)
Write queued output and return process status.
return_code(self)
Return code of underlying process.
write_queued_output(self)
Pass queued output to the logger.
stderr(self)
stderr stream from underlying process.
is_running(self)
Check if the underlying process is running and finalise stream readers if not.