ATLAS Offline Software
Loading...
Searching...
No Matches
python.SgDumpLib Namespace Reference

Functions

 _make_jobo (job)
 _gen_jobo (dct)
 _run_jobo (job, msg, options)
 run_sg_dump (files, output, nevts=-1, skip=0, dump_jobo=False, pyalg_cls='PyDumper.PyComps:PySgDumper', include=' *', exclude='', do_clean_up=False, athena_opts=None, conditions_tag=None, full_log=False, msg=None)

Variables

str __doc__
str __author__ = "Sebastien Binet <binet@cern.ch>"
list __all__

Function Documentation

◆ _gen_jobo()

python.SgDumpLib._gen_jobo ( dct)
protected

Definition at line 29 of file SgDumpLib.py.

29def _gen_jobo(dct):
30 import textwrap
31 job = textwrap.dedent("""\
32 #!/usr/bin/env athena.py
33 # automatically generated joboptions file
34
35 from AthenaConfiguration.AllConfigFlags import initConfigFlags
36 from AthenaConfiguration.Enums import Format
37 from PyDumper.DumpConfig import DumpCfg
38
39 flags = initConfigFlags()
40 flags.GeoModel.Align.Dynamic = False
41 flags.Input.Files = %(input-files)s
42 flags.Exec.MaxEvents = %(evts)s
43 flags.Exec.SkipEvents = %(skip)s
44
45 if flags.Input.Format is Format.BS:
46 # BS files don't contain the conditions/geometry tags.
47 # Try to give some reasonable defaults here, depending on the run.
48 # These may still be overridden from the command line.
49 if flags.Input.DataYear < 2000:
50 pass
51 elif flags.Input.DataYear < 2015:
52 flags.GeoModel.AtlasVersion = 'ATLAS-R1-2012-03-00-00'
53 flags.IOVDb.GlobalTag = 'COMCOND-BLKPA-RUN1-09'
54 flags.Trigger.doxAODConversion = False
55 elif flags.Input.DataYear < 2020:
56 flags.GeoModel.AtlasVersion = 'ATLAS-R2-2016-01-00-01'
57 flags.IOVDb.GlobalTag = 'CONDBR2-BLKPA-RUN2-09'
58 else:
59 flags.GeoModel.AtlasVersion = 'ATLAS-R3S-2021-03-02-00'
60 flags.IOVDb.GlobalTag = 'CONDBR2-BLKPA-2023-03'
61 else:
62 if flags.GeoModel.AtlasVersion:
63 if flags.GeoModel.AtlasVersion.find ('ATLAS-GEO-1') >= 0:
64 flags.GeoModel.AtlasVersion = 'ATLAS-R1-2012-03-00-00'
65
66 flags.fillFromArgs()
67 flags.lock()
68
69 from AthenaConfiguration.MainServicesConfig import MainServicesCfg
70 cfg = MainServicesCfg(flags)
71
72 if flags.Input.Format is Format.BS:
73 from PyDumper.BSReadConfig import BSReadCfg
74 cfg.merge (BSReadCfg (flags))
75 else:
76 from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
77 cfg.merge(PoolReadCfg(flags))
78
79 cfg.merge (DumpCfg (flags,
80 ofile='%(ofile-name)s',
81 items='%(include)s',
82 exclude='%(exclude)s'))
83
84 sc = cfg.run (%(evts)s)
85 import sys
86 sys.exit (sc.isFailure())
87 """) % dct
88
89 return job
90

◆ _make_jobo()

python.SgDumpLib._make_jobo ( job)
protected

Definition at line 20 of file SgDumpLib.py.

20def _make_jobo(job):
21 import tempfile
22 jobo = tempfile.NamedTemporaryFile(suffix='-jobo.py', mode='w+')
23 import textwrap
24 job = textwrap.dedent (job)
25 jobo.writelines([l+os.linesep for l in job.splitlines()])
26 jobo.flush()
27 return jobo
28

◆ _run_jobo()

python.SgDumpLib._run_jobo ( job,
msg,
options )
protected

Definition at line 91 of file SgDumpLib.py.

