17 msg = logging.getLogger(__name__)
25 def silent_running(*args, **kwargs):
27 save_err = os.open(
'/dev/null', os.O_WRONLY)
28 save_out = os.open(
'/dev/null', os.O_WRONLY)
29 os.dup2(sys.stderr.fileno(), save_err)
30 os.dup2(sys.stdout.fileno(), save_out)
33 quiet_err = os.open(
'/dev/null', os.O_WRONLY)
34 quiet_out = os.open(
'/dev/null', os.O_WRONLY)
35 os.dup2(quiet_err, sys.stderr.fileno())
36 os.dup2(quiet_out, sys.stdout.fileno())
39 rc = func(*args, **kwargs)
45 os.dup2(save_err, sys.stderr.fileno())
46 os.dup2(save_out, sys.stdout.fileno())
55 functools.update_wrapper(silent_running, func)
61 def exception_wrapper(*args, **kwargs):
66 import PyJobTransforms.trfExceptions
as trfExceptions
69 return func(*args, **kwargs)
71 except KeyboardInterrupt:
72 msg.critical(
'Caught a keyboard interrupt - exiting at your request.')
73 trfUtils.infanticide(message=
True)
74 sys.exit(128 + signal.SIGINT)
79 msg.critical(
'Transform setup failed: {0}'.
format(e.errMsg))
80 msg.critical(
'To help you debug here is the stack trace:')
81 msg.critical(traceback.format_exc(
None))
82 msg.critical(
'(Early exit - no job report is produced)')
83 trfUtils.infanticide(message=
True)
87 msg.critical(
'Got a transform exception in the outer exception handler: {0!s}'.
format(e))
88 msg.critical(
'Stack trace is...')
89 msg.critical(traceback.format_exc(
None))
90 msg.critical(
'Job reports are likely to be missing or incomplete - sorry')
91 msg.critical(
'Please report this as a transforms bug!')
92 trfUtils.infanticide(message=
True)
93 sys.exit(trfExit.nameToCode(
'TRF_UNEXPECTED_TRF_EXCEPTION'))
95 except Exception
as e:
96 msg.critical(
'Got a general exception in the outer exception handler: {0!s}'.
format(e))
97 msg.critical(
'Stack trace is...')
98 msg.critical(traceback.format_exc(
None))
99 msg.critical(
'Job reports are likely to be missing or incomplete - sorry')
100 msg.critical(
'Please report this as a transforms bug!')
101 trfUtils.infanticide(message=
True)
102 sys.exit(trfExit.nameToCode(
'TRF_UNEXPECTED_OTHER_EXCEPTION'))
104 functools.update_wrapper(exception_wrapper, func)
105 return exception_wrapper
115 class SigUsr1(Exception):
118 def sigHandler(signum, frame):
119 msg.info(
'Handling signal %d in sigHandler', signum)
122 def signal_wrapper(*args, **kwargs):
123 signal.signal(signal.SIGUSR1, sigHandler)
126 return func(*args, **kwargs)
129 msg.error(
'Transform received SIGUSR1. Exiting now with stack trace...')
130 msg.error(
'(The important frame is usually the one before this trfDecorators module.)')
131 msg.error(traceback.format_exc(
None))
132 trfUtils.infanticide(message=
True)
133 sys.exit(128 + signal.SIGUSR1)
135 functools.update_wrapper(signal_wrapper, func)
136 return signal_wrapper
140 def timelimited(timeout=None, retry=1, timefactor=1.5, sleeptime=10, defaultrc=None):
144 import multiprocessing
as mp
146 from sys
import exc_info
147 from PyJobTransforms.trfExceptions
import TransformTimeoutException, TransformInternalException
149 msg = logging.getLogger(__name__)
158 def funcWithQueue(queue, *args, **kwargs):
160 result = func(*args, **kwargs)
161 queue.put((
True, result))
165 exc2=traceback.format_exc()
166 msg.warning(
'In time limited function %s an exception occurred', func.__name__)
167 msg.warning(
'Original traceback:')
169 queue.put((
False,(exc0, exc1, exc2)))
171 def funcWithTimeout(*args, **kwargs):
174 ltimefactor=timefactor
178 if 'timeout' in kwargs:
179 ltimeout=kwargs.pop(
'timeout')
180 if 'retry' in kwargs:
181 lretry=kwargs.pop(
'retry')
182 if 'timefactor' in kwargs:
183 ltimefactor=kwargs.pop(
'timefactor')
184 if 'sleeptime' in kwargs:
185 lsleeptime=kwargs.pop(
'sleeptime')
186 if 'defaultrc' in kwargs:
187 ldefaultrc=kwargs.pop(
'defaultrc')
191 msg.debug(
'Running {0}: {1} {2} without timeout'.
format(func, args, kwargs))
192 return func(*args, **kwargs)
196 msg.info(
'Try %i out of %i (time limit %s s) to call %s.', n+1, retry+1, ltimeout, func.__name__)
197 starttime = time.time()
198 q=mp.Queue(maxsize=1)
200 proc=mp.Process(target=funcWithQueue, args=nargs, kwargs=kwargs)
204 flag,result = q.get(block=
True, timeout=ltimeout)
206 msg.info(
'Executed call within %d s.', time.time()-starttime)
210 msg.warning(
'But an exception occurred in function %s.', func.__name__)
211 msg.warning(
'Returning default return code %s.', ldefaultrc)
215 msg.warning(
'Timeout limit of %d s reached. Kill subprocess and its children.', ltimeout)
218 pids.extend(trfUtils.listChildren(parent=parent, listOrphans =
False))
219 trfUtils.infanticide(pids)
222 msg.info(
'Going to sleep for %d s.', lsleeptime)
223 time.sleep(lsleeptime)
225 ltimeout*=ltimefactor
226 lsleeptime*=ltimefactor
228 errMsg =
"IOError while communicating with subprocess"
230 raise TransformInternalException(trfExit.nameToCode(
"TRF_EXTERNAL"), errMsg)
232 msg.warning(
'All %i tries failed!', n)
233 raise TransformTimeoutException(trfExit.nameToCode(
'TRF_EXEC_TIMEOUT'),
'Timeout in function %s' % (func.__name__))
235 return funcWithTimeout
237 functools.update_wrapper(funcWithTimeout, func)