ATLAS Offline Software
Loading...
Searching...
No Matches
CPGridRun.CPGridRun Class Reference
Collaboration diagram for CPGridRun.CPGridRun:

Public Member Functions

 __init__ (self)
 inputList (self)
 outputFilesParsing (self)
 printHelp (self)
 getParser (self)
 configureSumbission (self)
 configureSubmissionSingleSample (self, input)
 printInputDetails (self)
 hasPyami (self)
bool checkInputInPyami (self)
 outputDSFormatter (self, name)
 execFormatter (self)
 outputsFormatter (self)
bool hasPrun (self)
 submit (self)
 printDelayedErrorCollection (self)
 checkExternalTools (self)
 askSubmission (self)

Static Public Member Functions

 isAtlasProductionFormat (name)
 rucioCustomNameParser (filename)
 atlasProductionNameParser (filename)

Public Attributes

 gridParser = self._parseGridArguments()
dict prunArgsDict = self._createPrunArgsDict()
dict cmd = {}
 args
 unknown_args = parser.parse_known_args()
 output_files = output_files
 inputList

Protected Member Functions

 _initRunscript (self)
 _parseGridArguments (self)
dict _createPrunArgsDict (self)
dict _unknownArgsDict (self)
 _checkPrunArgs (self, argDict)
 _prepareAmiQueryFromInputList (self)
bool _analyzeAmiResults (self, results, datasetPtag)
 _filesChangedOrTarballNotCreated (self)
 _hasCompressedTarball (self)
 _outputDSFormatter (self, name)
 _customOutputDSFormatter (self, name)
 _suffixFormatter (self)
 _filesChanged (self)
 _buildDir (self)
 _sourceDir (self)
 _checkYamlExists (self, runscriptArgs)

Static Protected Member Functions

 _parseInputFileList (path)

Protected Attributes

 _runscript = None
str _tarfile = 'cpgrid.tar.gz'
bool _isFirstRun = True
bool _tarballRecreated = False
list _inputList = None
dict _errorCollector = {}
 _yamlPath = None

Detailed Description

Definition at line 11 of file CPGridRun.py.

Constructor & Destructor Documentation

◆ __init__()

CPGridRun.CPGridRun.__init__ ( self)

Definition at line 12 of file CPGridRun.py.

12 def __init__(self):
13 self.gridParser = self._parseGridArguments()
14 self.prunArgsDict = self._createPrunArgsDict()
15 self._runscript = None
16 if self.args.help:
17 self._initRunscript()
18 self.printHelp()
19 sys.exit(0)
20 self._tarfile = 'cpgrid.tar.gz'
21 self._isFirstRun = True
22 self._tarballRecreated = False
23 self._inputList = None
24 self._errorCollector = {} # Delay the error collection until the end of the script for better user experience
25 self._yamlPath = None
26 self.cmd = {} # sample name -> command
27

Member Function Documentation

◆ _analyzeAmiResults()

bool CPGridRun.CPGridRun._analyzeAmiResults ( self,
results,
datasetPtag )
protected

Definition at line 282 of file CPGridRun.py.

282 def _analyzeAmiResults(self, results, datasetPtag) -> bool:
283 import re
284 regex = re.compile("_p[0-9]+")
285 results = [r['ldn'] for r in results]
286 notFound = []
287 latestPtag = {}
288
289 for datasetName in self.cmd:
290 if datasetName not in results:
291 notFound.append(datasetName)
292
293 base = regex.sub("_p%", datasetName)
294 matching = [r for r in results if r.startswith(base.replace("_p%", ""))]
295 for m in matching:
296 mParsed = CPGridRun.atlasProductionNameParser(m)
297 try:
298 mPtagInt = int(mParsed.get('ptag', 'p0')[1:])
299 currentPtagInt = int(datasetPtag.get(datasetName, 'p0')[1:])
300 if mPtagInt > currentPtagInt:
301 latestPtag[datasetName] = f"p{mPtagInt}"
302 except (ValueError, TypeError):
303 continue
304
305 if latestPtag:
306 logCPGridRun.info("Newer version of datasets found in AMI:")
307 for name, ptag in latestPtag.items():
308 logCPGridRun.info(f"{name} -> ptag: {ptag}")
309
310 if notFound:
311 logCPGridRun.error("Some input datasets are not available in AMI, missing datasets are likely to fail on the grid:")
312 logCPGridRun.error(", ".join(notFound))
313 return False
314
315 return True
316

◆ _buildDir()

CPGridRun.CPGridRun._buildDir ( self)
protected

Definition at line 400 of file CPGridRun.py.

400 def _buildDir(self):
401 buildDir = os.environ["CMAKE_PREFIX_PATH"]
402 buildDir = os.path.dirname(buildDir.split(":")[0])
403 return buildDir
404

◆ _checkPrunArgs()

CPGridRun.CPGridRun._checkPrunArgs ( self,
argDict )
protected
check the arguments against the prun script to ensure they are valid
See https://github.com/PanDAWMS/panda-client/blob/master/pandaclient/PrunScript.py

Definition at line 204 of file CPGridRun.py.