91def _run_jobo(job, msg, options):
92 import os,atexit,tempfile,shutil
93 # capture current directory's content
94 keep_files = [os.path.abspath(item)
95 for item in os.listdir(os.getcwd())]
96 keep_files.append (os.path.abspath(options.oname))
97 keep_files.append (os.path.abspath("%s.log"%options.oname))
98
99 def _cleanup(keep_files):
100 errors = []
101 for item in os.listdir(os.getcwd()):
102 item = os.path.abspath(item)
103 if os.path.basename(item).startswith(('.__afs',
104 '.__nfs')):
105 # don't care about freakingly sticky metadata files
106 continue
107
108 if item in keep_files:
109 continue
110 try:
111 if os.path.isfile (item): os.remove (item)
112 elif os.path.islink (item): os.unlink (item)
113 elif os.path.isdir (item): shutil.rmtree (item)
114 else:
115 msg.warning ("don't know what kind of stuff this is: %s",
116 item)
117 except Exception as err:
118 errors.append ("%s"%err)
119 pass
120 if len(errors)>0:
121 msg.error ("problem during workdir clean-up")
122 map (msg.error, errors)
123 else:
124 msg.debug ("workdir clean-up [ok]")
125 return
126
127 if options.do_clean_up:
128 atexit.register (_cleanup, keep_files)
129
130 import subprocess
131 sc,out = subprocess.getstatusoutput ('which athena.py')
132 if sc != 0:
133 msg.error("could not locate 'athena.py':\n%s", out)
134 return sc, out
135 app = out
136 jobo = _make_jobo(job)
137
138 sc,out = subprocess.getstatusoutput ('which sh')
139 if sc != 0:
140 msg.error("could not locate 'sh':\n%s",out)
141 return sc, out
142 sh = out
143
144 logfile = tempfile.NamedTemporaryFile(prefix='sg_dumper_job_',
145 suffix='.logfile.txt',
146 dir=os.getcwd(),
147 mode = 'w+')
148
149 # do not require $HOME to be available for ROOT
150 # see bug #82096
151 # https://savannah.cern.ch/bugs/index.php?82096
152 env = dict(os.environ)
153 env['ROOTENV_NO_HOME'] = os.getenv('ROOTENV_NO_HOME', '1')
154
155 out = []
156 athena_opts = []
157 if options.athena_opts:
158 import shlex
159 athena_opts = shlex.split(options.athena_opts)
160 cmd = [sh, app,] + athena_opts + ['--CA'] + [jobo.name,]
161 import subprocess as sub
162 app_handle = sub.Popen (args=cmd,
163 stdout=logfile,
164 stderr=logfile,
165 env=env)
166 pos = 0
167 import re
168 if options.full_log:
169 pat = re.compile ('.*')
170 else:
171 pat = re.compile (r'^Py:pyalg .*')
172 evt_pat = re.compile (
173 r'^Py:pyalg .*? ==> processing event \[(?P<evtnbr>\d*?)\].*'
174 )
175 def _monitor(pos):
176 logfile.flush()
177 _watcher = open (logfile.name, 'r'); _watcher.seek (0, 2) # end of file
178 end = _watcher.tell(); _watcher.seek (pos)
179 mon = [l for l in _watcher
180 if pat.match(l)]
181 _watcher.seek (end)
182 pos = _watcher.tell()
183
184 for l in mon:
185 if l.count ('==> initialize...'):
186 msg.info ('athena initialized')
187 if evt_pat.match(l):
188 ievt = evt_pat.match(l).group('evtnbr')
189 out.append(ievt)
190 msg.info ('processed event [%s]', ievt)
191 if l.count ('==> finalize...'):
192 msg.info ('athena finalized')
193
194 return pos
195
196 import time
197 while app_handle.poll() is None:
198 pos = _monitor(pos)
199 time.sleep (5)
200 pass
201 _monitor(pos)
202
203 jobo.close()
204 sc = app_handle.returncode
205 if sc != 0:
206 logfile.seek(0)
207 msg.error ('='*80)
208 from io import StringIO
209 err = StringIO()
210 for l in logfile:
211 print (l, end='')
212 print (l, end='', file=err)
213 msg.error ('='*80)
214 msg.error ('problem running jobo')
215 return sc, err.getvalue()
216
217 logfile.seek (0)
218 from io import StringIO
219 out = StringIO()
220 for l in logfile:
221 if pat.match(l):
222 print (l, end='', file=out)
223 return sc, out.getvalue()
224

◆ run_sg_dump()

python.SgDumpLib.run_sg_dump ( files,
output,
nevts = -1,
skip = 0,
dump_jobo = False,
pyalg_cls = 'PyDumper.PyComps:PySgDumper',
include = '*',
exclude = '',
do_clean_up = False,
athena_opts = None,
conditions_tag = None,
full_log = False,
msg = None )
API for the sg-dump script.
 `files` a list of input filenames to be dumped by SgDump
 `output` the name of the output (ASCII) file
 `nevts`  the number of events to dump (default: -1 ie all)
 `skip`   the number of events to skip at the start (default: 0)
 `dump_jobo` switch to store or not the automatically generated jobo (put
             the name of the jobo output name in there if you want to keep
             it)
 `pyalg_cls` the fully qualified name of the PyAthena.Alg class to process the file(s) content (PySgDumper or DataProxyLoader)
 `include`: comma-separates list of type#key container names to dump.
 `exclude`: comma-separated list of glob patterns for keys/types to ignore.
 `do_clean_up` flag to enable the attempt at removing all the files sg-dump
               produces during the course of its execution
 `athena_opts` a space-separated list of athena command-line options (e.g '--perfmon --stdcmalloc --nprocs=-1')
 `conditions_tag` force a specific global conditions tag
 `msg`    a logging.Logger instance

 returns the exit code of the sub-athena process

