ATLAS Offline Software
Loading...
Searching...
No Matches
trfDecorators.py
Go to the documentation of this file.
1# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
2
3
7
8import functools
9import os
10import sys
11import time
12
13import PyJobTransforms.trfUtils as trfUtils
14from PyJobTransforms.trfExitCodes import trfExit
15
16from PyJobTransforms.trfLogger import logging
17msg = logging.getLogger(__name__)
18
19
20
24def silent(func):
25 def silent_running(*args, **kwargs):
26 # Create some filehandles to save the stdout/err fds to
27 save_err = os.open('/dev/null', os.O_WRONLY)
28 save_out = os.open('/dev/null', os.O_WRONLY)
29 os.dup2(sys.stderr.fileno(), save_err)
30 os.dup2(sys.stdout.fileno(), save_out)
31
32 # Now open 'quiet' file handles and attach stdout/err
33 quiet_err = os.open('/dev/null', os.O_WRONLY)
34 quiet_out = os.open('/dev/null', os.O_WRONLY)
35 os.dup2(quiet_err, sys.stderr.fileno())
36 os.dup2(quiet_out, sys.stdout.fileno())
37
38 # Execute function
39 rc = func(*args, **kwargs)
40
41 sys.stderr.flush()
42 sys.stdout.flush()
43
44 # Restore fds
45 os.dup2(save_err, sys.stderr.fileno())
46 os.dup2(save_out, sys.stdout.fileno())
47
48 os.close (save_err)
49 os.close (save_out)
50 os.close (quiet_err)
51 os.close (quiet_out)
52
53 return rc
54 # Make the wrapper look like the wrapped function
55 functools.update_wrapper(silent_running, func)
56 return silent_running
57
58
59
61 def exception_wrapper(*args, **kwargs):
62 # Setup imports which the wrapper needs
63 import signal
64 import traceback
65
66 import PyJobTransforms.trfExceptions as trfExceptions
67
68 try:
69 return func(*args, **kwargs)
70
71 except KeyboardInterrupt:
72 msg.critical('Caught a keyboard interrupt - exiting at your request.')
73 trfUtils.infanticide(message=True)
74 sys.exit(128 + signal.SIGINT)
75
76 # This subclass is treated as a 'normal' exit condition
77 # but it should never happen in production as it's a transform definition error
79 msg.critical('Transform setup failed: {0}'.format(e.errMsg))
80 msg.critical('To help you debug here is the stack trace:')
81 msg.critical(traceback.format_exc(None))
82 msg.critical('(Early exit - no job report is produced)')
83 trfUtils.infanticide(message=True)
84 sys.exit(e.errCode)
85
87 msg.critical('Got a transform exception in the outer exception handler: {0!s}'.format(e))
88 msg.critical('Stack trace is...')
89 msg.critical(traceback.format_exc(None))
90 msg.critical('Job reports are likely to be missing or incomplete - sorry')
91 msg.critical('Please report this as a transforms bug!')
92 trfUtils.infanticide(message=True)
93 sys.exit(trfExit.nameToCode('TRF_UNEXPECTED_TRF_EXCEPTION'))
94
95 except Exception as e:
96 msg.critical('Got a general exception in the outer exception handler: {0!s}'.format(e))
97 msg.critical('Stack trace is...')
98 msg.critical(traceback.format_exc(None))
99 msg.critical('Job reports are likely to be missing or incomplete - sorry')
100 msg.critical('Please report this as a transforms bug!')
101 trfUtils.infanticide(message=True)
102 sys.exit(trfExit.nameToCode('TRF_UNEXPECTED_OTHER_EXCEPTION'))
103
104 functools.update_wrapper(exception_wrapper, func)
105 return exception_wrapper
106
107
108
112 import signal
113 import traceback
114
115 class SigUsr1(Exception):
116 pass
117
118 def sigHandler(signum, frame):
119 msg.info('Handling signal %d in sigHandler', signum)
120 raise SigUsr1
121
122 def signal_wrapper(*args, **kwargs):
123 signal.signal(signal.SIGUSR1, sigHandler)
124
125 try:
126 return func(*args, **kwargs)
127
128 except SigUsr1:
129 msg.error('Transform received SIGUSR1. Exiting now with stack trace...')
130 msg.error('(The important frame is usually the one before this trfDecorators module.)')
131 msg.error(traceback.format_exc(None))
132 trfUtils.infanticide(message=True)
133 sys.exit(128 + signal.SIGUSR1)
134
135 functools.update_wrapper(signal_wrapper, func)
136 return signal_wrapper
137
138
139
140def timelimited(timeout=None, retry=1, timefactor=1.5, sleeptime=10, defaultrc=None):
141
142 import traceback
143 import queue
144 import multiprocessing as mp
145
146 from sys import exc_info
147 from PyJobTransforms.trfExceptions import TransformTimeoutException, TransformInternalException
148
149 msg = logging.getLogger(__name__)
150
151 def internal(func):
152
153
158 def funcWithQueue(queue, *args, **kwargs):
159 try:
160 result = func(*args, **kwargs)
161 queue.put((True, result))
162 except Exception:
163 exc0=exc_info()[0]
164 exc1=exc_info()[1]
165 exc2=traceback.format_exc()
166 msg.warning('In time limited function %s an exception occurred', func.__name__)
167 msg.warning('Original traceback:')
168 msg.warning(exc2)
169 queue.put((False,(exc0, exc1, exc2)))
170
171 def funcWithTimeout(*args, **kwargs):
172 ltimeout=timeout
173 lretry=retry
174 ltimefactor=timefactor
175 lsleeptime=sleeptime
176 ldefaultrc=defaultrc
177
178 if 'timeout' in kwargs:
179 ltimeout=kwargs.pop('timeout')
180 if 'retry' in kwargs:
181 lretry=kwargs.pop('retry')
182 if 'timefactor' in kwargs:
183 ltimefactor=kwargs.pop('timefactor')
184 if 'sleeptime' in kwargs:
185 lsleeptime=kwargs.pop('sleeptime')
186 if 'defaultrc' in kwargs:
187 ldefaultrc=kwargs.pop('defaultrc')
188
189 if ltimeout is None:
190 # Run function normally with no timeout wrapper
191 msg.debug('Running {0}: {1} {2} without timeout'.format(func, args, kwargs))
192 return func(*args, **kwargs)
193
194 n=0
195 while n<=lretry:
196 msg.info('Try %i out of %i (time limit %s s) to call %s.', n+1, retry+1, ltimeout, func.__name__)
197 starttime = time.time()
198 q=mp.Queue(maxsize=1)
199 nargs = (q,) + args
200 proc=mp.Process(target=funcWithQueue, args=nargs, kwargs=kwargs)
201 proc.start()
202 try:
203 # Wait for function to run and return, but with a timeout
204 flag,result = q.get(block=True, timeout=ltimeout)
205 proc.join(60)
206 msg.info('Executed call within %d s.', time.time()-starttime)
207 if flag:
208 return result
209 else:
210 msg.warning('But an exception occurred in function %s.', func.__name__)
211 msg.warning('Returning default return code %s.', ldefaultrc)
212 return ldefaultrc
213 except queue.Empty:
214 # Our function did not run in time - kill increase timeout
215 msg.warning('Timeout limit of %d s reached. Kill subprocess and its children.', ltimeout)
216 parent=proc.pid
217 pids=[parent]
218 pids.extend(trfUtils.listChildren(parent=parent, listOrphans = False))
219 trfUtils.infanticide(pids)
220 proc.join(60) # Ensure cleanup
221 if n!=lretry:
222 msg.info('Going to sleep for %d s.', lsleeptime)
223 time.sleep(lsleeptime)
224 n+=1
225 ltimeout*=ltimefactor
226 lsleeptime*=ltimefactor
227 except IOError:
228 errMsg = "IOError while communicating with subprocess"
229 msg.error(errMsg)
230 raise TransformInternalException(trfExit.nameToCode("TRF_EXTERNAL"), errMsg)
231
232 msg.warning('All %i tries failed!', n)
233 raise TransformTimeoutException(trfExit.nameToCode('TRF_EXEC_TIMEOUT'), 'Timeout in function %s' % (func.__name__))
234
235 return funcWithTimeout
236
237 functools.update_wrapper(funcWithTimeout, func)
238
239 return internal
Base class for transform exceptions.
Module for transform exit codes.
Logging configuration for ATLAS job transforms.
Transform utility functions.
sigUsrStackTrace(func)
Decorator to dump a stack trace when hit by SIGUSR Note that this decorator has to go inside the stdT...
silent(func)
Redirect stdout/err to /dev/null Useful wrapper to get rid of ROOT verbosity... N....
timelimited(timeout=None, retry=1, timefactor=1.5, sleeptime=10, defaultrc=None)
stdTrfExceptionHandler(func)
Decorator to wrap a transform in outer try: ... except: ...