4from AnaAlgorithm.DualUseConfig
import isAthena
5from AnaAlgorithm.Logging
import logging
10logCPGridRun = logging.getLogger(
'CPGridRun')
32 from AnalysisAlgorithmsConfig.AthenaCPRunScript
import AthenaCPRunScript
35 from AnalysisAlgorithmsConfig.EventLoopCPRunScript
import EventLoopCPRunScript
40 parser = argparse.ArgumentParser(description=
'CPGrid runscript to submit CPRun.py jobs to the grid. '
41 'This script will submit a job to the grid using files in the input text one by one.'
42 'CPRun.py can handle multiple sources of input and create one output; but not this script',
44 formatter_class=argparse.RawTextHelpFormatter)
45 parser.add_argument(
'-h',
'--help', dest=
'help', action=
'store_true', help=
'Show this help message and continue')
47 ioGroup = parser.add_argument_group(
'Input/Output file configuration')
48 ioGroup.add_argument(
'-i',
'--input-list', dest=
'input_list', help=
'Path to the text file containing list of containers on the panda grid. Each container will be passed to prun as --inDS and is run individually')
49 ioGroup.add_argument(
'--output-files', dest=
'output_files', nargs=
'+', default=[
'output.root'],
50 help=
'The output files of the grid job. Example: --output-files A.root B.txt B.root results in A/A.root, B/B.txt, B/B.root in the output directory. No need to specify if using CPRun.py')
51 ioGroup.add_argument(
'--destSE', dest=
'destSE', default=
'', type=str, help=
'Destination storage element (PanDA)')
52 ioGroup.add_argument(
'--mergeType', dest=
'mergeType', default=
'Default', type=str, help=
'Output merging type, [None, Default, xAOD]')
54 pandaGroup = parser.add_argument_group(
'Input/Output naming configuration')
55 pandaGroup.add_argument(
'--gridUsername', dest=
'gridUsername', default=os.getenv(
'USER',
''), type=str, help=
'Grid username, or the groupname. Default is the current user. Only affect file naming')
56 pandaGroup.add_argument(
'--prefix', dest=
'prefix', default=
'', type=str, help=
'Prefix for the output directory. Dynamically set with input container if not provided')
57 pandaGroup.add_argument(
'--suffix', dest=
'suffix', default=
'',type=str, help=
'Suffix for the output directory')
58 pandaGroup.add_argument(
'--outDS', dest=
'outDS', default=
'', type=str,
59 help=
'Name of an output dataset. outDS will contain all output files (PanDA). If not provided, support dynamic naming if input name is in the Atlas production format or typical user production format')
61 cpgridGroup = parser.add_argument_group(
'CPGrid configuration')
62 cpgridGroup.add_argument(
'--groupProduction', dest=
'groupProduction', action=
'store_true', help=
'Only use for official production')
64 cpgridGroup.add_argument(
'--exec', dest=
'exec', type=str,
65 help=
'Executable line for the CPRun.py or custom script to run on the grid encapsulated in a double quote (PanDA)\n'
66 'Run CPRun.py with preset behavior including streamlined file i/o. E.g, "CPRun.py -t config.yaml --no-systematics".\n'
67 'Run custom script: "customRun.py -i inputs -o output --text-config config.yaml --flagA --flagB"\n'
70 submissionGroup = parser.add_argument_group(
'Submission configuration')
71 submissionGroup.add_argument(
'-y',
'--agreeAll', dest=
'agreeAll', action=
'store_true', help=
'Agree to all the submission details without asking for confirmation. Use with caution!')
72 submissionGroup.add_argument(
'--noSubmit', dest=
'noSubmit', action=
'store_true', help=
'Do not submit the job to the grid (PanDA). Useful to inspect the prun command')
73 submissionGroup.add_argument(
'--testRun', dest=
'testRun', action=
'store_true', help=
'Will submit job to the grid but greatly limit the number of files per job (10) and number of events (300)')
74 submissionGroup.add_argument(
'--checkInputDS', dest=
'checkInputDS', action=
'store_true', help=
'Check if the input datasets are available on the AMI.')
75 submissionGroup.add_argument(
'--recreateTar', dest=
'recreateTar', action=
'store_true', help=
'Re-compress the source code. Source code are compressed by default in submission, this is useful when the source code is updated')
76 submissionGroup.add_argument(
'--useCentralPackage', dest=
'useCentralPackage', action=
'store_true', help=
'Use central package instead of custom packages')
83 converting unknown args to a dictionary
86 if unknownArgsDict
and self.
hasPrun():
88 logCPGridRun.info(f
"Adding prun exclusive arguments: {unknownArgsDict.keys()}")
90 logCPGridRun.warning(f
"Unknown arguments detected: {unknownArgsDict}. Cannot check the availablility in Prun because Prun is not available / noSubmit is on.")
93 return unknownArgsDict
98 if self.
args.input_list.endswith(
'.txt'):
99 self.
_inputList = CPGridRun._parseInputFileList(self.
args.input_list)
100 elif self.
args.input_list.endswith(
'.json'):
101 raise NotImplementedError(
'JSON input list parsing is not implemented')
102 elif CPGridRun.isAtlasProductionFormat(self.
args.input_list):
106 'use --input-list to specify input containers')
111 for output
in self.
args.output_files:
113 output_files.extend(output.split(
','))
115 output_files.append(output)
120 logCPGridRun.info(
"\033[92m\n If you are using CPRun.py, the following flags are for the CPRun.py in this framework\033[0m")
121 self.
_runscript.parser.usage = argparse.SUPPRESS
132 self.
cmd[input] = cmd
139 'cmtConfig': os.environ[
"CMTCONFIG"],
140 'writeInputToTxt':
'IN:in.txt',
144 'addNthFieldOfInDSToLFN':
'2,3,6',
146 if self.
args.noSubmit:
147 config[
'noSubmit'] =
True
149 if self.
args.mergeType ==
'xAOD':
150 config[
'mergeScript'] =
'xAODMerge %OUT `echo %IN | sed \'s/,/ /g\'`'
152 if self.
args.mergeType !=
'None':
153 config[
'mergeOutput'] =
True
156 if self.
args.useCentralPackage:
158 config[
'noBuild'] =
True
159 config[
'noCompile'] =
True
160 config[
'athenaTag'] = f
"AnalysisBase,{os.environ['AnalysisBase_VERSION']}"
162 config[
'outTarBall'] = self.
_tarfile
163 config[
'useAthenaPackages'] =
True
167 config[
'useAthenaPackages'] =
True
169 if self.
args.groupProduction:
170 config[
'official'] =
True
171 config[
'voms'] = f
'atlas:/atlas/{self.args.gridUsername}/Role=production'
174 config[
'destSE'] = self.
args.destSE
176 if self.
args.testRun:
177 config[
'nEventsPerFile'] = 100
181 for k, v
in config.items():
182 if isinstance(v, bool)
and v:
184 elif v
is not None and v !=
'':
185 cmd += f
'--{k} {v} \\\n'
186 return cmd.rstrip(
' \\\n')
190 Cleans the unknown args by removing leading dashes and ensuring they are in key-value pairs
192 unknown_args_dict = {}
200 unknown_args_dict[self.
unknown_args[idx].lstrip(
'-')] =
True
202 return unknown_args_dict
206 check the arguments against the prun script to ensure they are valid
207 See https://github.com/PanDAWMS/panda-client/blob/master/pandaclient/PrunScript.py
209 import pandaclient.PrunScript
211 original_argv = sys.argv
214 prunArgsDict = pandaclient.PrunScript.main(get_options=
True)
215 sys.argv = original_argv
216 nonPrunOrCPGridArgs = []
218 if arg
not in prunArgsDict:
219 nonPrunOrCPGridArgs.append(arg)
220 if nonPrunOrCPGridArgs:
221 logCPGridRun.error(f
"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
222 raise ValueError(f
"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
225 for key, cmd
in self.
cmd.items():
226 parsed_name = CPGridRun.atlasProductionNameParser(key)
227 logCPGridRun.info(
"\n"
229 "\n".join([f
" {k.replace('_', ' ').title()}: {v}" for k, v
in parsed_name.items()]))
230 logCPGridRun.info(f
"Command: \n{cmd}")
238 import pyAMI.atlas.api
239 except ModuleNotFoundError:
241 "Cannot import pyAMI, please run the following commands:\n\n"
244 "voms-proxy-init -voms atlas\n"
246 "and make sure you have a valid certificate.")
254 client = pyAMI.client.Client(
'atlas')
255 pyAMI.atlas.api.init()
259 results = pyAMI.atlas.api.list_datasets(client, patterns=queries)
260 except pyAMI.exception.Error:
262 "Cannot query AMI, please run 'voms-proxy-init -voms atlas' and ensure your certificate is valid.")
269 Helper function to prepare a list of queries for the AMI based on the input list.
270 It will replace the _p### with _p% to match the latest ptag.
273 regex = re.compile(
"_p[0-9]+")
276 for datasetName
in self.
cmd:
277 parsed = CPGridRun.atlasProductionNameParser(datasetName)
278 datasetPtag[datasetName] = parsed.get(
'ptag')
279 queries.append(regex.sub(
"_p%", datasetName))
280 return queries, datasetPtag
284 regex = re.compile(
"_p[0-9]+")
285 results = [r[
'ldn']
for r
in results]
289 for datasetName
in self.
cmd:
290 if datasetName
not in results:
291 notFound.append(datasetName)
293 base = regex.sub(
"_p%", datasetName)
294 matching = [r
for r
in results
if r.startswith(base.replace(
"_p%",
""))]
296 mParsed = CPGridRun.atlasProductionNameParser(m)
298 mPtagInt = int(mParsed.get(
'ptag',
'p0')[1:])
299 currentPtagInt = int(datasetPtag.get(datasetName,
'p0')[1:])
300 if mPtagInt > currentPtagInt:
301 latestPtag[datasetName] = f
"p{mPtagInt}"
302 except (ValueError, TypeError):
306 logCPGridRun.info(
"Newer version of datasets found in AMI:")
307 for name, ptag
in latestPtag.items():
308 logCPGridRun.info(f
"{name} -> ptag: {ptag}")
311 logCPGridRun.error(
"Some input datasets are not available in AMI, missing datasets are likely to fail on the grid:")
312 logCPGridRun.error(
", ".join(notFound))
324 if CPGridRun.isAtlasProductionFormat(name):
331 {group/user}.{username}.{prefix}.{DSID}.{format}.{tags}.{suffix}
333 nameParser = CPGridRun.atlasProductionNameParser(name)
334 base =
'group' if self.
args.groupProduction
else 'user'
335 username = self.
args.gridUsername
336 dsid = nameParser[
'DSID']
337 tags =
'_'.join(nameParser[
'tags'])
338 fileFormat = nameParser[
'format']
339 base =
'group' if self.
args.groupProduction
else 'user'
340 prefix = self.
args.prefix
if self.
args.prefix
else nameParser[
'main'].
split(
'_')[0]
343 result = [base, username, prefix, dsid, fileFormat, tags, suffix]
344 return ".".join(filter(
None, result))
348 {group/user}.{username}.{main}.outputDS.{suffix}
350 parts = name.split(
'.')
351 base =
'group' if self.
args.groupProduction
else 'user'
352 username = self.
args.gridUsername
354 outputDS =
'outputDS'
357 result = [base, username,main, outputDS, suffix]
358 return ".".join(filter(
None, result))
362 return self.
args.suffix
363 if self.
args.testRun:
365 return f
"test_{uuid.uuid4().hex[:6]}"
370 tarball_mtime = os.path.getmtime(self.
_tarfile)
if os.path.exists(self.
_tarfile)
else 0
375 for root, _, files
in os.walk(buildDir):
377 file_path = os.path.join(root, file)
379 if os.path.getmtime(file_path) > tarball_mtime:
380 logCPGridRun.info(f
"File {file_path} is newer than the tarball.")
382 except FileNotFoundError:
386 if sourceDir
is None:
387 logCPGridRun.warning(
"Source directory is not detected, auto-compression is not performed. Use --recreateTar to update the submission")
389 for root, _, files
in os.walk(sourceDir):
391 file_path = os.path.join(root, file)
393 if os.path.getmtime(file_path) > tarball_mtime:
394 logCPGridRun.info(f
"File {file_path} is newer than the tarball.")
396 except FileNotFoundError:
401 buildDir = os.environ[
"CMAKE_PREFIX_PATH"]
402 buildDir = os.path.dirname(buildDir.split(
":")[0])
406 cmakeCachePath = os.path.join(self.
_buildDir(),
'CMakeCache.txt')
408 if not os.path.exists(cmakeCachePath):
410 with open(cmakeCachePath,
'r')
as cmakeCache:
411 for line
in cmakeCache:
412 if '_SOURCE_DIR:STATIC=' in line:
413 sourceDir = line.split(
'=')[1].
strip()
419 isCPRunDefault = self.
args.exec.startswith(
'-')
or self.
args.exec.startswith(
'CPRun.py')
421 'input_list':
'in.txt',
422 'merge_output_files':
True,
424 if not isCPRunDefault:
425 if self.
_isFirstRun: logCPGridRun.warning(
"Non-CPRun.py is detected, please ensure the exec string is formatted correctly. Exec string will not be automatically formatted.")
426 return f
'"{self.args.exec}"'
430 runscriptArgs, unknownArgs = self.
_runscript.parser.parse_known_args(self.
args.exec.split(
' '))
433 unknown_flags = [arg
for arg
in unknownArgs
if arg.startswith(
'--')]
435 logCPGridRun.error(f
"Unknown flags detected in the exec string: {unknown_flags}. Please check the exec string.")
436 raise ValueError(f
"Unknown arguments detected: {unknown_flags}")
439 for key, value
in formatingClause.items():
440 if hasattr(runscriptArgs, key):
441 old_value = getattr(runscriptArgs, key)
442 if old_value
is None or old_value == self.
_runscript.parser.get_default(key):
443 setattr(runscriptArgs, key, value)
444 if self.
_isFirstRun: logCPGridRun.info(f
"Setting '{key}' to '{value}' (CPRun.py default is: '{old_value}')")
446 if self.
_isFirstRun: logCPGridRun.warning(f
"Preserving user-defined '{key}': '{old_value}', default formatting '{value}' will not be applied.")
448 logCPGridRun.error(f
"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
449 raise ValueError(f
"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
452 arg_string =
' '.join(
453 f
'--{k.replace("_", "-")}' if isinstance(v, bool)
and v
else
454 f
'--{k.replace("_", "-")} {v}' for k, v
in vars(runscriptArgs).items()
if v
not in [
None,
False]
456 return f
'"CPRun.py {arg_string}"'
459 from AnalysisAlgorithmsConfig.CPBaseRunner
import CPBaseRunner
460 if not hasattr(runscriptArgs,
'text_config'):
461 self.
_errorCollector[
'no yaml'] =
"No YAML configuration file is specified in the exec string. Please provide one using --text-config"
463 yamlPath = getattr(runscriptArgs,
'text_config')
465 haveLocalYaml = CPBaseRunner.findLocalPathYamlConfig(yamlPath)
467 logCPGridRun.warning(
"A path to a local YAML configuration file is found, but it may not be grid-usable.")
469 repoYamls, _ = CPBaseRunner.findRepoPathYamlConfig(yamlPath)
470 if repoYamls
and len(repoYamls) > 1:
471 self.
_errorCollector[
'ambiguous yamls'] = f
'Multiple files named \"{yamlPath}\" found in the analysis repository. Please provide a more specific path to the config file.\nMatches found:\n' +
'\n'.join(repoYamls)
473 elif repoYamls
and len(repoYamls) == 1:
474 logCPGridRun.info(f
"Found a grid-usable YAML configuration file in the analysis repository: {repoYamls[0]}")
477 if haveLocalYaml
and self.
args.useCentralPackage:
478 logCPGridRun.warning(
"A path to a local YAML configuration file is found, no custom packages are found, proceed with /cvmfs packages only.")
480 if not repoYamls
and not self.
args.useCentralPackage:
481 self.
_errorCollector[
'no usable yaml'] = f
"Grid usable YAML configuration file not found: {yamlPath}"
483 self.
_errorCollector[
'have local yaml'] = f
"Only a local YAML configuration file is found: {yamlPath}, not usable in the grid.\n" \
484 f
"Make sure the YAML file is in build/x86_64-el9-gcc14-opt/data/package_name/config.yaml. You can install the YAML file through CMakeList.txt with `atlas_install_data( data/* )`; use `-t package_name/config.yaml` in the --exec\n"\
485 f
"Or if you are only using central packages, please use the `--useCentralPackage` flag."
488 outputs = [f
'{output.split(".")[0]}:{output}' for output
in self.
args.output_files]
489 return ','.join(outputs)
493 prun_path = shutil.which(
"prun")
494 if prun_path
is None:
496 "The 'prun' command is not found. If you are on lxplus, please run the following commands:\n\n"
499 "voms-proxy-init -voms atlas\n"
501 "Make sure you have a valid certificate."
508 for key, cmd
in self.
cmd.items():
509 process = subprocess.Popen(cmd, shell=
True, stdout=sys.stdout, stderr=sys.stderr)
510 process.communicate()
514 if name.startswith(
'mc')
or name.startswith(
'data'):
516 logCPGridRun.warning(
"Name is not in the Atlas production format, assuming it is a user production")
522 The custom name has many variations, but most of them follow user/group.username.datasetname.suffix
525 parts = filename.split(
'.')
526 result[
'userType'] = parts[0]
527 result[
'username'] = parts[1]
528 result[
'main'] = parts[2]
529 result[
'suffix'] = parts[-1]
535 Parsing file name into a dictionary, an example is given here
536 mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855/DAOD_PHYS.34865530._000740.pool.root.1
538 datasetName: mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
539 projectName: mc20_13TeV
543 main: PhPy8EG_A14_ttbar_hdamp258p75_nonallhad
544 TODO generator: PhPy8Eg
545 TODO tune: A14 # For Pythia8
547 TODO hdamp: 258p75 # For Powheg
548 TODO decayType: nonallhad
551 tags: e###_s###_r###_p###_a###_t###_b#
552 etag: e6337 # EVNT (EVGEN) production and merging
553 stag: s3681 # Geant4 simulation to produce HITS and merging!
554 rtag: r13167 # Digitisation and reconstruction, as well as AOD merging
555 ptag: p5855 # Production of NTUP_PILEUP format and merging
556 atag: aXXX: atlfast configuration (both simulation and digit/recon)
557 ttag: tXXX: tag production configuration
558 btag: bXXX: bytestream production configuration
571 datasetPart, filePart = filename.split(
'/')
573 datasetPart = filename
577 datasetParts = datasetPart.split(
'.')
578 result[
'datasetName'] = datasetPart
580 result[
'projectName'] = datasetParts[0]
582 campaign_energy = result[
'projectName'].
split(
'_')
583 result[
'campaign'] = campaign_energy[0]
584 result[
'energy'] = campaign_energy[1]
587 result[
'DSID'] = datasetParts[1]
588 result[
'main'] = datasetParts[2]
589 result[
'step'] = datasetParts[3]
590 result[
'format'] = datasetParts[4]
593 tags = datasetParts[5].
split(
'_')
594 result[
'tags'] = tags
596 if tag.startswith(
'e'):
598 elif tag.startswith(
's'):
600 elif tag.startswith(
'r'):
602 elif tag.startswith(
'p'):
604 elif tag.startswith(
'a'):
606 elif tag.startswith(
't'):
608 elif tag.startswith(
'b'):
613 fileParts = filePart.split(
'.')
614 result[
'jediTaskID'] = fileParts[1]
615 result[
'fileNumber'] = fileParts[2]
616 result[
'version'] = fileParts[-1]
622 with open(path,
'r')
as inputText:
623 for line
in inputText.readlines():
625 if line.startswith(
'#')
or not line.strip():
627 files += line.split(
',')
629 files = [file.strip()
for file
in files]
634 logCPGridRun.error(
"Errors were collected during the script execution:")
637 logCPGridRun.error(f
"{key}: {value}")
638 logCPGridRun.error(
"Please fix the errors and try again.")
642 if self.
args.noSubmit:
645 if self.
args.checkInputDS:
649 if self.
args.noSubmit:
651 if self.
args.agreeAll:
652 logCPGridRun.info(
"You have agreed to all the submission details. Jobs will be submitted without confirmation.")
655 answer = input(
"Please confirm ALL the submission details are correct before submitting [y/n]: ")
656 if answer.lower() ==
'y':
658 elif answer.lower() ==
'n':
659 logCPGridRun.info(
"Feel free to report any unexpected behavior to the CPAlgorithms team!")
661 logCPGridRun.error(
"Invalid input. Please enter 'y' or 'n'. Jobs are not submitted.")
663if __name__ ==
'__main__':
665 cpgrid.configureSumbission()
666 cpgrid.printInputDetails()
667 cpgrid.checkExternalTools()
668 cpgrid.printDelayedErrorCollection()
669 cpgrid.askSubmission()
void print(char *figname, TCanvas *c1)
outputDSFormatter(self, name)
configureSumbission(self)
dict _createPrunArgsDict(self)
rucioCustomNameParser(filename)
_checkYamlExists(self, runscriptArgs)
_parseGridArguments(self)
bool checkInputInPyami(self)
bool _analyzeAmiResults(self, results, datasetPtag)
isAtlasProductionFormat(name)
_customOutputDSFormatter(self, name)
_prepareAmiQueryFromInputList(self)
_parseInputFileList(path)
_filesChangedOrTarballNotCreated(self)
_hasCompressedTarball(self)
atlasProductionNameParser(filename)
_outputDSFormatter(self, name)
_checkPrunArgs(self, argDict)
printDelayedErrorCollection(self)
dict _unknownArgsDict(self)
configureSubmissionSingleSample(self, input)
std::vector< std::string > split(const std::string &s, const std::string &t=":")