11 API for the sg-dump script (which dumps an ASCII representation of events in
14 __author__ =
"Sebastien Binet <binet@cern.ch>"
22 jobo = tempfile.NamedTemporaryFile(suffix=
'-jobo.py', mode=
'w+')
24 job = textwrap.dedent (job)
25 jobo.writelines([l+os.linesep
for l
in job.splitlines()])
31 job = textwrap.dedent(
"""\
32 #!/usr/bin/env athena.py
33 # automatically generated joboptions file
35 from AthenaConfiguration.AllConfigFlags import initConfigFlags
36 from AthenaConfiguration.Enums import Format
37 from PyDumper.DumpConfig import DumpCfg
39 flags = initConfigFlags()
40 flags.GeoModel.Align.Dynamic = False
41 flags.Input.Files = %(input-files)s
42 flags.Exec.MaxEvents = %(evts)s
43 flags.Exec.SkipEvents = %(skip)s
45 if flags.Input.Format is Format.BS:
46 # BS files don't contain the conditions/geometry tags.
47 # Try to give some reasonable defaults here, depending on the run.
48 # These may still be overridden from the command line.
49 if flags.Input.DataYear < 2000:
51 elif flags.Input.DataYear < 2015:
52 flags.GeoModel.AtlasVersion = 'ATLAS-R1-2012-03-00-00'
53 flags.IOVDb.GlobalTag = 'COMCOND-BLKPA-RUN1-09'
54 flags.Trigger.doxAODConversion = False
55 elif flags.Input.DataYear < 2020:
56 flags.GeoModel.AtlasVersion = 'ATLAS-R2-2016-01-00-01'
57 flags.IOVDb.GlobalTag = 'CONDBR2-BLKPA-RUN2-09'
59 flags.GeoModel.AtlasVersion = 'ATLAS-R3S-2021-03-02-00'
60 flags.IOVDb.GlobalTag = 'CONDBR2-BLKPA-2023-03'
62 if flags.GeoModel.AtlasVersion != 0:
63 if flags.GeoModel.AtlasVersion.find ('ATLAS-GEO-18') >= 0:
64 flags.GeoModel.AtlasVersion = 'ATLAS-R1-2012-03-00-00'
69 from AthenaConfiguration.MainServicesConfig import MainServicesCfg
70 cfg = MainServicesCfg(flags)
72 if flags.Input.Format is Format.BS:
73 from PyDumper.BSReadConfig import BSReadCfg
74 cfg.merge (BSReadCfg (flags))
76 from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
77 cfg.merge(PoolReadCfg(flags))
79 cfg.merge (DumpCfg (flags,
80 ofile='%(ofile-name)s',
82 exclude='%(exclude)s'))
84 sc = cfg.run (%(evts)s)
86 sys.exit (sc.isFailure())
92 import os,atexit,tempfile,shutil
94 keep_files = [os.path.abspath(item)
95 for item
in os.listdir(os.getcwd())]
96 keep_files.append (os.path.abspath(options.oname))
97 keep_files.append (os.path.abspath(
"%s.log"%options.oname))
99 def _cleanup(keep_files):
101 for item
in os.listdir(os.getcwd()):
102 item = os.path.abspath(item)
103 if os.path.basename(item).startswith((
'.__afs',
108 if item
in keep_files:
111 if os.path.isfile (item): os.remove (item)
112 elif os.path.islink (item): os.unlink (item)
113 elif os.path.isdir (item): shutil.rmtree (item)
115 msg.warning (
"don't know what kind of stuff this is: %s",
117 except Exception
as err:
118 errors.append (
"%s"%err)
121 msg.error (
"problem during workdir clean-up")
122 map (msg.error, errors)
124 msg.debug (
"workdir clean-up [ok]")
127 if options.do_clean_up:
128 atexit.register (_cleanup, keep_files)
131 sc,out = subprocess.getstatusoutput (
'which athena.py')
133 msg.error(
"could not locate 'athena.py':\n%s", out)
138 sc,out = subprocess.getstatusoutput (
'which sh')
140 msg.error(
"could not locate 'sh':\n%s",out)
144 logfile = tempfile.NamedTemporaryFile(prefix=
'sg_dumper_job_',
145 suffix=
'.logfile.txt',
152 env = dict(os.environ)
153 env[
'ROOTENV_NO_HOME'] = os.getenv(
'ROOTENV_NO_HOME',
'1')
157 if options.athena_opts:
159 athena_opts = shlex.split(options.athena_opts)
160 cmd = [sh, app,] + athena_opts + [
'--CA'] + [jobo.name,]
161 import subprocess
as sub
162 app_handle = sub.Popen (args=cmd,
169 pat = re.compile (
'.*')
171 pat = re.compile (
r'^Py:pyalg .*')
172 evt_pat = re.compile (
173 r'^Py:pyalg .*? ==> processing event \[(?P<evtnbr>\d*?)\].*'
177 _watcher = open (logfile.name,
'r'); _watcher.seek (0, 2)
178 end = _watcher.tell(); _watcher.seek (pos)
179 mon = [l
for l
in _watcher
182 pos = _watcher.tell()
185 if l.count (
'==> initialize...'):
186 msg.info (
'athena initialized')
188 ievt = evt_pat.match(l).
group(
'evtnbr')
190 msg.info (
'processed event [%s]', ievt)
191 if l.count (
'==> finalize...'):
192 msg.info (
'athena finalized')
197 while app_handle.poll()
is None:
204 sc = app_handle.returncode
208 from io
import StringIO
212 print (l, end=
'', file=err)
214 msg.error (
'problem running jobo')
215 return sc, err.getvalue()
218 from io
import StringIO
222 print (l, end=
'', file=out)
223 return sc, out.getvalue()
229 pyalg_cls='PyDumper.PyComps:PySgDumper
',
237 """API for the sg-dump script.
238 `files` a list of input filenames to be dumped by SgDump
239 `output` the name of the output (ASCII) file
240 `nevts` the number of events to dump (default: -1 ie all)
241 `skip` the number of events to skip at the start (default: 0)
242 `dump_jobo` switch to store or not the automatically generated jobo (put
243 the name of the jobo output name in there if you want to keep
245 `pyalg_cls` the fully qualified name of the PyAthena.Alg class to process the file(s) content (PySgDumper or DataProxyLoader)
246 `include`: comma-separates list of type#key container names to dump.
247 `exclude`: comma-separated list of glob patterns for keys/types to ignore.
248 `do_clean_up` flag to enable the attempt at removing all the files sg-dump
249 produces during the course of its execution
250 `athena_opts` a space-separated list of athena command-line options (e.g '--perfmon --stdcmalloc --nprocs=-1')
251 `conditions_tag` force a specific global conditions tag
252 `msg` a logging.Logger instance
254 returns the exit code of the sub-athena process
258 import PyUtils.Logging
as L
259 msg = L.logging.getLogger(
'sg-dumper')
260 msg.setLevel(L.logging.INFO)
262 if isinstance(files, str):
263 files = files.split()
265 if not isinstance(files, (list,tuple)):
266 err =
"'files' needs to be a list (or tuple) of file names"
270 if not isinstance(output, str):
271 err =
"'output' needs to be a filename"
275 _allowed_values = (
'PyDumper.PyComps:PySgDumper',
276 'PyDumper.PyComps:DataProxyLoader')
277 if not (pyalg_cls
in _allowed_values):
278 err =
"'pyalg_cls' allowed values are: %s. got: [%s]" % (
282 raise ValueError(err)
283 pyalg_pkg,pyalg_cls = pyalg_cls.split(
':')
285 conditions_tag_frag =
''
287 conditions_tag_frag =
"conddb.setGlobalTag('%s')" % conditions_tag
289 'ofile-name' : output,
290 'input-files': files,
295 'pyalg_pkg': pyalg_pkg,
296 'pyalg_cls': pyalg_cls,
297 'conditions_tag_frag' : conditions_tag_frag,
301 msg.info(
'input files: %s', files)
302 msg.info(
'events: %s', nevts)
303 msg.info(
'skip: %s', skip)
304 msg.info(
'out (ascii): %s', output)
305 msg.info(
'pyalg-class: %s:%s', pyalg_pkg, pyalg_cls)
306 msg.info(
'include: %s', include)
307 msg.info(
'exclude: %s', exclude)
308 msg.info(
'conditions_tag: %s', conditions_tag)
310 if dump_jobo
and isinstance(dump_jobo, str):
312 with open(dump_jobo,
'w')
as f:
314 except Exception
as err:
315 msg.warning(
'problem while dumping joboption file to [%s]:\n%s',
318 from collections
import namedtuple
319 Options = namedtuple(
'Options',
320 'oname do_clean_up athena_opts full_log')
322 do_clean_up=do_clean_up,
324 athena_opts=athena_opts)
327 msg.info(
'running dumper...')
329 msg.info(
'dumper done')
330 if output != os.devnull:
331 msg.info(
'writing logfile: %s.log', output)
333 with open(
'%s.log'%output,
'w')
as f:
334 for l
in out.splitlines():
336 print (
"### EOF ###", file=f)
338 except Exception
as err:
339 msg.warning(
'problem writing out logfile [%s.log]:\n%s',