ATLAS Offline Software
CPGridRun.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 # Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
4 from AnaAlgorithm.DualUseConfig import isAthena
5 from AnaAlgorithm.Logging import logging
6 import argparse
7 import sys
8 import os
9 
10 logCPGridRun = logging.getLogger('CPGridRun')
11 class CPGridRun:
12  def __init__(self):
15  self._runscript = None
16  if self.args.help:
17  self._initRunscript()
18  self.printHelp()
19  sys.exit(0)
20  self._tarfile = 'cpgrid.tar.gz'
21  self._isFirstRun = True
22  self._tarballRecreated = False
23  self._inputList = None
24  self._errorCollector = {} # Delay the error collection until the end of the script for better user experience
25  self.cmd = {} # sample name -> command
26 
27  def _initRunscript(self):
28  if self._runscript is not None:
29  return self._runscript
30  elif isAthena:
31  from AnalysisAlgorithmsConfig.AthenaCPRunScript import AthenaCPRunScript
32  self._runscript = AthenaCPRunScript()
33  else:
34  from AnalysisAlgorithmsConfig.EventLoopCPRunScript import EventLoopCPRunScript
35  self._runscript = EventLoopCPRunScript()
36  return self._runscript
37 
39  parser = argparse.ArgumentParser(description='CPGrid runscript to submit CPRun.py jobs to the grid. '
40  'This script will submit a job to the grid using files in the input text one by one.'
41  'CPRun.py can handle multiple sources of input and create one output; but not this script',
42  add_help=False,
43  formatter_class=argparse.RawTextHelpFormatter)
44  parser.add_argument('-h', '--help', dest='help', action='store_true', help='Show this help message and continue')
45 
46  ioGroup = parser.add_argument_group('Input/Output file configuration')
47  ioGroup.add_argument('-i','--input-list', dest='input_list', help='Path to the text file containing list of containers on the panda grid. Each container will be passed to prun as --inDS and is run individually')
48  ioGroup.add_argument('--output-files', dest='output_files', nargs='+', default=['output.root'],
49  help='The output files of the grid job. Example: --output-files A.root B.txt B.root results in A/A.root, B/B.txt, B/B.root in the output directory. No need to specify if using CPRun.py')
50  ioGroup.add_argument('--destSE', dest='destSE', default='', type=str, help='Destination storage element (PanDA)')
51  ioGroup.add_argument('--mergeType', dest='mergeType', default='Default', type=str, help='Output merging type, [None, Default, xAOD]')
52 
53  pandaGroup = parser.add_argument_group('Input/Output naming configuration')
54  pandaGroup.add_argument('--gridUsername', dest='gridUsername', default=os.getenv('USER', ''), type=str, help='Grid username, or the groupname. Default is the current user. Only affect file naming')
55  pandaGroup.add_argument('--prefix', dest='prefix', default='', type=str, help='Prefix for the output directory. Dynamically set with input container if not provided')
56  pandaGroup.add_argument('--suffix', dest='suffix', default='',type=str, help='Suffix for the output directory')
57  pandaGroup.add_argument('--outDS', dest='outDS', default='', type=str,
58  help='Name of an output dataset. outDS will contain all output files (PanDA). If not provided, support dynamic naming if input name is in the Atlas production format or typical user production format')
59 
60  cpgridGroup = parser.add_argument_group('CPGrid configuration')
61  cpgridGroup.add_argument('--groupProduction', dest='groupProduction', action='store_true', help='Only use for official production')
62 
63  cpgridGroup.add_argument('--exec', dest='exec', type=str,
64  help='Executable line for the CPRun.py or custom script to run on the grid encapsulated in a double quote (PanDA)\n'
65  'Run CPRun.py with preset behavior including streamlined file i/o. E.g, "CPRun.py -t config.yaml --no-systematics".\n'
66  'Run custom script: "customRun.py -i inputs -o output --text-config config.yaml --flagA --flagB"\n'
67  )
68 
69  submissionGroup = parser.add_argument_group('Submission configuration')
70  submissionGroup.add_argument('-y', '--agreeAll', dest='agreeAll', action='store_true', help='Agree to all the submission details without asking for confirmation. Use with caution!')
71  submissionGroup.add_argument('--noSubmit', dest='noSubmit', action='store_true', help='Do not submit the job to the grid (PanDA). Useful to inspect the prun command')
72  submissionGroup.add_argument('--testRun', dest='testRun', action='store_true', help='Will submit job to the grid but greatly limit the number of files per job (10) and number of events (300)')
73  submissionGroup.add_argument('--checkInputDS', dest='checkInputDS', action='store_true', help='Check if the input datasets are available on the AMI.')
74  submissionGroup.add_argument('--recreateTar', dest='recreateTar', action='store_true', help='Re-compress the source code. Source code are compressed by default in submission, this is useful when the source code is updated')
75  self.args, self.unknown_args = parser.parse_known_args()
76  self.outputFilesParsing()
77  return parser
78 
79  def _createPrunArgsDict(self) -> dict:
80  '''
81  converting unknown args to a dictionary
82  '''
83  unknownArgsDict = self._unknownArgsDict()
84  if unknownArgsDict and self.hasPrun():
85  self._checkPrunArgs(unknownArgsDict)
86  logCPGridRun.info(f"Adding prun exclusive arguments: {unknownArgsDict.keys()}")
87  elif unknownArgsDict:
88  logCPGridRun.warning(f"Unknown arguments detected: {unknownArgsDict}. Cannot check the availablility in Prun because Prun is not available / noSubmit is on.")
89  else:
90  pass
91  return unknownArgsDict
92 
93  @property
94  def inputList(self):
95  if self._inputList is None:
96  if self.args.input_list.endswith('.txt'):
97  self._inputList = CPGridRun._parseInputFileList(self.args.input_list)
98  elif self.args.input_list.endswith('.json'):
99  raise NotImplementedError('JSON input list parsing is not implemented')
100  elif CPGridRun.isAtlasProductionFormat(self.args.input_list):
101  self._inputList = [self.args.input_list]
102  else:
103  raise ValueError(
104  'use --input-list to specify input containers')
105  return self._inputList
106 
108  output_files = []
109  for output in self.args.output_files:
110  if ',' in output:
111  output_files.extend(output.split(','))
112  else:
113  output_files.append(output)
114  self.output_files = output_files
115 
116  def printHelp(self):
117  self.gridParser.print_help()
118  logCPGridRun.info("\033[92m\n If you are using CPRun.py, the following flags are for the CPRun.py in this framework\033[0m")
119  self._runscript.parser.usage = argparse.SUPPRESS
120  self._runscript.parser.print_help()
121 
122  def getParser(self):
123  return self.gridParser
124 
125  # This function do all the checking, cleaning and preparing the command to be submitted to the grid
126  # separated for client to be able to change the behavior
128  for input in self.inputList:
129  cmd = self.configureSubmissionSingleSample(input)
130  self.cmd[input] = cmd
131  self._isFirstRun = False
132 
134  config = {
135  'inDS': input,
136  'outDS': self.args.outDS if self.args.outDS else self.outputDSFormatter(input) ,
137  'useAthenaPackages': True,
138  'cmtConfig': os.environ["CMTCONFIG"],
139  'writeInputToTxt': 'IN:in.txt',
140  'outputs': self.outputsFormatter(),
141  'exec': self.execFormatter(),
142  'memory': "2000", # MB
143  'addNthFieldOfInDSToLFN': '2,3,6',
144  }
145  if self.args.noSubmit:
146  config['noSubmit'] = True
147 
148  if self.args.mergeType == 'xAOD':
149  config['mergeScript'] = 'xAODMerge %OUT `echo %IN | sed \'s/,/ /g\'`'
150 
151  if self.args.mergeType != 'None':
152  config['mergeOutput'] = True
153 
154  if not self._tarballRecreated and (self.args.recreateTar or not os.path.exists(self._tarfile) or self._filesChanged()):
155  config['outTarBall'] = self._tarfile
156  self._tarballRecreated = True
157  elif os.path.exists(self._tarfile) or self._tarballRecreated:
158  config['inTarBall'] = self._tarfile
159 
160  if self.args.groupProduction:
161  config['official'] = True
162  config['voms'] = f'atlas:/atlas/{self.args.gridUsername}/Role=production'
163 
164  if self.args.destSE:
165  config['destSE'] = self.args.destSE
166 
167  if self.args.testRun:
168  config['nEventsPerFile'] = 300
169  config['nFiles'] = 10
170  config.update(self.prunArgsDict)
171  cmd = 'prun \\\n'
172  for k, v in config.items():
173  if isinstance(v, bool) and v:
174  cmd += f'--{k} \\\n'
175  elif v is not None and v != '':
176  cmd += f'--{k} {v} \\\n'
177  return cmd.rstrip(' \\\n')
178 
179  def _unknownArgsDict(self)->dict:
180  '''
181  Cleans the unknown args by removing leading dashes and ensuring they are in key-value pairs
182  '''
183  unknown_args_dict = {}
184  idx = 0
185  while idx < len(self.unknown_args):
186  if self.unknown_args[idx].startswith('-'):
187  if idx + 1 < len(self.unknown_args) and not self.unknown_args[idx + 1].startswith('-'):
188  unknown_args_dict[self.unknown_args[idx].lstrip('-')] = self.unknown_args[idx + 1]
189  idx += 2
190  else:
191  unknown_args_dict[self.unknown_args[idx].lstrip('-')] = True
192  idx += 1
193  return unknown_args_dict
194 
195  def _checkPrunArgs(self,argDict):
196  '''
197  check the arguments against the prun script to ensure they are valid
198  See https://github.com/PanDAWMS/panda-client/blob/master/pandaclient/PrunScript.py
199  '''
200  import pandaclient.PrunScript
201  # We need to temporarily clear the sys.argv to avoid the parser from PrunScript to fail
202  original_argv = sys.argv
203  sys.argv = ['prun'] # Reset sys.argv to only contain the script name
204  prunArgsDict = {}
205  prunArgsDict = pandaclient.PrunScript.main(get_options=True)
206  sys.argv = original_argv # Restore the original sys.argv
207  nonPrunOrCPGridArgs = []
208  for arg in argDict:
209  if arg not in prunArgsDict:
210  nonPrunOrCPGridArgs.append(arg)
211  if nonPrunOrCPGridArgs:
212  logCPGridRun.error(f"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
213  raise ValueError(f"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
214 
215  def printInputDetails(self):
216  for key, cmd in self.cmd.items():
217  parsed_name = CPGridRun.atlasProductionNameParser(key)
218  logCPGridRun.info("\n"
219  f"Input: {key}\n" +
220  "\n".join([f" {k.replace('_', ' ').title()}: {v}" for k, v in parsed_name.items()]))
221  logCPGridRun.info(f"Command: \n{cmd}")
222  print("-" * 70)
223  # Add your submission logic here
224 
225  def hasPyami(self):
226  try:
227  global pyAMI
228  import pyAMI.client
229  import pyAMI.atlas.api
230  except ModuleNotFoundError:
231  self._errorCollector['no AMI'] = (
232  "Cannot import pyAMI, please run the following commands:\n\n"
233  "```\n"
234  "lsetup pyami\n"
235  "voms-proxy-init -voms atlas\n"
236  "```\n"
237  "and make sure you have a valid certificate.")
238  return False
239  return True
240 
241  def checkInputInPyami(self) -> bool:
242  if not self.hasPyami():
243  return False
244 
245  client = pyAMI.client.Client('atlas')
246  pyAMI.atlas.api.init()
247 
248  queries, datasetPtag = self._prepareAmiQueryFromInputList()
249  try:
250  results = pyAMI.atlas.api.list_datasets(client, patterns=queries)
251  except pyAMI.exception.Error:
252  self._errorCollector['no valid certificate'] = (
253  "Cannot query AMI, please run 'voms-proxy-init -voms atlas' and ensure your certificate is valid.")
254  return False
255 
256  return self._analyzeAmiResults(results, datasetPtag)
257 
259  '''
260  Helper function to prepare a list of queries for the AMI based on the input list.
261  It will replace the _p### with _p% to match the latest ptag.
262  '''
263  import re
264  regex = re.compile("_p[0-9]+")
265  queries = []
266  datasetPtag = {}
267  for datasetName in self.cmd:
268  parsed = CPGridRun.atlasProductionNameParser(datasetName)
269  datasetPtag[datasetName] = parsed.get('ptag')
270  queries.append(regex.sub("_p%", datasetName))
271  return queries, datasetPtag
272 
273  def _analyzeAmiResults(self, results, datasetPtag) -> bool:
274  import re
275  regex = re.compile("_p[0-9]+")
276  results = [r['ldn'] for r in results]
277  notFound = []
278  latestPtag = {}
279 
280  for datasetName in self.cmd:
281  if datasetName not in results:
282  notFound.append(datasetName)
283 
284  base = regex.sub("_p%", datasetName)
285  matching = [r for r in results if r.startswith(base.replace("_p%", ""))]
286  for m in matching:
287  mParsed = CPGridRun.atlasProductionNameParser(m)
288  try:
289  mPtagInt = int(mParsed.get('ptag', 'p0')[1:])
290  currentPtagInt = int(datasetPtag.get(datasetName, 'p0')[1:])
291  if mPtagInt > currentPtagInt:
292  latestPtag[datasetName] = f"p{mPtagInt}"
293  except (ValueError, TypeError):
294  continue
295 
296  if latestPtag:
297  logCPGridRun.info("Newer version of datasets found in AMI:")
298  for name, ptag in latestPtag.items():
299  logCPGridRun.info(f"{name} -> ptag: {ptag}")
300 
301  if notFound:
302  logCPGridRun.error("Some input datasets are not available in AMI, missing datasets are likely to fail on the grid:")
303  logCPGridRun.error(", ".join(notFound))
304  return False
305 
306  return True
307 
308  def outputDSFormatter(self, name):
309  if CPGridRun.isAtlasProductionFormat(name):
310  return self._outputDSFormatter(name)
311  else:
312  return self._customOutputDSFormatter(name)
313 
314  def _outputDSFormatter(self, name):
315  '''
316  {group/user}.{username}.{prefix}.{DSID}.{format}.{tags}.{suffix}
317  '''
318  nameParser = CPGridRun.atlasProductionNameParser(name)
319  base = 'group' if self.args.groupProduction else 'user'
320  username = self.args.gridUsername
321  dsid = nameParser['DSID']
322  tags = '_'.join(nameParser['tags'])
323  fileFormat = nameParser['format']
324  base = 'group' if self.args.groupProduction else 'user'
325  prefix = self.args.prefix if self.args.prefix else nameParser['main'].split('_')[0] # Dynamically set the prefix, likely to be something like PhPy8Eg
326  suffix = self._suffixFormatter()
327 
328  result = [base, username, prefix, dsid, fileFormat, tags, suffix]
329  return ".".join(filter(None, result))
330 
331  def _customOutputDSFormatter(self, name):
332  '''
333  {group/user}.{username}.{main}.outputDS.{suffix}
334  '''
335  parts = name.split('.')
336  base = 'group' if self.args.groupProduction else 'user'
337  username = self.args.gridUsername
338  main = parts[2]
339  outputDS = 'outputDS'
340  suffix = parts[-1]
341 
342  result = [base, username,main, outputDS, suffix]
343  return ".".join(filter(None, result))
344 
345  def _suffixFormatter(self):
346  if self.args.suffix:
347  return self.args.suffix
348  if self.args.testRun:
349  import uuid
350  return f"test_{uuid.uuid4().hex[:6]}"
351  else:
352  ''
353 
354  def _filesChanged(self):
355  tarball_mtime = os.path.getmtime(self._tarfile) if os.path.exists(self._tarfile) else 0
356  buildDir = self._buildDir()
357  sourceDir = self._sourceDir()
358 
359  # Check for changes in buildDir
360  for root, _, files in os.walk(buildDir):
361  for file in files:
362  file_path = os.path.join(root, file)
363  try:
364  if os.path.getmtime(file_path) > tarball_mtime:
365  logCPGridRun.info(f"File {file_path} is newer than the tarball.")
366  return True
367  except FileNotFoundError:
368  continue
369 
370  # Check for changes in sourceDir
371  if sourceDir is None:
372  logCPGridRun.warning("Source directory is not detected, auto-compression is not performed. Use --recreateTar to update the submission")
373  return False
374  for root, _, files in os.walk(sourceDir):
375  for file in files:
376  file_path = os.path.join(root, file)
377  try:
378  if os.path.getmtime(file_path) > tarball_mtime:
379  logCPGridRun.info(f"File {file_path} is newer than the tarball.")
380  return True
381  except FileNotFoundError:
382  continue
383  return False
384 
385  def _buildDir(self):
386  buildDir = os.environ["CMAKE_PREFIX_PATH"]
387  buildDir = os.path.dirname(buildDir.split(":")[0])
388  return buildDir
389 
390  def _sourceDir(self):
391  cmakeCachePath = os.path.join(self._buildDir(), 'CMakeCache.txt')
392  sourceDir = None
393  if not os.path.exists(cmakeCachePath):
394  return sourceDir
395  with open(cmakeCachePath, 'r') as cmakeCache:
396  for line in cmakeCache:
397  if '_SOURCE_DIR:STATIC=' in line:
398  sourceDir = line.split('=')[1].strip()
399  break
400  return sourceDir
401 
402  def execFormatter(self):
403  # Check if the execution command starts with 'CPRun.py' or '-'
404  isCPRunDefault = self.args.exec.startswith('-') or self.args.exec.startswith('CPRun.py')
405  formatingClause = {
406  'input_list': 'in.txt',
407  'merge_output_files': True,
408  }
409  if not isCPRunDefault:
410  if self._isFirstRun: logCPGridRun.warning("Non-CPRun.py is detected, please ensure the exec string is formatted correctly. Exec string will not be automatically formatted.")
411  return f'"{self.args.exec}"'
412 
413  # Parse the exec string using the parser to validate and extract known arguments
414  self._initRunscript()
415  runscriptArgs, unknownArgs = self._runscript.parser.parse_known_args(self.args.exec.split(' '))
416 
417  # Throw error if unknownArgs contains any --args
418  unknown_flags = [arg for arg in unknownArgs if arg.startswith('--')]
419  if unknown_flags:
420  logCPGridRun.error(f"Unknown flags detected in the exec string: {unknown_flags}. Please check the exec string.")
421  raise ValueError(f"Unknown arguments detected: {unknown_flags}")
422 
423  # Only override if value is None or the parser default
424  for key, value in formatingClause.items():
425  if hasattr(runscriptArgs, key):
426  old_value = getattr(runscriptArgs, key)
427  if old_value is None or old_value == self._runscript.parser.get_default(key):
428  setattr(runscriptArgs, key, value)
429  if self._isFirstRun: logCPGridRun.info(f"Setting '{key}' to '{value}' (CPRun.py default is: '{old_value}')")
430  else:
431  if self._isFirstRun: logCPGridRun.warning(f"Preserving user-defined '{key}': '{old_value}', default formatting '{value}' will not be applied.")
432  else:
433  logCPGridRun.error(f"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
434  raise ValueError(f"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
435  self._checkYamlExists(runscriptArgs)
436  # Return the formatted arguments as a string
437  arg_string = ' '.join(
438  f'--{k.replace("_", "-")}' if isinstance(v, bool) and v else
439  f'--{k.replace("_", "-")} {v}' for k, v in vars(runscriptArgs).items() if v not in [None, False]
440  )
441  return f'"CPRun.py {arg_string}"'
442 
443  def _checkYamlExists(self, runscriptArgs):
444  from AnalysisAlgorithmsConfig.CPBaseRunner import CPBaseRunner
445  if not hasattr(runscriptArgs, 'text_config'):
446  self._errorCollector['no yaml'] = "No YAML configuration file is specified in the exec string. Please provide one using --text-config"
447  return
448  yamlPath = getattr(runscriptArgs, 'text_config')
449  haveLocalYaml = CPBaseRunner.findLocalPathYamlConfig(yamlPath)
450  if haveLocalYaml:
451  logCPGridRun.warning("A path to a local YAML configuration file is found, but it may not be grid-usable.")
452 
453  repoYamls = CPBaseRunner.findRepoPathYamlConfig(yamlPath)
454  if repoYamls and len(repoYamls) > 1:
455  self._errorCollector['ambiguous yamls'] = f'Multiple files named \"{yamlPath}\" found in the analysis repository. Please provide a more specific path to the config file.\nMatches found:\n' + '\n'.join(repoYamls)
456  return
457  elif repoYamls and len(repoYamls) == 1:
458  logCPGridRun.info(f"Found a grid-usable YAML configuration file in the analysis repository: {repoYamls[0]}")
459  return
460 
461  if not repoYamls:
462  self._errorCollector['no usable yaml'] = f"Grid usable YAML configuration file not found: {yamlPath}"
463  if haveLocalYaml:
464  self._errorCollector['have local yaml'] = f"Only a local YAML configuration file is found: {yamlPath}, not usable in the grid.\n" \
465  f"Make sure the YAML file is in build/x86_64-el9-gcc14-opt/data/package_name/config.yaml. You can install the YAML file through CMakeList.txt with `atlas_install_data( data/* )`; use `-t package_name/config.yaml` in the --exec"
466 
467  def outputsFormatter(self):
468  outputs = [f'{output.split(".")[0]}:{output}' for output in self.args.output_files]
469  return ','.join(outputs)
470 
471  def hasPrun(self) -> bool:
472  import shutil
473  prun_path = shutil.which("prun")
474  if prun_path is None:
475  self._errorCollector['no prun'] = (
476  "The 'prun' command is not found. If you are on lxplus, please run the following commands:\n\n"
477  "```\n"
478  "lsetup panda\n"
479  "voms-proxy-init -voms atlas\n"
480  "```\n"
481  "Make sure you have a valid certificate."
482  )
483  return False
484  return True
485 
486  def submit(self):
487  import subprocess
488  for key, cmd in self.cmd.items():
489  process = subprocess.Popen(cmd, shell=True, stdout=sys.stdout, stderr=sys.stderr)
490  process.communicate()
491 
492  @staticmethod
494  if name.startswith('mc') or name.startswith('data'):
495  return True
496  logCPGridRun.warning("Name is not in the Atlas production format, assuming it is a user production")
497  return False
498 
499  @staticmethod
500  def rucioCustomNameParser(filename):
501  '''
502  The custom name has many variations, but most of them follow user/group.username.datasetname.suffix
503  '''
504  result = {}
505  parts = filename.split('.')
506  result['userType'] = parts[0]
507  result['username'] = parts[1]
508  result['main'] = parts[2]
509  result['suffix'] = parts[-1]
510  return result
511 
512  @staticmethod
514  '''
515  Parsing file name into a dictionary, an example is given here
516  mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855/DAOD_PHYS.34865530._000740.pool.root.1
517  For the first part
518  datasetName: mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
519  projectName: mc20_13TeV
520  campaign: mc20
521  energy: 13 #(TeV)
522  DSID: 410470
523  main: PhPy8EG_A14_ttbar_hdamp258p75_nonallhad
524  TODO generator: PhPy8Eg
525  TODO tune: A14 # For Pythia8
526  TODO process: ttbar
527  TODO hdamp: 258p75 # For Powheg
528  TODO decayType: nonallhad
529  step: deriv
530  format: DAOD_PHYS
531  tags: e###_s###_r###_p###_a###_t###_b#
532  etag: e6337 # EVNT (EVGEN) production and merging
533  stag: s3681 # Geant4 simulation to produce HITS and merging!
534  rtag: r13167 # Digitisation and reconstruction, as well as AOD merging
535  ptag: p5855 # Production of NTUP_PILEUP format and merging
536  atag: aXXX: atlfast configuration (both simulation and digit/recon)
537  ttag: tXXX: tag production configuration
538  btag: bXXX: bytestream production configuration
539 
540  For the second part
541  JeditaskID: 34865530
542  fileNumber: 000740
543  version: 1
544 
545  '''
546  result = {}
547  #split the / in case
548  # mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
549  # /DAOD_PHYS.34865530._000740.pool.root.1
550  if '/' in filename:
551  datasetPart, filePart = filename.split('/')
552  else:
553  datasetPart = filename
554  filePart = None
555 
556  # Split the dataset part by dots
557  datasetParts = datasetPart.split('.')
558  result['datasetName'] = datasetPart
559  # Extract the first part
560  result['projectName'] = datasetParts[0] # is positional
561  # Extract the campaign and energy
562  campaign_energy = result['projectName'].split('_')
563  result['campaign'] = campaign_energy[0]
564  result['energy'] = campaign_energy[1]
565 
566  # Extract the DSID, positional
567  result['DSID'] = datasetParts[1]
568  result['main'] = datasetParts[2]
569  result['step'] = datasetParts[3]
570  result['format'] = datasetParts[4]
571 
572  # Extract the tags (etag, stag, rtag, ptag)
573  tags = datasetParts[5].split('_')
574  result['tags'] = tags
575  for tag in tags:
576  if tag.startswith('e'):
577  result['etag'] = tag
578  elif tag.startswith('s'):
579  result['stag'] = tag
580  elif tag.startswith('r'):
581  result['rtag'] = tag
582  elif tag.startswith('p'):
583  result['ptag'] = tag
584  elif tag.startswith('a'):
585  result['atag'] = tag
586  elif tag.startswith('t'):
587  result['ttag'] = tag
588  elif tag.startswith('b'):
589  result['btag'] = tag
590 
591  # Extract the file part if it exists
592  if filePart:
593  fileParts = filePart.split('.')
594  result['jediTaskID'] = fileParts[1]
595  result['fileNumber'] = fileParts[2]
596  result['version'] = fileParts[-1]
597  return result
598 
599  @staticmethod
601  files = []
602  with open(path, 'r') as inputText:
603  for line in inputText.readlines():
604  # skip comments and empty lines
605  if line.startswith('#') or not line.strip():
606  continue
607  files += line.split(',')
608  # remove leading/trailing whitespaces, and \n
609  files = [file.strip() for file in files]
610  return files
611 
613  if self._errorCollector:
614  logCPGridRun.error("Errors were collected during the script execution:")
615 
616  for key, value in self._errorCollector.items():
617  logCPGridRun.error(f"{key}: {value}")
618  logCPGridRun.error("Please fix the errors and try again.")
619  sys.exit(1)
620 
622  if self.args.noSubmit:
623  return
624  self.hasPrun()
625  if self.args.checkInputDS:
626  self.checkInputInPyami()
627 
628  def askSubmission(self):
629  if self.args.noSubmit:
630  return
631  if self.args.agreeAll:
632  logCPGridRun.info("You have agreed to all the submission details. Jobs will be submitted without confirmation.")
633  self.submit()
634  return
635  answer = input("Please confirm ALL the submission details are correct before submitting [y/n]: ")
636  if answer.lower() == 'y':
637  self.submit()
638  elif answer.lower() == 'n':
639  logCPGridRun.info("Feel free to report any unexpected behavior to the CPAlgorithms team!")
640  else:
641  logCPGridRun.error("Invalid input. Please enter 'y' or 'n'. Jobs are not submitted.")
642 
643 if __name__ == '__main__':
644  cpgrid = CPGridRun()
645  cpgrid.configureSumbission()
646  cpgrid.printInputDetails()
647  cpgrid.checkExternalTools()
648  cpgrid.printDelayedErrorCollection()
649  cpgrid.askSubmission()
CPGridRun.CPGridRun.output_files
output_files
Definition: CPGridRun.py:114
CPGridRun.CPGridRun._filesChanged
def _filesChanged(self)
Definition: CPGridRun.py:354
CPGridRun.CPGridRun._tarfile
_tarfile
Definition: CPGridRun.py:20
CPGridRun.CPGridRun.isAtlasProductionFormat
def isAtlasProductionFormat(name)
Definition: CPGridRun.py:493
CPGridRun.CPGridRun.cmd
cmd
Definition: CPGridRun.py:25
CPGridRun.CPGridRun.hasPyami
def hasPyami(self)
Definition: CPGridRun.py:225
CPGridRun.CPGridRun.printHelp
def printHelp(self)
Definition: CPGridRun.py:116
CPGridRun.CPGridRun._parseInputFileList
def _parseInputFileList(path)
Definition: CPGridRun.py:600
CPGridRun.CPGridRun._analyzeAmiResults
bool _analyzeAmiResults(self, results, datasetPtag)
Definition: CPGridRun.py:273
CPGridRun.CPGridRun.printInputDetails
def printInputDetails(self)
Definition: CPGridRun.py:215
CPGridRun.CPGridRun._sourceDir
def _sourceDir(self)
Definition: CPGridRun.py:390
CPGridRun.CPGridRun.checkExternalTools
def checkExternalTools(self)
Definition: CPGridRun.py:621
CPGridRun.CPGridRun._runscript
_runscript
Definition: CPGridRun.py:15
CPGridRun.CPGridRun.rucioCustomNameParser
def rucioCustomNameParser(filename)
Definition: CPGridRun.py:500
CPGridRun.CPGridRun.printDelayedErrorCollection
def printDelayedErrorCollection(self)
Definition: CPGridRun.py:612
covarianceTool.filter
filter
Definition: covarianceTool.py:514
CPGridRun.CPGridRun._outputDSFormatter
def _outputDSFormatter(self, name)
Definition: CPGridRun.py:314
CPGridRun.CPGridRun.configureSubmissionSingleSample
def configureSubmissionSingleSample(self, input)
Definition: CPGridRun.py:133
CPGridRun.CPGridRun._unknownArgsDict
dict _unknownArgsDict(self)
Definition: CPGridRun.py:179
CPGridRun.CPGridRun.hasPrun
bool hasPrun(self)
Definition: CPGridRun.py:471
CPGridRun.CPGridRun._errorCollector
_errorCollector
Definition: CPGridRun.py:24
CPGridRun.CPGridRun.submit
def submit(self)
Definition: CPGridRun.py:486
CPGridRun.CPGridRun.configureSumbission
def configureSumbission(self)
Definition: CPGridRun.py:127
CPGridRun.CPGridRun.gridParser
gridParser
Definition: CPGridRun.py:13
CPGridRun.CPGridRun._tarballRecreated
_tarballRecreated
Definition: CPGridRun.py:22
CPGridRun.CPGridRun._parseGridArguments
def _parseGridArguments(self)
Definition: CPGridRun.py:38
CPGridRun.CPGridRun.prunArgsDict
prunArgsDict
Definition: CPGridRun.py:14
CPGridRun.CPGridRun
Definition: CPGridRun.py:11
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:26
CPGridRun.CPGridRun.atlasProductionNameParser
def atlasProductionNameParser(filename)
Definition: CPGridRun.py:513
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
CPGridRun.CPGridRun.inputList
def inputList(self)
Definition: CPGridRun.py:94
CPGridRun.CPGridRun._suffixFormatter
def _suffixFormatter(self)
Definition: CPGridRun.py:345
CPGridRun.CPGridRun.outputsFormatter
def outputsFormatter(self)
Definition: CPGridRun.py:467
TrigJetMonitorAlgorithm.items
items
Definition: TrigJetMonitorAlgorithm.py:71
CPGridRun.CPGridRun.outputFilesParsing
def outputFilesParsing(self)
Definition: CPGridRun.py:107
CPGridRun.CPGridRun._customOutputDSFormatter
def _customOutputDSFormatter(self, name)
Definition: CPGridRun.py:331
Trk::open
@ open
Definition: BinningType.h:40
CPGridRun.CPGridRun._isFirstRun
_isFirstRun
Definition: CPGridRun.py:21
CPGridRun.CPGridRun._buildDir
def _buildDir(self)
Definition: CPGridRun.py:385
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
CPGridRun.CPGridRun._checkPrunArgs
def _checkPrunArgs(self, argDict)
Definition: CPGridRun.py:195
CPGridRun.CPGridRun.__init__
def __init__(self)
Definition: CPGridRun.py:12
CPGridRun.CPGridRun._initRunscript
def _initRunscript(self)
Definition: CPGridRun.py:27
CPGridRun.CPGridRun._createPrunArgsDict
dict _createPrunArgsDict(self)
Definition: CPGridRun.py:79
CPGridRun.CPGridRun._prepareAmiQueryFromInputList
def _prepareAmiQueryFromInputList(self)
Definition: CPGridRun.py:258
CPGridRun.CPGridRun._inputList
_inputList
Definition: CPGridRun.py:23
CPGridRun.CPGridRun.getParser
def getParser(self)
Definition: CPGridRun.py:122
CPGridRun.CPGridRun.checkInputInPyami
bool checkInputInPyami(self)
Definition: CPGridRun.py:241
CPGridRun.CPGridRun._checkYamlExists
def _checkYamlExists(self, runscriptArgs)
Definition: CPGridRun.py:443
CPGridRun.CPGridRun.askSubmission
def askSubmission(self)
Definition: CPGridRun.py:628
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
CPGridRun.CPGridRun.execFormatter
def execFormatter(self)
Definition: CPGridRun.py:402
CPGridRun.CPGridRun.outputDSFormatter
def outputDSFormatter(self, name)
Definition: CPGridRun.py:308
CPGridRun.CPGridRun.unknown_args
unknown_args
Definition: CPGridRun.py:75