204 def _checkPrunArgs(self,argDict):
205 '''
206 check the arguments against the prun script to ensure they are valid
207 See https://github.com/PanDAWMS/panda-client/blob/master/pandaclient/PrunScript.py
208 '''
209 import pandaclient.PrunScript
210 # We need to temporarily clear the sys.argv to avoid the parser from PrunScript to fail
211 original_argv = sys.argv
212 sys.argv = ['prun'] # Reset sys.argv to only contain the script name
213 prunArgsDict = {}
214 prunArgsDict = pandaclient.PrunScript.main(get_options=True)
215 sys.argv = original_argv # Restore the original sys.argv
216 nonPrunOrCPGridArgs = []
217 for arg in argDict:
218 if arg not in prunArgsDict:
219 nonPrunOrCPGridArgs.append(arg)
220 if nonPrunOrCPGridArgs:
221 logCPGridRun.error(f"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
222 raise ValueError(f"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
223

◆ _checkYamlExists()

CPGridRun.CPGridRun._checkYamlExists ( self,
runscriptArgs )
protected

Definition at line 458 of file CPGridRun.py.

458 def _checkYamlExists(self, runscriptArgs):
459 from AnalysisAlgorithmsConfig.CPBaseRunner import CPBaseRunner
460 if not hasattr(runscriptArgs, 'text_config'):
461 self._errorCollector['no yaml'] = "No YAML configuration file is specified in the exec string. Please provide one using --text-config"
462 return
463 yamlPath = getattr(runscriptArgs, 'text_config')
464 self._yamlPath = yamlPath
465 haveLocalYaml = CPBaseRunner.findLocalPathYamlConfig(yamlPath)
466 if haveLocalYaml:
467 logCPGridRun.warning("A path to a local YAML configuration file is found, but it may not be grid-usable.")
468
469 repoYamls, _ = CPBaseRunner.findRepoPathYamlConfig(yamlPath)
470 if repoYamls and len(repoYamls) > 1:
471 self._errorCollector['ambiguous yamls'] = f'Multiple files named \"{yamlPath}\" found in the analysis repository. Please provide a more specific path to the config file.\nMatches found:\n' + '\n'.join(repoYamls)
472 return
473 elif repoYamls and len(repoYamls) == 1:
474 logCPGridRun.info(f"Found a grid-usable YAML configuration file in the analysis repository: {repoYamls[0]}")
475 return
476
477 if haveLocalYaml and self.args.useCentralPackage:
478 logCPGridRun.warning("A path to a local YAML configuration file is found, no custom packages are found, proceed with /cvmfs packages only.")
479
480 if not repoYamls and not self.args.useCentralPackage:
481 self._errorCollector['no usable yaml'] = f"Grid usable YAML configuration file not found: {yamlPath}"
482 if haveLocalYaml:
483 self._errorCollector['have local yaml'] = f"Only a local YAML configuration file is found: {yamlPath}, not usable in the grid.\n" \
484 f"Make sure the YAML file is in build/x86_64-el9-gcc14-opt/data/package_name/config.yaml. You can install the YAML file through CMakeList.txt with `atlas_install_data( data/* )`; use `-t package_name/config.yaml` in the --exec\n"\
485 f"Or if you are only using central packages, please use the `--useCentralPackage` flag."
486

◆ _createPrunArgsDict()

dict CPGridRun.CPGridRun._createPrunArgsDict ( self)
protected
converting unknown args to a dictionary

Definition at line 81 of file CPGridRun.py.

81 def _createPrunArgsDict(self) -> dict:
82 '''
83 converting unknown args to a dictionary
84 '''
85 unknownArgsDict = self._unknownArgsDict()
86 if unknownArgsDict and self.hasPrun():
87 self._checkPrunArgs(unknownArgsDict)
88 logCPGridRun.info(f"Adding prun exclusive arguments: {unknownArgsDict.keys()}")
89 elif unknownArgsDict:
90 logCPGridRun.warning(f"Unknown arguments detected: {unknownArgsDict}. Cannot check the availablility in Prun because Prun is not available / noSubmit is on.")
91 else:
92 pass
93 return unknownArgsDict
94

◆ _customOutputDSFormatter()

CPGridRun.CPGridRun._customOutputDSFormatter ( self,
name )
protected
{group/user}.{username}.{main}.outputDS.{suffix}

Definition at line 346 of file CPGridRun.py.

346 def _customOutputDSFormatter(self, name):
347 '''
348 {group/user}.{username}.{main}.outputDS.{suffix}
349 '''
350 parts = name.split('.')
351 base = 'group' if self.args.groupProduction else 'user'
352 username = self.args.gridUsername
353 main = parts[2]
354 outputDS = 'outputDS'
355 suffix = parts[-1]
356
357 result = [base, username,main, outputDS, suffix]
358 return ".".join(filter(None, result))
359

◆ _filesChanged()

CPGridRun.CPGridRun._filesChanged ( self)
protected

Definition at line 369 of file CPGridRun.py.

369 def _filesChanged(self):
370 tarball_mtime = os.path.getmtime(self._tarfile) if os.path.exists(self._tarfile) else 0
371 buildDir = self._buildDir()
372 sourceDir = self._sourceDir()
373
374 # Check for changes in buildDir
375 for root, _, files in os.walk(buildDir):
376 for file in files:
377 file_path = os.path.join(root, file)
378 try:
379 if os.path.getmtime(file_path) > tarball_mtime:
380 logCPGridRun.info(f"File {file_path} is newer than the tarball.")
381 return True
382 except FileNotFoundError:
383 continue
384
385 # Check for changes in sourceDir
386 if sourceDir is None:
387 logCPGridRun.warning("Source directory is not detected, auto-compression is not performed. Use --recreateTar to update the submission")
388 return False
389 for root, _, files in os.walk(sourceDir):
390 for file in files:
391 file_path = os.path.join(root, file)
392 try:
393 if os.path.getmtime(file_path) > tarball_mtime:
394 logCPGridRun.info(f"File {file_path} is newer than the tarball.")
395 return True
396 except FileNotFoundError:
397 continue
398 return False
399

◆ _filesChangedOrTarballNotCreated()

CPGridRun.CPGridRun._filesChangedOrTarballNotCreated ( self)
protected

Definition at line 317 of file CPGridRun.py.

317 def _filesChangedOrTarballNotCreated(self):
318 return not self._tarballRecreated and (self.args.recreateTar or not os.path.exists(self._tarfile) or self._filesChanged())
319

◆ _hasCompressedTarball()

CPGridRun.CPGridRun._hasCompressedTarball ( self)
protected

Definition at line 320 of file CPGridRun.py.

320 def _hasCompressedTarball(self):
321 return os.path.exists(self._tarfile) or self._tarballRecreated
322

◆ _initRunscript()

CPGridRun.CPGridRun._initRunscript ( self)
protected

Definition at line 28 of file CPGridRun.py.

28 def _initRunscript(self):
29 if self._runscript is not None:
30 return self._runscript
31 elif isAthena:
32 from AnalysisAlgorithmsConfig.AthenaCPRunScript import AthenaCPRunScript
33 self._runscript = AthenaCPRunScript()
34 else:
35 from AnalysisAlgorithmsConfig.EventLoopCPRunScript import EventLoopCPRunScript
36 self._runscript = EventLoopCPRunScript()
37 return self._runscript
38

◆ _outputDSFormatter()

CPGridRun.CPGridRun._outputDSFormatter ( self,
name )
protected
{group/user}.{username}.{prefix}.{DSID}.{format}.{tags}.{suffix}

Definition at line 329 of file CPGridRun.py.

329 def _outputDSFormatter(self, name):
330 '''
331 {group/user}.{username}.{prefix}.{DSID}.{format}.{tags}.{suffix}
332 '''
333 nameParser = CPGridRun.atlasProductionNameParser(name)
334 base = 'group' if self.args.groupProduction else 'user'
335 username = self.args.gridUsername
336 dsid = nameParser['DSID']
337 tags = '_'.join(nameParser['tags'])
338 fileFormat = nameParser['format']
339 base = 'group' if self.args.groupProduction else 'user'
340 prefix = self.args.prefix if self.args.prefix else nameParser['main'].split('_')[0] # Dynamically set the prefix, likely to be something like PhPy8Eg
341 suffix = self._suffixFormatter()
342
343 result = [base, username, prefix, dsid, fileFormat, tags, suffix]
344 return ".".join(filter(None, result))
345
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177

◆ _parseGridArguments()

CPGridRun.CPGridRun._parseGridArguments ( self)
protected

Definition at line 39 of file CPGridRun.py.

39 def _parseGridArguments(self):
40 parser = argparse.ArgumentParser(description='CPGrid runscript to submit CPRun.py jobs to the grid. '
41 'This script will submit a job to the grid using files in the input text one by one.'
42 'CPRun.py can handle multiple sources of input and create one output; but not this script',
43 add_help=False,
44 formatter_class=argparse.RawTextHelpFormatter)
45 parser.add_argument('-h', '--help', dest='help', action='store_true', help='Show this help message and continue')
46
47 ioGroup = parser.add_argument_group('Input/Output file configuration')
48 ioGroup.add_argument('-i','--input-list', dest='input_list', help='Path to the text file containing list of containers on the panda grid. Each container will be passed to prun as --inDS and is run individually')
49 ioGroup.add_argument('--output-files', dest='output_files', nargs='+', default=['output.root'],
50 help='The output files of the grid job. Example: --output-files A.root B.txt B.root results in A/A.root, B/B.txt, B/B.root in the output directory. No need to specify if using CPRun.py')
51 ioGroup.add_argument('--destSE', dest='destSE', default='', type=str, help='Destination storage element (PanDA)')
52 ioGroup.add_argument('--mergeType', dest='mergeType', default='Default', type=str, help='Output merging type, [None, Default, xAOD]')
53
54 pandaGroup = parser.add_argument_group('Input/Output naming configuration')
55 pandaGroup.add_argument('--gridUsername', dest='gridUsername', default=os.getenv('USER', ''), type=str, help='Grid username, or the groupname. Default is the current user. Only affect file naming')
56 pandaGroup.add_argument('--prefix', dest='prefix', default='', type=str, help='Prefix for the output directory. Dynamically set with input container if not provided')
57 pandaGroup.add_argument('--suffix', dest='suffix', default='',type=str, help='Suffix for the output directory')
58 pandaGroup.add_argument('--outDS', dest='outDS', default='', type=str,
59 help='Name of an output dataset. outDS will contain all output files (PanDA). If not provided, support dynamic naming if input name is in the Atlas production format or typical user production format')
60
61 cpgridGroup = parser.add_argument_group('CPGrid configuration')
62 cpgridGroup.add_argument('--groupProduction', dest='groupProduction', action='store_true', help='Only use for official production')
63
64 cpgridGroup.add_argument('--exec', dest='exec', type=str,
65 help='Executable line for the CPRun.py or custom script to run on the grid encapsulated in a double quote (PanDA)\n'
66 'Run CPRun.py with preset behavior including streamlined file i/o. E.g, "CPRun.py -t config.yaml --no-systematics".\n'
67 'Run custom script: "customRun.py -i inputs -o output --text-config config.yaml --flagA --flagB"\n'
68 )
69
70 submissionGroup = parser.add_argument_group('Submission configuration')
71 submissionGroup.add_argument('-y', '--agreeAll', dest='agreeAll', action='store_true', help='Agree to all the submission details without asking for confirmation. Use with caution!')
72 submissionGroup.add_argument('--noSubmit', dest='noSubmit', action='store_true', help='Do not submit the job to the grid (PanDA). Useful to inspect the prun command')
73 submissionGroup.add_argument('--testRun', dest='testRun', action='store_true', help='Will submit job to the grid but greatly limit the number of files per job (10) and number of events (300)')
74 submissionGroup.add_argument('--checkInputDS', dest='checkInputDS', action='store_true', help='Check if the input datasets are available on the AMI.')
75 submissionGroup.add_argument('--recreateTar', dest='recreateTar', action='store_true', help='Re-compress the source code. Source code are compressed by default in submission, this is useful when the source code is updated')
76 submissionGroup.add_argument('--useCentralPackage', dest='useCentralPackage', action='store_true', help='Use central package instead of custom packages')
77 self.args, self.unknown_args = parser.parse_known_args()
78 self.outputFilesParsing()
79 return parser
80

◆ _parseInputFileList()

CPGridRun.CPGridRun._parseInputFileList ( path)
staticprotected

Definition at line 620 of file CPGridRun.py.

620 def _parseInputFileList(path):
621 files = []
622 with open(path, 'r') as inputText:
623 for line in inputText.readlines():
624 # skip comments and empty lines
625 if line.startswith('#') or not line.strip():
626 continue
627 files += line.split(',')
628 # remove leading/trailing whitespaces, and \n
629 files = [file.strip() for file in files]
630 return files
631

◆ _prepareAmiQueryFromInputList()

CPGridRun.CPGridRun._prepareAmiQueryFromInputList ( self)
protected
Helper function to prepare a list of queries for the AMI based on the input list.
It will replace the _p### with _p% to match the latest ptag.

Definition at line 267 of file CPGridRun.py.

267 def _prepareAmiQueryFromInputList(self):
268 '''
269 Helper function to prepare a list of queries for the AMI based on the input list.
270 It will replace the _p### with _p% to match the latest ptag.
271 '''
272 import re
273 regex = re.compile("_p[0-9]+")
274 queries = []
275 datasetPtag = {}
276 for datasetName in self.cmd:
277 parsed = CPGridRun.atlasProductionNameParser(datasetName)
278 datasetPtag[datasetName] = parsed.get('ptag')
279 queries.append(regex.sub("_p%", datasetName))
280 return queries, datasetPtag
281

◆ _sourceDir()

CPGridRun.CPGridRun._sourceDir ( self)
protected

Definition at line 405 of file CPGridRun.py.

405 def _sourceDir(self):
406 cmakeCachePath = os.path.join(self._buildDir(), 'CMakeCache.txt')
407 sourceDir = None
408 if not os.path.exists(cmakeCachePath):
409 return sourceDir
410 with open(cmakeCachePath, 'r') as cmakeCache:
411 for line in cmakeCache:
412 if '_SOURCE_DIR:STATIC=' in line:
413 sourceDir = line.split('=')[1].strip()
414 break
415 return sourceDir
416

◆ _suffixFormatter()

CPGridRun.CPGridRun._suffixFormatter ( self)
protected

Definition at line 360 of file CPGridRun.py.

360 def _suffixFormatter(self):
361 if self.args.suffix:
362 return self.args.suffix
363 if self.args.testRun:
364 import uuid
365 return f"test_{uuid.uuid4().hex[:6]}"
366 else:
367 ''
368

◆ _unknownArgsDict()

dict CPGridRun.CPGridRun._unknownArgsDict ( self)
protected
Cleans the unknown args by removing leading dashes and ensuring they are in key-value pairs

Definition at line 188 of file CPGridRun.py.

188 def _unknownArgsDict(self)->dict:
189 '''
190 Cleans the unknown args by removing leading dashes and ensuring they are in key-value pairs
191 '''
192 unknown_args_dict = {}
193 idx = 0
194 while idx < len(self.unknown_args):
195 if self.unknown_args[idx].startswith('-'):
196 if idx + 1 < len(self.unknown_args) and not self.unknown_args[idx + 1].startswith('-'):
197 unknown_args_dict[self.unknown_args[idx].lstrip('-')] = self.unknown_args[idx + 1]
198 idx += 2
199 else:
200 unknown_args_dict[self.unknown_args[idx].lstrip('-')] = True
201 idx += 1
202 return unknown_args_dict
203

◆ askSubmission()

CPGridRun.CPGridRun.askSubmission ( self)

Definition at line 648 of file CPGridRun.py.

648 def askSubmission(self):
649 if self.args.noSubmit:
650 return
651 if self.args.agreeAll:
652 logCPGridRun.info("You have agreed to all the submission details. Jobs will be submitted without confirmation.")
653 self.submit()
654 return
655 answer = input("Please confirm ALL the submission details are correct before submitting [y/n]: ")
656 if answer.lower() == 'y':
657 self.submit()
658 elif answer.lower() == 'n':
659 logCPGridRun.info("Feel free to report any unexpected behavior to the CPAlgorithms team!")
660 else:
661 logCPGridRun.error("Invalid input. Please enter 'y' or 'n'. Jobs are not submitted.")
662

◆ atlasProductionNameParser()

CPGridRun.CPGridRun.atlasProductionNameParser ( filename)
static
Parsing file name into a dictionary, an example is given here
mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855/DAOD_PHYS.34865530._000740.pool.root.1
For the first part
datasetName: mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
projectName: mc20_13TeV
campaign: mc20
energy: 13 #(TeV)
DSID: 410470
main: PhPy8EG_A14_ttbar_hdamp258p75_nonallhad
TODO  generator: PhPy8Eg
TODO  tune: A14 # For Pythia8
TODO  process: ttbar
TODO  hdamp: 258p75 # For Powheg
TODO  decayType: nonallhad
step: deriv
format: DAOD_PHYS
tags: e###_s###_r###_p###_a###_t###_b#
etag: e6337 # EVNT (EVGEN) production and merging
stag: s3681 # Geant4 simulation to produce HITS and merging!
rtag: r13167 # Digitisation and reconstruction, as well as AOD merging
ptag: p5855 # Production of NTUP_PILEUP format and merging
atag: aXXX: atlfast configuration (both simulation and digit/recon)
ttag: tXXX: tag production configuration
btag: bXXX: bytestream production configuration

For the second part
JeditaskID: 34865530
fileNumber: 000740
version: 1

Definition at line 533 of file CPGridRun.py.

533 def atlasProductionNameParser(filename):
534 '''
535 Parsing file name into a dictionary, an example is given here
536 mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855/DAOD_PHYS.34865530._000740.pool.root.1
537 For the first part
538 datasetName: mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
539 projectName: mc20_13TeV
540 campaign: mc20
541 energy: 13 #(TeV)
542 DSID: 410470
543 main: PhPy8EG_A14_ttbar_hdamp258p75_nonallhad
544 TODO generator: PhPy8Eg
545 TODO tune: A14 # For Pythia8
546 TODO process: ttbar
547 TODO hdamp: 258p75 # For Powheg
548 TODO decayType: nonallhad
549 step: deriv
550 format: DAOD_PHYS
551 tags: e###_s###_r###_p###_a###_t###_b#
552 etag: e6337 # EVNT (EVGEN) production and merging
553 stag: s3681 # Geant4 simulation to produce HITS and merging!
554 rtag: r13167 # Digitisation and reconstruction, as well as AOD merging
555 ptag: p5855 # Production of NTUP_PILEUP format and merging
556 atag: aXXX: atlfast configuration (both simulation and digit/recon)
557 ttag: tXXX: tag production configuration
558 btag: bXXX: bytestream production configuration
559
560 For the second part
561 JeditaskID: 34865530
562 fileNumber: 000740
563 version: 1
564
565 '''
566 result = {}
567 #split the / in case
568 # mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
569 # /DAOD_PHYS.34865530._000740.pool.root.1
570 if '/' in filename:
571 datasetPart, filePart = filename.split('/')
572 else:
573 datasetPart = filename
574 filePart = None
575
576 # Split the dataset part by dots
577 datasetParts = datasetPart.split('.')
578 result['datasetName'] = datasetPart
579 # Extract the first part
580 result['projectName'] = datasetParts[0] # is positional
581 # Extract the campaign and energy
582 campaign_energy = result['projectName'].split('_')
583 result['campaign'] = campaign_energy[0]
584 result['energy'] = campaign_energy[1]
585
586 # Extract the DSID, positional
587 result['DSID'] = datasetParts[1]
588 result['main'] = datasetParts[2]
589 result['step'] = datasetParts[3]
590 result['format'] = datasetParts[4]
591
592 # Extract the tags (etag, stag, rtag, ptag)
593 tags = datasetParts[5].split('_')
594 result['tags'] = tags
595 for tag in tags:
596 if tag.startswith('e'):
597 result['etag'] = tag
598 elif tag.startswith('s'):
599 result['stag'] = tag
600 elif tag.startswith('r'):
601 result['rtag'] = tag
602 elif tag.startswith('p'):
603 result['ptag'] = tag
604 elif tag.startswith('a'):
605 result['atag'] = tag
606 elif tag.startswith('t'):
607 result['ttag'] = tag
608 elif tag.startswith('b'):
609 result['btag'] = tag
610
611 # Extract the file part if it exists
612 if filePart:
613 fileParts = filePart.split('.')
614 result['jediTaskID'] = fileParts[1]
615 result['fileNumber'] = fileParts[2]
616 result['version'] = fileParts[-1]
617 return result
618

◆ checkExternalTools()

CPGridRun.CPGridRun.checkExternalTools ( self)

Definition at line 641 of file CPGridRun.py.

641 def checkExternalTools(self):
642 if self.args.noSubmit:
643 return
644 self.hasPrun()
645 if self.args.checkInputDS:
646 self.checkInputInPyami()
647

◆ checkInputInPyami()

bool CPGridRun.CPGridRun.checkInputInPyami ( self)

Definition at line 250 of file CPGridRun.py.

250 def checkInputInPyami(self) -> bool:
251 if not self.hasPyami():
252 return False
253
254 client = pyAMI.client.Client('atlas')
255 pyAMI.atlas.api.init()
256
257 queries, datasetPtag = self._prepareAmiQueryFromInputList()
258 try:
259 results = pyAMI.atlas.api.list_datasets(client, patterns=queries)
260 except pyAMI.exception.Error:
261 self._errorCollector['no valid certificate'] = (
262 "Cannot query AMI, please run 'voms-proxy-init -voms atlas' and ensure your certificate is valid.")
263 return False
264
265 return self._analyzeAmiResults(results, datasetPtag)
266

◆ configureSubmissionSingleSample()

CPGridRun.CPGridRun.configureSubmissionSingleSample ( self,
input )

Definition at line 135 of file CPGridRun.py.

135 def configureSubmissionSingleSample(self, input):
136 config = {
137 'inDS': input,
138 'outDS': self.args.outDS if self.args.outDS else self.outputDSFormatter(input) ,
139 'cmtConfig': os.environ["CMTCONFIG"],
140 'writeInputToTxt': 'IN:in.txt',
141 'outputs': self.outputsFormatter(),
142 'exec': self.execFormatter(),
143 'memory': "2000", # MB
144 'addNthFieldOfInDSToLFN': '2,3,6',
145 }
146 if self.args.noSubmit:
147 config['noSubmit'] = True
148
149 if self.args.mergeType == 'xAOD':
150 config['mergeScript'] = 'xAODMerge %OUT `echo %IN | sed \'s/,/ /g\'`'
151
152 if self.args.mergeType != 'None':
153 config['mergeOutput'] = True
154
155 # Three types of files sending the grid
156 if self.args.useCentralPackage: # 1. Using central package and have a yaml file only
157 config['extFile'] = self._yamlPath
158 config['noBuild'] = True
159 config['noCompile'] = True
160 config['athenaTag'] = f"AnalysisBase,{os.environ['AnalysisBase_VERSION']}"
161 elif self._filesChangedOrTarballNotCreated(): # 2. Using custom packages and haven't compressed the tarball since the last changes
162 config['outTarBall'] = self._tarfile
163 config['useAthenaPackages'] = True
164 self._tarballRecreated = True
165 elif self._hasCompressedTarball(): # 3. Using custom packages and have compressed the tarball
166 config['inTarBall'] = self._tarfile
167 config['useAthenaPackages'] = True
168
169 if self.args.groupProduction:
170 config['official'] = True
171 config['voms'] = f'atlas:/atlas/{self.args.gridUsername}/Role=production'
172
173 if self.args.destSE:
174 config['destSE'] = self.args.destSE
175
176 if self.args.testRun:
177 config['nEventsPerFile'] = 100
178 config['nFiles'] = 5
179 config.update(self.prunArgsDict)
180 cmd = 'prun \\\n'
181 for k, v in config.items():
182 if isinstance(v, bool) and v:
183 cmd += f'--{k} \\\n'
184 elif v is not None and v != '':
185 cmd += f'--{k} {v} \\\n'
186 return cmd.rstrip(' \\\n')
187

◆ configureSumbission()

CPGridRun.CPGridRun.configureSumbission ( self)

Definition at line 129 of file CPGridRun.py.

129 def configureSumbission(self):
130 for input in self.inputList:
131 cmd = self.configureSubmissionSingleSample(input)
132 self.cmd[input] = cmd
133 self._isFirstRun = False
134

◆ execFormatter()

CPGridRun.CPGridRun.execFormatter ( self)

Definition at line 417 of file CPGridRun.py.

417 def execFormatter(self):
418 # Check if the execution command starts with 'CPRun.py' or '-'
419 isCPRunDefault = self.args.exec.startswith('-') or self.args.exec.startswith('CPRun.py')
420 formatingClause = {
421 'input_list': 'in.txt',
422 'merge_output_files': True,
423 }
424 if not isCPRunDefault:
425 if self._isFirstRun: logCPGridRun.warning("Non-CPRun.py is detected, please ensure the exec string is formatted correctly. Exec string will not be automatically formatted.")
426 return f'"{self.args.exec}"'
427
428 # Parse the exec string using the parser to validate and extract known arguments
429 self._initRunscript()
430 runscriptArgs, unknownArgs = self._runscript.parser.parse_known_args(self.args.exec.split(' '))
431
432 # Throw error if unknownArgs contains any --args
433 unknown_flags = [arg for arg in unknownArgs if arg.startswith('--')]
434 if unknown_flags:
435 logCPGridRun.error(f"Unknown flags detected in the exec string: {unknown_flags}. Please check the exec string.")
436 raise ValueError(f"Unknown arguments detected: {unknown_flags}")
437
438 # Only override if value is None or the parser default
439 for key, value in formatingClause.items():
440 if hasattr(runscriptArgs, key):
441 old_value = getattr(runscriptArgs, key)
442 if old_value is None or old_value == self._runscript.parser.get_default(key):
443 setattr(runscriptArgs, key, value)
444 if self._isFirstRun: logCPGridRun.info(f"Setting '{key}' to '{value}' (CPRun.py default is: '{old_value}')")
445 else:
446 if self._isFirstRun: logCPGridRun.warning(f"Preserving user-defined '{key}': '{old_value}', default formatting '{value}' will not be applied.")
447 else:
448 logCPGridRun.error(f"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
449 raise ValueError(f"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
450 self._checkYamlExists(runscriptArgs)
451 # Return the formatted arguments as a string
452 arg_string = ' '.join(
453 f'--{k.replace("_", "-")}' if isinstance(v, bool) and v else
454 f'--{k.replace("_", "-")} {v}' for k, v in vars(runscriptArgs).items() if v not in [None, False]
455 )
456 return f'"CPRun.py {arg_string}"'
457

◆ getParser()

CPGridRun.CPGridRun.getParser ( self)

Definition at line 124 of file CPGridRun.py.

124 def getParser(self):
125 return self.gridParser
126

◆ hasPrun()

bool CPGridRun.CPGridRun.hasPrun ( self)

Definition at line 491 of file CPGridRun.py.

491 def hasPrun(self) -> bool:
492 import shutil
493 prun_path = shutil.which("prun")
494 if prun_path is None:
495 self._errorCollector['no prun'] = (
496 "The 'prun' command is not found. If you are on lxplus, please run the following commands:\n\n"
497 "```\n"
498 "lsetup panda\n"
499 "voms-proxy-init -voms atlas\n"
500 "```\n"
501 "Make sure you have a valid certificate."
502 )
503 return False
504 return True
505

◆ hasPyami()

CPGridRun.CPGridRun.hasPyami ( self)

Definition at line 234 of file CPGridRun.py.

234 def hasPyami(self):
235 try:
236 global pyAMI
237 import pyAMI.client
238 import pyAMI.atlas.api
239 except ModuleNotFoundError:
240 self._errorCollector['no AMI'] = (
241 "Cannot import pyAMI, please run the following commands:\n\n"
242 "```\n"
243 "lsetup pyami\n"
244 "voms-proxy-init -voms atlas\n"
245 "```\n"
246 "and make sure you have a valid certificate.")
247 return False
248 return True
249

◆ inputList()

CPGridRun.CPGridRun.inputList ( self)

Definition at line 96 of file CPGridRun.py.

96 def inputList(self):
97 if self._inputList is None:
98 if self.args.input_list.endswith('.txt'):
99 self._inputList = CPGridRun._parseInputFileList(self.args.input_list)
100 elif self.args.input_list.endswith('.json'):
101 raise NotImplementedError('JSON input list parsing is not implemented')
102 elif CPGridRun.isAtlasProductionFormat(self.args.input_list):
103 self._inputList = [self.args.input_list]
104 else:
105 raise ValueError(
106 'use --input-list to specify input containers')
107 return self._inputList
108

◆ isAtlasProductionFormat()

CPGridRun.CPGridRun.isAtlasProductionFormat ( name)
static

Definition at line 513 of file CPGridRun.py.

513 def isAtlasProductionFormat(name):
514 if name.startswith('mc') or name.startswith('data'):
515 return True
516 logCPGridRun.warning("Name is not in the Atlas production format, assuming it is a user production")
517 return False
518

◆ outputDSFormatter()

CPGridRun.CPGridRun.outputDSFormatter ( self,
name )

Definition at line 323 of file CPGridRun.py.

323 def outputDSFormatter(self, name):
324 if CPGridRun.isAtlasProductionFormat(name):
325 return self._outputDSFormatter(name)
326 else:
327 return self._customOutputDSFormatter(name)
328

◆ outputFilesParsing()

CPGridRun.CPGridRun.outputFilesParsing ( self)

Definition at line 109 of file CPGridRun.py.

109 def outputFilesParsing(self):
110 output_files = []
111 for output in self.args.output_files:
112 if ',' in output:
113 output_files.extend(output.split(','))
114 else:
115 output_files.append(output)
116 self.output_files = output_files
117

◆ outputsFormatter()

CPGridRun.CPGridRun.outputsFormatter ( self)

Definition at line 487 of file CPGridRun.py.

487 def outputsFormatter(self):
488 outputs = [f'{output.split(".")[0]}:{output}' for output in self.args.output_files]
489 return ','.join(outputs)
490

◆ printDelayedErrorCollection()

CPGridRun.CPGridRun.printDelayedErrorCollection ( self)

Definition at line 632 of file CPGridRun.py.

632 def printDelayedErrorCollection(self):
633 if self._errorCollector:
634 logCPGridRun.error("Errors were collected during the script execution:")
635
636 for key, value in self._errorCollector.items():
637 logCPGridRun.error(f"{key}: {value}")
638 logCPGridRun.error("Please fix the errors and try again.")
639 sys.exit(1)
640

◆ printHelp()

CPGridRun.CPGridRun.printHelp ( self)

Definition at line 118 of file CPGridRun.py.

118 def printHelp(self):
119 self.gridParser.print_help()
120 logCPGridRun.info("\033[92m\n If you are using CPRun.py, the following flags are for the CPRun.py in this framework\033[0m")
121 self._runscript.parser.usage = argparse.SUPPRESS
122 self._runscript.parser.print_help()
123
void printHelp()

◆ printInputDetails()

CPGridRun.CPGridRun.printInputDetails ( self)

Definition at line 224 of file CPGridRun.py.

224 def printInputDetails(self):
225 for key, cmd in self.cmd.items():
226 parsed_name = CPGridRun.atlasProductionNameParser(key)
227 logCPGridRun.info("\n"
228 f"Input: {key}\n" +
229 "\n".join([f" {k.replace('_', ' ').title()}: {v}" for k, v in parsed_name.items()]))
230 logCPGridRun.info(f"Command: \n{cmd}")
231 print("-" * 70)
232 # Add your submission logic here
233
void print(char *figname, TCanvas *c1)

◆ rucioCustomNameParser()

CPGridRun.CPGridRun.rucioCustomNameParser ( filename)
static
The custom name has many variations, but most of them follow user/group.username.datasetname.suffix

Definition at line 520 of file CPGridRun.py.

520 def rucioCustomNameParser(filename):
521 '''
522 The custom name has many variations, but most of them follow user/group.username.datasetname.suffix
523 '''
524 result = {}
525 parts = filename.split('.')
526 result['userType'] = parts[0]
527 result['username'] = parts[1]
528 result['main'] = parts[2]
529 result['suffix'] = parts[-1]
530 return result
531

◆ submit()

CPGridRun.CPGridRun.submit ( self)

Definition at line 506 of file CPGridRun.py.

506 def submit(self):
507 import subprocess
508 for key, cmd in self.cmd.items():
509 process = subprocess.Popen(cmd, shell=True, stdout=sys.stdout, stderr=sys.stderr)
510 process.communicate()
511

Member Data Documentation

◆ _errorCollector

dict CPGridRun.CPGridRun._errorCollector = {}
protected

Definition at line 24 of file CPGridRun.py.

◆ _inputList

list CPGridRun.CPGridRun._inputList = None
protected

Definition at line 23 of file CPGridRun.py.

◆ _isFirstRun

bool CPGridRun.CPGridRun._isFirstRun = True
protected

Definition at line 21 of file CPGridRun.py.

◆ _runscript

CPGridRun.CPGridRun._runscript = None
protected

Definition at line 15 of file CPGridRun.py.

◆ _tarballRecreated

bool CPGridRun.CPGridRun._tarballRecreated = False
protected

Definition at line 22 of file CPGridRun.py.

◆ _tarfile

CPGridRun.CPGridRun._tarfile = 'cpgrid.tar.gz'
protected

Definition at line 20 of file CPGridRun.py.

◆ _yamlPath

CPGridRun.CPGridRun._yamlPath = None
protected

Definition at line 25 of file CPGridRun.py.

◆ args

CPGridRun.CPGridRun.args

Definition at line 77 of file CPGridRun.py.

◆ cmd

dict CPGridRun.CPGridRun.cmd = {}

Definition at line 26 of file CPGridRun.py.

◆ gridParser

CPGridRun.CPGridRun.gridParser = self._parseGridArguments()

Definition at line 13 of file CPGridRun.py.

◆ inputList

CPGridRun.CPGridRun.inputList

Definition at line 130 of file CPGridRun.py.

◆ output_files

CPGridRun.CPGridRun.output_files = output_files

Definition at line 116 of file CPGridRun.py.

◆ prunArgsDict

CPGridRun.CPGridRun.prunArgsDict = self._createPrunArgsDict()

Definition at line 14 of file CPGridRun.py.

◆ unknown_args

CPGridRun.CPGridRun.unknown_args = parser.parse_known_args()

Definition at line 77 of file CPGridRun.py.


The documentation for this class was generated from the following file: