ATLAS Offline Software
submissionTool.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 # Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 
4 """
5 This module is a wrapper to pathena, which does the following useful things:
6 - Retrieves the list of matrix-elememt weights stored in a given ATLAS MC samples, either
7 -- using the DSID_database
8 -- or by automatically downloading a test sample and checking manually which weights are
9 available when initialising the Rivet_i instances
10 - Produces Job options from templates stored in /data/RivetAnalysis*JO*py
11 - Automatic download of test samples if needed
12 
13 Author: Louie D. Corpe (UCL)
14 Email: l.corpe@cern.ch
15 """
16 
17 import os
18 import sys
19 import re
20 import optparse
21 import readDatabase as rDB
22 import rivet
23 
24 parser = optparse.OptionParser(usage="%prog [options]")
25 parser.add_option("-i", "--inputSamples", help="list of samples to submit. One per line", dest="inputSamples", default="example.txt")
26 parser.add_option("-l", "--label", help="label to keep track of what you submitted", dest="label", default="")
27 parser.add_option("--noSyst", help="process the systematics variations if there are any in ME", dest="noSyst", default=False, action="store_true")
28 parser.add_option("--testSampleDir", help="where to store the test samples. Must be away from the submission dir otherwise panda will try to send the test samples too...", dest="testSampleDir", default="../testSamples")
29 parser.add_option("-a", "--rivetAnalysis ", help="If running Rivet, which rivet analysis/analyses to run? The script will include it in the job. Supports comma-separeted list of analyses. If this is a custom plugin please make sure you have compiled it the submission directory using: rivet-buildplugin RivetAnalysis_<myanalysis>.so <myanalysis>.cc. eg ATLAS_2017_I1514251. If you are running AthAnalysis code, you should ignore this option", dest="analysis", default=None)
30 parser.add_option("--dryRun ", help="do everything except submit the jobs", dest="dryRun", default=False, action="store_true")
31 parser.add_option("--ds", "--downloadSample", help="if your sample DSID is not in the database, you can download one like this and get the weights from there. ", dest="downloadSample", default=False, action="store_true")
32 parser.add_option("--ef", "--extraFiles", help="comma separated list of any additional files your jobs need as inputs (eg config files, steering files...). Should be in your submission directory, and will get bundled up and submitted by panda.", dest="extraFiles", default="")
33 parser.add_option("-N", "--nJobs", help="[DEPRECATED] Number of jobs to prepare. This feeds into the pathena nJobs/split option. -1 tells pathena to work out a sensible value itself.", dest="nJobs", default=-1)
34 parser.add_option("--pathenaOptions", help="Pass any of the usual pathena options inside '', which will be propagated to job submission.", dest="pathenaOptions", default="")
35 parser.add_option("--nFilesPerJob", help="[DEPRECATED] Number of files to process in each job. This feeds into the pathena nFilesPerJob option. -1 tells pathena to work out a sensible value itself.", dest="nFilesPerJob", default=-1)
36 parser.add_option("-j", "--jo", "--templateJobOptions", help="Your template job options! Should contain a line 'systWeights=!SYSTWEIGHTS! which will get populated by this script. If running rivet analyses, you can leave this option blank and just fill --analysis with the list of Rivet analyses you want to run. For AthAnalysis JOs, see this as an example: source/ExampleDAODAnalysis/share/ExampleDAODAnalysisAlgJobOptions.py' ", dest="templateJobOptions", default=None)
37 (opts, args) = parser.parse_args()
38 
39 
40 submissionTemplatesDir = os.environ["SYSTTOOLSPATH"]
41 
42 
43 def findTestFile(testSampleDir, identifier):
44  """
45  `testSampleDir` String (look in this directory to try to find a matching test file)
46  `identifier` String (A string which uniquely defined the sample you are interested in
47  for example, the DSID of the sample of interest)
48  `result` String or None (String if a matching file name was found, None otherwise)
49 
50  check the specified directory for a test EVNT file for a specified DISD
51  """
52  for dirName in os.listdir(testSampleDir):
53  if identifier in dirName:
54  testSamplePath = testSampleDir + "/" + dirName
55  for fileName in os.listdir(testSamplePath):
56  if 'root' not in fileName: continue
57  if 'EVNT' not in fileName: continue
58  testSamplePath = testSamplePath + "/" + fileName
59  # if so, break: we don't need to download it.
60  return testSamplePath
61  break
62 
63 
64 def getTotalNEventsAndNFiles(thisSampleName):
65  """
66  `thisSampleName` String (name of sample to retrieve info for)
67  `result` int, int (nFilesInSample, nEventsInSample)
68 
69  download a tst file for the specified dataset name into the specified dir name
70  """
71  commandLine = "rucio list-files %s | tail -n3 > out.tmp" % (thisSampleName)
72  os.system(commandLine)
73  ftmp = open("out.tmp", 'r')
74  nTotalFiles = -1
75  nTotalEvents = -1
76  for line in ftmp.readlines():
77  line = line.strip().split(":")
78  if "Total files" in line[0]:
79  nTotalFiles = int(line[1])
80  if "Total events" in line[0]:
81  nTotalEvents = int(line[1])
82  print("[INFO] sample has nFiles=%d, with nEvents=%d" % (nTotalFiles, nTotalEvents))
83  return nTotalFiles, nTotalEvents
84 
85 
86 def downloadTestFile(testSampleDir, thisSampleName):
87  """
88  `testSampleDir` String (look in this directory to try to find a matching test file)
89  `thisSampleName` String (name of sample to download)
90  `result` String or None (String if a matching file name was found, None otherwise)
91 
92  download a tst file for the specified dataset name into the specified dir name
93  """
94  print("[INFO] do not currently have a test sample for ", thisSampleName)
95  commandLine = "rucio download --nrandom 1 %s --dir %s " % (thisSampleName, testSampleDir)
96  print("[INFO] --> downloading one using this command \n ", commandLine)
97  os.system(commandLine)
98  if ":" in thisSampleName:
99  testSamplePath = testSampleDir + "/" + thisSampleName.split(":")[1]
100  else:
101  testSamplePath = testSampleDir + "/" + thisSampleName
102  for fileName in os.listdir(testSamplePath):
103  testSamplePath = testSamplePath + "/" + fileName
104  return testSamplePath
105  break
106 
107 
109  """
110  return the nickname related to the grid certificate
111  """
112  try:
113  nickname = os.popen("voms-proxy-info -all | grep nickname").read()
114  nickname = nickname.split(' ')[4]
115  except Exception:
116  nickname = os.getlogin()
117  return nickname
118 
119 
120 def main(argv):
121  """
122  This module can also be run as a standalone executable.
123  For info about the options try:
124  submissionTool.py -h
125 
126  This tool is used to submit samples to the GRID, when once instance of Rivet_i per Matrix-Element
127  weight is initialised and run. Template Job Options are available in /data/RivetAnalysis_*JO*.py
128  """
129  print("======================================================================================= = ")
130  print("[INFO] processing files for ", opts.label, " using inputs ", opts.inputSamples)
131  print("[INFO] ignore systematics ? ", opts.noSyst)
132  print("[INFO] use ME-wights from downloaded test sample ? ", opts.downloadSample)
133  print("[INFO] location of downloaded samples = ", opts.testSampleDir)
134  print("======================================================================================= = ")
135 
136  isRivet = ((opts.templateJobOptions is None) and (opts.analysis is not None))
137  isAthAnalysis = ((opts.templateJobOptions is not None) and (opts.analysis is None))
138 
139  if (isRivet == isAthAnalysis):
140  print("Configuration error.")
141  print("If running rivet routines, you should provide arguments for --rivetAnalysis and NOT --templateJobOptions")
142  print("If running AthAnalysis code, you should provide arguments for --templateJobOptions and NOT --rivetAnalysis")
143  exit(1)
144 
145  if opts.nFilesPerJob > 0 or opts.nJobs > 0:
146  print("[WARNING] --nFilesPerJob and --nJobs options for submissionTool.py are deprecated. Advice it to let pAthena work this our by itself. If you really want to use thise options, specify them manually with --pathenaOptions")
147  exit(1)
148 
149  f = open(opts.inputSamples)
150  # loop through the samples we want to process
151  for line in f.readlines():
152  line = line.strip()
153  fracString = ""
154  if len(line.split(" ")) > 1:
155  fracString = line.split(" ")[1]
156  line = line.split(" ")[0]
157  if len(line) == 0: continue
158  if line[0] == "#": continue
159  if line.strip()[-1] == '/': line = line.strip()[0:-1]
160  # get sample name and DSID
161  thisSampleName = line.split()[0]
162  isOfficialProduction = True
163  if thisSampleName in re.findall("user.*", thisSampleName):
164  print("[INFO] this sample has been indentified as a user-geneated sample rather than official ATLAS production")
165  isOfficialProduction = False
166  # this is not an official ATLAS sample
167  # so no guarantee to be able to easily find
168  # a DSID where we normally expect it.
169  # in this case dsid is a dummy anyway.
170  # the safest thing to do is use the whole
171  # sample name (after the scope) to identify
172  # the sample.
173  if ":" not in thisSampleName: dsid = thisSampleName
174  else: dsid = thisSampleName.split(":")[1]
175  else:
176  dsid = thisSampleName.split(".")[1]
177  print("\n--- [INFO] processing DSID %s ---" % dsid)
178  testSamplePath = None
179 
180  frac = 1
181  nFilesInSample, nEventsInSample = -1, -1
182  if "" == fracString or fracString == "all" or fracString == "-1":
183  print("[INFO] sample specified with string '%s': processing all events" % fracString)
184  elif "%" in fracString:
185  nFilesInSample, nEventsInSample = getTotalNEventsAndNFiles(thisSampleName)
186  frac = float(fracString.replace("%", "")) * 0.01
187  print("[INFO] sample specified with string '%s': processing %d out of %d files, so %.2f%% of files" % (fracString, int(frac * nFilesInSample), nFilesInSample, 100 * (frac * nFilesInSample) / nFilesInSample))
188  opts.pathenaOptions += " --nFiles=%d " % int(frac * nFilesInSample)
189  elif "." in fracString and float(fracString) < 1:
190  nFilesInSample, nEventsInSample = getTotalNEventsAndNFiles(thisSampleName)
191  frac = float(fracString)
192  print("[INFO] sample specified with string '%s': processing %d out of %d files, so %.2f%% of files" % (fracString, int(frac * nFilesInSample), nFilesInSample, 100 * (frac * nFilesInSample) / nFilesInSample))
193  opts.pathenaOptions += " --nFiles=%d " % int(frac * nFilesInSample)
194  elif ("." not in fracString) and int(float(fracString)) > 1:
195  events = int(float(fracString))
196  nFilesInSample, nEventsInSample = getTotalNEventsAndNFiles(thisSampleName)
197  frac = float(events) / nEventsInSample
198  print("[INFO] sample specified with string '%s'. processing %d/%d=%.2f%% events, so %d/%d=%.2f%% of files" % (fracString, events, nEventsInSample, 100 * frac, int(frac * nFilesInSample), nFilesInSample, 100 * (frac * nFilesInSample) / nFilesInSample))
199  opts.pathenaOptions += " --nFiles=%d " % int(frac * nFilesInSample)
200  else:
201  print("[ERROR] malformed input string: %s. Should either be an integer number of events, a float between (0, 1) for the fraction of files, or a percentage of files, or 'all' (default)")
202  exit(1)
203 
204  os.system("mkdir -p %s" % opts.testSampleDir)
205  testSamplePath = findTestFile(opts.testSampleDir, dsid)
206  systWeights = []
207  if opts.noSyst and isRivet:
208  if float(rivet.version()[0]) >= 3:
209  opts.templateJobOptions = "%s/data/RivetAnalysis_JO_Rivet3noSyst.py" % (submissionTemplatesDir)
210  else:
211  opts.templateJobOptions = "%s/data/RivetAnalysis_JO_noSyst.py" % (submissionTemplatesDir)
212  if not opts.noSyst:
213  print("[INFO] Including the Systematic Variations stored as ME weights")
214  # we are doing systematics. Can we get the ME weight names from DB?
215  list_dictionary, list_keys = rDB.getWeights(dsid)
216  if len(list_dictionary.keys()) > 0 and not opts.downloadSample:
217  if not isOfficialProduction:
218  print("[ERROR] your sample:", thisSampleName)
219  print("[ERROR]... appears to be a user-generated dataset")
220  print("[ERROR]... and will not be present in the DSID_database")
221  print("[ERROR]... try again with option --downloadSamples")
222  exit(1)
223  print("[INFO] Obtaining ME weights from Database")
224  # retrieved ME weight names from DB, use the correct template
225  if isRivet:
226  if float(rivet.version()[0]) >= 3:
227  opts.templateJobOptions = "%s/data/RivetAnalysis_JO_Rivet3.py" % (submissionTemplatesDir)
228  else:
229  opts.templateJobOptions = "%s/data/RivetAnalysis_JO.py" % (submissionTemplatesDir)
230  systWeights = []
231  for weightType, weightInfo in rDB.getWeights(dsid)[0].items():
232  if isinstance(weightInfo['weights'], list):
233  weights = weightInfo['weights']
234  else: weights = [weightInfo['weights']]
235  for iw in weights:
236  if iw not in systWeights:
237  systWeights.append(iw)
238  else:
239  print("[INFO] Obtaining ME weights from download of test file")
240  # instead, download a test file of each DSID to be submitted,
241  # and manually get the list of ME weights it contains
242  if isRivet:
243  if float(rivet.version()[0]) >= 3:
244  opts.templateJobOptions = "%s/data/RivetAnalysis_JO_Rivet3.py" % (submissionTemplatesDir)
245  else:
246  opts.templateJobOptions = "%s/data/RivetAnalysis_JO_MEfromFile.py" % (submissionTemplatesDir)
247  # check if we already have a test file for that DSID
248  if (testSamplePath is None):
249  testSamplePath = downloadTestFile(opts.testSampleDir, thisSampleName)
250  print("[SUCCESS] found test file ", testSamplePath)
251  # make JOs for this DSID
252  if isAthAnalysis:
253  print("[INFO] attempting to retrieve weight names from metadata... this will crash if you are trying to submit EVNT files from R21 or DAOD files from R20...")
254  from PyUtils.MetaReader import read_metadata
255  systWeights = None
256  metadata = read_metadata(testSamplePath, None, 'full')[testSamplePath]
257  if '/Generation/Parameters' in metadata:
258  genpars = metadata['/Generation/Parameters']
259  if 'HepMCWeightNames' in genpars:
260  systWeights = genpars['HepMCWeightNames']
261  print("[SUCCESS] we found the following syst weights!", systWeights.keys())
262  else:
263  print('HepMCWeightName not found in /Generation/Parameters:')
264  print(genpars)
265  else:
266  print('/Generation/Parameters not found in metadata:')
267  print(metadata)
268 
269  thisSampleJobOption = opts.templateJobOptions.replace(".py", "_%s_%s.py" % (dsid, opts.label)).split("/")[-1]
270  os.system("cp %s %s" % (opts.templateJobOptions, thisSampleJobOption))
271  systWeights = str(systWeights).replace('\'', '"')
272  os.system("sed -i -e 's|!SYSTWEIGHTS!|%s|g' %s" % (systWeights, thisSampleJobOption))
273  os.system("sed -i -e 's|!DSID!|%s|g' %s" % (dsid, thisSampleJobOption))
274  os.system("sed -i -e 's|!TESTSAMPLE!|%s|g' %s" % (testSamplePath, thisSampleJobOption))
275  os.system("sed -i -e 's|!RIVETANALYSIS!|%s|g' %s" % (opts.analysis, thisSampleJobOption))
276  if ":" in thisSampleName:
277  outputSampleName = "user.%s." % (findUserNickname()) + thisSampleName.split(":")[1].replace(".evgen.EVNT", "").replace(".merge.EVNT", "") + ".RIVET.%s" % (opts.label)
278  else:
279  outputSampleName = "user.%s." % (findUserNickname()) + thisSampleName.replace(".evgen.EVNT", "").replace(".merge.EVNT", "") + ".RIVET.%s" % (opts.label)
280  pathenaCommandLine = r"pathena --nFilesPerJob = 100 --long --extOutFile \*.yoda --inDS=%s --outDS=%s --extFile = RivetAnalysis_%s.so,%s.yoda %s" % (thisSampleName, outputSampleName, opts.analysis, opts.analysis, thisSampleJobOption)
281  analysis_files = []
282  if isRivet:
283  for an in opts.analysis.split(","):
284  analysis_files += ["RivetAnalysis_%s.so" % an]
285  analysis_files += ["%s.yoda" % an]
286  analysis_files = ",".join(analysis_files)
287  if len(opts.extraFiles) > 0:
288  if len(analysis_files): analysis_files += "," + opts.extraFiles
289  else: analysis_files = opts.extraFiles
290  if len(analysis_files): analysis_files = "--extFile=%s" % analysis_files
291  rootOrYoda = "root" if not isRivet else 'yoda'
292  pathenaCommandLine = r"pathena --extOutFile \*.%s --inDS=%s --outDS=%s %s %s %s" % (rootOrYoda, thisSampleName, outputSampleName, analysis_files, thisSampleJobOption, opts.pathenaOptions)
293  if opts.dryRun:
294  pathenaCommandLine += " --noSubmit"
295  else:
296  os.system(pathenaCommandLine)
297 
298 
299 if __name__ == "__main__":
300  main(sys.argv[1:])
read
IovVectorMap_t read(const Folder &theFolder, const SelectionCriterion &choice, const unsigned int limit=10)
Definition: openCoraCool.cxx:569
replace
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition: hcg.cxx:307
submissionTool.main
def main(argv)
Definition: submissionTool.py:120
submissionTool.findTestFile
def findTestFile(testSampleDir, identifier)
Definition: submissionTool.py:43
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
python.MetaReader.read_metadata
def read_metadata(filenames, file_type=None, mode='lite', promote=None, meta_key_filter=None, unique_tag_info_values=True, ignoreNonExistingLocalFiles=False)
Definition: MetaReader.py:52
submissionTool.findUserNickname
def findUserNickname()
Definition: submissionTool.py:108
submissionTool.downloadTestFile
def downloadTestFile(testSampleDir, thisSampleName)
Definition: submissionTool.py:86
calibdata.exit
exit
Definition: calibdata.py:236
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
submissionTool.getTotalNEventsAndNFiles
def getTotalNEventsAndNFiles(thisSampleName)
Definition: submissionTool.py:64
TrigJetMonitorAlgorithm.items
items
Definition: TrigJetMonitorAlgorithm.py:79
Trk::open
@ open
Definition: BinningType.h:40
str
Definition: BTagTrackIpAccessor.cxx:11
dbg::print
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
Definition: SGImplSvc.cxx:70
readCCLHist.float
float
Definition: readCCLHist.py:83
Trk::split
@ split
Definition: LayerMaterialProperties.h:38