Definition at line 225 of file SgDumpLib.py.

236 msg=None):
237 """API for the sg-dump script.
238 `files` a list of input filenames to be dumped by SgDump
239 `output` the name of the output (ASCII) file
240 `nevts` the number of events to dump (default: -1 ie all)
241 `skip` the number of events to skip at the start (default: 0)
242 `dump_jobo` switch to store or not the automatically generated jobo (put
243 the name of the jobo output name in there if you want to keep
244 it)
245 `pyalg_cls` the fully qualified name of the PyAthena.Alg class to process the file(s) content (PySgDumper or DataProxyLoader)
246 `include`: comma-separates list of type#key container names to dump.
247 `exclude`: comma-separated list of glob patterns for keys/types to ignore.
248 `do_clean_up` flag to enable the attempt at removing all the files sg-dump
249 produces during the course of its execution
250 `athena_opts` a space-separated list of athena command-line options (e.g '--perfmon --stdcmalloc --nprocs=-1')
251 `conditions_tag` force a specific global conditions tag
252 `msg` a logging.Logger instance
253
254 returns the exit code of the sub-athena process
255 """
256
257 if msg is None:
258 import PyUtils.Logging as L
259 msg = L.logging.getLogger('sg-dumper')
260 msg.setLevel(L.logging.INFO)
261
262 if isinstance(files, str):
263 files = files.split()
264
265 if not isinstance(files, (list,tuple)):
266 err = "'files' needs to be a list (or tuple) of file names"
267 msg.error(err)
268 raise TypeError(err)
269
270 if not isinstance(output, str):
271 err = "'output' needs to be a filename"
272 msg.error(err)
273 raise TypeError(err)
274
275 _allowed_values = ('PyDumper.PyComps:PySgDumper',
276 'PyDumper.PyComps:DataProxyLoader')
277 if not (pyalg_cls in _allowed_values):
278 err = "'pyalg_cls' allowed values are: %s. got: [%s]" % (
279 _allowed_values,
280 pyalg_cls)
281 msg.error(err)
282 raise ValueError(err)
283 pyalg_pkg,pyalg_cls = pyalg_cls.split(':')
284
285 conditions_tag_frag = ''
286 if conditions_tag:
287 conditions_tag_frag = "conddb.setGlobalTag('%s')" % conditions_tag
288 jobo = _gen_jobo({
289 'ofile-name' : output,
290 'input-files': files,
291 'evts' : nevts,
292 'skip' : skip,
293 'include' : include,
294 'exclude' : exclude,
295 'pyalg_pkg': pyalg_pkg,
296 'pyalg_cls': pyalg_cls,
297 'conditions_tag_frag' : conditions_tag_frag,
298 })
299
300 msg.info(':'*40)
301 msg.info('input files: %s', files)
302 msg.info('events: %s', nevts)
303 msg.info('skip: %s', skip)
304 msg.info('out (ascii): %s', output)
305 msg.info('pyalg-class: %s:%s', pyalg_pkg, pyalg_cls)
306 msg.info('include: %s', include)
307 msg.info('exclude: %s', exclude)
308 msg.info('conditions_tag: %s', conditions_tag)
309
310 if dump_jobo and isinstance(dump_jobo, str):
311 try:
312 with open(dump_jobo, 'w') as f:
313 f.write(jobo)
314 except Exception as err:
315 msg.warning('problem while dumping joboption file to [%s]:\n%s',
316 dump_jobo, err)
317
318 from collections import namedtuple
319 Options = namedtuple('Options',
320 'oname do_clean_up athena_opts full_log')
321 opts = Options(oname=output,
322 do_clean_up=do_clean_up,
323 full_log=full_log,
324 athena_opts=athena_opts)
325
326 sc, out = 1, "<N/A>"
327 msg.info('running dumper...')
328 sc,out = _run_jobo(jobo, msg, opts)
329 msg.info('dumper done')
330 if output != os.devnull:
331 msg.info('writing logfile: %s.log', output)
332 try:
333 with open('%s.log'%output, 'w') as f:
334 for l in out.splitlines():
335 print (l, file=f)
336 print ("### EOF ###", file=f)
337
338 except Exception as err:
339 msg.warning('problem writing out logfile [%s.log]:\n%s',
340 output, err)
341
342 msg.info('bye.')
343 msg.info(':'*40)
344 return sc, out

Variable Documentation

◆ __all__

list python.SgDumpLib.__all__
private
Initial value:
1= [
2 'run_sg_dump',
3 ]

Definition at line 16 of file SgDumpLib.py.

◆ __author__

str python.SgDumpLib.__author__ = "Sebastien Binet <binet@cern.ch>"
private

Definition at line 14 of file SgDumpLib.py.

◆ __doc__

str python.SgDumpLib.__doc__
private
Initial value:
1= """\
2API for the sg-dump script (which dumps an ASCII representation of events in
3POOL or RAW files
4"""

Definition at line 10 of file SgDumpLib.py.