ATLAS Offline Software
Functions | Variables
python.SgDumpLib Namespace Reference

Functions

def _make_jobo (job)
 
def _gen_jobo (dct)
 
def _run_jobo (job, msg, options)
 
def run_sg_dump (files, output, nevts=-1, skip=0, dump_jobo=False, pyalg_cls='PyDumper.PyComps:PySgDumper', include=' *', exclude='', do_clean_up=False, athena_opts=None, conditions_tag=None, full_log=False, msg=None)
 

Variables

string __doc__
 
string __author__ = "Sebastien Binet <binet@cern.ch>"
 
list __all__
 

Function Documentation

◆ _gen_jobo()

def python.SgDumpLib._gen_jobo (   dct)
private

Definition at line 29 of file SgDumpLib.py.

29 def _gen_jobo(dct):
30  import textwrap
31  job = textwrap.dedent("""\
32  #!/usr/bin/env athena.py
33  # automatically generated joboptions file
34 
35  from AthenaConfiguration.AllConfigFlags import initConfigFlags
36  from AthenaConfiguration.Enums import Format
37  from PyDumper.DumpConfig import DumpCfg
38 
39  flags = initConfigFlags()
40  flags.GeoModel.Align.Dynamic = False
41  flags.Input.Files = %(input-files)s
42  flags.Exec.MaxEvents = %(evts)s
43  flags.Exec.SkipEvents = %(skip)s
44 
45  if flags.Input.Format is Format.BS:
46  # BS files don't contain the conditions/geometry tags.
47  # Try to give some reasonable defaults here, depending on the run.
48  # These may still be overridden from the command line.
49  if flags.Input.DataYear < 2000:
50  pass
51  elif flags.Input.DataYear < 2015:
52  flags.GeoModel.AtlasVersion = 'ATLAS-R1-2012-03-00-00'
53  flags.IOVDb.GlobalTag = 'COMCOND-BLKPA-RUN1-09'
54  flags.Trigger.doxAODConversion = False
55  elif flags.Input.DataYear < 2020:
56  flags.GeoModel.AtlasVersion = 'ATLAS-R2-2016-01-00-01'
57  flags.IOVDb.GlobalTag = 'CONDBR2-BLKPA-RUN2-09'
58  else:
59  flags.GeoModel.AtlasVersion = 'ATLAS-R3S-2021-03-02-00'
60  flags.IOVDb.GlobalTag = 'CONDBR2-BLKPA-2023-03'
61  else:
62  if flags.GeoModel.AtlasVersion != 0:
63  if flags.GeoModel.AtlasVersion.find ('ATLAS-GEO-18') >= 0:
64  flags.GeoModel.AtlasVersion = 'ATLAS-R1-2012-03-00-00'
65 
66  flags.fillFromArgs()
67  flags.lock()
68 
69  from AthenaConfiguration.MainServicesConfig import MainServicesCfg
70  cfg = MainServicesCfg(flags)
71 
72  if flags.Input.Format is Format.BS:
73  from PyDumper.BSReadConfig import BSReadCfg
74  cfg.merge (BSReadCfg (flags))
75  else:
76  from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
77  cfg.merge(PoolReadCfg(flags))
78 
79  cfg.merge (DumpCfg (flags,
80  ofile='%(ofile-name)s',
81  items='%(include)s',
82  exclude='%(exclude)s'))
83 
84  sc = cfg.run (%(evts)s)
85  import sys
86  sys.exit (sc.isFailure())
87  """) % dct
88 
89  return job
90 

◆ _make_jobo()

def python.SgDumpLib._make_jobo (   job)
private

Definition at line 20 of file SgDumpLib.py.

20 def _make_jobo(job):
21  import tempfile
22  jobo = tempfile.NamedTemporaryFile(suffix='-jobo.py', mode='w+')
23  import textwrap
24  job = textwrap.dedent (job)
25  jobo.writelines([l+os.linesep for l in job.splitlines()])
26  jobo.flush()
27  return jobo
28 

◆ _run_jobo()

def python.SgDumpLib._run_jobo (   job,
  msg,
  options 
)
private

