4from AnaAlgorithm.DualUseConfig
import isAthena
5from AnaAlgorithm.Logging
import logging
10logCPGridRun = logging.getLogger(
'CPGridRun')
32 from AnalysisAlgorithmsConfig.AthenaCPRunScript
import AthenaCPRunScript
35 from AnalysisAlgorithmsConfig.EventLoopCPRunScript
import EventLoopCPRunScript
40 parser = argparse.ArgumentParser(description=
'CPGrid runscript to submit CPRun.py jobs to the grid. '
41 'This script will submit a job to the grid using files in the input text one by one.'
42 'CPRun.py can handle multiple sources of input and create one output; but not this script',
44 formatter_class=argparse.RawTextHelpFormatter)
45 parser.add_argument(
'-h',
'--help', dest=
'help', action=
'store_true', help=
'Show this help message and continue')
47 ioGroup = parser.add_argument_group(
'Input/Output file configuration')
48 ioGroup.add_argument(
'-i',
'--input-list', dest=
'input_list', help=
'Path to the text file containing list of containers on the panda grid. Each container will be passed to prun as --inDS and is run individually')
49 ioGroup.add_argument(
'--output-files', dest=
'output_files', nargs=
'+', default=[
'output.root'],
50 help=
'The output files of the grid job. Example: --output-files A.root B.txt B.root results in A/A.root, B/B.txt, B/B.root in the output directory. No need to specify if using CPRun.py')
51 ioGroup.add_argument(
'--destSE', dest=
'destSE', default=
'', type=str, help=
'Destination storage element (PanDA)')
52 ioGroup.add_argument(
'--mergeType', dest=
'mergeType', default=
'Default', type=str, help=
'Output merging type, [None, Default, xAOD]')
54 pandaGroup = parser.add_argument_group(
'Input/Output naming configuration')
55 pandaGroup.add_argument(
'--gridUsername', dest=
'gridUsername', default=os.getenv(
'USER',
''), type=str, help=
'Grid username, or the groupname. Default is the current user. Only affect file naming')
56 pandaGroup.add_argument(
'--prefix', dest=
'prefix', default=
'', type=str, help=
'Prefix for the output directory. Dynamically set with input container if not provided')
57 pandaGroup.add_argument(
'--suffix', dest=
'suffix', default=
'',type=str, help=
'Suffix for the output directory')
58 pandaGroup.add_argument(
'--outDS', dest=
'outDS', default=
'', type=str,
59 help=
'Name of an output dataset. outDS will contain all output files (PanDA). If not provided, support dynamic naming if input name is in the Atlas production format or typical user production format')
61 cpgridGroup = parser.add_argument_group(
'CPGrid configuration')
62 cpgridGroup.add_argument(
'--groupProduction', dest=
'groupProduction', action=
'store_true', help=
'Only use for official production')
64 cpgridGroup.add_argument(
'--exec', dest=
'exec', type=str,
65 help=
'Executable line for the CPRun.py or custom script to run on the grid encapsulated in a double quote (PanDA)\n'
66 'Run CPRun.py with preset behavior including streamlined file i/o. E.g, "CPRun.py -t config.yaml --no-systematics".\n'
67 'Run custom script: "customRun.py -i inputs -o output --text-config config.yaml --flagA --flagB"\n'
70 submissionGroup = parser.add_argument_group(
'Submission configuration')
71 submissionGroup.add_argument(
'-y',
'--agreeAll', dest=
'agreeAll', action=
'store_true', help=
'Agree to all the submission details without asking for confirmation. Use with caution!')
72 submissionGroup.add_argument(
'--noSubmit', dest=
'noSubmit', action=
'store_true', help=
'Do not submit the job to the grid (PanDA). Useful to inspect the prun command')
73 submissionGroup.add_argument(
'--testRun', dest=
'testRun', action=
'store_true', help=
'Will submit job to the grid but greatly limit the number of files per job (10) and number of events (300)')
74 submissionGroup.add_argument(
'--checkInputDS', dest=
'checkInputDS', action=
'store_true', help=
'Check if the input datasets are available on the AMI.')
75 submissionGroup.add_argument(
'--recreateTar', dest=
'recreateTar', action=
'store_true', help=
'Re-compress the source code. Source code are compressed by default in submission, this is useful when the source code is updated')
76 submissionGroup.add_argument(
'--useCentralPackage', dest=
'useCentralPackage', action=
'store_true', help=
'Use central package instead of custom packages')
83 converting unknown args to a dictionary
86 if unknownArgsDict
and self.
hasPrun():
88 logCPGridRun.info(f
"Adding prun exclusive arguments: {unknownArgsDict.keys()}")
90 logCPGridRun.warning(f
"Unknown arguments detected: {unknownArgsDict}. Cannot check the availablility in Prun because Prun is not available / noSubmit is on.")
93 return unknownArgsDict
97 if not self.
args.input_list:
98 raise ValueError(
'No input list provided, use --input-list to specify the input containers')
100 if self.
args.input_list.endswith(
'.txt'):
101 self.
_inputList = CPGridRun._parseInputFileList(self.
args.input_list)
102 elif self.
args.input_list.endswith(
'.json'):
103 raise NotImplementedError(
'JSON input list parsing is not implemented')
104 elif CPGridRun.isAtlasProductionFormat(self.
args.input_list):
108 'use --input-list to specify input containers')
113 for output
in self.
args.output_files:
115 output_files.extend(output.split(
','))
117 output_files.append(output)
122 logCPGridRun.info(
"\033[92m\n If you are using CPRun.py, the following flags are for the CPRun.py in this framework\033[0m")
123 self.
_runscript.parser.usage = argparse.SUPPRESS
134 self.
cmd[input] = cmd
141 'cmtConfig': os.environ[
"CMTCONFIG"],
142 'writeInputToTxt':
'IN:in.txt',
146 'addNthFieldOfInDSToLFN':
'2,3,6',
148 if self.
args.noSubmit:
149 config[
'noSubmit'] =
True
151 if self.
args.mergeType ==
'xAOD':
152 config[
'mergeScript'] =
'xAODMerge %OUT `echo %IN | sed \'s/,/ /g\'`'
154 if self.
args.mergeType !=
'None':
155 config[
'mergeOutput'] =
True
158 if self.
args.useCentralPackage:
160 config[
'noBuild'] =
True
161 config[
'noCompile'] =
True
162 config[
'athenaTag'] = f
"AnalysisBase,{os.environ['AnalysisBase_VERSION']}"
164 config[
'outTarBall'] = self.
_tarfile
165 config[
'useAthenaPackages'] =
True
169 config[
'useAthenaPackages'] =
True
171 if self.
args.groupProduction:
172 config[
'official'] =
True
173 config[
'voms'] = f
'atlas:/atlas/{self.args.gridUsername}/Role=production'
176 config[
'destSE'] = self.
args.destSE
178 if self.
args.testRun:
179 config[
'nEventsPerFile'] = 100
183 for k, v
in config.items():
184 if isinstance(v, bool)
and v:
186 elif v
is not None and v !=
'':
187 cmd += f
'--{k} {v} \\\n'
188 return cmd.rstrip(
' \\\n')
192 Cleans the unknown args by removing leading dashes and ensuring they are in key-value pairs
194 unknown_args_dict = {}
202 unknown_args_dict[self.
unknown_args[idx].lstrip(
'-')] =
True
204 return unknown_args_dict
208 check the arguments against the prun script to ensure they are valid
209 See https://github.com/PanDAWMS/panda-client/blob/master/pandaclient/PrunScript.py
211 import pandaclient.PrunScript
213 original_argv = sys.argv
216 prunArgsDict = pandaclient.PrunScript.main(get_options=
True)
217 sys.argv = original_argv
218 nonPrunOrCPGridArgs = []
220 if arg
not in prunArgsDict:
221 nonPrunOrCPGridArgs.append(arg)
222 if nonPrunOrCPGridArgs:
223 logCPGridRun.error(f
"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
224 raise ValueError(f
"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
227 for key, cmd
in self.
cmd.items():
228 parsed_name = CPGridRun.atlasProductionNameParser(key)
229 logCPGridRun.info(
"\n"
231 "\n".join([f
" {k.replace('_', ' ').title()}: {v}" for k, v
in parsed_name.items()]))
232 logCPGridRun.info(f
"Command: \n{cmd}")
240 import pyAMI.atlas.api
241 except ModuleNotFoundError:
243 "Cannot import pyAMI, please run the following commands:\n\n"
246 "voms-proxy-init -voms atlas\n"
248 "and make sure you have a valid certificate.")
256 client = pyAMI.client.Client(
'atlas')
257 pyAMI.atlas.api.init()
261 results = pyAMI.atlas.api.list_datasets(client, patterns=queries)
262 except pyAMI.exception.Error:
264 "Cannot query AMI, please run 'voms-proxy-init -voms atlas' and ensure your certificate is valid.")
271 Helper function to prepare a list of queries for the AMI based on the input list.
272 It will replace the _p### with _p% to match the latest ptag.
275 regex = re.compile(
"_p[0-9]+")
278 for datasetName
in self.
cmd:
279 parsed = CPGridRun.atlasProductionNameParser(datasetName)
280 datasetPtag[datasetName] = parsed.get(
'ptag')
281 queries.append(regex.sub(
"_p%", datasetName))
282 return queries, datasetPtag
286 regex = re.compile(
"_p[0-9]+")
287 results = [r[
'ldn']
for r
in results]
291 for datasetName
in self.
cmd:
292 if datasetName
not in results:
293 notFound.append(datasetName)
295 base = regex.sub(
"_p%", datasetName)
296 matching = [r
for r
in results
if r.startswith(base.replace(
"_p%",
""))]
298 mParsed = CPGridRun.atlasProductionNameParser(m)
300 mPtagInt = int(mParsed.get(
'ptag',
'p0')[1:])
301 currentPtagInt = int(datasetPtag.get(datasetName,
'p0')[1:])
302 if mPtagInt > currentPtagInt:
303 latestPtag[datasetName] = f
"p{mPtagInt}"
304 except (ValueError, TypeError):
308 logCPGridRun.info(
"Newer version of datasets found in AMI:")
309 for name, ptag
in latestPtag.items():
310 logCPGridRun.info(f
"{name} -> ptag: {ptag}")
313 logCPGridRun.error(
"Some input datasets are not available in AMI, missing datasets are likely to fail on the grid:")
314 logCPGridRun.error(
", ".join(notFound))
326 if CPGridRun.isAtlasProductionFormat(name):
333 {group/user}.{username}.{prefix}.{DSID}.{format}.{tags}.{suffix}
335 nameParser = CPGridRun.atlasProductionNameParser(name)
336 base =
'group' if self.
args.groupProduction
else 'user'
337 username = self.
args.gridUsername
338 dsid = nameParser[
'DSID']
339 tags =
'_'.join(nameParser[
'tags'])
340 fileFormat = nameParser[
'format']
341 base =
'group' if self.
args.groupProduction
else 'user'
342 prefix = self.
args.prefix
if self.
args.prefix
else nameParser[
'main'].
split(
'_')[0]
345 result = [base, username, prefix, dsid, fileFormat, tags, suffix]
346 return ".".join(filter(
None, result))
350 {group/user}.{username}.{main}.outputDS.{suffix}
352 parts = name.split(
'.')
353 base =
'group' if self.
args.groupProduction
else 'user'
354 username = self.
args.gridUsername
356 outputDS =
'outputDS'
359 result = [base, username,main, outputDS, suffix]
360 return ".".join(filter(
None, result))
364 return self.
args.suffix
365 if self.
args.testRun:
367 return f
"test_{uuid.uuid4().hex[:6]}"
372 tarball_mtime = os.path.getmtime(self.
_tarfile)
if os.path.exists(self.
_tarfile)
else 0
377 for root, _, files
in os.walk(buildDir):
379 file_path = os.path.join(root, file)
381 if os.path.getmtime(file_path) > tarball_mtime:
382 logCPGridRun.info(f
"File {file_path} is newer than the tarball.")
384 except FileNotFoundError:
388 if sourceDir
is None:
389 logCPGridRun.warning(
"Source directory is not detected, auto-compression is not performed. Use --recreateTar to update the submission")
391 for root, _, files
in os.walk(sourceDir):
393 file_path = os.path.join(root, file)
395 if os.path.getmtime(file_path) > tarball_mtime:
396 logCPGridRun.info(f
"File {file_path} is newer than the tarball.")
398 except FileNotFoundError:
403 buildDir = os.environ[
"CMAKE_PREFIX_PATH"]
404 buildDir = os.path.dirname(buildDir.split(
":")[0])
408 cmakeCachePath = os.path.join(self.
_buildDir(),
'CMakeCache.txt')
410 if not os.path.exists(cmakeCachePath):
412 with open(cmakeCachePath,
'r')
as cmakeCache:
413 for line
in cmakeCache:
414 if '_SOURCE_DIR:STATIC=' in line:
415 sourceDir = line.split(
'=')[1].
strip()
420 if not self.
args.exec:
421 raise ValueError(
'No exec command provided, use --exec to specify the command to run on the grid')
424 isCPRunDefault = self.
args.exec.startswith(
'-')
or self.
args.exec.startswith(
'CPRun.py')
426 'input_list':
'in.txt',
427 'merge_output_files': len(self.
args.output_files) == 1,
429 if not isCPRunDefault:
430 if self.
_isFirstRun: logCPGridRun.warning(
"Non-CPRun.py is detected, please ensure the exec string is formatted correctly. Exec string will not be automatically formatted.")
431 return f
'"{self.args.exec}"'
435 runscriptArgs, unknownArgs = self.
_runscript.parser.parse_known_args(self.
args.exec.split(
' '))
438 unknown_flags = [arg
for arg
in unknownArgs
if arg.startswith(
'--')]
440 logCPGridRun.error(f
"Unknown flags detected in the exec string: {unknown_flags}. Please check the exec string.")
441 raise ValueError(f
"Unknown arguments detected: {unknown_flags}")
444 for key, value
in formatingClause.items():
445 if hasattr(runscriptArgs, key):
446 old_value = getattr(runscriptArgs, key)
447 if old_value
is None or old_value == self.
_runscript.parser.get_default(key):
448 setattr(runscriptArgs, key, value)
449 if self.
_isFirstRun: logCPGridRun.info(f
"Setting '{key}' to '{value}' (CPRun.py default is: '{old_value}')")
451 if self.
_isFirstRun: logCPGridRun.warning(f
"Preserving user-defined '{key}': '{old_value}', default formatting '{value}' will not be applied.")
453 logCPGridRun.error(f
"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
454 raise ValueError(f
"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
457 arg_string =
' '.join(
458 f
'--{k.replace("_", "-")}' if isinstance(v, bool)
and v
else
459 f
'--{k.replace("_", "-")} {v}' for k, v
in vars(runscriptArgs).items()
if v
not in [
None,
False]
461 return f
'"CPRun.py {arg_string}"'
464 from AnalysisAlgorithmsConfig.CPBaseRunner
import CPBaseRunner
465 if not hasattr(runscriptArgs,
'text_config'):
466 self.
_errorCollector[
'no yaml'] =
"No YAML configuration file is specified in the exec string. Please provide one using --text-config"
468 yamlPath = getattr(runscriptArgs,
'text_config')
470 haveLocalYaml = CPBaseRunner.findLocalPathYamlConfig(yamlPath)
472 logCPGridRun.warning(
"A path to a local YAML configuration file is found, but it may not be grid-usable.")
474 repoYamls, _ = CPBaseRunner.findRepoPathYamlConfig(yamlPath)
475 if repoYamls
and len(repoYamls) > 1:
476 self.
_errorCollector[
'ambiguous yamls'] = f
'Multiple files named \"{yamlPath}\" found in the analysis repository. Please provide a more specific path to the config file.\nMatches found:\n' +
'\n'.join(repoYamls)
478 elif repoYamls
and len(repoYamls) == 1:
479 logCPGridRun.info(f
"Found a grid-usable YAML configuration file in the analysis repository: {repoYamls[0]}")
482 if haveLocalYaml
and self.
args.useCentralPackage:
483 logCPGridRun.warning(
"A path to a local YAML configuration file is found, no custom packages are found, proceed with /cvmfs packages only.")
485 if not repoYamls
and not self.
args.useCentralPackage:
486 self.
_errorCollector[
'no usable yaml'] = f
"Grid usable YAML configuration file not found: {yamlPath}"
488 self.
_errorCollector[
'have local yaml'] = f
"Only a local YAML configuration file is found: {yamlPath}, not usable in the grid.\n" \
489 f
"Make sure the YAML file is in build/x86_64-el9-gcc14-opt/data/package_name/config.yaml. You can install the YAML file through CMakeList.txt with `atlas_install_data( data/* )`; use `-t package_name/config.yaml` in the --exec\n"\
490 f
"Or if you are only using central packages, please use the `--useCentralPackage` flag."
493 outputs = [f
'{output.split(".")[0]}:{output}' if ":" not in output
else output
for output
in self.
args.output_files]
494 return ','.join(outputs)
498 prun_path = shutil.which(
"prun")
499 if prun_path
is None:
501 "The 'prun' command is not found. If you are on lxplus, please run the following commands:\n\n"
504 "voms-proxy-init -voms atlas\n"
506 "Make sure you have a valid certificate."
513 for key, cmd
in self.
cmd.items():
514 process = subprocess.Popen(cmd, shell=
True, stdout=sys.stdout, stderr=sys.stderr)
515 process.communicate()
520 name = name.split(
":")[1]
522 if name.startswith(
'mc')
or name.startswith(
'data'):
525 logCPGridRun.warning(
"Name is not in the Atlas production format, assuming it is a user production")
531 The custom name has many variations, but most of them follow user/group.username.datasetname.suffix
534 parts = filename.split(
'.')
535 result[
'userType'] = parts[0]
536 result[
'username'] = parts[1]
537 result[
'main'] = parts[2]
538 result[
'suffix'] = parts[-1]
544 Parsing file name into a dictionary, an example is given here
545 mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855/DAOD_PHYS.34865530._000740.pool.root.1
547 datasetName: mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
548 projectName: mc20_13TeV
552 main: PhPy8EG_A14_ttbar_hdamp258p75_nonallhad
553 TODO generator: PhPy8Eg
554 TODO tune: A14 # For Pythia8
556 TODO hdamp: 258p75 # For Powheg
557 TODO decayType: nonallhad
560 tags: e###_s###_r###_p###_a###_t###_b#
561 etag: e6337 # EVNT (EVGEN) production and merging
562 stag: s3681 # Geant4 simulation to produce HITS and merging!
563 rtag: r13167 # Digitisation and reconstruction, as well as AOD merging
564 ptag: p5855 # Production of NTUP_PILEUP format and merging
565 atag: aXXX: atlfast configuration (both simulation and digit/recon)
566 ttag: tXXX: tag production configuration
567 btag: bXXX: bytestream production configuration
580 datasetPart, filePart = filename.split(
'/')
582 datasetPart = filename
586 if ':' in datasetPart:
587 datasetPart = datasetPart.split(
':')[1]
590 if datasetPart.startswith(
'user')
or datasetPart.startswith(
'group'):
591 result[
'datasetName'] = datasetPart
595 datasetParts = datasetPart.split(
'.')
596 result[
'datasetName'] = datasetPart
598 result[
'projectName'] = datasetParts[0]
600 campaign_energy = result[
'projectName'].
split(
'_')
601 result[
'campaign'] = campaign_energy[0]
602 result[
'energy'] = campaign_energy[1]
605 result[
'DSID'] = datasetParts[1]
606 result[
'main'] = datasetParts[2]
607 result[
'step'] = datasetParts[3]
608 result[
'format'] = datasetParts[4]
611 tags = datasetParts[5].
split(
'_')
612 result[
'tags'] = tags
614 if tag.startswith(
'e'):
616 elif tag.startswith(
's'):
618 elif tag.startswith(
'r'):
620 elif tag.startswith(
'p'):
622 elif tag.startswith(
'a'):
624 elif tag.startswith(
't'):
626 elif tag.startswith(
'b'):
631 fileParts = filePart.split(
'.')
632 result[
'jediTaskID'] = fileParts[1]
633 result[
'fileNumber'] = fileParts[2]
634 result[
'version'] = fileParts[-1]
640 with open(path,
'r')
as inputText:
641 for line
in inputText.readlines():
643 if line.startswith(
'#')
or not line.strip():
645 files += line.split(
',')
647 files = [file.strip()
for file
in files]
652 logCPGridRun.error(
"Errors were collected during the script execution:")
655 logCPGridRun.error(f
"{key}: {value}")
656 logCPGridRun.error(
"Please fix the errors and try again.")
661 if self.
args.checkInputDS:
665 if self.
args.agreeAll:
666 logCPGridRun.info(
"You have agreed to all the submission details. Jobs will be submitted without confirmation.")
669 answer = input(
"Please confirm ALL the submission details are correct before submitting [y/n]: ")
670 if answer.lower() ==
'y':
672 elif answer.lower() ==
'n':
673 logCPGridRun.info(
"Feel free to report any unexpected behavior to the CPAlgorithms team!")
675 logCPGridRun.error(
"Invalid input. Please enter 'y' or 'n'. Jobs are not submitted.")
677if __name__ ==
'__main__':
679 cpgrid.configureSubmission()
680 cpgrid.printInputDetails()
681 cpgrid.checkExternalTools()
682 cpgrid.printDelayedErrorCollection()
683 cpgrid.askSubmission()
void print(char *figname, TCanvas *c1)
outputDSFormatter(self, name)
dict _createPrunArgsDict(self)
rucioCustomNameParser(filename)
_checkYamlExists(self, runscriptArgs)
_parseGridArguments(self)
bool checkInputInPyami(self)
bool _analyzeAmiResults(self, results, datasetPtag)
isAtlasProductionFormat(name)
_customOutputDSFormatter(self, name)
_prepareAmiQueryFromInputList(self)
_parseInputFileList(path)
_filesChangedOrTarballNotCreated(self)
_hasCompressedTarball(self)
atlasProductionNameParser(filename)
_outputDSFormatter(self, name)
_checkPrunArgs(self, argDict)
configureSubmission(self)
printDelayedErrorCollection(self)
dict _unknownArgsDict(self)
configureSubmissionSingleSample(self, input)
std::vector< std::string > split(const std::string &s, const std::string &t=":")