ATLAS Offline Software
runJobs.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4 # Front-end script to run JobRunner jobs
5 
6 
7 from __future__ import print_function
8 
9 __authors__ = ['Juerg Beringer', 'Carl Suster']
10 __version__ = 'runJobs.py atlas/athena'
11 __usage__ = """%prog [options] JOBOPTIONTEMPLATE DATASET TASK
12 
13 Templates: - InDetBeamSpotExample/VertexTemplate.py
14  - InDetBeamSpotExample/*Template.py
15  - your own template file
16 
17 See the comment field of jobs (e.g. MON.DB_BEAMSPOT jobs) in the beam spot
18 summary webpage for real usage examples.
19 """
20 
21 import sys
22 import glob
23 import os
24 import re
25 import subprocess
26 import InDetBeamSpotExample
27 from InDetBeamSpotExample import TaskManager
28 from InDetBeamSpotExample import DiskUtils
29 
30 def extract_file_list_legacy(inputdata, options):
31  ''' Reads several legacy options to work out what input data to use. '''
32  if options.legacy_griduser:
33  fs = [inputdata]
34  elif options.legacy_fromcastor:
35  # INPUTDATA specifies a directory with files
36  pattern = options.legacy_filter or (None if options.bytestream else '.*ESD.*')
37  fs = DiskUtils.FileSet.from_directory(inputdata).matching(pattern)
38  elif os.path.isfile(inputdata):
39  # inputdata is a text file with filenames
40  fs = DiskUtils.FileSet.from_file_containing_list(inputdata)
41  elif options.legacy_runoverdpd:
42  # INPUTDATA is filename
43  rundir = os.path.join(os.getcwd(), dsname)
44  if not os.path.exists(rundir):
45  raise Exception('Run ' + dsname + ' (directory ' + rundir + ') not found')
46  dpddir = os.path.join(rundir, inputdata)
47  if not os.path.exists(dpddir):
48  raise Exception('Dataset with name ' + inputdata + ' (directory ' + dpddir + ') not found')
49  fs = DiskUtils.FileSet.from_glob(os.path.join(dpddir, '*', '*-dpd.root*'))
50  else:
51  # INPUTDATA is a directory with files
52  pattern = options.legacy_filter or '*.root*'
53  fs = DiskUtils.FileSet.from_glob(os.path.join(inputdata, pattern))
54  return list(sorted(fs))
55 
56 def extract_file_list(options):
57  ''' Use new flags to work out input file list. '''
58  if options.in_directory:
59  fs = DiskUtils.FileSet.from_directory(options.in_directory)
60  elif options.in_list:
61  fs = DiskUtils.FileSet.from_file_containing_list(options.in_list)
62  return list(sorted(fs
63  .matching(options.f_match)
64  .excluding(options.f_exclude)
65  ))
66 
67 def process_flags(options, legacy=False):
68  flags = {
69  'inputds' : '',
70  'bytestream' : options.bytestream,
71  'DataSource' : 'geant4' if options.is_mc else 'data',
72  'evtmax' : options.evtmax,
73  'maxjobs' : options.maxjobs,
74  'outputlevel' : options.outputlevel,
75  'logmail' : options.users,
76  'alignmentfile' : options.alignmentfile,
77  'beamspotfile' : options.beamspotfile,
78  'autoconfparams' : options.autoconfparams,
79  'taskpostprocsteps' : options.postprocsteps,
80  'filesperjob' : options.nfiles,
81  'lbperjob' : options.lbperjob,
82  'batchqueue' : options.batchqueue,
83  'gridsite' : options.gridsite,
84  'addinputtopoolcatalog' : not (options.bytestream or options.submit == 'grid' or options.legacy_griduser),
85  }
86 
87  # List of desired output files. For grid jobs, it must specify exactly
88  # the expected files (otherwise the grid jobs will fail or not return the
89  # desired output). For all other jobs, it can be an inclusive list of
90  # possible outputs and JobRunner will return only the actually present
91  # output files when asked for output files.
92  if options.outputfilelist:
93  flags['outputfilelist'] = [ f.strip() for f in options.outputfilelist.split(',') ]
94  else:
95  flags['outputfilelist'] = ['dpd.root', 'nt.root', 'monitoring.root', 'beamspot.db']
96 
97  if legacy:
98  flags['griduser'] = options.legacy_griduser
99  else:
100  flags['griduser'] = '.'.join(['user', options.grid_user or os.getenv('USER')])
101 
102  if options.legacy_runoverdpd and not options.lbperjob:
103  flags['maxjobs'] = 1
104 
105  for s in options.params.split(', '):
106  if s:
107  try:
108  p = s.split('=', 1)
109  flags[p[0].strip()] = eval(p[1].strip())
110  except:
111  print ('\nERROR parsing user parameter', p, '- parameter will be ignored')
112 
113  return flags
114 
115 def make_runner(runner_type, flags):
116  runner_class = InDetBeamSpotExample.loadClass(runner_type)
117  return runner_class(**flags)
118 
119 if __name__ == '__main__':
120  cmd = subprocess.list2cmdline(sys.argv)
121 
122  from optparse import OptionParser, OptionGroup
123  parser = OptionParser(usage=__usage__, version=__version__)
124  parser.add_option('', '--bytestream', dest='bytestream', action='store_true', default=False,
125  help='input files are bytestream instead of ROOT/POOL files')
126  parser.add_option('-m', '--mc', dest='is_mc', action='store_true', default=False,
127  help='input data is from Monte-Carlo instead of data (automatically chooses between COMP200 and OFLP200 / CONDBR2 conditions DBs)')
128  parser.add_option('-j', '--maxjobs', dest='maxjobs', type='int', default=0,
129  help='max number of jobs (default: 0 ie no maximum)')
130  parser.add_option('', '--files-per-job', dest='nfiles', type='int', default=1, metavar='N',
131  help='number of files per job (default: 1, set to 0 for single job over all files)')
132  parser.add_option('-e', '--maxevents', dest='evtmax', type='int', default=-1,
133  help='max number of events per job')
134  parser.add_option('', '--lbperjob', dest='lbperjob', type='int', default=0, metavar='N',
135  help='number of luminosity blocks per job (default: 0 - no bunching)')
136  parser.add_option('-o', '--outputfilelist', dest='outputfilelist', default='', metavar='FILES',
137  help='list of desired output files (default: "dpd.root,nt.root,monitoring.root,beamspot.db"; must be specified explicitly for grid)')
138  parser.add_option('-k', '--taskdb', dest='taskdb', default='',
139  help='TaskManager database (default: from TASKDB or sqlite_file:taskdata.db; set to string None to avoid using a task database)')
140  parser.add_option('-l', '--logmail', dest='users', default='', metavar='USERS',
141  help='send log mail to specified users (default: no mail)')
142  parser.add_option('-z', '--postprocsteps', dest='postprocsteps', default='JobPostProcessing', metavar='STEPS',
143  help='Task-level postprocessing steps (Default: JobPostProcessing)')
144  parser.add_option('-t', '--test', dest='testonly', action='store_true', default=False,
145  help='show only options and input files')
146  parser.add_option('-v', '--verbosity', dest='outputlevel', type='int', default=4, metavar='LEVEL',
147  help='output level (default:4, where 1=VERBOSE, 2=DEBUG, 3=INFO, 4=WARNING, 5=ERROR, 6=FATAL)')
148  parser.add_option('-p', '--params', dest='params', default='',
149  help='job option parameters to pass to job option template')
150  parser.add_option('', '--autoconfparams', dest='autoconfparams', default='DetDescrVersion',
151  help='comma-separated list of automatically determined parameters (template must include AutoConfFragment.py, default: "DetDescrVersion")')
152 
153  # Additional optional files requiring special treatment (other parameters
154  # should be passed to the job option template via "-p params")
155  parser.add_option('-a', '--alignment-file', dest='alignmentfile', default='', metavar='FILE',
156  help='alignment file (default: none)')
157  parser.add_option('-b', '--beamspot-file', dest='beamspotfile', default='', metavar='FILE',
158  help='beam spot SQLite file (default: none)')
159 
160  execopt = OptionGroup(parser, 'Execution Options')
161  execopt.add_option('', '--submit', dest='submit', default='condor', metavar='TYPE',
162  choices=['grid', 'lsf', 'shell', 'bg', 'pdsf', 'simple', 'condor'],
163  help='submission type (default: condor, choices: grid,lsf,shell,bg,pdsf,simple,condor)')
164  execopt.add_option('', '--grid-user', dest='grid_user', default=None, metavar='USER',
165  help='grid username (default: $USER)')
166  execopt.add_option('', '--grid-site', dest='gridsite', default='AUTO', metavar='SITE',
167  help='site name where jobs are sent (default: AUTO)')
168  execopt.add_option('-q', '--queue', dest='batchqueue', default='atlasb1',
169  help='batch queue (default: atlasb1)')
170  parser.add_option_group(execopt)
171 
172  inopt = OptionGroup(parser, 'Input File Options',
173  "One of these must be specified.")
174  inopt.add_option('', '--directory', dest='in_directory', metavar='DIR',
175  help='run over all matching files in the directory')
176  inopt.add_option('', '--file-list', dest='in_list', metavar='FILE',
177  help='run over all matching files in the directory')
178  inopt.add_option('', '--dsid', dest='in_dsid', metavar='DSID',
179  help='run over a rucio DSID')
180  parser.add_option_group(inopt)
181 
182  filtopt = OptionGroup(parser, 'Input Filtering Options',
183  "Optional filters to select input files.")
184  inopt.add_option('', '--match', dest='f_match', default=None, metavar='REGEX',
185  help='keep only files matching the pattern')
186  inopt.add_option('', '--exclude', dest='f_exclude', default=None, metavar='REGEX',
187  help='skip files matching the pattern')
188  parser.add_option_group(filtopt)
189 
190  # TODO Check if these flags can be removed:
191  deprecated = OptionGroup(parser, 'Deprecated Options')
192  deprecated.add_option('-c', '--castor', dest='legacy_fromcastor', action='store_true', default=False,
193  help='INPUTDATA refers to CASTOR directory')
194  deprecated.add_option('', '--prefix', dest='legacy_prefix', default='',
195  help='Prefix for reading files from mass storage (ignored)')
196  deprecated.add_option('-d', '--dpd', dest='legacy_runoverdpd', action='store_true', default=False,
197  help='run over DPD (single job, INPUTDATA is DPD task name)')
198  deprecated.add_option('-i', '--interactive', dest='legacy_interactive', action='store_true', default=False,
199  help='run interatively (same as -r JobRunner)')
200  deprecated.add_option('-f', '--filter', dest='legacy_filter', default='',
201  help='use specified pattern to filter input files (default: *.root* for local files, .*ESD.* for castor)')
202  deprecated.add_option('-g', '--grid', dest='legacy_griduser', default='',
203  help='run on grid (GRIDUSER is user prefix of grid job name, e.g. user09.JuergBeringer; INPUTDATA is grid dataset name)')
204  deprecated.add_option('-s', '--gridsite', dest='gridsite', default='AUTO',
205  help='deprecated spelling of --grid-site')
206  deprecated.add_option('-r', '--runner', dest='legacy_runner', default='LSFJobRunner',
207  help='type of JobRunner (default: LSFJobRunner or PandaJobRunner)')
208  deprecated.add_option('-w', '--wait', dest='legacy_dowait', action='store_true', default=False,
209  help='wait for jobs to complete')
210  deprecated.add_option('-n', '--nfiles', dest='nfiles', type='int',
211  help='deprecated spelling of --files-per-job')
212  parser.add_option_group(deprecated)
213 
214  (opts,args) = parser.parse_args()
215  if len(args) not in [3, 4]:
216  parser.error('wrong number of command line arguments')
217 
218  joboptiontemplate = args[0]
219  dsname = args[1]
220  taskname = args[2]
221 
222  legacy_options = len(args) == 4
223  if legacy_options:
224  print ("WARNING: the four-argument invocation of runJobs is deprecated")
225  print ("WARNING: enabling (imperfect) legacy compatibility mode")
226  files = extract_file_list_legacy(args[3], opts)
227  grid_mode = bool(opts.legacy_griduser)
228  runner_type = opts.legacy_runner
229  if grid_mode:
230  runner_type = 'PandaJobRunner'
231  if opts.legacy_interactive:
232  runner_type = 'JobRunner'
233  else:
234  files = extract_file_list(opts)
235  grid_mode = opts.submit == 'grid'
236  runner_type = {
237  'lsf': 'LSFJobRunner',
238  'grid': 'PandaJobRunner',
239  'shell': 'ShellJobRunner',
240  'bg': 'BackgroundJobRunner',
241  'pdsf': 'PDSFJobRunner',
242  'simple': 'JobRunner',
243  'condor': 'HTCondorJobRunner',
244  }[opts.submit]
245  if grid_mode:
246  if not opts.in_dsid:
247  sys.exit('ERROR: For grid submission, a DSID must be given')
248  if not files:
249  sys.exit('ERROR: No input files found')
250 
251  flags = process_flags(opts, legacy=legacy_options)
252  flags['comment'] = cmd
253  flags['inputfiles'] = files
254  flags['joboptionpath'] = joboptiontemplate
255 
256  if grid_mode:
257  flags['inputds'] = files[0]
258  if opts.lbperjob:
259  sys.exit('ERROR: Bunching per LB not supported for grid jobs')
260  if not opts.outputfilelist:
261  sys.exit('ERROR: For grid jobs, must specify output files expclitly using option -o (e.g. -o dpd.root)')
262 
263  if opts.nfiles < 1 or (opts.legacy_runoverdpd and opts.nfiles == 1):
264  # run single job over all files:
265  flags['filesperjob'] = len(files)
266  if grid_mode:
267  sys.exit('ERROR: Must specify number of files per job explicitly when running on grid')
268 
269  workdir = os.path.join(os.getcwd(), dsname, taskname)
270  flags['jobdir'] = os.path.join(workdir, '%(jobnr)03i')
271  if os.path.exists(workdir):
272  sys.exit("ERROR: Task %s exists already for dataset %s (directory %s)" % (taskname,dsname,workdir))
273 
274  if opts.lbperjob:
275  flags['jobname'] = '-'.join([dsname, taskname, 'lb%(jobnr)03i'])
276  else:
277  flags['jobname'] = '-'.join([dsname, taskname, '%(jobnr)03i'])
278  if grid_mode or opts.legacy_runoverdpd:
279  flags['jobname'] = dsname + '-' + taskname
280 
281  runner = make_runner(runner_type, flags)
282 
283  if opts.alignmentfile:
284  runner.addFilesToPoolFileCatalog([opts.alignmentfile])
285 
286  if grid_mode:
287  runner.setParam('outputfileprefix','%(jobname)s-')
288  runner.setParam('addinputtopoolcatalog',False) # input is a grid dataset
289  runner.registerToBeCopied('alignmentfile')
290  runner.registerToBeCopied('beamspotfile')
291 
292  print()
293  runner.showParams(-1)
294  print()
295 
296  # Temporary warning. TODO: still needed?
297  if grid_mode and opts.autoconfparams:
298  print ("WARNING: Automatic configuration of parameters such as DetDescrVersion doesn't work yet on the grid!")
299  print (" Please be sure the values of each of the following parameters are specified explicitly above,")
300  print (" unless the defaults in the job option template are correct:\n")
301  print (" ", opts.autoconfparams)
302  print()
303 
304  print (len(files), "input file(s)/dataset found.")
305  print()
306  if not opts.testonly:
307  runner.configure()
308  if opts.taskdb != 'None':
309  try:
310  with TaskManager.TaskManager(opts.taskdb) as taskman:
311  taskman.addTask(dsname,taskname,joboptiontemplate,runner.getParam('release'),runner.getNJobs(),opts.postprocsteps,comment=cmd)
312  except:
313  print ('WARNING: Unable to add task to task manager database ' + opts.taskdb)
314  runner.run()
315  if opts.legacy_dowait and not grid_mode:
316  if not opts.legacy_interactive: runner.wait()
317  print()
318  print ("Job directories in %s for this task:" % workdir)
319  print()
320  os.system('ls -l %s' % workdir)
321  print()
322  print ("The following output file(s) were produced:")
323  print()
324  print (runner.getOutputFiles())
325  print()
runJobs.extract_file_list_legacy
def extract_file_list_legacy(inputdata, options)
Definition: runJobs.py:30
runJobs.make_runner
def make_runner(runner_type, flags)
Definition: runJobs.py:115
runJobs.extract_file_list
def extract_file_list(options)
Definition: runJobs.py:56
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
DerivationFramework::TriggerMatchingUtils::sorted
std::vector< typename T::value_type > sorted(T begin, T end)
Helper function to create a sorted vector from an unsorted one.
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
runJobs.process_flags
def process_flags(options, legacy=False)
Definition: runJobs.py:67
dbg::print
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
Definition: SGImplSvc.cxx:70
xAOD::bool
setBGCode setTAP setLVL2ErrorBits bool
Definition: TrigDecision_v1.cxx:60