4 from AnaAlgorithm.DualUseConfig
import isAthena
5 from AnaAlgorithm.Logging
import logging
10 logCPGridRun = logging.getLogger(
'CPGridRun')
31 from AnalysisAlgorithmsConfig.AthenaCPRunScript
import AthenaCPRunScript
34 from AnalysisAlgorithmsConfig.EventLoopCPRunScript
import EventLoopCPRunScript
39 parser = argparse.ArgumentParser(description=
'CPGrid runscript to submit CPRun.py jobs to the grid. '
40 'This script will submit a job to the grid using files in the input text one by one.'
41 'CPRun.py can handle multiple sources of input and create one output; but not this script',
43 formatter_class=argparse.RawTextHelpFormatter)
44 parser.add_argument(
'-h',
'--help', dest=
'help', action=
'store_true', help=
'Show this help message and continue')
46 ioGroup = parser.add_argument_group(
'Input/Output file configuration')
47 ioGroup.add_argument(
'-i',
'--input-list', dest=
'input_list', help=
'Path to the text file containing list of containers on the panda grid. Each container will be passed to prun as --inDS and is run individually')
48 ioGroup.add_argument(
'--output-files', dest=
'output_files', nargs=
'+', default=[
'output.root'],
49 help=
'The output files of the grid job. Example: --output-files A.root B.txt B.root results in A/A.root, B/B.txt, B/B.root in the output directory. No need to specify if using CPRun.py')
50 ioGroup.add_argument(
'--destSE', dest=
'destSE', default=
'', type=str, help=
'Destination storage element (PanDA)')
51 ioGroup.add_argument(
'--mergeType', dest=
'mergeType', default=
'Default', type=str, help=
'Output merging type, [None, Default, xAOD]')
53 pandaGroup = parser.add_argument_group(
'Input/Output naming configuration')
54 pandaGroup.add_argument(
'--gridUsername', dest=
'gridUsername', default=os.getenv(
'USER',
''), type=str, help=
'Grid username, or the groupname. Default is the current user. Only affect file naming')
55 pandaGroup.add_argument(
'--prefix', dest=
'prefix', default=
'', type=str, help=
'Prefix for the output directory. Dynamically set with input container if not provided')
56 pandaGroup.add_argument(
'--suffix', dest=
'suffix', default=
'',type=str, help=
'Suffix for the output directory')
57 pandaGroup.add_argument(
'--outDS', dest=
'outDS', default=
'', type=str,
58 help=
'Name of an output dataset. outDS will contain all output files (PanDA). If not provided, support dynamic naming if input name is in the Atlas production format or typical user production format')
60 cpgridGroup = parser.add_argument_group(
'CPGrid configuration')
61 cpgridGroup.add_argument(
'--groupProduction', dest=
'groupProduction', action=
'store_true', help=
'Only use for official production')
63 cpgridGroup.add_argument(
'--exec', dest=
'exec', type=str,
64 help=
'Executable line for the CPRun.py or custom script to run on the grid encapsulated in a double quote (PanDA)\n'
65 'Run CPRun.py with preset behavior including streamlined file i/o. E.g, "CPRun.py -t config.yaml --no-systematics".\n'
66 'Run custom script: "customRun.py -i inputs -o output --text-config config.yaml --flagA --flagB"\n'
69 submissionGroup = parser.add_argument_group(
'Submission configuration')
70 submissionGroup.add_argument(
'-y',
'--agreeAll', dest=
'agreeAll', action=
'store_true', help=
'Agree to all the submission details without asking for confirmation. Use with caution!')
71 submissionGroup.add_argument(
'--noSubmit', dest=
'noSubmit', action=
'store_true', help=
'Do not submit the job to the grid (PanDA). Useful to inspect the prun command')
72 submissionGroup.add_argument(
'--testRun', dest=
'testRun', action=
'store_true', help=
'Will submit job to the grid but greatly limit the number of files per job (10) and number of events (300)')
73 submissionGroup.add_argument(
'--checkInputDS', dest=
'checkInputDS', action=
'store_true', help=
'Check if the input datasets are available on the AMI.')
74 submissionGroup.add_argument(
'--recreateTar', dest=
'recreateTar', action=
'store_true', help=
'Re-compress the source code. Source code are compressed by default in submission, this is useful when the source code is updated')
81 converting unknown args to a dictionary
84 if unknownArgsDict
and self.
hasPrun():
86 logCPGridRun.info(f
"Adding prun exclusive arguments: {unknownArgsDict.keys()}")
88 logCPGridRun.warning(f
"Unknown arguments detected: {unknownArgsDict}. Cannot check the availablility in Prun because Prun is not available / noSubmit is on.")
91 return unknownArgsDict
96 if self.args.input_list.endswith(
'.txt'):
97 self.
_inputList = CPGridRun._parseInputFileList(self.args.input_list)
98 elif self.args.input_list.endswith(
'.json'):
99 raise NotImplementedError(
'JSON input list parsing is not implemented')
100 elif CPGridRun.isAtlasProductionFormat(self.args.input_list):
104 'use --input-list to specify input containers')
109 for output
in self.args.output_files:
111 output_files.extend(output.split(
','))
113 output_files.append(output)
118 logCPGridRun.info(
"\033[92m\n If you are using CPRun.py, the following flags are for the CPRun.py in this framework\033[0m")
119 self.
_runscript.parser.usage = argparse.SUPPRESS
130 self.
cmd[input] = cmd
137 'useAthenaPackages':
True,
138 'cmtConfig': os.environ[
"CMTCONFIG"],
139 'writeInputToTxt':
'IN:in.txt',
143 'addNthFieldOfInDSToLFN':
'2,3,6',
145 if self.args.noSubmit:
146 config[
'noSubmit'] =
True
148 if self.args.mergeType ==
'xAOD':
149 config[
'mergeScript'] =
'xAODMerge %OUT `echo %IN | sed \'s/,/ /g\'`'
151 if self.args.mergeType !=
'None':
152 config[
'mergeOutput'] =
True
155 config[
'outTarBall'] = self.
_tarfile
160 if self.args.groupProduction:
161 config[
'official'] =
True
162 config[
'voms'] = f
'atlas:/atlas/{self.args.gridUsername}/Role=production'
165 config[
'destSE'] = self.args.destSE
167 if self.args.testRun:
168 config[
'nEventsPerFile'] = 300
169 config[
'nFiles'] = 10
172 for k, v
in config.items():
173 if isinstance(v, bool)
and v:
175 elif v
is not None and v !=
'':
176 cmd += f
'--{k} {v} \\\n'
177 return cmd.rstrip(
' \\\n')
181 Cleans the unknown args by removing leading dashes and ensuring they are in key-value pairs
183 unknown_args_dict = {}
191 unknown_args_dict[self.
unknown_args[idx].lstrip(
'-')] =
True
193 return unknown_args_dict
197 check the arguments against the prun script to ensure they are valid
198 See https://github.com/PanDAWMS/panda-client/blob/master/pandaclient/PrunScript.py
200 import pandaclient.PrunScript
202 original_argv = sys.argv
205 prunArgsDict = pandaclient.PrunScript.main(get_options=
True)
206 sys.argv = original_argv
207 nonPrunOrCPGridArgs = []
209 if arg
not in prunArgsDict:
210 nonPrunOrCPGridArgs.append(arg)
211 if nonPrunOrCPGridArgs:
212 logCPGridRun.error(f
"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
213 raise ValueError(f
"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
217 parsed_name = CPGridRun.atlasProductionNameParser(key)
218 logCPGridRun.info(
"\n"
220 "\n".
join([f
" {k.replace('_', ' ').title()}: {v}" for k, v
in parsed_name.items()]))
221 logCPGridRun.info(f
"Command: \n{cmd}")
229 import pyAMI.atlas.api
230 except ModuleNotFoundError:
232 "Cannot import pyAMI, please run the following commands:\n\n"
235 "voms-proxy-init -voms atlas\n"
237 "and make sure you have a valid certificate.")
245 client = pyAMI.client.Client(
'atlas')
246 pyAMI.atlas.api.init()
250 results = pyAMI.atlas.api.list_datasets(client, patterns=queries)
251 except pyAMI.exception.Error:
253 "Cannot query AMI, please run 'voms-proxy-init -voms atlas' and ensure your certificate is valid.")
260 Helper function to prepare a list of queries for the AMI based on the input list.
261 It will replace the _p### with _p% to match the latest ptag.
264 regex = re.compile(
"_p[0-9]+")
267 for datasetName
in self.
cmd:
268 parsed = CPGridRun.atlasProductionNameParser(datasetName)
269 datasetPtag[datasetName] = parsed.get(
'ptag')
270 queries.append(regex.sub(
"_p%", datasetName))
271 return queries, datasetPtag
275 regex = re.compile(
"_p[0-9]+")
276 results = [r[
'ldn']
for r
in results]
280 for datasetName
in self.
cmd:
281 if datasetName
not in results:
282 notFound.append(datasetName)
284 base = regex.sub(
"_p%", datasetName)
285 matching = [r
for r
in results
if r.startswith(base.replace(
"_p%",
""))]
287 mParsed = CPGridRun.atlasProductionNameParser(m)
289 mPtagInt =
int(mParsed.get(
'ptag',
'p0')[1:])
290 currentPtagInt =
int(datasetPtag.get(datasetName,
'p0')[1:])
291 if mPtagInt > currentPtagInt:
292 latestPtag[datasetName] = f
"p{mPtagInt}"
293 except (ValueError, TypeError):
297 logCPGridRun.info(
"Newer version of datasets found in AMI:")
298 for name, ptag
in latestPtag.items():
299 logCPGridRun.info(f
"{name} -> ptag: {ptag}")
302 logCPGridRun.error(
"Some input datasets are not available in AMI, missing datasets are likely to fail on the grid:")
303 logCPGridRun.error(
", ".
join(notFound))
309 if CPGridRun.isAtlasProductionFormat(name):
316 {group/user}.{username}.{prefix}.{DSID}.{format}.{tags}.{suffix}
318 nameParser = CPGridRun.atlasProductionNameParser(name)
319 base =
'group' if self.args.groupProduction
else 'user'
320 username = self.args.gridUsername
321 dsid = nameParser[
'DSID']
322 tags =
'_'.
join(nameParser[
'tags'])
323 fileFormat = nameParser[
'format']
324 base =
'group' if self.args.groupProduction
else 'user'
325 prefix = self.args.prefix
if self.args.prefix
else nameParser[
'main'].
split(
'_')[0]
328 result = [base, username, prefix, dsid, fileFormat, tags, suffix]
333 {group/user}.{username}.{main}.outputDS.{suffix}
335 parts = name.split(
'.')
336 base =
'group' if self.args.groupProduction
else 'user'
337 username = self.args.gridUsername
339 outputDS =
'outputDS'
342 result = [base, username,main, outputDS, suffix]
347 return self.args.suffix
348 if self.args.testRun:
350 return f
"test_{uuid.uuid4().hex[:6]}"
355 tarball_mtime = os.path.getmtime(self.
_tarfile)
if os.path.exists(self.
_tarfile)
else 0
360 for root, _, files
in os.walk(buildDir):
362 file_path = os.path.join(root, file)
364 if os.path.getmtime(file_path) > tarball_mtime:
365 logCPGridRun.info(f
"File {file_path} is newer than the tarball.")
367 except FileNotFoundError:
371 if sourceDir
is None:
372 logCPGridRun.warning(
"Source directory is not detected, auto-compression is not performed. Use --recreateTar to update the submission")
374 for root, _, files
in os.walk(sourceDir):
376 file_path = os.path.join(root, file)
378 if os.path.getmtime(file_path) > tarball_mtime:
379 logCPGridRun.info(f
"File {file_path} is newer than the tarball.")
381 except FileNotFoundError:
386 buildDir = os.environ[
"CMAKE_PREFIX_PATH"]
387 buildDir = os.path.dirname(buildDir.split(
":")[0])
391 cmakeCachePath = os.path.join(self.
_buildDir(),
'CMakeCache.txt')
393 if not os.path.exists(cmakeCachePath):
395 with open(cmakeCachePath,
'r')
as cmakeCache:
396 for line
in cmakeCache:
397 if '_SOURCE_DIR:STATIC=' in line:
398 sourceDir = line.split(
'=')[1].strip()
404 isCPRunDefault = self.args.exec.startswith(
'-')
or self.args.exec.startswith(
'CPRun.py')
406 'input_list':
'in.txt',
407 'merge_output_files':
True,
409 if not isCPRunDefault:
410 if self.
_isFirstRun: logCPGridRun.warning(
"Non-CPRun.py is detected, please ensure the exec string is formatted correctly. Exec string will not be automatically formatted.")
411 return f
'"{self.args.exec}"'
415 runscriptArgs, unknownArgs = self.
_runscript.parser.parse_known_args(self.args.exec.split(
' '))
418 unknown_flags = [arg
for arg
in unknownArgs
if arg.startswith(
'--')]
420 logCPGridRun.error(f
"Unknown flags detected in the exec string: {unknown_flags}. Please check the exec string.")
421 raise ValueError(f
"Unknown arguments detected: {unknown_flags}")
424 for key, value
in formatingClause.items():
425 if hasattr(runscriptArgs, key):
426 old_value = getattr(runscriptArgs, key)
427 if old_value
is None or old_value == self.
_runscript.parser.get_default(key):
428 setattr(runscriptArgs, key, value)
429 if self.
_isFirstRun: logCPGridRun.info(f
"Setting '{key}' to '{value}' (CPRun.py default is: '{old_value}')")
431 if self.
_isFirstRun: logCPGridRun.warning(f
"Preserving user-defined '{key}': '{old_value}', default formatting '{value}' will not be applied.")
433 logCPGridRun.error(f
"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
434 raise ValueError(f
"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
437 arg_string =
' '.
join(
438 f
'--{k.replace("_", "-")}' if isinstance(v, bool)
and v
else
439 f
'--{k.replace("_", "-")} {v}' for k, v
in vars(runscriptArgs).
items()
if v
not in [
None,
False]
441 return f
'"CPRun.py {arg_string}"'
444 from AnalysisAlgorithmsConfig.CPBaseRunner
import CPBaseRunner
445 if not hasattr(runscriptArgs,
'text_config'):
446 self.
_errorCollector[
'no yaml'] =
"No YAML configuration file is specified in the exec string. Please provide one using --text-config"
448 yamlPath = getattr(runscriptArgs,
'text_config')
449 haveLocalYaml = CPBaseRunner.findLocalPathYamlConfig(yamlPath)
451 logCPGridRun.warning(
"A path to a local YAML configuration file is found, but it may not be grid-usable.")
453 repoYamls = CPBaseRunner.findRepoPathYamlConfig(yamlPath)
454 if repoYamls
and len(repoYamls) > 1:
455 self.
_errorCollector[
'ambiguous yamls'] = f
'Multiple files named \"{yamlPath}\" found in the analysis repository. Please provide a more specific path to the config file.\nMatches found:\n' +
'\n'.
join(repoYamls)
457 elif repoYamls
and len(repoYamls) == 1:
458 logCPGridRun.info(f
"Found a grid-usable YAML configuration file in the analysis repository: {repoYamls[0]}")
462 self.
_errorCollector[
'no usable yaml'] = f
"Grid usable YAML configuration file not found: {yamlPath}"
464 self.
_errorCollector[
'have local yaml'] = f
"Only a local YAML configuration file is found: {yamlPath}, not usable in the grid.\n" \
465 f
"Make sure the YAML file is in build/x86_64-el9-gcc14-opt/data/package_name/config.yaml. You can install the YAML file through CMakeList.txt with `atlas_install_data( data/* )`; use `-t package_name/config.yaml` in the --exec"
468 outputs = [f
'{output.split(".")[0]}:{output}' for output
in self.args.output_files]
469 return ','.
join(outputs)
473 prun_path = shutil.which(
"prun")
474 if prun_path
is None:
476 "The 'prun' command is not found. If you are on lxplus, please run the following commands:\n\n"
479 "voms-proxy-init -voms atlas\n"
481 "Make sure you have a valid certificate."
489 process = subprocess.Popen(cmd, shell=
True, stdout=sys.stdout, stderr=sys.stderr)
490 process.communicate()
494 if name.startswith(
'mc')
or name.startswith(
'data'):
496 logCPGridRun.warning(
"Name is not in the Atlas production format, assuming it is a user production")
502 The custom name has many variations, but most of them follow user/group.username.datasetname.suffix
505 parts = filename.split(
'.')
506 result[
'userType'] = parts[0]
507 result[
'username'] = parts[1]
508 result[
'main'] = parts[2]
509 result[
'suffix'] = parts[-1]
515 Parsing file name into a dictionary, an example is given here
516 mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855/DAOD_PHYS.34865530._000740.pool.root.1
518 datasetName: mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
519 projectName: mc20_13TeV
523 main: PhPy8EG_A14_ttbar_hdamp258p75_nonallhad
524 TODO generator: PhPy8Eg
525 TODO tune: A14 # For Pythia8
527 TODO hdamp: 258p75 # For Powheg
528 TODO decayType: nonallhad
531 tags: e###_s###_r###_p###_a###_t###_b#
532 etag: e6337 # EVNT (EVGEN) production and merging
533 stag: s3681 # Geant4 simulation to produce HITS and merging!
534 rtag: r13167 # Digitisation and reconstruction, as well as AOD merging
535 ptag: p5855 # Production of NTUP_PILEUP format and merging
536 atag: aXXX: atlfast configuration (both simulation and digit/recon)
537 ttag: tXXX: tag production configuration
538 btag: bXXX: bytestream production configuration
551 datasetPart, filePart = filename.split(
'/')
553 datasetPart = filename
557 datasetParts = datasetPart.split(
'.')
558 result[
'datasetName'] = datasetPart
560 result[
'projectName'] = datasetParts[0]
562 campaign_energy = result[
'projectName'].
split(
'_')
563 result[
'campaign'] = campaign_energy[0]
564 result[
'energy'] = campaign_energy[1]
567 result[
'DSID'] = datasetParts[1]
568 result[
'main'] = datasetParts[2]
569 result[
'step'] = datasetParts[3]
570 result[
'format'] = datasetParts[4]
573 tags = datasetParts[5].
split(
'_')
574 result[
'tags'] = tags
576 if tag.startswith(
'e'):
578 elif tag.startswith(
's'):
580 elif tag.startswith(
'r'):
582 elif tag.startswith(
'p'):
584 elif tag.startswith(
'a'):
586 elif tag.startswith(
't'):
588 elif tag.startswith(
'b'):
593 fileParts = filePart.split(
'.')
594 result[
'jediTaskID'] = fileParts[1]
595 result[
'fileNumber'] = fileParts[2]
596 result[
'version'] = fileParts[-1]
602 with open(path,
'r')
as inputText:
603 for line
in inputText.readlines():
605 if line.startswith(
'#')
or not line.strip():
607 files += line.split(
',')
609 files = [file.strip()
for file
in files]
614 logCPGridRun.error(
"Errors were collected during the script execution:")
617 logCPGridRun.error(f
"{key}: {value}")
618 logCPGridRun.error(
"Please fix the errors and try again.")
622 if self.args.noSubmit:
625 if self.args.checkInputDS:
629 if self.args.noSubmit:
631 if self.args.agreeAll:
632 logCPGridRun.info(
"You have agreed to all the submission details. Jobs will be submitted without confirmation.")
635 answer = input(
"Please confirm ALL the submission details are correct before submitting [y/n]: ")
636 if answer.lower() ==
'y':
638 elif answer.lower() ==
'n':
639 logCPGridRun.info(
"Feel free to report any unexpected behavior to the CPAlgorithms team!")
641 logCPGridRun.error(
"Invalid input. Please enter 'y' or 'n'. Jobs are not submitted.")
643 if __name__ ==
'__main__':
645 cpgrid.configureSumbission()
646 cpgrid.printInputDetails()
647 cpgrid.checkExternalTools()
648 cpgrid.printDelayedErrorCollection()
649 cpgrid.askSubmission()