Definition at line 91 of file SgDumpLib.py.

91 def _run_jobo(job, msg, options):
92  import os,atexit,tempfile,shutil
93  # capture current directory's content
94  keep_files = [os.path.abspath(item)
95  for item in os.listdir(os.getcwd())]
96  keep_files.append (os.path.abspath(options.oname))
97  keep_files.append (os.path.abspath("%s.log"%options.oname))
98 
99  def _cleanup(keep_files):
100  errors = []
101  for item in os.listdir(os.getcwd()):
102  item = os.path.abspath(item)
103  if os.path.basename(item).startswith(('.__afs',
104  '.__nfs')):
105  # don't care about freakingly sticky metadata files
106  continue
107 
108  if item in keep_files:
109  continue
110  try:
111  if os.path.isfile (item): os.remove (item)
112  elif os.path.islink (item): os.unlink (item)
113  elif os.path.isdir (item): shutil.rmtree (item)
114  else:
115  msg.warning ("don't know what kind of stuff this is: %s",
116  item)
117  except Exception as err:
118  errors.append ("%s"%err)
119  pass
120  if len(errors)>0:
121  msg.error ("problem during workdir clean-up")
122  map (msg.error, errors)
123  else:
124  msg.debug ("workdir clean-up [ok]")
125  return
126 
127  if options.do_clean_up:
128  atexit.register (_cleanup, keep_files)
129 
130  import subprocess
131  sc,out = subprocess.getstatusoutput ('which athena.py')
132  if sc != 0:
133  msg.error("could not locate 'athena.py':\n%s", out)
134  return sc, out
135  app = out
136  jobo = _make_jobo(job)
137 
138  sc,out = subprocess.getstatusoutput ('which sh')
139  if sc != 0:
140  msg.error("could not locate 'sh':\n%s",out)
141  return sc, out
142  sh = out
143 
144  logfile = tempfile.NamedTemporaryFile(prefix='sg_dumper_job_',
145  suffix='.logfile.txt',
146  dir=os.getcwd(),
147  mode = 'w+')
148 
149  # do not require $HOME to be available for ROOT
150  # see bug #82096
151  # https://savannah.cern.ch/bugs/index.php?82096
152  env = dict(os.environ)
153  env['ROOTENV_NO_HOME'] = os.getenv('ROOTENV_NO_HOME', '1')
154 
155  out = []
156  athena_opts = []
157  if options.athena_opts:
158  import shlex
159  athena_opts = shlex.split(options.athena_opts)
160  cmd = [sh, app,] + athena_opts + ['--CA'] + [jobo.name,]
161  import subprocess as sub
162  app_handle = sub.Popen (args=cmd,
163  stdout=logfile,
164  stderr=logfile,
165  env=env)
166  pos = 0
167  import re
168  if options.full_log:
169  pat = re.compile ('.*')
170  else:
171  pat = re.compile (r'^Py:pyalg .*')
172  evt_pat = re.compile (
173  r'^Py:pyalg .*? ==> processing event \[(?P<evtnbr>\d*?)\].*'
174  )
175  def _monitor(pos):
176  logfile.flush()
177  _watcher = open (logfile.name, 'r'); _watcher.seek (0, 2) # end of file
178  end = _watcher.tell(); _watcher.seek (pos)
179  mon = [l for l in _watcher
180  if pat.match(l)]
181  _watcher.seek (end)
182  pos = _watcher.tell()
183 
184  for l in mon:
185  if l.count ('==> initialize...'):
186  msg.info ('athena initialized')
187  if evt_pat.match(l):
188  ievt = evt_pat.match(l).group('evtnbr')
189  out.append(ievt)
190  msg.info ('processed event [%s]', ievt)
191  if l.count ('==> finalize...'):
192  msg.info ('athena finalized')
193 
194  return pos
195 
196  import time
197  while app_handle.poll() is None:
198  pos = _monitor(pos)
199  time.sleep (5)
200  pass
201  _monitor(pos)
202 
203  jobo.close()
204  sc = app_handle.returncode
205  if sc != 0:
206  logfile.seek(0)
207  msg.error ('='*80)
208  from io import StringIO
209  err = StringIO()
210  for l in logfile:
211  print (l, end='')
212  print (l, end='', file=err)
213  msg.error ('='*80)
214  msg.error ('problem running jobo')
215  return sc, err.getvalue()
216 
217  logfile.seek (0)
218  from io import StringIO
219  out = StringIO()
220  for l in logfile:
221  if pat.match(l):
222  print (l, end='', file=out)
223  return sc, out.getvalue()
224 

◆ run_sg_dump()

def python.SgDumpLib.run_sg_dump (   files,
  output,
  nevts = -1,
  skip = 0,
  dump_jobo = False,
  pyalg_cls = 'PyDumper.PyComps:PySgDumper',
  include = '*',
  exclude = '',
  do_clean_up = False,
  athena_opts = None,
  conditions_tag = None,
  full_log = False,
  msg = None 
)
API for the sg-dump script.
 `files` a list of input filenames to be dumped by SgDump
 `output` the name of the output (ASCII) file
 `nevts`  the number of events to dump (default: -1 ie all)
 `skip`   the number of events to skip at the start (default: 0)
 `dump_jobo` switch to store or not the automatically generated jobo (put
             the name of the jobo output name in there if you want to keep
             it)
 `pyalg_cls` the fully qualified name of the PyAthena.Alg class to process the file(s) content (PySgDumper or DataProxyLoader)
 `include`: comma-separates list of type#key container names to dump.
 `exclude`: comma-separated list of glob patterns for keys/types to ignore.
 `do_clean_up` flag to enable the attempt at removing all the files sg-dump
               produces during the course of its execution
 `athena_opts` a space-separated list of athena command-line options (e.g '--perfmon --stdcmalloc --nprocs=-1')
 `conditions_tag` force a specific global conditions tag
 `msg`    a logging.Logger instance

 returns the exit code of the sub-athena process

Definition at line 225 of file SgDumpLib.py.

225 def run_sg_dump(files, output,
226  nevts=-1,
227  skip=0,
228  dump_jobo=False,
229  pyalg_cls='PyDumper.PyComps:PySgDumper', include='*',
230  exclude='',
231  do_clean_up=False,
232  athena_opts=None,
233  conditions_tag=None,
234  full_log=False,
235  msg=None):
236  """API for the sg-dump script.
237  `files` a list of input filenames to be dumped by SgDump
238  `output` the name of the output (ASCII) file
239  `nevts` the number of events to dump (default: -1 ie all)
240  `skip` the number of events to skip at the start (default: 0)
241  `dump_jobo` switch to store or not the automatically generated jobo (put
242  the name of the jobo output name in there if you want to keep
243  it)
244  `pyalg_cls` the fully qualified name of the PyAthena.Alg class to process the file(s) content (PySgDumper or DataProxyLoader)
245  `include`: comma-separates list of type#key container names to dump.
246  `exclude`: comma-separated list of glob patterns for keys/types to ignore.
247  `do_clean_up` flag to enable the attempt at removing all the files sg-dump
248  produces during the course of its execution
249  `athena_opts` a space-separated list of athena command-line options (e.g '--perfmon --stdcmalloc --nprocs=-1')
250  `conditions_tag` force a specific global conditions tag
251  `msg` a logging.Logger instance
252 
253  returns the exit code of the sub-athena process
254  """
255 
256  if msg is None:
257  import PyUtils.Logging as L
258  msg = L.logging.getLogger('sg-dumper')
259  msg.setLevel(L.logging.INFO)
260 
261  if isinstance(files, str):
262  files = files.split()
263 
264  if not isinstance(files, (list,tuple)):
265  err = "'files' needs to be a list (or tuple) of file names"
266  msg.error(err)
267  raise TypeError(err)
268 
269  if not isinstance(output, str):
270  err = "'output' needs to be a filename"
271  msg.error(err)
272  raise TypeError(err)
273 
274  _allowed_values = ('PyDumper.PyComps:PySgDumper',
275  'PyDumper.PyComps:DataProxyLoader')
276  if not (pyalg_cls in _allowed_values):
277  err = "'pyalg_cls' allowed values are: %s. got: [%s]" % (
278  _allowed_values,
279  pyalg_cls)
280  msg.error(err)
281  raise ValueError(err)
282  pyalg_pkg,pyalg_cls = pyalg_cls.split(':')
283 
284  conditions_tag_frag = ''
285  if conditions_tag:
286  conditions_tag_frag = "conddb.setGlobalTag('%s')" % conditions_tag
287  jobo = _gen_jobo({
288  'ofile-name' : output,
289  'input-files': files,
290  'evts' : nevts,
291  'skip' : skip,
292  'include' : include,
293  'exclude' : exclude,
294  'pyalg_pkg': pyalg_pkg,
295  'pyalg_cls': pyalg_cls,
296  'conditions_tag_frag' : conditions_tag_frag,
297  })
298 
299  msg.info(':'*40)
300  msg.info('input files: %s', files)
301  msg.info('events: %s', nevts)
302  msg.info('skip: %s', skip)
303  msg.info('out (ascii): %s', output)
304  msg.info('pyalg-class: %s:%s', pyalg_pkg, pyalg_cls)
305  msg.info('include: %s', include)
306  msg.info('exclude: %s', exclude)
307  msg.info('conditions_tag: %s', conditions_tag)
308 
309  if dump_jobo and isinstance(dump_jobo, str):
310  try:
311  with open(dump_jobo, 'w') as f:
312  f.write(jobo)
313  except Exception as err:
314  msg.warning('problem while dumping joboption file to [%s]:\n%s',
315  dump_jobo, err)
316 
317  from collections import namedtuple
318  Options = namedtuple('Options',
319  'oname do_clean_up athena_opts full_log')
320  opts = Options(oname=output,
321  do_clean_up=do_clean_up,
322  full_log=full_log,
323  athena_opts=athena_opts)
324 
325  sc, out = 1, "<N/A>"
326  msg.info('running dumper...')
327  sc,out = _run_jobo(jobo, msg, opts)
328  msg.info('dumper done')
329  if output != os.devnull:
330  msg.info('writing logfile: %s.log', output)
331  try:
332  with open('%s.log'%output, 'w') as f:
333  for l in out.splitlines():
334  print (l, file=f)
335  print ("### EOF ###", file=f)
336 
337  except Exception as err:
338  msg.warning('problem writing out logfile [%s.log]:\n%s',
339  output, err)
340 
341  msg.info('bye.')
342  msg.info(':'*40)
343  return sc, out
344 

Variable Documentation

◆ __all__

list python.SgDumpLib.__all__
private
Initial value:
1 = [
2  'run_sg_dump',
3  ]

Definition at line 16 of file SgDumpLib.py.

◆ __author__

string python.SgDumpLib.__author__ = "Sebastien Binet <binet@cern.ch>"
private

Definition at line 14 of file SgDumpLib.py.

◆ __doc__

string python.SgDumpLib.__doc__
private
Initial value:
1 = """\
2 API for the sg-dump script (which dumps an ASCII representation of events in
3 POOL or RAW files
4 """

Definition at line 10 of file SgDumpLib.py.

python.SgDumpLib._make_jobo
def _make_jobo(job)
Definition: SgDumpLib.py:20
python.SgDumpLib._run_jobo
def _run_jobo(job, msg, options)
Definition: SgDumpLib.py:91
python.SgDumpLib._gen_jobo
def _gen_jobo(dct)
Definition: SgDumpLib.py:29
checkNSWValTree.Options
Options
Definition: checkNSWValTree.py:15
Trk::open
@ open
Definition: BinningType.h:40
CaloLCW_tf.group
group
Definition: CaloLCW_tf.py:28
python.App._monitor
def _monitor(tag)
Definition: App.py:69
python.SgDumpLib.run_sg_dump
def run_sg_dump(files, output, nevts=-1, skip=0, dump_jobo=False, pyalg_cls='PyDumper.PyComps:PySgDumper', include=' *', exclude='', do_clean_up=False, athena_opts=None, conditions_tag=None, full_log=False, msg=None)
Definition: SgDumpLib.py:225