ATLAS Offline Software
Loading...
Searching...
No Matches
CPGridRun.CPGridRun Class Reference
Collaboration diagram for CPGridRun.CPGridRun:

Public Member Functions

 __init__ (self)
 inputList (self)
 outputFilesParsing (self)
 printHelp (self)
 getParser (self)
 configureSumbission (self)
 configureSubmissionSingleSample (self, input)
 printInputDetails (self)
 hasPyami (self)
bool checkInputInPyami (self)
 outputDSFormatter (self, name)
 execFormatter (self)
 outputsFormatter (self)
bool hasPrun (self)
 submit (self)
 printDelayedErrorCollection (self)
 checkExternalTools (self)
 askSubmission (self)

Static Public Member Functions

 isAtlasProductionFormat (name)
 rucioCustomNameParser (filename)
 atlasProductionNameParser (filename)

Public Attributes

 gridParser = self._parseGridArguments()
dict prunArgsDict = self._createPrunArgsDict()
dict cmd = {}
 args
 unknown_args = parser.parse_known_args()
 output_files = output_files
 inputList

Protected Member Functions

 _initRunscript (self)
 _parseGridArguments (self)
dict _createPrunArgsDict (self)
dict _unknownArgsDict (self)
 _checkPrunArgs (self, argDict)
 _prepareAmiQueryFromInputList (self)
bool _analyzeAmiResults (self, results, datasetPtag)
 _outputDSFormatter (self, name)
 _customOutputDSFormatter (self, name)
 _suffixFormatter (self)
 _filesChanged (self)
 _buildDir (self)
 _sourceDir (self)
 _checkYamlExists (self, runscriptArgs)

Static Protected Member Functions

 _parseInputFileList (path)

Protected Attributes

 _runscript = None
str _tarfile = 'cpgrid.tar.gz'
bool _isFirstRun = True
bool _tarballRecreated = False
list _inputList = None
dict _errorCollector = {}

Detailed Description

Definition at line 11 of file CPGridRun.py.

Constructor & Destructor Documentation

◆ __init__()

CPGridRun.CPGridRun.__init__ ( self)

Definition at line 12 of file CPGridRun.py.

12 def __init__(self):
13 self.gridParser = self._parseGridArguments()
14 self.prunArgsDict = self._createPrunArgsDict()
15 self._runscript = None
16 if self.args.help:
17 self._initRunscript()
18 self.printHelp()
19 sys.exit(0)
20 self._tarfile = 'cpgrid.tar.gz'
21 self._isFirstRun = True
22 self._tarballRecreated = False
23 self._inputList = None
24 self._errorCollector = {} # Delay the error collection until the end of the script for better user experience
25 self.cmd = {} # sample name -> command
26

Member Function Documentation

◆ _analyzeAmiResults()

bool CPGridRun.CPGridRun._analyzeAmiResults ( self,
results,
datasetPtag )
protected

Definition at line 273 of file CPGridRun.py.

273 def _analyzeAmiResults(self, results, datasetPtag) -> bool:
274 import re
275 regex = re.compile("_p[0-9]+")
276 results = [r['ldn'] for r in results]
277 notFound = []
278 latestPtag = {}
279
280 for datasetName in self.cmd:
281 if datasetName not in results:
282 notFound.append(datasetName)
283
284 base = regex.sub("_p%", datasetName)
285 matching = [r for r in results if r.startswith(base.replace("_p%", ""))]
286 for m in matching:
287 mParsed = CPGridRun.atlasProductionNameParser(m)
288 try:
289 mPtagInt = int(mParsed.get('ptag', 'p0')[1:])
290 currentPtagInt = int(datasetPtag.get(datasetName, 'p0')[1:])
291 if mPtagInt > currentPtagInt:
292 latestPtag[datasetName] = f"p{mPtagInt}"
293 except (ValueError, TypeError):
294 continue
295
296 if latestPtag:
297 logCPGridRun.info("Newer version of datasets found in AMI:")
298 for name, ptag in latestPtag.items():
299 logCPGridRun.info(f"{name} -> ptag: {ptag}")
300
301 if notFound:
302 logCPGridRun.error("Some input datasets are not available in AMI, missing datasets are likely to fail on the grid:")
303 logCPGridRun.error(", ".join(notFound))
304 return False
305
306 return True
307

◆ _buildDir()

CPGridRun.CPGridRun._buildDir ( self)
protected

Definition at line 385 of file CPGridRun.py.

385 def _buildDir(self):
386 buildDir = os.environ["CMAKE_PREFIX_PATH"]
387 buildDir = os.path.dirname(buildDir.split(":")[0])
388 return buildDir
389

◆ _checkPrunArgs()

CPGridRun.CPGridRun._checkPrunArgs ( self,
argDict )
protected
check the arguments against the prun script to ensure they are valid
See https://github.com/PanDAWMS/panda-client/blob/master/pandaclient/PrunScript.py

