ATLAS Offline Software
Loading...
Searching...
No Matches
runJobs.py
Go to the documentation of this file.
1#! /usr/bin/env python
2
3# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4# Front-end script to run JobRunner jobs
5
6
7
8__authors__ = ['Juerg Beringer', 'Carl Suster']
9__version__ = 'runJobs.py atlas/athena'
10__usage__ = """%prog [options] JOBOPTIONTEMPLATE DATASET TASK
11
12Templates: - InDetBeamSpotExample/VertexTemplate.py
13 - InDetBeamSpotExample/*Template.py
14 - your own template file
15
16See the comment field of jobs (e.g. MON.DB_BEAMSPOT jobs) in the beam spot
17summary webpage for real usage examples.
18"""
19
20import sys
21import glob
22import os
23import re
24import subprocess
25import InDetBeamSpotExample
26from InDetBeamSpotExample import TaskManager
27from InDetBeamSpotExample import DiskUtils
28
29def extract_file_list_legacy(inputdata, options):
30 ''' Reads several legacy options to work out what input data to use. '''
31 if options.legacy_griduser:
32 fs = [inputdata]
33 elif options.legacy_fromcastor:
34 # INPUTDATA specifies a directory with files
35 pattern = options.legacy_filter or (None if options.bytestream else '.*ESD.*')
36 fs = DiskUtils.FileSet.from_directory(inputdata).matching(pattern)
37 elif os.path.isfile(inputdata):
38 # inputdata is a text file with filenames
39 fs = DiskUtils.FileSet.from_file_containing_list(inputdata)
40 elif options.legacy_runoverdpd:
41 # INPUTDATA is filename
42 rundir = os.path.join(os.getcwd(), dsname)
43 if not os.path.exists(rundir):
44 raise Exception('Run ' + dsname + ' (directory ' + rundir + ') not found')
45 dpddir = os.path.join(rundir, inputdata)
46 if not os.path.exists(dpddir):
47 raise Exception('Dataset with name ' + inputdata + ' (directory ' + dpddir + ') not found')
48 fs = DiskUtils.FileSet.from_glob(os.path.join(dpddir, '*', '*-dpd.root*'))
49 else:
50 # INPUTDATA is a directory with files
51 pattern = options.legacy_filter or '*.root*'
52 fs = DiskUtils.FileSet.from_glob(os.path.join(inputdata, pattern))
53 return list(sorted(fs))
54
55def extract_file_list(options):
56 ''' Use new flags to work out input file list. '''
57 if options.in_directory:
58 fs = DiskUtils.FileSet.from_directory(options.in_directory)
59 elif options.in_list:
60 fs = DiskUtils.FileSet.from_file_containing_list(options.in_list)
61 return list(sorted(fs
62 .matching(options.f_match)
63 .excluding(options.f_exclude)
64 ))
65
66def process_flags(options, legacy=False):
67 flags = {
68 'inputds' : '',
69 'bytestream' : options.bytestream,
70 'DataSource' : 'geant4' if options.is_mc else 'data',
71 'evtmax' : options.evtmax,
72 'maxjobs' : options.maxjobs,
73 'outputlevel' : options.outputlevel,
74 'logmail' : options.users,
75 'alignmentfile' : options.alignmentfile,
76 'beamspotfile' : options.beamspotfile,
77 'autoconfparams' : options.autoconfparams,
78 'taskpostprocsteps' : options.postprocsteps,
79 'filesperjob' : options.nfiles,
80 'lbperjob' : options.lbperjob,
81 'batchqueue' : options.batchqueue,
82 'gridsite' : options.gridsite,
83 'addinputtopoolcatalog' : not (options.bytestream or options.submit == 'grid' or options.legacy_griduser),
84 }
85
86 # List of desired output files. For grid jobs, it must specify exactly
87 # the expected files (otherwise the grid jobs will fail or not return the
88 # desired output). For all other jobs, it can be an inclusive list of
89 # possible outputs and JobRunner will return only the actually present
90 # output files when asked for output files.
91 if options.outputfilelist:
92 flags['outputfilelist'] = [ f.strip() for f in options.outputfilelist.split(',') ]
93 else:
94 flags['outputfilelist'] = ['dpd.root', 'nt.root', 'monitoring.root', 'beamspot.db']
95
96 if legacy:
97 flags['griduser'] = options.legacy_griduser
98 else:
99 flags['griduser'] = '.'.join(['user', options.grid_user or os.getenv('USER')])
100
101 if options.legacy_runoverdpd and not options.lbperjob:
102 flags['maxjobs'] = 1
103
104 for s in options.params.split(', '):
105 if s:
106 try:
107 p = s.split('=', 1)
108 flags[p[0].strip()] = eval(p[1].strip())
109 except:
110 print ('\nERROR parsing user parameter', p, '- parameter will be ignored')
111
112 return flags
113
114def make_runner(runner_type, flags):
115 runner_class = InDetBeamSpotExample.loadClass(runner_type)
116 return runner_class(**flags)
117
118if __name__ == '__main__':
119 cmd = subprocess.list2cmdline(sys.argv)
120
121 from optparse import OptionParser, OptionGroup
122 parser = OptionParser(usage=__usage__, version=__version__)
123 parser.add_option('', '--bytestream', dest='bytestream', action='store_true', default=False,
124 help='input files are bytestream instead of ROOT/POOL files')
125 parser.add_option('-m', '--mc', dest='is_mc', action='store_true', default=False,
126 help='input data is from Monte-Carlo instead of data (automatically chooses between COMP200 and OFLP200 / CONDBR2 conditions DBs)')
127 parser.add_option('-j', '--maxjobs', dest='maxjobs', type='int', default=0,
128 help='max number of jobs (default: 0 ie no maximum)')
129 parser.add_option('', '--files-per-job', dest='nfiles', type='int', default=1, metavar='N',
130 help='number of files per job (default: 1, set to 0 for single job over all files)')
131 parser.add_option('-e', '--maxevents', dest='evtmax', type='int', default=-1,
132 help='max number of events per job')
133 parser.add_option('', '--lbperjob', dest='lbperjob', type='int', default=0, metavar='N',
134 help='number of luminosity blocks per job (default: 0 - no bunching)')
135 parser.add_option('-o', '--outputfilelist', dest='outputfilelist', default='', metavar='FILES',
136 help='list of desired output files (default: "dpd.root,nt.root,monitoring.root,beamspot.db"; must be specified explicitly for grid)')
137 parser.add_option('-k', '--taskdb', dest='taskdb', default='',
138 help='TaskManager database (default: from TASKDB or sqlite_file:taskdata.db; set to string None to avoid using a task database)')
139 parser.add_option('-l', '--logmail', dest='users', default='', metavar='USERS',
140 help='send log mail to specified users (default: no mail)')
141 parser.add_option('-z', '--postprocsteps', dest='postprocsteps', default='JobPostProcessing', metavar='STEPS',
142 help='Task-level postprocessing steps (Default: JobPostProcessing)')
143 parser.add_option('-t', '--test', dest='testonly', action='store_true', default=False,
144 help='show only options and input files')
145 parser.add_option('-v', '--verbosity', dest='outputlevel', type='int', default=4, metavar='LEVEL',
146 help='output level (default:4, where 1=VERBOSE, 2=DEBUG, 3=INFO, 4=WARNING, 5=ERROR, 6=FATAL)')
147 parser.add_option('-p', '--params', dest='params', default='',
148 help='job option parameters to pass to job option template')
149 parser.add_option('', '--autoconfparams', dest='autoconfparams', default='DetDescrVersion',
150 help='comma-separated list of automatically determined parameters (template must include AutoConfFragment.py, default: "DetDescrVersion")')
151
152 # Additional optional files requiring special treatment (other parameters
153 # should be passed to the job option template via "-p params")
154 parser.add_option('-a', '--alignment-file', dest='alignmentfile', default='', metavar='FILE',
155 help='alignment file (default: none)')
156 parser.add_option('-b', '--beamspot-file', dest='beamspotfile', default='', metavar='FILE',
157 help='beam spot SQLite file (default: none)')
158
159 execopt = OptionGroup(parser, 'Execution Options')
160 execopt.add_option('', '--submit', dest='submit', default='condor', metavar='TYPE',
161 choices=['grid', 'lsf', 'shell', 'bg', 'pdsf', 'simple', 'condor'],
162 help='submission type (default: condor, choices: grid,lsf,shell,bg,pdsf,simple,condor)')
163 execopt.add_option('', '--grid-user', dest='grid_user', default=None, metavar='USER',
164 help='grid username (default: $USER)')
165 execopt.add_option('', '--grid-site', dest='gridsite', default='AUTO', metavar='SITE',
166 help='site name where jobs are sent (default: AUTO)')
167 execopt.add_option('-q', '--queue', dest='batchqueue', default='atlasb1',
168 help='batch queue (default: atlasb1)')
169 parser.add_option_group(execopt)
170
171 inopt = OptionGroup(parser, 'Input File Options',
172 "One of these must be specified.")
173 inopt.add_option('', '--directory', dest='in_directory', metavar='DIR',
174 help='run over all matching files in the directory')
175 inopt.add_option('', '--file-list', dest='in_list', metavar='FILE',
176 help='run over all matching files in the directory')
177 inopt.add_option('', '--dsid', dest='in_dsid', metavar='DSID',
178 help='run over a rucio DSID')
179 parser.add_option_group(inopt)
180
181 filtopt = OptionGroup(parser, 'Input Filtering Options',
182 "Optional filters to select input files.")
183 inopt.add_option('', '--match', dest='f_match', default=None, metavar='REGEX',
184 help='keep only files matching the pattern')
185 inopt.add_option('', '--exclude', dest='f_exclude', default=None, metavar='REGEX',
186 help='skip files matching the pattern')
187 parser.add_option_group(filtopt)
188
189 # TODO Check if these flags can be removed:
190 deprecated = OptionGroup(parser, 'Deprecated Options')
191 deprecated.add_option('-c', '--castor', dest='legacy_fromcastor', action='store_true', default=False,
192 help='INPUTDATA refers to CASTOR directory')
193 deprecated.add_option('', '--prefix', dest='legacy_prefix', default='',
194 help='Prefix for reading files from mass storage (ignored)')
195 deprecated.add_option('-d', '--dpd', dest='legacy_runoverdpd', action='store_true', default=False,
196 help='run over DPD (single job, INPUTDATA is DPD task name)')
197 deprecated.add_option('-i', '--interactive', dest='legacy_interactive', action='store_true', default=False,
198 help='run interatively (same as -r JobRunner)')
199 deprecated.add_option('-f', '--filter', dest='legacy_filter', default='',
200 help='use specified pattern to filter input files (default: *.root* for local files, .*ESD.* for castor)')
201 deprecated.add_option('-g', '--grid', dest='legacy_griduser', default='',
202 help='run on grid (GRIDUSER is user prefix of grid job name, e.g. user09.JuergBeringer; INPUTDATA is grid dataset name)')
203 deprecated.add_option('-s', '--gridsite', dest='gridsite', default='AUTO',
204 help='deprecated spelling of --grid-site')
205 deprecated.add_option('-r', '--runner', dest='legacy_runner', default='LSFJobRunner',
206 help='type of JobRunner (default: LSFJobRunner or PandaJobRunner)')
207 deprecated.add_option('-w', '--wait', dest='legacy_dowait', action='store_true', default=False,
208 help='wait for jobs to complete')
209 deprecated.add_option('-n', '--nfiles', dest='nfiles', type='int',
210 help='deprecated spelling of --files-per-job')
211 parser.add_option_group(deprecated)
212
213 (opts,args) = parser.parse_args()
214 if len(args) not in [3, 4]:
215 parser.error('wrong number of command line arguments')
216
217 joboptiontemplate = args[0]
218 dsname = args[1]
219 taskname = args[2]
220
221 legacy_options = len(args) == 4
222 if legacy_options:
223 print ("WARNING: the four-argument invocation of runJobs is deprecated")
224 print ("WARNING: enabling (imperfect) legacy compatibility mode")
225 files = extract_file_list_legacy(args[3], opts)
226 grid_mode = bool(opts.legacy_griduser)
227 runner_type = opts.legacy_runner
228 if grid_mode:
229 runner_type = 'PandaJobRunner'
230 if opts.legacy_interactive:
231 runner_type = 'JobRunner'
232 else:
233 files = extract_file_list(opts)
234 grid_mode = opts.submit == 'grid'
235 runner_type = {
236 'lsf': 'LSFJobRunner',
237 'grid': 'PandaJobRunner',
238 'shell': 'ShellJobRunner',
239 'bg': 'BackgroundJobRunner',
240 'pdsf': 'PDSFJobRunner',
241 'simple': 'JobRunner',
242 'condor': 'HTCondorJobRunner',
243 }[opts.submit]
244 if grid_mode:
245 if not opts.in_dsid:
246 sys.exit('ERROR: For grid submission, a DSID must be given')
247 if not files:
248 sys.exit('ERROR: No input files found')
249
250 flags = process_flags(opts, legacy=legacy_options)
251 flags['comment'] = cmd
252 flags['inputfiles'] = files
253 flags['joboptionpath'] = joboptiontemplate
254
255 if grid_mode:
256 flags['inputds'] = files[0]
257 if opts.lbperjob:
258 sys.exit('ERROR: Bunching per LB not supported for grid jobs')
259 if not opts.outputfilelist:
260 sys.exit('ERROR: For grid jobs, must specify output files expclitly using option -o (e.g. -o dpd.root)')
261
262 if opts.nfiles < 1 or (opts.legacy_runoverdpd and opts.nfiles == 1):
263 # run single job over all files:
264 flags['filesperjob'] = len(files)
265 if grid_mode:
266 sys.exit('ERROR: Must specify number of files per job explicitly when running on grid')
267
268 workdir = os.path.join(os.getcwd(), dsname, taskname)
269 flags['jobdir'] = os.path.join(workdir, '%(jobnr)03i')
270 if os.path.exists(workdir):
271 sys.exit("ERROR: Task %s exists already for dataset %s (directory %s)" % (taskname,dsname,workdir))
272
273 if opts.lbperjob:
274 flags['jobname'] = '-'.join([dsname, taskname, 'lb%(jobnr)03i'])
275 else:
276 flags['jobname'] = '-'.join([dsname, taskname, '%(jobnr)03i'])
277 if grid_mode or opts.legacy_runoverdpd:
278 flags['jobname'] = dsname + '-' + taskname
279
280 runner = make_runner(runner_type, flags)
281
282 if opts.alignmentfile:
283 runner.addFilesToPoolFileCatalog([opts.alignmentfile])
284
285 if grid_mode:
286 runner.setParam('outputfileprefix','%(jobname)s-')
287 runner.setParam('addinputtopoolcatalog',False) # input is a grid dataset
288 runner.registerToBeCopied('alignmentfile')
289 runner.registerToBeCopied('beamspotfile')
290
291 print()
292 runner.showParams(-1)
293 print()
294
295 # Temporary warning. TODO: still needed?
296 if grid_mode and opts.autoconfparams:
297 print ("WARNING: Automatic configuration of parameters such as DetDescrVersion doesn't work yet on the grid!")
298 print (" Please be sure the values of each of the following parameters are specified explicitly above,")
299 print (" unless the defaults in the job option template are correct:\n")
300 print (" ", opts.autoconfparams)
301 print()
302
303 print (len(files), "input file(s)/dataset found.")
304 print()
305 if not opts.testonly:
306 runner.configure()
307 if opts.taskdb != 'None':
308 try:
309 with TaskManager.TaskManager(opts.taskdb) as taskman:
310 taskman.addTask(dsname,taskname,joboptiontemplate,runner.getParam('release'),runner.getNJobs(),opts.postprocsteps,comment=cmd)
311 except:
312 print ('WARNING: Unable to add task to task manager database ' + opts.taskdb)
313 runner.run()
314 if opts.legacy_dowait and not grid_mode:
315 if not opts.legacy_interactive: runner.wait()
316 print()
317 print ("Job directories in %s for this task:" % workdir)
318 print()
319 os.system('ls -l %s' % workdir)
320 print()
321 print ("The following output file(s) were produced:")
322 print()
323 print (runner.getOutputFiles())
324 print()
void print(char *figname, TCanvas *c1)
extract_file_list_legacy(inputdata, options)
Definition runJobs.py:29
extract_file_list(options)
Definition runJobs.py:55
process_flags(options, legacy=False)
Definition runJobs.py:66
make_runner(runner_type, flags)
Definition runJobs.py:114