ATLAS Offline Software
Loading...
Searching...
No Matches
CPGridRun.py
Go to the documentation of this file.
1#! /usr/bin/env python
2
3# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
4from AnaAlgorithm.DualUseConfig import isAthena
5from AnaAlgorithm.Logging import logging
6import argparse
7import sys
8import os
9
10logCPGridRun = logging.getLogger('CPGridRun')
12 def __init__(self):
15 self._runscript = None
16 if self.args.help:
17 self._initRunscript()
18 self.printHelp()
19 sys.exit(0)
20 self._tarfile = 'cpgrid.tar.gz'
21 self._isFirstRun = True
22 self._tarballRecreated = False
23 self._inputList = None
24 self._errorCollector = {} # Delay the error collection until the end of the script for better user experience
25 self._yamlPath = None
26 self.cmd = {} # sample name -> command
27
28 def _initRunscript(self):
29 if self._runscript is not None:
30 return self._runscript
31 elif isAthena:
32 from AnalysisAlgorithmsConfig.AthenaCPRunScript import AthenaCPRunScript
33 self._runscript = AthenaCPRunScript()
34 else:
35 from AnalysisAlgorithmsConfig.EventLoopCPRunScript import EventLoopCPRunScript
36 self._runscript = EventLoopCPRunScript()
37 return self._runscript
38
40 parser = argparse.ArgumentParser(description='CPGrid runscript to submit CPRun.py jobs to the grid. '
41 'This script will submit a job to the grid using files in the input text one by one.'
42 'CPRun.py can handle multiple sources of input and create one output; but not this script',
43 add_help=False,
44 formatter_class=argparse.RawTextHelpFormatter)
45 parser.add_argument('-h', '--help', dest='help', action='store_true', help='Show this help message and continue')
46
47 ioGroup = parser.add_argument_group('Input/Output file configuration')
48 ioGroup.add_argument('-i','--input-list', dest='input_list', help='Path to the text file containing list of containers on the panda grid. Each container will be passed to prun as --inDS and is run individually')
49 ioGroup.add_argument('--output-files', dest='output_files', nargs='+', default=['output.root'],
50 help='The output files of the grid job. Example: --output-files A.root B.txt B.root results in A/A.root, B/B.txt, B/B.root in the output directory. No need to specify if using CPRun.py')
51 ioGroup.add_argument('--destSE', dest='destSE', default='', type=str, help='Destination storage element (PanDA)')
52 ioGroup.add_argument('--mergeType', dest='mergeType', default='Default', type=str, help='Output merging type, [None, Default, xAOD]')
53
54 pandaGroup = parser.add_argument_group('Input/Output naming configuration')
55 pandaGroup.add_argument('--gridUsername', dest='gridUsername', default=os.getenv('USER', ''), type=str, help='Grid username, or the groupname. Default is the current user. Only affect file naming')
56 pandaGroup.add_argument('--prefix', dest='prefix', default='', type=str, help='Prefix for the output directory. Dynamically set with input container if not provided')
57 pandaGroup.add_argument('--suffix', dest='suffix', default='',type=str, help='Suffix for the output directory')
58 pandaGroup.add_argument('--outDS', dest='outDS', default='', type=str,
59 help='Name of an output dataset. outDS will contain all output files (PanDA). If not provided, support dynamic naming if input name is in the Atlas production format or typical user production format')
60
61 cpgridGroup = parser.add_argument_group('CPGrid configuration')
62 cpgridGroup.add_argument('--groupProduction', dest='groupProduction', action='store_true', help='Only use for official production')
63
64 cpgridGroup.add_argument('--exec', dest='exec', type=str,
65 help='Executable line for the CPRun.py or custom script to run on the grid encapsulated in a double quote (PanDA)\n'
66 'Run CPRun.py with preset behavior including streamlined file i/o. E.g, "CPRun.py -t config.yaml --no-systematics".\n'
67 'Run custom script: "customRun.py -i inputs -o output --text-config config.yaml --flagA --flagB"\n'
68 )
69
70 submissionGroup = parser.add_argument_group('Submission configuration')
71 submissionGroup.add_argument('-y', '--agreeAll', dest='agreeAll', action='store_true', help='Agree to all the submission details without asking for confirmation. Use with caution!')
72 submissionGroup.add_argument('--noSubmit', dest='noSubmit', action='store_true', help='Do not submit the job to the grid (PanDA). Useful to inspect the prun command')
73 submissionGroup.add_argument('--testRun', dest='testRun', action='store_true', help='Will submit job to the grid but greatly limit the number of files per job (10) and number of events (300)')
74 submissionGroup.add_argument('--checkInputDS', dest='checkInputDS', action='store_true', help='Check if the input datasets are available on the AMI.')
75 submissionGroup.add_argument('--recreateTar', dest='recreateTar', action='store_true', help='Re-compress the source code. Source code are compressed by default in submission, this is useful when the source code is updated')
76 submissionGroup.add_argument('--useCentralPackage', dest='useCentralPackage', action='store_true', help='Use central package instead of custom packages')
77 self.args, self.unknown_args = parser.parse_known_args()
79 return parser
80
81 def _createPrunArgsDict(self) -> dict:
82 '''
83 converting unknown args to a dictionary
84 '''
85 unknownArgsDict = self._unknownArgsDict()
86 if unknownArgsDict and self.hasPrun():
87 self._checkPrunArgs(unknownArgsDict)
88 logCPGridRun.info(f"Adding prun exclusive arguments: {unknownArgsDict.keys()}")
89 elif unknownArgsDict:
90 logCPGridRun.warning(f"Unknown arguments detected: {unknownArgsDict}. Cannot check the availablility in Prun because Prun is not available / noSubmit is on.")
91 else:
92 pass
93 return unknownArgsDict
94
95 @property
96 def inputList(self):
97 if not self.args.input_list:
98 raise ValueError('No input list provided, use --input-list to specify the input containers')
99 if self._inputList is None:
100 if self.args.input_list.endswith('.txt'):
101 self._inputList = CPGridRun._parseInputFileList(self.args.input_list)
102 elif self.args.input_list.endswith('.json'):
103 raise NotImplementedError('JSON input list parsing is not implemented')
104 elif CPGridRun.isAtlasProductionFormat(self.args.input_list):
105 self._inputList = [self.args.input_list]
106 else:
107 raise ValueError(
108 'use --input-list to specify input containers')
109 return self._inputList
110
112 output_files = []
113 for output in self.args.output_files:
114 if ',' in output:
115 output_files.extend(output.split(','))
116 else:
117 output_files.append(output)
118 self.output_files = output_files
119
120 def printHelp(self):
121 self.gridParser.print_help()
122 logCPGridRun.info("\033[92m\n If you are using CPRun.py, the following flags are for the CPRun.py in this framework\033[0m")
123 self._runscript.parser.usage = argparse.SUPPRESS
124 self._runscript.parser.print_help()
125
126 def getParser(self):
127 return self.gridParser
128
129 # This function do all the checking, cleaning and preparing the command to be submitted to the grid
130 # separated for client to be able to change the behavior
132 for input in self.inputList:
133 cmd = self.configureSubmissionSingleSample(input)
134 self.cmd[input] = cmd
135 self._isFirstRun = False
136
138 config = {
139 'inDS': input,
140 'outDS': self.args.outDS if self.args.outDS else self.outputDSFormatter(input) ,
141 'cmtConfig': os.environ["CMTCONFIG"],
142 'writeInputToTxt': 'IN:in.txt',
143 'outputs': self.outputsFormatter(),
144 'exec': self.execFormatter(),
145 'memory': "2000", # MB
146 'addNthFieldOfInDSToLFN': '2,3,6',
147 }
148 if self.args.noSubmit:
149 config['noSubmit'] = True
150
151 if self.args.mergeType == 'xAOD':
152 config['mergeScript'] = 'xAODMerge %OUT `echo %IN | sed \'s/,/ /g\'`'
153
154 if self.args.mergeType != 'None':
155 config['mergeOutput'] = True
156
157 # Three types of files sending the grid
158 if self.args.useCentralPackage: # 1. Using central package and have a yaml file only
159 config['extFile'] = self._yamlPath
160 config['noBuild'] = True
161 config['noCompile'] = True
162 config['athenaTag'] = f"AnalysisBase,{os.environ['AnalysisBase_VERSION']}"
163 elif self._filesChangedOrTarballNotCreated(): # 2. Using custom packages and haven't compressed the tarball since the last changes
164 config['outTarBall'] = self._tarfile
165 config['useAthenaPackages'] = True
166 self._tarballRecreated = True
167 elif self._hasCompressedTarball(): # 3. Using custom packages and have compressed the tarball
168 config['inTarBall'] = self._tarfile
169 config['useAthenaPackages'] = True
170
171 if self.args.groupProduction:
172 config['official'] = True
173 config['voms'] = f'atlas:/atlas/{self.args.gridUsername}/Role=production'
174
175 if self.args.destSE:
176 config['destSE'] = self.args.destSE
177
178 if self.args.testRun:
179 config['nEventsPerFile'] = 100
180 config['nFiles'] = 5
181 config.update(self.prunArgsDict)
182 cmd = 'prun \\\n'
183 for k, v in config.items():
184 if isinstance(v, bool) and v:
185 cmd += f'--{k} \\\n'
186 elif v is not None and v != '':
187 cmd += f'--{k} {v} \\\n'
188 return cmd.rstrip(' \\\n')
189
190 def _unknownArgsDict(self)->dict:
191 '''
192 Cleans the unknown args by removing leading dashes and ensuring they are in key-value pairs
193 '''
194 unknown_args_dict = {}
195 idx = 0
196 while idx < len(self.unknown_args):
197 if self.unknown_args[idx].startswith('-'):
198 if idx + 1 < len(self.unknown_args) and not self.unknown_args[idx + 1].startswith('-'):
199 unknown_args_dict[self.unknown_args[idx].lstrip('-')] = self.unknown_args[idx + 1]
200 idx += 2
201 else:
202 unknown_args_dict[self.unknown_args[idx].lstrip('-')] = True
203 idx += 1
204 return unknown_args_dict
205
206 def _checkPrunArgs(self,argDict):
207 '''
208 check the arguments against the prun script to ensure they are valid
209 See https://github.com/PanDAWMS/panda-client/blob/master/pandaclient/PrunScript.py
210 '''
211 import pandaclient.PrunScript
212 # We need to temporarily clear the sys.argv to avoid the parser from PrunScript to fail
213 original_argv = sys.argv
214 sys.argv = ['prun'] # Reset sys.argv to only contain the script name
215 prunArgsDict = {}
216 prunArgsDict = pandaclient.PrunScript.main(get_options=True)
217 sys.argv = original_argv # Restore the original sys.argv
218 nonPrunOrCPGridArgs = []
219 for arg in argDict:
220 if arg not in prunArgsDict:
221 nonPrunOrCPGridArgs.append(arg)
222 if nonPrunOrCPGridArgs:
223 logCPGridRun.error(f"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
224 raise ValueError(f"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
225
227 for key, cmd in self.cmd.items():
228 parsed_name = CPGridRun.atlasProductionNameParser(key)
229 logCPGridRun.info("\n"
230 f"Input: {key}\n" +
231 "\n".join([f" {k.replace('_', ' ').title()}: {v}" for k, v in parsed_name.items()]))
232 logCPGridRun.info(f"Command: \n{cmd}")
233 print("-" * 70)
234 # Add your submission logic here
235
236 def hasPyami(self):
237 try:
238 global pyAMI
239 import pyAMI.client
240 import pyAMI.atlas.api
241 except ModuleNotFoundError:
242 self._errorCollector['no AMI'] = (
243 "Cannot import pyAMI, please run the following commands:\n\n"
244 "```\n"
245 "lsetup pyami\n"
246 "voms-proxy-init -voms atlas\n"
247 "```\n"
248 "and make sure you have a valid certificate.")
249 return False
250 return True
251
252 def checkInputInPyami(self) -> bool:
253 if not self.hasPyami():
254 return False
255
256 client = pyAMI.client.Client('atlas')
257 pyAMI.atlas.api.init()
258
259 queries, datasetPtag = self._prepareAmiQueryFromInputList()
260 try:
261 results = pyAMI.atlas.api.list_datasets(client, patterns=queries)
262 except pyAMI.exception.Error:
263 self._errorCollector['no valid certificate'] = (
264 "Cannot query AMI, please run 'voms-proxy-init -voms atlas' and ensure your certificate is valid.")
265 return False
266
267 return self._analyzeAmiResults(results, datasetPtag)
268
270 '''
271 Helper function to prepare a list of queries for the AMI based on the input list.
272 It will replace the _p### with _p% to match the latest ptag.
273 '''
274 import re
275 regex = re.compile("_p[0-9]+")
276 queries = []
277 datasetPtag = {}
278 for datasetName in self.cmd:
279 parsed = CPGridRun.atlasProductionNameParser(datasetName)
280 datasetPtag[datasetName] = parsed.get('ptag')
281 queries.append(regex.sub("_p%", datasetName))
282 return queries, datasetPtag
283
284 def _analyzeAmiResults(self, results, datasetPtag) -> bool:
285 import re
286 regex = re.compile("_p[0-9]+")
287 results = [r['ldn'] for r in results]
288 notFound = []
289 latestPtag = {}
290
291 for datasetName in self.cmd:
292 if datasetName not in results:
293 notFound.append(datasetName)
294
295 base = regex.sub("_p%", datasetName)
296 matching = [r for r in results if r.startswith(base.replace("_p%", ""))]
297 for m in matching:
298 mParsed = CPGridRun.atlasProductionNameParser(m)
299 try:
300 mPtagInt = int(mParsed.get('ptag', 'p0')[1:])
301 currentPtagInt = int(datasetPtag.get(datasetName, 'p0')[1:])
302 if mPtagInt > currentPtagInt:
303 latestPtag[datasetName] = f"p{mPtagInt}"
304 except (ValueError, TypeError):
305 continue
306
307 if latestPtag:
308 logCPGridRun.info("Newer version of datasets found in AMI:")
309 for name, ptag in latestPtag.items():
310 logCPGridRun.info(f"{name} -> ptag: {ptag}")
311
312 if notFound:
313 logCPGridRun.error("Some input datasets are not available in AMI, missing datasets are likely to fail on the grid:")
314 logCPGridRun.error(", ".join(notFound))
315 return False
316
317 return True
318
320 return not self._tarballRecreated and (self.args.recreateTar or not os.path.exists(self._tarfile) or self._filesChanged())
321
323 return os.path.exists(self._tarfile) or self._tarballRecreated
324
325 def outputDSFormatter(self, name):
326 if CPGridRun.isAtlasProductionFormat(name):
327 return self._outputDSFormatter(name)
328 else:
329 return self._customOutputDSFormatter(name)
330
331 def _outputDSFormatter(self, name):
332 '''
333 {group/user}.{username}.{prefix}.{DSID}.{format}.{tags}.{suffix}
334 '''
335 nameParser = CPGridRun.atlasProductionNameParser(name)
336 base = 'group' if self.args.groupProduction else 'user'
337 username = self.args.gridUsername
338 dsid = nameParser['DSID']
339 tags = '_'.join(nameParser['tags'])
340 fileFormat = nameParser['format']
341 base = 'group' if self.args.groupProduction else 'user'
342 prefix = self.args.prefix if self.args.prefix else nameParser['main'].split('_')[0] # Dynamically set the prefix, likely to be something like PhPy8Eg
343 suffix = self._suffixFormatter()
344
345 result = [base, username, prefix, dsid, fileFormat, tags, suffix]
346 return ".".join(filter(None, result))
347
349 '''
350 {group/user}.{username}.{main}.outputDS.{suffix}
351 '''
352 parts = name.split('.')
353 base = 'group' if self.args.groupProduction else 'user'
354 username = self.args.gridUsername
355 main = parts[2]
356 outputDS = 'outputDS'
357 suffix = parts[-1]
358
359 result = [base, username,main, outputDS, suffix]
360 return ".".join(filter(None, result))
361
363 if self.args.suffix:
364 return self.args.suffix
365 if self.args.testRun:
366 import uuid
367 return f"test_{uuid.uuid4().hex[:6]}"
368 else:
369 ''
370
371 def _filesChanged(self):
372 tarball_mtime = os.path.getmtime(self._tarfile) if os.path.exists(self._tarfile) else 0
373 buildDir = self._buildDir()
374 sourceDir = self._sourceDir()
375
376 # Check for changes in buildDir
377 for root, _, files in os.walk(buildDir):
378 for file in files:
379 file_path = os.path.join(root, file)
380 try:
381 if os.path.getmtime(file_path) > tarball_mtime:
382 logCPGridRun.info(f"File {file_path} is newer than the tarball.")
383 return True
384 except FileNotFoundError:
385 continue
386
387 # Check for changes in sourceDir
388 if sourceDir is None:
389 logCPGridRun.warning("Source directory is not detected, auto-compression is not performed. Use --recreateTar to update the submission")
390 return False
391 for root, _, files in os.walk(sourceDir):
392 for file in files:
393 file_path = os.path.join(root, file)
394 try:
395 if os.path.getmtime(file_path) > tarball_mtime:
396 logCPGridRun.info(f"File {file_path} is newer than the tarball.")
397 return True
398 except FileNotFoundError:
399 continue
400 return False
401
402 def _buildDir(self):
403 buildDir = os.environ["CMAKE_PREFIX_PATH"]
404 buildDir = os.path.dirname(buildDir.split(":")[0])
405 return buildDir
406
407 def _sourceDir(self):
408 cmakeCachePath = os.path.join(self._buildDir(), 'CMakeCache.txt')
409 sourceDir = None
410 if not os.path.exists(cmakeCachePath):
411 return sourceDir
412 with open(cmakeCachePath, 'r') as cmakeCache:
413 for line in cmakeCache:
414 if '_SOURCE_DIR:STATIC=' in line:
415 sourceDir = line.split('=')[1].strip()
416 break
417 return sourceDir
418
419 def execFormatter(self):
420 if not self.args.exec:
421 raise ValueError('No exec command provided, use --exec to specify the command to run on the grid')
422
423 # Check if the execution command starts with 'CPRun.py' or '-'
424 isCPRunDefault = self.args.exec.startswith('-') or self.args.exec.startswith('CPRun.py')
425 formatingClause = {
426 'input_list': 'in.txt',
427 'merge_output_files': len(self.args.output_files) == 1,
428 }
429 if not isCPRunDefault:
430 if self._isFirstRun: logCPGridRun.warning("Non-CPRun.py is detected, please ensure the exec string is formatted correctly. Exec string will not be automatically formatted.")
431 return f'"{self.args.exec}"'
432
433 # Parse the exec string using the parser to validate and extract known arguments
434 self._initRunscript()
435 runscriptArgs, unknownArgs = self._runscript.parser.parse_known_args(self.args.exec.split(' '))
436
437 # Throw error if unknownArgs contains any --args
438 unknown_flags = [arg for arg in unknownArgs if arg.startswith('--')]
439 if unknown_flags:
440 logCPGridRun.error(f"Unknown flags detected in the exec string: {unknown_flags}. Please check the exec string.")
441 raise ValueError(f"Unknown arguments detected: {unknown_flags}")
442
443 # Only override if value is None or the parser default
444 for key, value in formatingClause.items():
445 if hasattr(runscriptArgs, key):
446 old_value = getattr(runscriptArgs, key)
447 if old_value is None or old_value == self._runscript.parser.get_default(key):
448 setattr(runscriptArgs, key, value)
449 if self._isFirstRun: logCPGridRun.info(f"Setting '{key}' to '{value}' (CPRun.py default is: '{old_value}')")
450 else:
451 if self._isFirstRun: logCPGridRun.warning(f"Preserving user-defined '{key}': '{old_value}', default formatting '{value}' will not be applied.")
452 else:
453 logCPGridRun.error(f"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
454 raise ValueError(f"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
455 self._checkYamlExists(runscriptArgs)
456 # Return the formatted arguments as a string
457 arg_string = ' '.join(
458 f'--{k.replace("_", "-")}' if isinstance(v, bool) and v else
459 f'--{k.replace("_", "-")} {v}' for k, v in vars(runscriptArgs).items() if v not in [None, False]
460 )
461 return f'"CPRun.py {arg_string}"'
462
463 def _checkYamlExists(self, runscriptArgs):
464 from AnalysisAlgorithmsConfig.CPBaseRunner import CPBaseRunner
465 if not hasattr(runscriptArgs, 'text_config'):
466 self._errorCollector['no yaml'] = "No YAML configuration file is specified in the exec string. Please provide one using --text-config"
467 return
468 yamlPath = getattr(runscriptArgs, 'text_config')
469 self._yamlPath = yamlPath
470 haveLocalYaml = CPBaseRunner.findLocalPathYamlConfig(yamlPath)
471 if haveLocalYaml:
472 logCPGridRun.warning("A path to a local YAML configuration file is found, but it may not be grid-usable.")
473
474 repoYamls, _ = CPBaseRunner.findRepoPathYamlConfig(yamlPath)
475 if repoYamls and len(repoYamls) > 1:
476 self._errorCollector['ambiguous yamls'] = f'Multiple files named \"{yamlPath}\" found in the analysis repository. Please provide a more specific path to the config file.\nMatches found:\n' + '\n'.join(repoYamls)
477 return
478 elif repoYamls and len(repoYamls) == 1:
479 logCPGridRun.info(f"Found a grid-usable YAML configuration file in the analysis repository: {repoYamls[0]}")
480 return
481
482 if haveLocalYaml and self.args.useCentralPackage:
483 logCPGridRun.warning("A path to a local YAML configuration file is found, no custom packages are found, proceed with /cvmfs packages only.")
484
485 if not repoYamls and not self.args.useCentralPackage:
486 self._errorCollector['no usable yaml'] = f"Grid usable YAML configuration file not found: {yamlPath}"
487 if haveLocalYaml:
488 self._errorCollector['have local yaml'] = f"Only a local YAML configuration file is found: {yamlPath}, not usable in the grid.\n" \
489 f"Make sure the YAML file is in build/x86_64-el9-gcc14-opt/data/package_name/config.yaml. You can install the YAML file through CMakeList.txt with `atlas_install_data( data/* )`; use `-t package_name/config.yaml` in the --exec\n"\
490 f"Or if you are only using central packages, please use the `--useCentralPackage` flag."
491
493 outputs = [f'{output.split(".")[0]}:{output}' if ":" not in output else output for output in self.args.output_files]
494 return ','.join(outputs)
495
496 def hasPrun(self) -> bool:
497 import shutil
498 prun_path = shutil.which("prun")
499 if prun_path is None:
500 self._errorCollector['no prun'] = (
501 "The 'prun' command is not found. If you are on lxplus, please run the following commands:\n\n"
502 "```\n"
503 "lsetup panda\n"
504 "voms-proxy-init -voms atlas\n"
505 "```\n"
506 "Make sure you have a valid certificate."
507 )
508 return False
509 return True
510
511 def submit(self):
512 import subprocess
513 for key, cmd in self.cmd.items():
514 process = subprocess.Popen(cmd, shell=True, stdout=sys.stdout, stderr=sys.stderr)
515 process.communicate()
516
517 @staticmethod
519 if ":" in name:
520 name = name.split(":")[1]
521
522 if name.startswith('mc') or name.startswith('data'):
523 return True
524
525 logCPGridRun.warning("Name is not in the Atlas production format, assuming it is a user production")
526 return False
527
528 @staticmethod
530 '''
531 The custom name has many variations, but most of them follow user/group.username.datasetname.suffix
532 '''
533 result = {}
534 parts = filename.split('.')
535 result['userType'] = parts[0]
536 result['username'] = parts[1]
537 result['main'] = parts[2]
538 result['suffix'] = parts[-1]
539 return result
540
541 @staticmethod
543 '''
544 Parsing file name into a dictionary, an example is given here
545 mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855/DAOD_PHYS.34865530._000740.pool.root.1
546 For the first part
547 datasetName: mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
548 projectName: mc20_13TeV
549 campaign: mc20
550 energy: 13 #(TeV)
551 DSID: 410470
552 main: PhPy8EG_A14_ttbar_hdamp258p75_nonallhad
553 TODO generator: PhPy8Eg
554 TODO tune: A14 # For Pythia8
555 TODO process: ttbar
556 TODO hdamp: 258p75 # For Powheg
557 TODO decayType: nonallhad
558 step: deriv
559 format: DAOD_PHYS
560 tags: e###_s###_r###_p###_a###_t###_b#
561 etag: e6337 # EVNT (EVGEN) production and merging
562 stag: s3681 # Geant4 simulation to produce HITS and merging!
563 rtag: r13167 # Digitisation and reconstruction, as well as AOD merging
564 ptag: p5855 # Production of NTUP_PILEUP format and merging
565 atag: aXXX: atlfast configuration (both simulation and digit/recon)
566 ttag: tXXX: tag production configuration
567 btag: bXXX: bytestream production configuration
568
569 For the second part
570 JeditaskID: 34865530
571 fileNumber: 000740
572 version: 1
573
574 '''
575 result = {}
576 #split the / in case
577 # mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
578 # /DAOD_PHYS.34865530._000740.pool.root.1
579 if '/' in filename:
580 datasetPart, filePart = filename.split('/')
581 else:
582 datasetPart = filename
583 filePart = None
584
585 # Remove the scope
586 if ':' in datasetPart:
587 datasetPart = datasetPart.split(':')[1]
588
589 # Do not try to parse user datasets
590 if datasetPart.startswith('user') or datasetPart.startswith('group'):
591 result['datasetName'] = datasetPart
592 return result
593
594 # Split the dataset part by dots
595 datasetParts = datasetPart.split('.')
596 result['datasetName'] = datasetPart
597 # Extract the first part
598 result['projectName'] = datasetParts[0] # is positional
599 # Extract the campaign and energy
600 campaign_energy = result['projectName'].split('_')
601 result['campaign'] = campaign_energy[0]
602 result['energy'] = campaign_energy[1]
603
604 # Extract the DSID, positional
605 result['DSID'] = datasetParts[1]
606 result['main'] = datasetParts[2]
607 result['step'] = datasetParts[3]
608 result['format'] = datasetParts[4]
609
610 # Extract the tags (etag, stag, rtag, ptag)
611 tags = datasetParts[5].split('_')
612 result['tags'] = tags
613 for tag in tags:
614 if tag.startswith('e'):
615 result['etag'] = tag
616 elif tag.startswith('s'):
617 result['stag'] = tag
618 elif tag.startswith('r'):
619 result['rtag'] = tag
620 elif tag.startswith('p'):
621 result['ptag'] = tag
622 elif tag.startswith('a'):
623 result['atag'] = tag
624 elif tag.startswith('t'):
625 result['ttag'] = tag
626 elif tag.startswith('b'):
627 result['btag'] = tag
628
629 # Extract the file part if it exists
630 if filePart:
631 fileParts = filePart.split('.')
632 result['jediTaskID'] = fileParts[1]
633 result['fileNumber'] = fileParts[2]
634 result['version'] = fileParts[-1]
635 return result
636
637 @staticmethod
639 files = []
640 with open(path, 'r') as inputText:
641 for line in inputText.readlines():
642 # skip comments and empty lines
643 if line.startswith('#') or not line.strip():
644 continue
645 files += line.split(',')
646 # remove leading/trailing whitespaces, and \n
647 files = [file.strip() for file in files]
648 return files
649
651 if self._errorCollector:
652 logCPGridRun.error("Errors were collected during the script execution:")
653
654 for key, value in self._errorCollector.items():
655 logCPGridRun.error(f"{key}: {value}")
656 logCPGridRun.error("Please fix the errors and try again.")
657 sys.exit(1)
658
660 self.hasPrun()
661 if self.args.checkInputDS:
662 self.checkInputInPyami()
663
664 def askSubmission(self):
665 if self.args.agreeAll:
666 logCPGridRun.info("You have agreed to all the submission details. Jobs will be submitted without confirmation.")
667 self.submit()
668 return
669 answer = input("Please confirm ALL the submission details are correct before submitting [y/n]: ")
670 if answer.lower() == 'y':
671 self.submit()
672 elif answer.lower() == 'n':
673 logCPGridRun.info("Feel free to report any unexpected behavior to the CPAlgorithms team!")
674 else:
675 logCPGridRun.error("Invalid input. Please enter 'y' or 'n'. Jobs are not submitted.")
676
677if __name__ == '__main__':
678 cpgrid = CPGridRun()
679 cpgrid.configureSubmission()
680 cpgrid.printInputDetails()
681 cpgrid.checkExternalTools()
682 cpgrid.printDelayedErrorCollection()
683 cpgrid.askSubmission()
void printHelp()
void print(char *figname, TCanvas *c1)
outputDSFormatter(self, name)
Definition CPGridRun.py:325
dict _createPrunArgsDict(self)
Definition CPGridRun.py:81
rucioCustomNameParser(filename)
Definition CPGridRun.py:529
bool hasPrun(self)
Definition CPGridRun.py:496
_checkYamlExists(self, runscriptArgs)
Definition CPGridRun.py:463
_parseGridArguments(self)
Definition CPGridRun.py:39
bool checkInputInPyami(self)
Definition CPGridRun.py:252
bool _analyzeAmiResults(self, results, datasetPtag)
Definition CPGridRun.py:284
isAtlasProductionFormat(name)
Definition CPGridRun.py:518
_customOutputDSFormatter(self, name)
Definition CPGridRun.py:348
_prepareAmiQueryFromInputList(self)
Definition CPGridRun.py:269
_filesChangedOrTarballNotCreated(self)
Definition CPGridRun.py:319
_hasCompressedTarball(self)
Definition CPGridRun.py:322
atlasProductionNameParser(filename)
Definition CPGridRun.py:542
_outputDSFormatter(self, name)
Definition CPGridRun.py:331
_checkPrunArgs(self, argDict)
Definition CPGridRun.py:206
printDelayedErrorCollection(self)
Definition CPGridRun.py:650
dict _unknownArgsDict(self)
Definition CPGridRun.py:190
configureSubmissionSingleSample(self, input)
Definition CPGridRun.py:137
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:179