ATLAS Offline Software
trfDecorators.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
2 
3 
7 
8 import functools
9 import os
10 import sys
11 import time
12 
13 import PyJobTransforms.trfUtils as trfUtils
14 from PyJobTransforms.trfExitCodes import trfExit
15 
16 from PyJobTransforms.trfLogger import logging
17 msg = logging.getLogger(__name__)
18 
19 
20 
24 def silent(func):
25  def silent_running(*args, **kwargs):
26  # Create some filehandles to save the stdout/err fds to
27  save_err = os.open('/dev/null', os.O_WRONLY)
28  save_out = os.open('/dev/null', os.O_WRONLY)
29  os.dup2(sys.stderr.fileno(), save_err)
30  os.dup2(sys.stdout.fileno(), save_out)
31 
32  # Now open 'quiet' file handles and attach stdout/err
33  quiet_err = os.open('/dev/null', os.O_WRONLY)
34  quiet_out = os.open('/dev/null', os.O_WRONLY)
35  os.dup2(quiet_err, sys.stderr.fileno())
36  os.dup2(quiet_out, sys.stdout.fileno())
37 
38  # Execute function
39  rc = func(*args, **kwargs)
40 
41  sys.stderr.flush()
42  sys.stdout.flush()
43 
44  # Restore fds
45  os.dup2(save_err, sys.stderr.fileno())
46  os.dup2(save_out, sys.stdout.fileno())
47 
48  os.close (save_err)
49  os.close (save_out)
50  os.close (quiet_err)
51  os.close (quiet_out)
52 
53  return rc
54  # Make the wrapper look like the wrapped function
55  functools.update_wrapper(silent_running, func)
56  return silent_running
57 
58 
59 
61  def exception_wrapper(*args, **kwargs):
62  # Setup imports which the wrapper needs
63  import signal
64  import traceback
65 
66  import PyJobTransforms.trfExceptions as trfExceptions
67 
68  try:
69  return func(*args, **kwargs)
70 
71  except KeyboardInterrupt:
72  msg.critical('Caught a keyboard interrupt - exiting at your request.')
73  trfUtils.infanticide(message=True)
74  sys.exit(128 + signal.SIGINT)
75 
76  # This subclass is treated as a 'normal' exit condition
77  # but it should never happen in production as it's a transform definition error
79  msg.critical('Transform setup failed: {0}'.format(e.errMsg))
80  msg.critical('To help you debug here is the stack trace:')
81  msg.critical(traceback.format_exc(None))
82  msg.critical('(Early exit - no job report is produced)')
83  trfUtils.infanticide(message=True)
84  sys.exit(e.errCode)
85 
87  msg.critical('Got a transform exception in the outer exception handler: {0!s}'.format(e))
88  msg.critical('Stack trace is...')
89  msg.critical(traceback.format_exc(None))
90  msg.critical('Job reports are likely to be missing or incomplete - sorry')
91  msg.critical('Please report this as a transforms bug!')
92  trfUtils.infanticide(message=True)
93  sys.exit(trfExit.nameToCode('TRF_UNEXPECTED_TRF_EXCEPTION'))
94 
95  except Exception as e:
96  msg.critical('Got a general exception in the outer exception handler: {0!s}'.format(e))
97  msg.critical('Stack trace is...')
98  msg.critical(traceback.format_exc(None))
99  msg.critical('Job reports are likely to be missing or incomplete - sorry')
100  msg.critical('Please report this as a transforms bug!')
101  trfUtils.infanticide(message=True)
102  sys.exit(trfExit.nameToCode('TRF_UNEXPECTED_OTHER_EXCEPTION'))
103 
104  functools.update_wrapper(exception_wrapper, func)
105  return exception_wrapper
106 
107 
108 
112  import signal
113  import traceback
114 
115  class SigUsr1(Exception):
116  pass
117 
118  def sigHandler(signum, frame):
119  msg.info('Handling signal %d in sigHandler', signum)
120  raise SigUsr1
121 
122  def signal_wrapper(*args, **kwargs):
123  signal.signal(signal.SIGUSR1, sigHandler)
124 
125  try:
126  return func(*args, **kwargs)
127 
128  except SigUsr1:
129  msg.error('Transform received SIGUSR1. Exiting now with stack trace...')
130  msg.error('(The important frame is usually the one before this trfDecorators module.)')
131  msg.error(traceback.format_exc(None))
132  trfUtils.infanticide(message=True)
133  sys.exit(128 + signal.SIGUSR1)
134 
135  functools.update_wrapper(signal_wrapper, func)
136  return signal_wrapper
137 
138 
139 
140 def timelimited(timeout=None, retry=1, timefactor=1.5, sleeptime=10, defaultrc=None):
141 
142  import traceback
143  import queue
144  import multiprocessing as mp
145 
146  from sys import exc_info
147  from PyJobTransforms.trfExceptions import TransformTimeoutException, TransformInternalException
148 
149  msg = logging.getLogger(__name__)
150 
151  def internal(func):
152 
153 
158  def funcWithQueue(queue, *args, **kwargs):
159  try:
160  result = func(*args, **kwargs)
161  queue.put((True, result))
162  except Exception:
163  exc0=exc_info()[0]
164  exc1=exc_info()[1]
165  exc2=traceback.format_exc()
166  msg.warning('In time limited function %s an exception occurred', func.__name__)
167  msg.warning('Original traceback:')
168  msg.warning(exc2)
169  queue.put((False,(exc0, exc1, exc2)))
170 
171  def funcWithTimeout(*args, **kwargs):
172  ltimeout=timeout
173  lretry=retry
174  ltimefactor=timefactor
175  lsleeptime=sleeptime
176  ldefaultrc=defaultrc
177 
178  if 'timeout' in kwargs:
179  ltimeout=kwargs.pop('timeout')
180  if 'retry' in kwargs:
181  lretry=kwargs.pop('retry')
182  if 'timefactor' in kwargs:
183  ltimefactor=kwargs.pop('timefactor')
184  if 'sleeptime' in kwargs:
185  lsleeptime=kwargs.pop('sleeptime')
186  if 'defaultrc' in kwargs:
187  ldefaultrc=kwargs.pop('defaultrc')
188 
189  if ltimeout is None:
190  # Run function normally with no timeout wrapper
191  msg.debug('Running {0}: {1} {2} without timeout'.format(func, args, kwargs))
192  return func(*args, **kwargs)
193 
194  n=0
195  while n<=lretry:
196  msg.info('Try %i out of %i (time limit %s s) to call %s.', n+1, retry+1, ltimeout, func.__name__)
197  starttime = time.time()
198  q=mp.Queue(maxsize=1)
199  nargs = (q,) + args
200  proc=mp.Process(target=funcWithQueue, args=nargs, kwargs=kwargs)
201  proc.start()
202  try:
203  # Wait for function to run and return, but with a timeout
204  flag,result = q.get(block=True, timeout=ltimeout)
205  proc.join(60)
206  msg.info('Executed call within %d s.', time.time()-starttime)
207  if flag:
208  return result
209  else:
210  msg.warning('But an exception occurred in function %s.', func.__name__)
211  msg.warning('Returning default return code %s.', ldefaultrc)
212  return ldefaultrc
213  except queue.Empty:
214  # Our function did not run in time - kill increase timeout
215  msg.warning('Timeout limit of %d s reached. Kill subprocess and its children.', ltimeout)
216  parent=proc.pid
217  pids=[parent]
218  pids.extend(trfUtils.listChildren(parent=parent, listOrphans = False))
219  trfUtils.infanticide(pids)
220  proc.join(60) # Ensure cleanup
221  if n!=lretry:
222  msg.info('Going to sleep for %d s.', lsleeptime)
223  time.sleep(lsleeptime)
224  n+=1
225  ltimeout*=ltimefactor
226  lsleeptime*=ltimefactor
227  except IOError:
228  errMsg = "IOError while communicating with subprocess"
229  msg.error(errMsg)
230  raise TransformInternalException(trfExit.nameToCode("TRF_EXTERNAL"), errMsg)
231 
232  msg.warning('All %i tries failed!', n)
233  raise TransformTimeoutException(trfExit.nameToCode('TRF_EXEC_TIMEOUT'), 'Timeout in function %s' % (func.__name__))
234 
235  return funcWithTimeout
236 
237  functools.update_wrapper(funcWithTimeout, func)
238 
239  return internal
python.trfDecorators.timelimited
def timelimited(timeout=None, retry=1, timefactor=1.5, sleeptime=10, defaultrc=None)
Definition: trfDecorators.py:140
python.trfExceptions.TransformSetupException
Setup exceptions.
Definition: trfExceptions.py:42
vtune_athena.format
format
Definition: vtune_athena.py:14
python.trfDecorators.sigUsrStackTrace
def sigUsrStackTrace(func)
Decorator to dump a stack trace when hit by SIGUSR Note that this decorator has to go inside the stdT...
Definition: trfDecorators.py:111
python.trfDecorators.stdTrfExceptionHandler
def stdTrfExceptionHandler(func)
Decorator to wrap a transform in outer try: ...
Definition: trfDecorators.py:60
PyJobTransforms.trfExitCodes
Module for transform exit codes.
Execution.exc_info
exc_info
Definition: Execution.py:14
python.trfExceptions.TransformException
Base class for transform exceptions.
Definition: trfExceptions.py:14
python.trfDecorators.silent
def silent(func)
Redirect stdout/err to /dev/null Useful wrapper to get rid of ROOT verbosity...
Definition: trfDecorators.py:24
PyJobTransforms.trfUtils
Transform utility functions.
PyJobTransforms.trfLogger
Logging configuration for ATLAS job transforms.