ATLAS Offline Software
runJobs.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4 # Front-end script to run JobRunner jobs
5 
6 
7 
8 __authors__ = ['Juerg Beringer', 'Carl Suster']
9 __version__ = 'runJobs.py atlas/athena'
10 __usage__ = """%prog [options] JOBOPTIONTEMPLATE DATASET TASK
11 
12 Templates: - InDetBeamSpotExample/VertexTemplate.py
13  - InDetBeamSpotExample/*Template.py
14  - your own template file
15 
16 See the comment field of jobs (e.g. MON.DB_BEAMSPOT jobs) in the beam spot
17 summary webpage for real usage examples.
18 """
19 
20 import sys
21 import glob
22 import os
23 import re
24 import subprocess
25 import InDetBeamSpotExample
26 from InDetBeamSpotExample import TaskManager
27 from InDetBeamSpotExample import DiskUtils
28 
29 def extract_file_list_legacy(inputdata, options):
30  ''' Reads several legacy options to work out what input data to use. '''
31  if options.legacy_griduser:
32  fs = [inputdata]
33  elif options.legacy_fromcastor:
34  # INPUTDATA specifies a directory with files
35  pattern = options.legacy_filter or (None if options.bytestream else '.*ESD.*')
36  fs = DiskUtils.FileSet.from_directory(inputdata).matching(pattern)
37  elif os.path.isfile(inputdata):
38  # inputdata is a text file with filenames
39  fs = DiskUtils.FileSet.from_file_containing_list(inputdata)
40  elif options.legacy_runoverdpd:
41  # INPUTDATA is filename
42  rundir = os.path.join(os.getcwd(), dsname)
43  if not os.path.exists(rundir):
44  raise Exception('Run ' + dsname + ' (directory ' + rundir + ') not found')
45  dpddir = os.path.join(rundir, inputdata)
46  if not os.path.exists(dpddir):
47  raise Exception('Dataset with name ' + inputdata + ' (directory ' + dpddir + ') not found')
48  fs = DiskUtils.FileSet.from_glob(os.path.join(dpddir, '*', '*-dpd.root*'))
49  else:
50  # INPUTDATA is a directory with files
51  pattern = options.legacy_filter or '*.root*'
52  fs = DiskUtils.FileSet.from_glob(os.path.join(inputdata, pattern))
53  return list(sorted(fs))
54 
55 def extract_file_list(options):
56  ''' Use new flags to work out input file list. '''
57  if options.in_directory:
58  fs = DiskUtils.FileSet.from_directory(options.in_directory)
59  elif options.in_list:
60  fs = DiskUtils.FileSet.from_file_containing_list(options.in_list)
61  return list(sorted(fs
62  .matching(options.f_match)
63  .excluding(options.f_exclude)
64  ))
65 
66 def process_flags(options, legacy=False):
67  flags = {
68  'inputds' : '',
69  'bytestream' : options.bytestream,
70  'DataSource' : 'geant4' if options.is_mc else 'data',
71  'evtmax' : options.evtmax,
72  'maxjobs' : options.maxjobs,
73  'outputlevel' : options.outputlevel,
74  'logmail' : options.users,
75  'alignmentfile' : options.alignmentfile,
76  'beamspotfile' : options.beamspotfile,
77  'autoconfparams' : options.autoconfparams,
78  'taskpostprocsteps' : options.postprocsteps,
79  'filesperjob' : options.nfiles,
80  'lbperjob' : options.lbperjob,
81  'batchqueue' : options.batchqueue,
82  'gridsite' : options.gridsite,
83  'addinputtopoolcatalog' : not (options.bytestream or options.submit == 'grid' or options.legacy_griduser),
84  }
85 
86  # List of desired output files. For grid jobs, it must specify exactly
87  # the expected files (otherwise the grid jobs will fail or not return the
88  # desired output). For all other jobs, it can be an inclusive list of
89  # possible outputs and JobRunner will return only the actually present
90  # output files when asked for output files.
91  if options.outputfilelist:
92  flags['outputfilelist'] = [ f.strip() for f in options.outputfilelist.split(',') ]
93  else:
94  flags['outputfilelist'] = ['dpd.root', 'nt.root', 'monitoring.root', 'beamspot.db']
95 
96  if legacy:
97  flags['griduser'] = options.legacy_griduser
98  else:
99  flags['griduser'] = '.'.join(['user', options.grid_user or os.getenv('USER')])
100 
101  if options.legacy_runoverdpd and not options.lbperjob:
102  flags['maxjobs'] = 1
103 
104  for s in options.params.split(', '):
105  if s:
106  try:
107  p = s.split('=', 1)
108  flags[p[0].strip()] = eval(p[1].strip())
109  except:
110  print ('\nERROR parsing user parameter', p, '- parameter will be ignored')
111 
112  return flags
113 
114 def make_runner(runner_type, flags):
115  runner_class = InDetBeamSpotExample.loadClass(runner_type)
116  return runner_class(**flags)
117 
118 if __name__ == '__main__':
119  cmd = subprocess.list2cmdline(sys.argv)
120 
121  from optparse import OptionParser, OptionGroup
122  parser = OptionParser(usage=__usage__, version=__version__)
123  parser.add_option('', '--bytestream', dest='bytestream', action='store_true', default=False,
124  help='input files are bytestream instead of ROOT/POOL files')
125  parser.add_option('-m', '--mc', dest='is_mc', action='store_true', default=False,
126  help='input data is from Monte-Carlo instead of data (automatically chooses between COMP200 and OFLP200 / CONDBR2 conditions DBs)')
127  parser.add_option('-j', '--maxjobs', dest='maxjobs', type='int', default=0,
128  help='max number of jobs (default: 0 ie no maximum)')
129  parser.add_option('', '--files-per-job', dest='nfiles', type='int', default=1, metavar='N',
130  help='number of files per job (default: 1, set to 0 for single job over all files)')
131  parser.add_option('-e', '--maxevents', dest='evtmax', type='int', default=-1,
132  help='max number of events per job')
133  parser.add_option('', '--lbperjob', dest='lbperjob', type='int', default=0, metavar='N',
134  help='number of luminosity blocks per job (default: 0 - no bunching)')
135  parser.add_option('-o', '--outputfilelist', dest='outputfilelist', default='', metavar='FILES',
136  help='list of desired output files (default: "dpd.root,nt.root,monitoring.root,beamspot.db"; must be specified explicitly for grid)')
137  parser.add_option('-k', '--taskdb', dest='taskdb', default='',
138  help='TaskManager database (default: from TASKDB or sqlite_file:taskdata.db; set to string None to avoid using a task database)')
139  parser.add_option('-l', '--logmail', dest='users', default='', metavar='USERS',
140  help='send log mail to specified users (default: no mail)')
141  parser.add_option('-z', '--postprocsteps', dest='postprocsteps', default='JobPostProcessing', metavar='STEPS',
142  help='Task-level postprocessing steps (Default: JobPostProcessing)')
143  parser.add_option('-t', '--test', dest='testonly', action='store_true', default=False,
144  help='show only options and input files')
145  parser.add_option('-v', '--verbosity', dest='outputlevel', type='int', default=4, metavar='LEVEL',
146  help='output level (default:4, where 1=VERBOSE, 2=DEBUG, 3=INFO, 4=WARNING, 5=ERROR, 6=FATAL)')
147  parser.add_option('-p', '--params', dest='params', default='',
148  help='job option parameters to pass to job option template')
149  parser.add_option('', '--autoconfparams', dest='autoconfparams', default='DetDescrVersion',
150  help='comma-separated list of automatically determined parameters (template must include AutoConfFragment.py, default: "DetDescrVersion")')
151 
152  # Additional optional files requiring special treatment (other parameters
153  # should be passed to the job option template via "-p params")
154  parser.add_option('-a', '--alignment-file', dest='alignmentfile', default='', metavar='FILE',
155  help='alignment file (default: none)')
156  parser.add_option('-b', '--beamspot-file', dest='beamspotfile', default='', metavar='FILE',
157  help='beam spot SQLite file (default: none)')
158 
159  execopt = OptionGroup(parser, 'Execution Options')
160  execopt.add_option('', '--submit', dest='submit', default='condor', metavar='TYPE',
161  choices=['grid', 'lsf', 'shell', 'bg', 'pdsf', 'simple', 'condor'],
162  help='submission type (default: condor, choices: grid,lsf,shell,bg,pdsf,simple,condor)')
163  execopt.add_option('', '--grid-user', dest='grid_user', default=None, metavar='USER',
164  help='grid username (default: $USER)')
165  execopt.add_option('', '--grid-site', dest='gridsite', default='AUTO', metavar='SITE',
166  help='site name where jobs are sent (default: AUTO)')
167  execopt.add_option('-q', '--queue', dest='batchqueue', default='atlasb1',
168  help='batch queue (default: atlasb1)')
169  parser.add_option_group(execopt)
170 
171  inopt = OptionGroup(parser, 'Input File Options',
172  "One of these must be specified.")
173  inopt.add_option('', '--directory', dest='in_directory', metavar='DIR',
174  help='run over all matching files in the directory')
175  inopt.add_option('', '--file-list', dest='in_list', metavar='FILE',
176  help='run over all matching files in the directory')
177  inopt.add_option('', '--dsid', dest='in_dsid', metavar='DSID',
178  help='run over a rucio DSID')
179  parser.add_option_group(inopt)
180 
181  filtopt = OptionGroup(parser, 'Input Filtering Options',
182  "Optional filters to select input files.")
183  inopt.add_option('', '--match', dest='f_match', default=None, metavar='REGEX',
184  help='keep only files matching the pattern')
185  inopt.add_option('', '--exclude', dest='f_exclude', default=None, metavar='REGEX',
186  help='skip files matching the pattern')
187  parser.add_option_group(filtopt)
188 
189  # TODO Check if these flags can be removed:
190  deprecated = OptionGroup(parser, 'Deprecated Options')
191  deprecated.add_option('-c', '--castor', dest='legacy_fromcastor', action='store_true', default=False,
192  help='INPUTDATA refers to CASTOR directory')
193  deprecated.add_option('', '--prefix', dest='legacy_prefix', default='',
194  help='Prefix for reading files from mass storage (ignored)')
195  deprecated.add_option('-d', '--dpd', dest='legacy_runoverdpd', action='store_true', default=False,
196  help='run over DPD (single job, INPUTDATA is DPD task name)')
197  deprecated.add_option('-i', '--interactive', dest='legacy_interactive', action='store_true', default=False,
198  help='run interatively (same as -r JobRunner)')
199  deprecated.add_option('-f', '--filter', dest='legacy_filter', default='',
200  help='use specified pattern to filter input files (default: *.root* for local files, .*ESD.* for castor)')
201  deprecated.add_option('-g', '--grid', dest='legacy_griduser', default='',
202  help='run on grid (GRIDUSER is user prefix of grid job name, e.g. user09.JuergBeringer; INPUTDATA is grid dataset name)')
203  deprecated.add_option('-s', '--gridsite', dest='gridsite', default='AUTO',
204  help='deprecated spelling of --grid-site')
205  deprecated.add_option('-r', '--runner', dest='legacy_runner', default='LSFJobRunner',
206  help='type of JobRunner (default: LSFJobRunner or PandaJobRunner)')
207  deprecated.add_option('-w', '--wait', dest='legacy_dowait', action='store_true', default=False,
208  help='wait for jobs to complete')
209  deprecated.add_option('-n', '--nfiles', dest='nfiles', type='int',
210  help='deprecated spelling of --files-per-job')
211  parser.add_option_group(deprecated)
212 
213  (opts,args) = parser.parse_args()
214  if len(args) not in [3, 4]:
215  parser.error('wrong number of command line arguments')
216 
217  joboptiontemplate = args[0]
218  dsname = args[1]
219  taskname = args[2]
220 
221  legacy_options = len(args) == 4
222  if legacy_options:
223  print ("WARNING: the four-argument invocation of runJobs is deprecated")
224  print ("WARNING: enabling (imperfect) legacy compatibility mode")
225  files = extract_file_list_legacy(args[3], opts)
226  grid_mode = bool(opts.legacy_griduser)
227  runner_type = opts.legacy_runner
228  if grid_mode:
229  runner_type = 'PandaJobRunner'
230  if opts.legacy_interactive:
231  runner_type = 'JobRunner'
232  else:
233  files = extract_file_list(opts)
234  grid_mode = opts.submit == 'grid'
235  runner_type = {
236  'lsf': 'LSFJobRunner',
237  'grid': 'PandaJobRunner',
238  'shell': 'ShellJobRunner',
239  'bg': 'BackgroundJobRunner',
240  'pdsf': 'PDSFJobRunner',
241  'simple': 'JobRunner',
242  'condor': 'HTCondorJobRunner',
243  }[opts.submit]
244  if grid_mode:
245  if not opts.in_dsid:
246  sys.exit('ERROR: For grid submission, a DSID must be given')
247  if not files:
248  sys.exit('ERROR: No input files found')
249 
250  flags = process_flags(opts, legacy=legacy_options)
251  flags['comment'] = cmd
252  flags['inputfiles'] = files
253  flags['joboptionpath'] = joboptiontemplate
254 
255  if grid_mode:
256  flags['inputds'] = files[0]
257  if opts.lbperjob:
258  sys.exit('ERROR: Bunching per LB not supported for grid jobs')
259  if not opts.outputfilelist:
260  sys.exit('ERROR: For grid jobs, must specify output files expclitly using option -o (e.g. -o dpd.root)')
261 
262  if opts.nfiles < 1 or (opts.legacy_runoverdpd and opts.nfiles == 1):
263  # run single job over all files:
264  flags['filesperjob'] = len(files)
265  if grid_mode:
266  sys.exit('ERROR: Must specify number of files per job explicitly when running on grid')
267 
268  workdir = os.path.join(os.getcwd(), dsname, taskname)
269  flags['jobdir'] = os.path.join(workdir, '%(jobnr)03i')
270  if os.path.exists(workdir):
271  sys.exit("ERROR: Task %s exists already for dataset %s (directory %s)" % (taskname,dsname,workdir))
272 
273  if opts.lbperjob:
274  flags['jobname'] = '-'.join([dsname, taskname, 'lb%(jobnr)03i'])
275  else:
276  flags['jobname'] = '-'.join([dsname, taskname, '%(jobnr)03i'])
277  if grid_mode or opts.legacy_runoverdpd:
278  flags['jobname'] = dsname + '-' + taskname
279 
280  runner = make_runner(runner_type, flags)
281 
282  if opts.alignmentfile:
283  runner.addFilesToPoolFileCatalog([opts.alignmentfile])
284 
285  if grid_mode:
286  runner.setParam('outputfileprefix','%(jobname)s-')
287  runner.setParam('addinputtopoolcatalog',False) # input is a grid dataset
288  runner.registerToBeCopied('alignmentfile')
289  runner.registerToBeCopied('beamspotfile')
290 
291  print()
292  runner.showParams(-1)
293  print()
294 
295  # Temporary warning. TODO: still needed?
296  if grid_mode and opts.autoconfparams:
297  print ("WARNING: Automatic configuration of parameters such as DetDescrVersion doesn't work yet on the grid!")
298  print (" Please be sure the values of each of the following parameters are specified explicitly above,")
299  print (" unless the defaults in the job option template are correct:\n")
300  print (" ", opts.autoconfparams)
301  print()
302 
303  print (len(files), "input file(s)/dataset found.")
304  print()
305  if not opts.testonly:
306  runner.configure()
307  if opts.taskdb != 'None':
308  try:
309  with TaskManager.TaskManager(opts.taskdb) as taskman:
310  taskman.addTask(dsname,taskname,joboptiontemplate,runner.getParam('release'),runner.getNJobs(),opts.postprocsteps,comment=cmd)
311  except:
312  print ('WARNING: Unable to add task to task manager database ' + opts.taskdb)
313  runner.run()
314  if opts.legacy_dowait and not grid_mode:
315  if not opts.legacy_interactive: runner.wait()
316  print()
317  print ("Job directories in %s for this task:" % workdir)
318  print()
319  os.system('ls -l %s' % workdir)
320  print()
321  print ("The following output file(s) were produced:")
322  print()
323  print (runner.getOutputFiles())
324  print()
DerivationFramework::TriggerMatchingUtils::sorted
std::vector< typename R::value_type > sorted(const R &r, PROJ proj={})
Helper function to create a sorted vector from an unsorted range.
runJobs.extract_file_list_legacy
def extract_file_list_legacy(inputdata, options)
Definition: runJobs.py:29
runJobs.make_runner
def make_runner(runner_type, flags)
Definition: runJobs.py:114
runJobs.extract_file_list
def extract_file_list(options)
Definition: runJobs.py:55
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
runJobs.process_flags
def process_flags(options, legacy=False)
Definition: runJobs.py:66
xAOD::bool
setBGCode setTAP setLVL2ErrorBits bool
Definition: TrigDecision_v1.cxx:60