Loading [MathJax]/extensions/tex2jax.js
ATLAS Offline Software
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
CPGridRun.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 # Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
4 from AnaAlgorithm.DualUseConfig import isAthena
5 from AnaAlgorithm.Logging import logging
6 import argparse
7 import sys
8 import os
9 
10 logCPGridRun = logging.getLogger('CPGridRun')
11 class CPGridRun:
12  def __init__(self):
14  if self.args.help:
15  self._runscript = self._initRunscript()
16  self.printHelp()
17  sys.exit(0)
18  self._tarfile = 'cpgrid.tar.gz'
19  self._tarballRecreated = False
20  self._inputList = None # list of name?
21  self.cmd = {} # sample name -> command
22 
23  def _initRunscript(self):
24  if isAthena:
25  from AnalysisAlgorithmsConfig.AthenaCPRunScript import AthenaCPRunScript
26  runscript = AthenaCPRunScript()
27  else:
28  from AnalysisAlgorithmsConfig.EventLoopCPRunScript import EventLoopCPRunScript
29  runscript = EventLoopCPRunScript()
30  return runscript
31 
33  parser = argparse.ArgumentParser(description='CPGrid runscript to submit CPRun.py jobs to the grid. '
34  'This script will submit a job to the grid using files in the input text one by one.'
35  'CPRun.py can handle multiple sources of input and create one output; but not this script',
36  add_help=False,
37  formatter_class=argparse.RawTextHelpFormatter)
38  parser.add_argument('-h', '--help', dest='help', action='store_true', help='Show this help message and continue')
39 
40  ioGroup = parser.add_argument_group('Input/Output file configuration')
41  ioGroup.add_argument('-i','--input-list', dest='input_list', help='Path to the text file containing list of containers on the panda grid. Each container will be passed to prun as --inDS and is run individually')
42  ioGroup.add_argument('--output-files', dest='output_files', default='output.root',
43  help='The output files of the grid job. Example: --output-files "A.root,B.txt,B.root" results in A/A.root, B/B.txt, B/B.root in the output directory. No need to specify if using CPRun.py')
44  ioGroup.add_argument('--destSE', dest='destSE', default='', type=str, help='Destination storage element (PanDA)')
45  ioGroup.add_argument('--mergeType', dest='mergeType', default='Default', type=str, help='Output merging type, [None, Default, xAOD]')
46 
47  pandaGroup = parser.add_argument_group('Input/Output naming configuration')
48  pandaGroup.add_argument('--gridUsername', dest='gridUsername', default=os.getenv('USER', ''), type=str, help='Grid username, or the groupname. Default is the current user. Only affect file naming')
49  pandaGroup.add_argument('--prefix', dest='prefix', default='', type=str, help='Prefix for the output directory. Dynamically set with input container if not provided')
50  pandaGroup.add_argument('--suffix', dest='suffix', default='',type=str, help='Suffix for the output directory')
51  pandaGroup.add_argument('--outDS', dest='outDS', default='', type=str,
52  help='Name of an output dataset. outDS will contain all output files (PanDA). If not provided, support dynamic naming if input name is in the Atlas production format or typical user production format')
53 
54  cpgridGroup = parser.add_argument_group('CPGrid configuration')
55  cpgridGroup.add_argument('--groupProduction', dest='groupProduction', action='store_true', help='Only use for official production')
56 
57  cpgridGroup.add_argument('--exec', dest='exec', type=str,
58  help='Executable line for the CPRun.py or custom script to run on the grid encapsulated in a double quote (PanDA)\n'
59  'Run CPRun.py with preset behavior including streamlined file i/o. E.g, "-t config.yaml --no-systematics".\n'
60  'CPRun.py but overriding the preset behavior (for future and experts): "CPRun.py --input-list ourExpert.txt -t config.yaml --flagB"\n'
61  'Run custom script: "customRun.py -i inputs -o output --text-config config.yaml --flagA --flagB"\n'
62  )
63 
64  submissionGroup = parser.add_argument_group('Submission configuration')
65  submissionGroup.add_argument('--noSubmit', dest='noSubmit', action='store_true', help='Do not submit the job to the grid (PanDA). Useful to inspect the prun command')
66  submissionGroup.add_argument('--testRun', dest='testRun', action='store_true', help='Will submit job to the grid but greatly limit the number of files per job (10) and number of events (300)')
67  submissionGroup.add_argument('--recreateTar', dest='recreateTar', action='store_true', help='Re-compress the source code. Source code are compressed by default in submission, this is useful when the source code is updated')
68 
69  self.args = parser.parse_args()
70  return parser
71 
72  @property
73  def inputList(self):
74  if self._inputList is None:
75  if self.args.input_list.endswith('.txt'):
76  self._inputList = CPGridRun._parseInputFileList(self.args.input_list)
77  elif self.args.input_list.endswith('.json'):
78  raise NotImplementedError('JSON input list parsing is not implemented')
79  elif CPGridRun.isAtlasProductionFormat(self.args.input_list):
80  self._inputList = [self.args.input_list]
81  else:
82  raise ValueError(
83  'use --input-list to specify input containers')
84  return self._inputList
85 
86  def printHelp(self):
87  self.gridParser.print_help()
88  logCPGridRun.info("\033[92m\n If you are using CPRun.py, the following flags are for the CPRun.py in this framework\033[0m")
89  self._runscript.parser.usage = argparse.SUPPRESS
90  self._runscript.parser.print_help()
91 
92  def getParser(self):
93  return self.gridParser
94 
95  # This function do all the checking, cleaning and preparing the command to be submitted to the grid
96  # separated for client to be able to change the behavior
98  #check for prun?
99  #check for Merge type?
100  for input in self.inputList:
101  cmd = self.configureSubmissionSingleSample(input)
102  self.cmd[input] = cmd
103 
105  config = {
106  'inDS': input,
107  'outDS': self.args.outDS if self.args.outDS else self.outputDSFormatter(input) ,
108  # 'outDS': self.outputDSFormatter(input) if CPGridRun.isAtlasProductionFormat(input) else self.customOutputDSFormatter(input),
109  'useAthenaPackages': True,
110  'cmtConfig': os.environ["CMTCONFIG"],
111  'writeInputToTxt': 'IN:in.txt',
112  'outputs': self.outputsFormatter(),
113  'exec': self.execFormatter(),
114  'memory': "2000", # MB
115  'addNthFieldOfInDSToLFN': '2,3,6',
116  }
117  if self.args.noSubmit:
118  config['noSubmit'] = True
119 
120  if self.args.mergeType == 'xAOD':
121  config['mergeScript'] = 'xAODMerge %OUT `echo %IN | sed \'s/,/ /g\'`'
122 
123  if self.args.mergeType != 'None':
124  config['mergeOutput'] = True
125 
126  if (self.args.recreateTar or not os.path.exists(self._tarfile) or self._filesChanged()) and not self._tarballRecreated:
127  config['outTarBall'] = self._tarfile
128  self._tarballRecreated = True
129  elif os.path.exists(self._tarfile) or self._tarballRecreated:
130  config['inTarBall'] = self._tarfile
131 
132  if self.args.groupProduction:
133  config['official'] = True
134  config['voms'] = f'atlas:/atlas/{self.args.gridUsername}/Role=production'
135 
136  if self.args.destSE:
137  config['destSE'] = self.args.destSE
138 
139  if self.args.testRun:
140  config['nEventsPerFile'] = 300
141  config['nFiles'] = 10
142 
143  cmd = 'prun \\\n'
144  for k, v in config.items():
145  if isinstance(v, bool) and v:
146  cmd += f'--{k} \\\n'
147  elif v is not None and v != '':
148  cmd += f'--{k} {v} \\\n'
149  return cmd.rstrip(' \\\n')
150 
151  def printInputDetails(self):
152  for key, cmd in self.cmd.items():
153  parsed_name = CPGridRun.atlasProductionNameParser(key)
154  logCPGridRun.info("\n"
155  f"Input: {key}\n" +
156  "\n".join([f" {k.replace('_', ' ').title()}: {v}" for k, v in parsed_name.items()]))
157  logCPGridRun.info(f"Command: \n{cmd}")
158  print("-" * 70)
159  # logCPGridRun.info("-" * 40)
160  # Add your submission logic here
161 
162  def outputDSFormatter(self, name):
163  if CPGridRun.isAtlasProductionFormat(name):
164  return self._outputDSFormatter(name)
165  else:
166  return self._customOutputDSFormatter(name)
167 
168  def _outputDSFormatter(self, name):
169  '''
170  {group/user}.{username}.{prefix}.{DSID}.{format}.{tags}.{suffix}
171  '''
172  nameParser = CPGridRun.atlasProductionNameParser(name)
173  base = 'group' if self.args.groupProduction else 'user'
174  username = self.args.gridUsername
175  dsid = nameParser['DSID']
176  tags = '_'.join(nameParser['tags'])
177  fileFormat = nameParser['format']
178  base = 'group' if self.args.groupProduction else 'user'
179  prefix = self.args.prefix if self.args.prefix else nameParser['main'].split('_')[0] # Dynamically set the prefix, likely to be something like PhPy8Eg
180  suffix = self._suffixFormatter()
181 
182  result = [base, username, prefix, dsid, fileFormat, tags, suffix]
183  return ".".join(filter(None, result))
184 
185  def _customOutputDSFormatter(self, name):
186  '''
187  {group/user}.{username}.{main}.outputDS.{suffix}
188  '''
189  parts = name.split('.')
190  base = 'group' if self.args.groupProduction else 'user'
191  username = self.args.gridUsername
192  main = parts[2]
193  outputDS = 'outputDS'
194  suffix = parts[-1]
195 
196  result = [base, username,main, outputDS, suffix]
197  return ".".join(filter(None, result))
198 
199  def _suffixFormatter(self):
200  if self.args.suffix:
201  return self.args.suffix
202  if self.args.testRun:
203  import uuid
204  return f"test_{uuid.uuid4().hex[:6]}"
205  else:
206  ''
207 
208  def _filesChanged(self):
209  tarball_mtime = os.path.getmtime(self._tarfile) if os.path.exists(self._tarfile) else 0
210  buildDir = self._buildDir()
211  sourceDir = self._sourceDir()
212 
213  # Check for changes in buildDir
214  for root, _, files in os.walk(buildDir):
215  for file in files:
216  file_path = os.path.join(root, file)
217  try:
218  if os.path.getmtime(file_path) > tarball_mtime:
219  logCPGridRun.info(f"File {file_path} is newer than the tarball.")
220  return True
221  except FileNotFoundError:
222  continue
223 
224  # Check for changes in sourceDir
225  if sourceDir is None:
226  logCPGridRun.warning("Source directory is not detected, auto-compression is not performed. Use --recreateTar to update the submission")
227  return False
228  for root, _, files in os.walk(sourceDir):
229  for file in files:
230  file_path = os.path.join(root, file)
231  try:
232  if os.path.getmtime(file_path) > tarball_mtime:
233  logCPGridRun.info(f"File {file_path} is newer than the tarball.")
234  return True
235  except FileNotFoundError:
236  continue
237  return False
238 
239  def _buildDir(self):
240  buildDir = os.environ["CMAKE_PREFIX_PATH"]
241  buildDir = os.path.dirname(buildDir.split(":")[0])
242  return buildDir
243 
244  def _sourceDir(self):
245  cmakeCachePath = os.path.join(self._buildDir(), 'CMakeCache.txt')
246  sourceDir = None
247  if not os.path.exists(cmakeCachePath):
248  return sourceDir
249  with open(cmakeCachePath, 'r') as cmakeCache:
250  for line in cmakeCache:
251  if '_SOURCE_DIR:STATIC=' in line:
252  sourceDir = line.split('=')[1].strip()
253  break
254  return sourceDir
255 
256  def execFormatter(self):
257  # Check if the execution command starts with 'CPRun.py' or '-'
258  isCPRunDefault = self.args.exec.startswith('-')
259  if isCPRunDefault:
260  clause = self.args.exec.split(' ')
261  base = ['CPRun.py']
262  inputClause = ['--input-list in.txt']
263  strip = ['--strip'] if not isAthena else []
264  return f'"{ " ".join(base + inputClause + strip + clause) }"'
265  else:
266  return f'"{self.args.exec}"'
267 
268  def outputsFormatter(self):
269  outputs = self.args.output_files.split(',')
270  outputs = [f'{output.split(".")[0]}:{output}' for output in outputs]
271  return ','.join(outputs)
272 
273  def submit(self):
274  import subprocess
275  import shutil
276  prun_path = shutil.which("prun")
277  if prun_path is None:
278  logCPGridRun.error("The 'prun' command is not found. If you use are on lxplus, please run `setupATLAS` and `lsetup panda`")
279  return
280  for key, cmd in self.cmd.items():
281  process = subprocess.Popen(cmd, shell=True, stdout=sys.stdout, stderr=sys.stderr)
282  process.communicate()
283 
284  @staticmethod
286  if name.startswith('mc') or name.startswith('data'):
287  return True
288  logCPGridRun.warning("Name is not in the Atlas production format, assuming it is a user production")
289  return False
290 
291  @staticmethod
292  def rucioCustomNameParser(filename):
293  '''
294  The custom name has many variations, but most of them follow user/group.username.datasetname.suffix
295  '''
296  result = {}
297  parts = filename.split('.')
298  result['userType'] = parts[0]
299  result['username'] = parts[1]
300  result['main'] = parts[2]
301  result['suffix'] = parts[-1]
302  return result
303 
304  @staticmethod
306  '''
307  Parsing file name into a dictionary, an example is given here
308  mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855/DAOD_PHYS.34865530._000740.pool.root.1
309  For the first part
310  datasetName: mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
311  projectName: mc20_13TeV
312  campaign: mc20
313  energy: 13 #(TeV)
314  DSID: 410470
315  main: PhPy8EG_A14_ttbar_hdamp258p75_nonallhad
316  TODO generator: PhPy8Eg
317  TODO tune: A14 # For Pythia8
318  TODO process: ttbar
319  TODO hdamp: 258p75 # For Powheg
320  TODO decayType: nonallhad
321  step: deriv
322  format: DAOD_PHYS
323  tags: e###_s###_r###_p###_a###_t###_b#
324  etag: e6337 # EVNT (EVGEN) production and merging
325  stag: s3681 # Geant4 simulation to produce HITS and merging!
326  rtag: r13167 # Digitisation and reconstruction, as well as AOD merging
327  ptag: p5855 # Production of NTUP_PILEUP format and merging
328  atag: aXXX: atlfast configuration (both simulation and digit/recon)
329  ttag: tXXX: tag production configuration
330  btag: bXXX: bytestream production configuration
331 
332  For the second part
333  JeditaskID: 34865530
334  fileNumber: 000740
335  version: 1
336 
337  '''
338  result = {}
339  #split the / in case
340  # mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
341  # /DAOD_PHYS.34865530._000740.pool.root.1
342  if '/' in filename:
343  datasetPart, filePart = filename.split('/')
344  else:
345  datasetPart = filename
346  filePart = None
347 
348  # Split the dataset part by dots
349  datasetParts = datasetPart.split('.')
350  result['datasetName'] = datasetPart
351  # Extract the first part
352  result['projectName'] = datasetParts[0] # is positional
353  # Extract the campaign and energy
354  campaign_energy = result['projectName'].split('_')
355  result['campaign'] = campaign_energy[0]
356  result['energy'] = campaign_energy[1]
357 
358  # Extract the DSID, positional
359  result['DSID'] = datasetParts[1]
360  result['main'] = datasetParts[2]
361  result['step'] = datasetParts[3]
362  result['format'] = datasetParts[4]
363 
364  # Extract the tags (etag, stag, rtag, ptag)
365  tags = datasetParts[5].split('_')
366  result['tags'] = tags
367  for tag in tags:
368  if tag.startswith('e'):
369  result['etag'] = tag
370  elif tag.startswith('s'):
371  result['stag'] = tag
372  elif tag.startswith('r'):
373  result['rtag'] = tag
374  elif tag.startswith('p'):
375  result['ptag'] = tag
376  elif tag.startswith('a'):
377  result['atag'] = tag
378  elif tag.startswith('t'):
379  result['ttag'] = tag
380  elif tag.startswith('b'):
381  result['btag'] = tag
382 
383  # Extract the file part if it exists
384  if filePart:
385  fileParts = filePart.split('.')
386  result['jediTaskID'] = fileParts[1]
387  result['fileNumber'] = fileParts[2]
388  result['version'] = fileParts[-1]
389  return result
390 
391  @staticmethod
393  files = []
394  with open(path, 'r') as inputText:
395  for line in inputText.readlines():
396  # skip comments and empty lines
397  if line.startswith('#') or not line.strip():
398  continue
399  files += line.split(',')
400  # remove leading/trailing whitespaces, and \n
401  files = [file.strip() for file in files]
402  return files
403 
404  def _askSubmission(self):
405  answer = input("[Tenative asking] Please confirm ALL the submission details are correct to submit [y/n]: ")
406  if answer.lower() == 'y':
407  self.submit()
408  elif answer.lower() == 'n':
409  logCPGridRun.info("Feel free to report any unexpected behavior to CPAlgorithms team!")
410  else:
411  logCPGridRun.error("Invalid input. Please enter 'y' or 'n'. Jobs are not submitted")
412 
413 if __name__ == '__main__':
414  cpgrid = CPGridRun()
415  cpgrid.configureSumbission()
416  cpgrid.printInputDetails()
417  cpgrid._askSubmission()
CPGridRun.CPGridRun._filesChanged
def _filesChanged(self)
Definition: CPGridRun.py:208
CPGridRun.CPGridRun._tarfile
_tarfile
Definition: CPGridRun.py:18
CPGridRun.CPGridRun.isAtlasProductionFormat
def isAtlasProductionFormat(name)
Definition: CPGridRun.py:285
CPGridRun.CPGridRun.cmd
cmd
Definition: CPGridRun.py:21
CPGridRun.CPGridRun.printHelp
def printHelp(self)
Definition: CPGridRun.py:86
CPGridRun.CPGridRun._parseInputFileList
def _parseInputFileList(path)
Definition: CPGridRun.py:392
CPGridRun.CPGridRun.printInputDetails
def printInputDetails(self)
Definition: CPGridRun.py:151
CPGridRun.CPGridRun._sourceDir
def _sourceDir(self)
Definition: CPGridRun.py:244
CPGridRun.CPGridRun._runscript
_runscript
Definition: CPGridRun.py:15
CPGridRun.CPGridRun._askSubmission
def _askSubmission(self)
Definition: CPGridRun.py:404
CPGridRun.CPGridRun.rucioCustomNameParser
def rucioCustomNameParser(filename)
Definition: CPGridRun.py:292
covarianceTool.filter
filter
Definition: covarianceTool.py:514
CPGridRun.CPGridRun._outputDSFormatter
def _outputDSFormatter(self, name)
Definition: CPGridRun.py:168
CPGridRun.CPGridRun.configureSubmissionSingleSample
def configureSubmissionSingleSample(self, input)
Definition: CPGridRun.py:104
CPGridRun.CPGridRun.submit
def submit(self)
Definition: CPGridRun.py:273
CPGridRun.CPGridRun.configureSumbission
def configureSumbission(self)
Definition: CPGridRun.py:97
CPGridRun.CPGridRun.gridParser
gridParser
Definition: CPGridRun.py:13
CPGridRun.CPGridRun._tarballRecreated
_tarballRecreated
Definition: CPGridRun.py:19
CPGridRun.CPGridRun._parseGridArguments
def _parseGridArguments(self)
Definition: CPGridRun.py:32
CPGridRun.CPGridRun
Definition: CPGridRun.py:11
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
CPGridRun.CPGridRun.atlasProductionNameParser
def atlasProductionNameParser(filename)
Definition: CPGridRun.py:305
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
CPGridRun.CPGridRun.inputList
def inputList(self)
Definition: CPGridRun.py:73
CPGridRun.CPGridRun._suffixFormatter
def _suffixFormatter(self)
Definition: CPGridRun.py:199
CPGridRun.CPGridRun.outputsFormatter
def outputsFormatter(self)
Definition: CPGridRun.py:268
CPGridRun.CPGridRun.args
args
Definition: CPGridRun.py:69
TrigJetMonitorAlgorithm.items
items
Definition: TrigJetMonitorAlgorithm.py:71
CPGridRun.CPGridRun._customOutputDSFormatter
def _customOutputDSFormatter(self, name)
Definition: CPGridRun.py:185
Trk::open
@ open
Definition: BinningType.h:40
CPGridRun.CPGridRun._buildDir
def _buildDir(self)
Definition: CPGridRun.py:239
CPGridRun.CPGridRun.__init__
def __init__(self)
Definition: CPGridRun.py:12
CPGridRun.CPGridRun._initRunscript
def _initRunscript(self)
Definition: CPGridRun.py:23
CPGridRun.CPGridRun._inputList
_inputList
Definition: CPGridRun.py:20
CPGridRun.CPGridRun.getParser
def getParser(self)
Definition: CPGridRun.py:92
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
CPGridRun.CPGridRun.execFormatter
def execFormatter(self)
Definition: CPGridRun.py:256
CPGridRun.CPGridRun.outputDSFormatter
def outputDSFormatter(self, name)
Definition: CPGridRun.py:162