Definition at line 195 of file CPGridRun.py.

195 def _checkPrunArgs(self,argDict):
196 '''
197 check the arguments against the prun script to ensure they are valid
198 See https://github.com/PanDAWMS/panda-client/blob/master/pandaclient/PrunScript.py
199 '''
200 import pandaclient.PrunScript
201 # We need to temporarily clear the sys.argv to avoid the parser from PrunScript to fail
202 original_argv = sys.argv
203 sys.argv = ['prun'] # Reset sys.argv to only contain the script name
204 prunArgsDict = {}
205 prunArgsDict = pandaclient.PrunScript.main(get_options=True)
206 sys.argv = original_argv # Restore the original sys.argv
207 nonPrunOrCPGridArgs = []
208 for arg in argDict:
209 if arg not in prunArgsDict:
210 nonPrunOrCPGridArgs.append(arg)
211 if nonPrunOrCPGridArgs:
212 logCPGridRun.error(f"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
213 raise ValueError(f"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
214

◆ _checkYamlExists()

CPGridRun.CPGridRun._checkYamlExists ( self,
runscriptArgs )
protected

Definition at line 443 of file CPGridRun.py.

443 def _checkYamlExists(self, runscriptArgs):
444 from AnalysisAlgorithmsConfig.CPBaseRunner import CPBaseRunner
445 if not hasattr(runscriptArgs, 'text_config'):
446 self._errorCollector['no yaml'] = "No YAML configuration file is specified in the exec string. Please provide one using --text-config"
447 return
448 yamlPath = getattr(runscriptArgs, 'text_config')
449 haveLocalYaml = CPBaseRunner.findLocalPathYamlConfig(yamlPath)
450 if haveLocalYaml:
451 logCPGridRun.warning("A path to a local YAML configuration file is found, but it may not be grid-usable.")
452
453 repoYamls = CPBaseRunner.findRepoPathYamlConfig(yamlPath)
454 if repoYamls and len(repoYamls) > 1:
455 self._errorCollector['ambiguous yamls'] = f'Multiple files named \"{yamlPath}\" found in the analysis repository. Please provide a more specific path to the config file.\nMatches found:\n' + '\n'.join(repoYamls)
456 return
457 elif repoYamls and len(repoYamls) == 1:
458 logCPGridRun.info(f"Found a grid-usable YAML configuration file in the analysis repository: {repoYamls[0]}")
459 return
460
461 if not repoYamls:
462 self._errorCollector['no usable yaml'] = f"Grid usable YAML configuration file not found: {yamlPath}"
463 if haveLocalYaml:
464 self._errorCollector['have local yaml'] = f"Only a local YAML configuration file is found: {yamlPath}, not usable in the grid.\n" \
465 f"Make sure the YAML file is in build/x86_64-el9-gcc14-opt/data/package_name/config.yaml. You can install the YAML file through CMakeList.txt with `atlas_install_data( data/* )`; use `-t package_name/config.yaml` in the --exec"
466

◆ _createPrunArgsDict()

dict CPGridRun.CPGridRun._createPrunArgsDict ( self)
protected
converting unknown args to a dictionary

Definition at line 79 of file CPGridRun.py.

79 def _createPrunArgsDict(self) -> dict:
80 '''
81 converting unknown args to a dictionary
82 '''
83 unknownArgsDict = self._unknownArgsDict()
84 if unknownArgsDict and self.hasPrun():
85 self._checkPrunArgs(unknownArgsDict)
86 logCPGridRun.info(f"Adding prun exclusive arguments: {unknownArgsDict.keys()}")
87 elif unknownArgsDict:
88 logCPGridRun.warning(f"Unknown arguments detected: {unknownArgsDict}. Cannot check the availablility in Prun because Prun is not available / noSubmit is on.")
89 else:
90 pass
91 return unknownArgsDict
92

◆ _customOutputDSFormatter()

CPGridRun.CPGridRun._customOutputDSFormatter ( self,
name )
protected
{group/user}.{username}.{main}.outputDS.{suffix}

Definition at line 331 of file CPGridRun.py.

331 def _customOutputDSFormatter(self, name):
332 '''
333 {group/user}.{username}.{main}.outputDS.{suffix}
334 '''
335 parts = name.split('.')
336 base = 'group' if self.args.groupProduction else 'user'
337 username = self.args.gridUsername
338 main = parts[2]
339 outputDS = 'outputDS'
340 suffix = parts[-1]
341
342 result = [base, username,main, outputDS, suffix]
343 return ".".join(filter(None, result))
344

◆ _filesChanged()

CPGridRun.CPGridRun._filesChanged ( self)
protected

Definition at line 354 of file CPGridRun.py.

354 def _filesChanged(self):
355 tarball_mtime = os.path.getmtime(self._tarfile) if os.path.exists(self._tarfile) else 0
356 buildDir = self._buildDir()
357 sourceDir = self._sourceDir()
358
359 # Check for changes in buildDir
360 for root, _, files in os.walk(buildDir):
361 for file in files:
362 file_path = os.path.join(root, file)
363 try:
364 if os.path.getmtime(file_path) > tarball_mtime:
365 logCPGridRun.info(f"File {file_path} is newer than the tarball.")
366 return True
367 except FileNotFoundError:
368 continue
369
370 # Check for changes in sourceDir
371 if sourceDir is None:
372 logCPGridRun.warning("Source directory is not detected, auto-compression is not performed. Use --recreateTar to update the submission")
373 return False
374 for root, _, files in os.walk(sourceDir):
375 for file in files:
376 file_path = os.path.join(root, file)
377 try:
378 if os.path.getmtime(file_path) > tarball_mtime:
379 logCPGridRun.info(f"File {file_path} is newer than the tarball.")
380 return True
381 except FileNotFoundError:
382 continue
383 return False
384

◆ _initRunscript()

CPGridRun.CPGridRun._initRunscript ( self)
protected

Definition at line 27 of file CPGridRun.py.

27 def _initRunscript(self):
28 if self._runscript is not None:
29 return self._runscript
30 elif isAthena:
31 from AnalysisAlgorithmsConfig.AthenaCPRunScript import AthenaCPRunScript
32 self._runscript = AthenaCPRunScript()
33 else:
34 from AnalysisAlgorithmsConfig.EventLoopCPRunScript import EventLoopCPRunScript
35 self._runscript = EventLoopCPRunScript()
36 return self._runscript
37

◆ _outputDSFormatter()

CPGridRun.CPGridRun._outputDSFormatter ( self,
name )
protected
{group/user}.{username}.{prefix}.{DSID}.{format}.{tags}.{suffix}

Definition at line 314 of file CPGridRun.py.

314 def _outputDSFormatter(self, name):
315 '''
316 {group/user}.{username}.{prefix}.{DSID}.{format}.{tags}.{suffix}
317 '''
318 nameParser = CPGridRun.atlasProductionNameParser(name)
319 base = 'group' if self.args.groupProduction else 'user'
320 username = self.args.gridUsername
321 dsid = nameParser['DSID']
322 tags = '_'.join(nameParser['tags'])
323 fileFormat = nameParser['format']
324 base = 'group' if self.args.groupProduction else 'user'
325 prefix = self.args.prefix if self.args.prefix else nameParser['main'].split('_')[0] # Dynamically set the prefix, likely to be something like PhPy8Eg
326 suffix = self._suffixFormatter()
327
328 result = [base, username, prefix, dsid, fileFormat, tags, suffix]
329 return ".".join(filter(None, result))
330
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177

◆ _parseGridArguments()

CPGridRun.CPGridRun._parseGridArguments ( self)
protected

Definition at line 38 of file CPGridRun.py.

38 def _parseGridArguments(self):
39 parser = argparse.ArgumentParser(description='CPGrid runscript to submit CPRun.py jobs to the grid. '
40 'This script will submit a job to the grid using files in the input text one by one.'
41 'CPRun.py can handle multiple sources of input and create one output; but not this script',
42 add_help=False,
43 formatter_class=argparse.RawTextHelpFormatter)
44 parser.add_argument('-h', '--help', dest='help', action='store_true', help='Show this help message and continue')
45
46 ioGroup = parser.add_argument_group('Input/Output file configuration')
47 ioGroup.add_argument('-i','--input-list', dest='input_list', help='Path to the text file containing list of containers on the panda grid. Each container will be passed to prun as --inDS and is run individually')
48 ioGroup.add_argument('--output-files', dest='output_files', nargs='+', default=['output.root'],
49 help='The output files of the grid job. Example: --output-files A.root B.txt B.root results in A/A.root, B/B.txt, B/B.root in the output directory. No need to specify if using CPRun.py')
50 ioGroup.add_argument('--destSE', dest='destSE', default='', type=str, help='Destination storage element (PanDA)')
51 ioGroup.add_argument('--mergeType', dest='mergeType', default='Default', type=str, help='Output merging type, [None, Default, xAOD]')
52
53 pandaGroup = parser.add_argument_group('Input/Output naming configuration')
54 pandaGroup.add_argument('--gridUsername', dest='gridUsername', default=os.getenv('USER', ''), type=str, help='Grid username, or the groupname. Default is the current user. Only affect file naming')
55 pandaGroup.add_argument('--prefix', dest='prefix', default='', type=str, help='Prefix for the output directory. Dynamically set with input container if not provided')
56 pandaGroup.add_argument('--suffix', dest='suffix', default='',type=str, help='Suffix for the output directory')
57 pandaGroup.add_argument('--outDS', dest='outDS', default='', type=str,
58 help='Name of an output dataset. outDS will contain all output files (PanDA). If not provided, support dynamic naming if input name is in the Atlas production format or typical user production format')
59
60 cpgridGroup = parser.add_argument_group('CPGrid configuration')
61 cpgridGroup.add_argument('--groupProduction', dest='groupProduction', action='store_true', help='Only use for official production')
62
63 cpgridGroup.add_argument('--exec', dest='exec', type=str,
64 help='Executable line for the CPRun.py or custom script to run on the grid encapsulated in a double quote (PanDA)\n'
65 'Run CPRun.py with preset behavior including streamlined file i/o. E.g, "CPRun.py -t config.yaml --no-systematics".\n'
66 'Run custom script: "customRun.py -i inputs -o output --text-config config.yaml --flagA --flagB"\n'
67 )
68
69 submissionGroup = parser.add_argument_group('Submission configuration')
70 submissionGroup.add_argument('-y', '--agreeAll', dest='agreeAll', action='store_true', help='Agree to all the submission details without asking for confirmation. Use with caution!')
71 submissionGroup.add_argument('--noSubmit', dest='noSubmit', action='store_true', help='Do not submit the job to the grid (PanDA). Useful to inspect the prun command')
72 submissionGroup.add_argument('--testRun', dest='testRun', action='store_true', help='Will submit job to the grid but greatly limit the number of files per job (10) and number of events (300)')
73 submissionGroup.add_argument('--checkInputDS', dest='checkInputDS', action='store_true', help='Check if the input datasets are available on the AMI.')
74 submissionGroup.add_argument('--recreateTar', dest='recreateTar', action='store_true', help='Re-compress the source code. Source code are compressed by default in submission, this is useful when the source code is updated')
75 self.args, self.unknown_args = parser.parse_known_args()
76 self.outputFilesParsing()
77 return parser
78

◆ _parseInputFileList()

CPGridRun.CPGridRun._parseInputFileList ( path)
staticprotected

Definition at line 600 of file CPGridRun.py.

600 def _parseInputFileList(path):
601 files = []
602 with open(path, 'r') as inputText:
603 for line in inputText.readlines():
604 # skip comments and empty lines
605 if line.startswith('#') or not line.strip():
606 continue
607 files += line.split(',')
608 # remove leading/trailing whitespaces, and \n
609 files = [file.strip() for file in files]
610 return files
611

◆ _prepareAmiQueryFromInputList()

CPGridRun.CPGridRun._prepareAmiQueryFromInputList ( self)
protected
Helper function to prepare a list of queries for the AMI based on the input list.
It will replace the _p### with _p% to match the latest ptag.

Definition at line 258 of file CPGridRun.py.

258 def _prepareAmiQueryFromInputList(self):
259 '''
260 Helper function to prepare a list of queries for the AMI based on the input list.
261 It will replace the _p### with _p% to match the latest ptag.
262 '''
263 import re
264 regex = re.compile("_p[0-9]+")
265 queries = []
266 datasetPtag = {}
267 for datasetName in self.cmd:
268 parsed = CPGridRun.atlasProductionNameParser(datasetName)
269 datasetPtag[datasetName] = parsed.get('ptag')
270 queries.append(regex.sub("_p%", datasetName))
271 return queries, datasetPtag
272

◆ _sourceDir()

CPGridRun.CPGridRun._sourceDir ( self)
protected

Definition at line 390 of file CPGridRun.py.

390 def _sourceDir(self):
391 cmakeCachePath = os.path.join(self._buildDir(), 'CMakeCache.txt')
392 sourceDir = None
393 if not os.path.exists(cmakeCachePath):
394 return sourceDir
395 with open(cmakeCachePath, 'r') as cmakeCache:
396 for line in cmakeCache:
397 if '_SOURCE_DIR:STATIC=' in line:
398 sourceDir = line.split('=')[1].strip()
399 break
400 return sourceDir
401

◆ _suffixFormatter()

CPGridRun.CPGridRun._suffixFormatter ( self)
protected

Definition at line 345 of file CPGridRun.py.

345 def _suffixFormatter(self):
346 if self.args.suffix:
347 return self.args.suffix
348 if self.args.testRun:
349 import uuid
350 return f"test_{uuid.uuid4().hex[:6]}"
351 else:
352 ''
353

◆ _unknownArgsDict()

dict CPGridRun.CPGridRun._unknownArgsDict ( self)
protected
Cleans the unknown args by removing leading dashes and ensuring they are in key-value pairs

Definition at line 179 of file CPGridRun.py.

179 def _unknownArgsDict(self)->dict:
180 '''
181 Cleans the unknown args by removing leading dashes and ensuring they are in key-value pairs
182 '''
183 unknown_args_dict = {}
184 idx = 0
185 while idx < len(self.unknown_args):
186 if self.unknown_args[idx].startswith('-'):
187 if idx + 1 < len(self.unknown_args) and not self.unknown_args[idx + 1].startswith('-'):
188 unknown_args_dict[self.unknown_args[idx].lstrip('-')] = self.unknown_args[idx + 1]
189 idx += 2
190 else:
191 unknown_args_dict[self.unknown_args[idx].lstrip('-')] = True
192 idx += 1
193 return unknown_args_dict
194

◆ askSubmission()

CPGridRun.CPGridRun.askSubmission ( self)

Definition at line 628 of file CPGridRun.py.

628 def askSubmission(self):
629 if self.args.noSubmit:
630 return
631 if self.args.agreeAll:
632 logCPGridRun.info("You have agreed to all the submission details. Jobs will be submitted without confirmation.")
633 self.submit()
634 return
635 answer = input("Please confirm ALL the submission details are correct before submitting [y/n]: ")
636 if answer.lower() == 'y':
637 self.submit()
638 elif answer.lower() == 'n':
639 logCPGridRun.info("Feel free to report any unexpected behavior to the CPAlgorithms team!")
640 else:
641 logCPGridRun.error("Invalid input. Please enter 'y' or 'n'. Jobs are not submitted.")
642

◆ atlasProductionNameParser()

CPGridRun.CPGridRun.atlasProductionNameParser ( filename)
static
Parsing file name into a dictionary, an example is given here
mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855/DAOD_PHYS.34865530._000740.pool.root.1
For the first part
datasetName: mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
projectName: mc20_13TeV
campaign: mc20
energy: 13 #(TeV)
DSID: 410470
main: PhPy8EG_A14_ttbar_hdamp258p75_nonallhad
TODO  generator: PhPy8Eg
TODO  tune: A14 # For Pythia8
TODO  process: ttbar
TODO  hdamp: 258p75 # For Powheg
TODO  decayType: nonallhad
step: deriv
format: DAOD_PHYS
tags: e###_s###_r###_p###_a###_t###_b#
etag: e6337 # EVNT (EVGEN) production and merging
stag: s3681 # Geant4 simulation to produce HITS and merging!
rtag: r13167 # Digitisation and reconstruction, as well as AOD merging
ptag: p5855 # Production of NTUP_PILEUP format and merging
atag: aXXX: atlfast configuration (both simulation and digit/recon)
ttag: tXXX: tag production configuration
btag: bXXX: bytestream production configuration

For the second part
JeditaskID: 34865530
fileNumber: 000740
version: 1

Definition at line 513 of file CPGridRun.py.

513 def atlasProductionNameParser(filename):
514 '''
515 Parsing file name into a dictionary, an example is given here
516 mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855/DAOD_PHYS.34865530._000740.pool.root.1
517 For the first part
518 datasetName: mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
519 projectName: mc20_13TeV
520 campaign: mc20
521 energy: 13 #(TeV)
522 DSID: 410470
523 main: PhPy8EG_A14_ttbar_hdamp258p75_nonallhad
524 TODO generator: PhPy8Eg
525 TODO tune: A14 # For Pythia8
526 TODO process: ttbar
527 TODO hdamp: 258p75 # For Powheg
528 TODO decayType: nonallhad
529 step: deriv
530 format: DAOD_PHYS
531 tags: e###_s###_r###_p###_a###_t###_b#
532 etag: e6337 # EVNT (EVGEN) production and merging
533 stag: s3681 # Geant4 simulation to produce HITS and merging!
534 rtag: r13167 # Digitisation and reconstruction, as well as AOD merging
535 ptag: p5855 # Production of NTUP_PILEUP format and merging
536 atag: aXXX: atlfast configuration (both simulation and digit/recon)
537 ttag: tXXX: tag production configuration
538 btag: bXXX: bytestream production configuration
539
540 For the second part
541 JeditaskID: 34865530
542 fileNumber: 000740
543 version: 1
544
545 '''
546 result = {}
547 #split the / in case
548 # mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
549 # /DAOD_PHYS.34865530._000740.pool.root.1
550 if '/' in filename:
551 datasetPart, filePart = filename.split('/')
552 else:
553 datasetPart = filename
554 filePart = None
555
556 # Split the dataset part by dots
557 datasetParts = datasetPart.split('.')
558 result['datasetName'] = datasetPart
559 # Extract the first part
560 result['projectName'] = datasetParts[0] # is positional
561 # Extract the campaign and energy
562 campaign_energy = result['projectName'].split('_')
563 result['campaign'] = campaign_energy[0]
564 result['energy'] = campaign_energy[1]
565
566 # Extract the DSID, positional
567 result['DSID'] = datasetParts[1]
568 result['main'] = datasetParts[2]
569 result['step'] = datasetParts[3]
570 result['format'] = datasetParts[4]
571
572 # Extract the tags (etag, stag, rtag, ptag)
573 tags = datasetParts[5].split('_')
574 result['tags'] = tags
575 for tag in tags:
576 if tag.startswith('e'):
577 result['etag'] = tag
578 elif tag.startswith('s'):
579 result['stag'] = tag
580 elif tag.startswith('r'):
581 result['rtag'] = tag
582 elif tag.startswith('p'):
583 result['ptag'] = tag
584 elif tag.startswith('a'):
585 result['atag'] = tag
586 elif tag.startswith('t'):
587 result['ttag'] = tag
588 elif tag.startswith('b'):
589 result['btag'] = tag
590
591 # Extract the file part if it exists
592 if filePart:
593 fileParts = filePart.split('.')
594 result['jediTaskID'] = fileParts[1]
595 result['fileNumber'] = fileParts[2]
596 result['version'] = fileParts[-1]
597 return result
598

◆ checkExternalTools()

CPGridRun.CPGridRun.checkExternalTools ( self)

Definition at line 621 of file CPGridRun.py.

621 def checkExternalTools(self):
622 if self.args.noSubmit:
623 return
624 self.hasPrun()
625 if self.args.checkInputDS:
626 self.checkInputInPyami()
627

◆ checkInputInPyami()

bool CPGridRun.CPGridRun.checkInputInPyami ( self)

Definition at line 241 of file CPGridRun.py.

241 def checkInputInPyami(self) -> bool:
242 if not self.hasPyami():
243 return False
244
245 client = pyAMI.client.Client('atlas')
246 pyAMI.atlas.api.init()
247
248 queries, datasetPtag = self._prepareAmiQueryFromInputList()
249 try:
250 results = pyAMI.atlas.api.list_datasets(client, patterns=queries)
251 except pyAMI.exception.Error:
252 self._errorCollector['no valid certificate'] = (
253 "Cannot query AMI, please run 'voms-proxy-init -voms atlas' and ensure your certificate is valid.")
254 return False
255
256 return self._analyzeAmiResults(results, datasetPtag)
257

◆ configureSubmissionSingleSample()

CPGridRun.CPGridRun.configureSubmissionSingleSample ( self,
input )

Definition at line 133 of file CPGridRun.py.

133 def configureSubmissionSingleSample(self, input):
134 config = {
135 'inDS': input,
136 'outDS': self.args.outDS if self.args.outDS else self.outputDSFormatter(input) ,
137 'useAthenaPackages': True,
138 'cmtConfig': os.environ["CMTCONFIG"],
139 'writeInputToTxt': 'IN:in.txt',
140 'outputs': self.outputsFormatter(),
141 'exec': self.execFormatter(),
142 'memory': "2000", # MB
143 'addNthFieldOfInDSToLFN': '2,3,6',
144 }
145 if self.args.noSubmit:
146 config['noSubmit'] = True
147
148 if self.args.mergeType == 'xAOD':
149 config['mergeScript'] = 'xAODMerge %OUT `echo %IN | sed \'s/,/ /g\'`'
150
151 if self.args.mergeType != 'None':
152 config['mergeOutput'] = True
153
154 if not self._tarballRecreated and (self.args.recreateTar or not os.path.exists(self._tarfile) or self._filesChanged()):
155 config['outTarBall'] = self._tarfile
156 self._tarballRecreated = True
157 elif os.path.exists(self._tarfile) or self._tarballRecreated:
158 config['inTarBall'] = self._tarfile
159
160 if self.args.groupProduction:
161 config['official'] = True
162 config['voms'] = f'atlas:/atlas/{self.args.gridUsername}/Role=production'
163
164 if self.args.destSE:
165 config['destSE'] = self.args.destSE
166
167 if self.args.testRun:
168 config['nEventsPerFile'] = 300
169 config['nFiles'] = 10
170 config.update(self.prunArgsDict)
171 cmd = 'prun \\\n'
172 for k, v in config.items():
173 if isinstance(v, bool) and v:
174 cmd += f'--{k} \\\n'
175 elif v is not None and v != '':
176 cmd += f'--{k} {v} \\\n'
177 return cmd.rstrip(' \\\n')
178

◆ configureSumbission()

CPGridRun.CPGridRun.configureSumbission ( self)

Definition at line 127 of file CPGridRun.py.

127 def configureSumbission(self):
128 for input in self.inputList:
129 cmd = self.configureSubmissionSingleSample(input)
130 self.cmd[input] = cmd
131 self._isFirstRun = False
132

◆ execFormatter()

CPGridRun.CPGridRun.execFormatter ( self)

Definition at line 402 of file CPGridRun.py.

402 def execFormatter(self):
403 # Check if the execution command starts with 'CPRun.py' or '-'
404 isCPRunDefault = self.args.exec.startswith('-') or self.args.exec.startswith('CPRun.py')
405 formatingClause = {
406 'input_list': 'in.txt',
407 'merge_output_files': True,
408 }
409 if not isCPRunDefault:
410 if self._isFirstRun: logCPGridRun.warning("Non-CPRun.py is detected, please ensure the exec string is formatted correctly. Exec string will not be automatically formatted.")
411 return f'"{self.args.exec}"'
412
413 # Parse the exec string using the parser to validate and extract known arguments
414 self._initRunscript()
415 runscriptArgs, unknownArgs = self._runscript.parser.parse_known_args(self.args.exec.split(' '))
416
417 # Throw error if unknownArgs contains any --args
418 unknown_flags = [arg for arg in unknownArgs if arg.startswith('--')]
419 if unknown_flags:
420 logCPGridRun.error(f"Unknown flags detected in the exec string: {unknown_flags}. Please check the exec string.")
421 raise ValueError(f"Unknown arguments detected: {unknown_flags}")
422
423 # Only override if value is None or the parser default
424 for key, value in formatingClause.items():
425 if hasattr(runscriptArgs, key):
426 old_value = getattr(runscriptArgs, key)
427 if old_value is None or old_value == self._runscript.parser.get_default(key):
428 setattr(runscriptArgs, key, value)
429 if self._isFirstRun: logCPGridRun.info(f"Setting '{key}' to '{value}' (CPRun.py default is: '{old_value}')")
430 else:
431 if self._isFirstRun: logCPGridRun.warning(f"Preserving user-defined '{key}': '{old_value}', default formatting '{value}' will not be applied.")
432 else:
433 logCPGridRun.error(f"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
434 raise ValueError(f"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
435 self._checkYamlExists(runscriptArgs)
436 # Return the formatted arguments as a string
437 arg_string = ' '.join(
438 f'--{k.replace("_", "-")}' if isinstance(v, bool) and v else
439 f'--{k.replace("_", "-")} {v}' for k, v in vars(runscriptArgs).items() if v not in [None, False]
440 )
441 return f'"CPRun.py {arg_string}"'
442

◆ getParser()

CPGridRun.CPGridRun.getParser ( self)

Definition at line 122 of file CPGridRun.py.

122 def getParser(self):
123 return self.gridParser
124

◆ hasPrun()

bool CPGridRun.CPGridRun.hasPrun ( self)

Definition at line 471 of file CPGridRun.py.

471 def hasPrun(self) -> bool:
472 import shutil
473 prun_path = shutil.which("prun")
474 if prun_path is None:
475 self._errorCollector['no prun'] = (
476 "The 'prun' command is not found. If you are on lxplus, please run the following commands:\n\n"
477 "```\n"
478 "lsetup panda\n"
479 "voms-proxy-init -voms atlas\n"
480 "```\n"
481 "Make sure you have a valid certificate."
482 )
483 return False
484 return True
485

◆ hasPyami()

CPGridRun.CPGridRun.hasPyami ( self)

Definition at line 225 of file CPGridRun.py.

225 def hasPyami(self):
226 try:
227 global pyAMI
228 import pyAMI.client
229 import pyAMI.atlas.api
230 except ModuleNotFoundError:
231 self._errorCollector['no AMI'] = (
232 "Cannot import pyAMI, please run the following commands:\n\n"
233 "```\n"
234 "lsetup pyami\n"
235 "voms-proxy-init -voms atlas\n"
236 "```\n"
237 "and make sure you have a valid certificate.")
238 return False
239 return True
240

◆ inputList()

CPGridRun.CPGridRun.inputList ( self)

Definition at line 94 of file CPGridRun.py.

94 def inputList(self):
95 if self._inputList is None:
96 if self.args.input_list.endswith('.txt'):
97 self._inputList = CPGridRun._parseInputFileList(self.args.input_list)
98 elif self.args.input_list.endswith('.json'):
99 raise NotImplementedError('JSON input list parsing is not implemented')
100 elif CPGridRun.isAtlasProductionFormat(self.args.input_list):
101 self._inputList = [self.args.input_list]
102 else:
103 raise ValueError(
104 'use --input-list to specify input containers')
105 return self._inputList
106

◆ isAtlasProductionFormat()

CPGridRun.CPGridRun.isAtlasProductionFormat ( name)
static

Definition at line 493 of file CPGridRun.py.

493 def isAtlasProductionFormat(name):
494 if name.startswith('mc') or name.startswith('data'):
495 return True
496 logCPGridRun.warning("Name is not in the Atlas production format, assuming it is a user production")
497 return False
498

◆ outputDSFormatter()

CPGridRun.CPGridRun.outputDSFormatter ( self,
name )

Definition at line 308 of file CPGridRun.py.

308 def outputDSFormatter(self, name):
309 if CPGridRun.isAtlasProductionFormat(name):
310 return self._outputDSFormatter(name)
311 else:
312 return self._customOutputDSFormatter(name)
313

◆ outputFilesParsing()

CPGridRun.CPGridRun.outputFilesParsing ( self)

Definition at line 107 of file CPGridRun.py.

107 def outputFilesParsing(self):
108 output_files = []
109 for output in self.args.output_files:
110 if ',' in output:
111 output_files.extend(output.split(','))
112 else:
113 output_files.append(output)
114 self.output_files = output_files
115

◆ outputsFormatter()

CPGridRun.CPGridRun.outputsFormatter ( self)

Definition at line 467 of file CPGridRun.py.

467 def outputsFormatter(self):
468 outputs = [f'{output.split(".")[0]}:{output}' for output in self.args.output_files]
469 return ','.join(outputs)
470

◆ printDelayedErrorCollection()

CPGridRun.CPGridRun.printDelayedErrorCollection ( self)

Definition at line 612 of file CPGridRun.py.

612 def printDelayedErrorCollection(self):
613 if self._errorCollector:
614 logCPGridRun.error("Errors were collected during the script execution:")
615
616 for key, value in self._errorCollector.items():
617 logCPGridRun.error(f"{key}: {value}")
618 logCPGridRun.error("Please fix the errors and try again.")
619 sys.exit(1)
620

◆ printHelp()

CPGridRun.CPGridRun.printHelp ( self)

Definition at line 116 of file CPGridRun.py.

116 def printHelp(self):
117 self.gridParser.print_help()
118 logCPGridRun.info("\033[92m\n If you are using CPRun.py, the following flags are for the CPRun.py in this framework\033[0m")
119 self._runscript.parser.usage = argparse.SUPPRESS
120 self._runscript.parser.print_help()
121
void printHelp()

◆ printInputDetails()

CPGridRun.CPGridRun.printInputDetails ( self)

Definition at line 215 of file CPGridRun.py.

215 def printInputDetails(self):
216 for key, cmd in self.cmd.items():
217 parsed_name = CPGridRun.atlasProductionNameParser(key)
218 logCPGridRun.info("\n"
219 f"Input: {key}\n" +
220 "\n".join([f" {k.replace('_', ' ').title()}: {v}" for k, v in parsed_name.items()]))
221 logCPGridRun.info(f"Command: \n{cmd}")
222 print("-" * 70)
223 # Add your submission logic here
224
void print(char *figname, TCanvas *c1)

◆ rucioCustomNameParser()

CPGridRun.CPGridRun.rucioCustomNameParser ( filename)
static
The custom name has many variations, but most of them follow user/group.username.datasetname.suffix

Definition at line 500 of file CPGridRun.py.

500 def rucioCustomNameParser(filename):
501 '''
502 The custom name has many variations, but most of them follow user/group.username.datasetname.suffix
503 '''
504 result = {}
505 parts = filename.split('.')
506 result['userType'] = parts[0]
507 result['username'] = parts[1]
508 result['main'] = parts[2]
509 result['suffix'] = parts[-1]
510 return result
511

◆ submit()

CPGridRun.CPGridRun.submit ( self)

Definition at line 486 of file CPGridRun.py.

486 def submit(self):
487 import subprocess
488 for key, cmd in self.cmd.items():
489 process = subprocess.Popen(cmd, shell=True, stdout=sys.stdout, stderr=sys.stderr)
490 process.communicate()
491

Member Data Documentation

◆ _errorCollector

dict CPGridRun.CPGridRun._errorCollector = {}
protected

Definition at line 24 of file CPGridRun.py.

◆ _inputList

list CPGridRun.CPGridRun._inputList = None
protected

Definition at line 23 of file CPGridRun.py.

◆ _isFirstRun

bool CPGridRun.CPGridRun._isFirstRun = True
protected

Definition at line 21 of file CPGridRun.py.

◆ _runscript

CPGridRun.CPGridRun._runscript = None
protected

Definition at line 15 of file CPGridRun.py.

◆ _tarballRecreated

bool CPGridRun.CPGridRun._tarballRecreated = False
protected

Definition at line 22 of file CPGridRun.py.

◆ _tarfile

CPGridRun.CPGridRun._tarfile = 'cpgrid.tar.gz'
protected

Definition at line 20 of file CPGridRun.py.

◆ args

CPGridRun.CPGridRun.args

Definition at line 75 of file CPGridRun.py.

◆ cmd

dict CPGridRun.CPGridRun.cmd = {}

Definition at line 25 of file CPGridRun.py.

◆ gridParser

CPGridRun.CPGridRun.gridParser = self._parseGridArguments()

Definition at line 13 of file CPGridRun.py.

◆ inputList

CPGridRun.CPGridRun.inputList

Definition at line 128 of file CPGridRun.py.

◆ output_files

CPGridRun.CPGridRun.output_files = output_files

Definition at line 114 of file CPGridRun.py.

◆ prunArgsDict

CPGridRun.CPGridRun.prunArgsDict = self._createPrunArgsDict()

Definition at line 14 of file CPGridRun.py.

◆ unknown_args

CPGridRun.CPGridRun.unknown_args = parser.parse_known_args()

Definition at line 75 of file CPGridRun.py.


The documentation for this class was generated from the following file: