ATLAS Offline Software
SgDumpLib.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 # @file PyDumper.SgDumpLib
4 # @purpose API for the sg-dump script
5 # @author Sebastien Binet <binet@cern.ch>
6 # @date August 2009
7 
8 import os
9 
10 __doc__ = """\
11 API for the sg-dump script (which dumps an ASCII representation of events in
12 POOL or RAW files
13 """
14 __author__ = "Sebastien Binet <binet@cern.ch>"
15 
16 __all__ = [
17  'run_sg_dump',
18  ]
19 
20 def _make_jobo(job):
21  import tempfile
22  jobo = tempfile.NamedTemporaryFile(suffix='-jobo.py', mode='w+')
23  import textwrap
24  job = textwrap.dedent (job)
25  jobo.writelines([l+os.linesep for l in job.splitlines()])
26  jobo.flush()
27  return jobo
28 
29 def _gen_jobo(dct):
30  import textwrap
31  job = textwrap.dedent("""\
32  #!/usr/bin/env athena.py
33  # automatically generated joboptions file
34 
35  from AthenaConfiguration.AllConfigFlags import initConfigFlags
36  from AthenaConfiguration.Enums import Format
37  from PyDumper.DumpConfig import DumpCfg
38 
39  flags = initConfigFlags()
40  flags.GeoModel.Align.Dynamic = False
41  flags.Input.Files = %(input-files)s
42  flags.Exec.MaxEvents = %(evts)s
43  flags.Exec.SkipEvents = %(skip)s
44 
45  if flags.Input.Format is Format.BS:
46  # BS files don't contain the conditions/geometry tags.
47  # Try to give some reasonable defaults here, depending on the run.
48  # These may still be overridden from the command line.
49  if flags.Input.DataYear < 2000:
50  pass
51  elif flags.Input.DataYear < 2015:
52  flags.GeoModel.AtlasVersion = 'ATLAS-R1-2012-03-00-00'
53  flags.IOVDb.GlobalTag = 'COMCOND-BLKPA-RUN1-09'
54  flags.Trigger.doxAODConversion = False
55  elif flags.Input.DataYear < 2020:
56  flags.GeoModel.AtlasVersion = 'ATLAS-R2-2016-01-00-01'
57  flags.IOVDb.GlobalTag = 'CONDBR2-BLKPA-RUN2-09'
58  else:
59  flags.GeoModel.AtlasVersion = 'ATLAS-R3S-2021-03-02-00'
60  flags.IOVDb.GlobalTag = 'CONDBR2-BLKPA-2023-03'
61  else:
62  if flags.GeoModel.AtlasVersion != 0:
63  if flags.GeoModel.AtlasVersion.find ('ATLAS-GEO-18') >= 0:
64  flags.GeoModel.AtlasVersion = 'ATLAS-R1-2012-03-00-00'
65 
66  flags.fillFromArgs()
67  flags.lock()
68 
69  from AthenaConfiguration.MainServicesConfig import MainServicesCfg
70  cfg = MainServicesCfg(flags)
71 
72  if flags.Input.Format is Format.BS:
73  from PyDumper.BSReadConfig import BSReadCfg
74  cfg.merge (BSReadCfg (flags))
75  else:
76  from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
77  cfg.merge(PoolReadCfg(flags))
78 
79  cfg.merge (DumpCfg (flags,
80  ofile='%(ofile-name)s',
81  items='%(include)s',
82  exclude='%(exclude)s'))
83 
84  sc = cfg.run (%(evts)s)
85  import sys
86  sys.exit (sc.isFailure())
87  """) % dct
88 
89  return job
90 
91 def _run_jobo(job, msg, options):
92  import os,atexit,tempfile,shutil
93  # capture current directory's content
94  keep_files = [os.path.abspath(item)
95  for item in os.listdir(os.getcwd())]
96  keep_files.append (os.path.abspath(options.oname))
97  keep_files.append (os.path.abspath("%s.log"%options.oname))
98 
99  def _cleanup(keep_files):
100  errors = []
101  for item in os.listdir(os.getcwd()):
102  item = os.path.abspath(item)
103  if os.path.basename(item).startswith(('.__afs',
104  '.__nfs')):
105  # don't care about freakingly sticky metadata files
106  continue
107 
108  if item in keep_files:
109  continue
110  try:
111  if os.path.isfile (item): os.remove (item)
112  elif os.path.islink (item): os.unlink (item)
113  elif os.path.isdir (item): shutil.rmtree (item)
114  else:
115  msg.warning ("don't know what kind of stuff this is: %s",
116  item)
117  except Exception as err:
118  errors.append ("%s"%err)
119  pass
120  if len(errors)>0:
121  msg.error ("problem during workdir clean-up")
122  map (msg.error, errors)
123  else:
124  msg.debug ("workdir clean-up [ok]")
125  return
126 
127  if options.do_clean_up:
128  atexit.register (_cleanup, keep_files)
129 
130  import subprocess
131  sc,out = subprocess.getstatusoutput ('which athena.py')
132  if sc != 0:
133  msg.error("could not locate 'athena.py':\n%s", out)
134  return sc, out
135  app = out
136  jobo = _make_jobo(job)
137 
138  sc,out = subprocess.getstatusoutput ('which sh')
139  if sc != 0:
140  msg.error("could not locate 'sh':\n%s",out)
141  return sc, out
142  sh = out
143 
144  logfile = tempfile.NamedTemporaryFile(prefix='sg_dumper_job_',
145  suffix='.logfile.txt',
146  dir=os.getcwd(),
147  mode = 'w+')
148 
149  # do not require $HOME to be available for ROOT
150  # see bug #82096
151  # https://savannah.cern.ch/bugs/index.php?82096
152  env = dict(os.environ)
153  env['ROOTENV_NO_HOME'] = os.getenv('ROOTENV_NO_HOME', '1')
154 
155  out = []
156  athena_opts = []
157  if options.athena_opts:
158  import shlex
159  athena_opts = shlex.split(options.athena_opts)
160  cmd = [sh, app,] + athena_opts + ['--CA'] + [jobo.name,]
161  import subprocess as sub
162  app_handle = sub.Popen (args=cmd,
163  stdout=logfile,
164  stderr=logfile,
165  env=env)
166  pos = 0
167  import re
168  if options.full_log:
169  pat = re.compile ('.*')
170  else:
171  pat = re.compile (r'^Py:pyalg .*')
172  evt_pat = re.compile (
173  r'^Py:pyalg .*? ==> processing event \[(?P<evtnbr>\d*?)\].*'
174  )
175  def _monitor(pos):
176  logfile.flush()
177  _watcher = open (logfile.name, 'r'); _watcher.seek (0, 2) # end of file
178  end = _watcher.tell(); _watcher.seek (pos)
179  mon = [l for l in _watcher
180  if pat.match(l)]
181  _watcher.seek (end)
182  pos = _watcher.tell()
183 
184  for l in mon:
185  if l.count ('==> initialize...'):
186  msg.info ('athena initialized')
187  if evt_pat.match(l):
188  ievt = evt_pat.match(l).group('evtnbr')
189  out.append(ievt)
190  msg.info ('processed event [%s]', ievt)
191  if l.count ('==> finalize...'):
192  msg.info ('athena finalized')
193 
194  return pos
195 
196  import time
197  while app_handle.poll() is None:
198  pos = _monitor(pos)
199  time.sleep (5)
200  pass
201  _monitor(pos)
202 
203  jobo.close()
204  sc = app_handle.returncode
205  if sc != 0:
206  logfile.seek(0)
207  msg.error ('='*80)
208  from io import StringIO
209  err = StringIO()
210  for l in logfile:
211  print (l, end='')
212  print (l, end='', file=err)
213  msg.error ('='*80)
214  msg.error ('problem running jobo')
215  return sc, err.getvalue()
216 
217  logfile.seek (0)
218  from io import StringIO
219  out = StringIO()
220  for l in logfile:
221  if pat.match(l):
222  print (l, end='', file=out)
223  return sc, out.getvalue()
224 
225 def run_sg_dump(files, output,
226  nevts=-1,
227  skip=0,
228  dump_jobo=False,
229  pyalg_cls='PyDumper.PyComps:PySgDumper',
230  include='*',
231  exclude='',
232  do_clean_up=False,
233  athena_opts=None,
234  conditions_tag=None,
235  full_log=False,
236  msg=None):
237  """API for the sg-dump script.
238  `files` a list of input filenames to be dumped by SgDump
239  `output` the name of the output (ASCII) file
240  `nevts` the number of events to dump (default: -1 ie all)
241  `skip` the number of events to skip at the start (default: 0)
242  `dump_jobo` switch to store or not the automatically generated jobo (put
243  the name of the jobo output name in there if you want to keep
244  it)
245  `pyalg_cls` the fully qualified name of the PyAthena.Alg class to process the file(s) content (PySgDumper or DataProxyLoader)
246  `include`: comma-separates list of type#key container names to dump.
247  `exclude`: comma-separated list of glob patterns for keys/types to ignore.
248  `do_clean_up` flag to enable the attempt at removing all the files sg-dump
249  produces during the course of its execution
250  `athena_opts` a space-separated list of athena command-line options (e.g '--perfmon --stdcmalloc --nprocs=-1')
251  `conditions_tag` force a specific global conditions tag
252  `msg` a logging.Logger instance
253 
254  returns the exit code of the sub-athena process
255  """
256 
257  if msg is None:
258  import PyUtils.Logging as L
259  msg = L.logging.getLogger('sg-dumper')
260  msg.setLevel(L.logging.INFO)
261 
262  if isinstance(files, str):
263  files = files.split()
264 
265  if not isinstance(files, (list,tuple)):
266  err = "'files' needs to be a list (or tuple) of file names"
267  msg.error(err)
268  raise TypeError(err)
269 
270  if not isinstance(output, str):
271  err = "'output' needs to be a filename"
272  msg.error(err)
273  raise TypeError(err)
274 
275  _allowed_values = ('PyDumper.PyComps:PySgDumper',
276  'PyDumper.PyComps:DataProxyLoader')
277  if not (pyalg_cls in _allowed_values):
278  err = "'pyalg_cls' allowed values are: %s. got: [%s]" % (
279  _allowed_values,
280  pyalg_cls)
281  msg.error(err)
282  raise ValueError(err)
283  pyalg_pkg,pyalg_cls = pyalg_cls.split(':')
284 
285  conditions_tag_frag = ''
286  if conditions_tag:
287  conditions_tag_frag = "conddb.setGlobalTag('%s')" % conditions_tag
288  jobo = _gen_jobo({
289  'ofile-name' : output,
290  'input-files': files,
291  'evts' : nevts,
292  'skip' : skip,
293  'include' : include,
294  'exclude' : exclude,
295  'pyalg_pkg': pyalg_pkg,
296  'pyalg_cls': pyalg_cls,
297  'conditions_tag_frag' : conditions_tag_frag,
298  })
299 
300  msg.info(':'*40)
301  msg.info('input files: %s', files)
302  msg.info('events: %s', nevts)
303  msg.info('skip: %s', skip)
304  msg.info('out (ascii): %s', output)
305  msg.info('pyalg-class: %s:%s', pyalg_pkg, pyalg_cls)
306  msg.info('include: %s', include)
307  msg.info('exclude: %s', exclude)
308  msg.info('conditions_tag: %s', conditions_tag)
309 
310  if dump_jobo and isinstance(dump_jobo, str):
311  try:
312  with open(dump_jobo, 'w') as f:
313  f.write(jobo)
314  except Exception as err:
315  msg.warning('problem while dumping joboption file to [%s]:\n%s',
316  dump_jobo, err)
317 
318  from collections import namedtuple
319  Options = namedtuple('Options',
320  'oname do_clean_up athena_opts full_log')
321  opts = Options(oname=output,
322  do_clean_up=do_clean_up,
323  full_log=full_log,
324  athena_opts=athena_opts)
325 
326  sc, out = 1, "<N/A>"
327  msg.info('running dumper...')
328  sc,out = _run_jobo(jobo, msg, opts)
329  msg.info('dumper done')
330  if output != os.devnull:
331  msg.info('writing logfile: %s.log', output)
332  try:
333  with open('%s.log'%output, 'w') as f:
334  for l in out.splitlines():
335  print (l, file=f)
336  print ("### EOF ###", file=f)
337 
338  except Exception as err:
339  msg.warning('problem writing out logfile [%s.log]:\n%s',
340  output, err)
341 
342  msg.info('bye.')
343  msg.info(':'*40)
344  return sc, out
python.SgDumpLib._make_jobo
def _make_jobo(job)
Definition: SgDumpLib.py:20
python.SgDumpLib._run_jobo
def _run_jobo(job, msg, options)
Definition: SgDumpLib.py:91
python.SgDumpLib._gen_jobo
def _gen_jobo(dct)
Definition: SgDumpLib.py:29
checkNSWValTree.Options
Options
Definition: checkNSWValTree.py:15
Trk::open
@ open
Definition: BinningType.h:40
CaloLCW_tf.group
group
Definition: CaloLCW_tf.py:28
python.SgDumpLib.run_sg_dump
def run_sg_dump(files, output, nevts=-1, skip=0, dump_jobo=False, pyalg_cls='PyDumper.PyComps:PySgDumper', include=' *', exclude='', do_clean_up=False, athena_opts=None, conditions_tag=None, full_log=False, msg=None)
Definition: SgDumpLib.py:225