ATLAS Offline Software
Loading...
Searching...
No Matches
CPGridRun.py
Go to the documentation of this file.
1#! /usr/bin/env python
2
3# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
4from AnaAlgorithm.DualUseConfig import isAthena
5from AnaAlgorithm.Logging import logging
6import argparse
7import sys
8import os
9
10logCPGridRun = logging.getLogger('CPGridRun')
12 def __init__(self):
15 self._runscript = None
16 if self.args.help:
17 self._initRunscript()
18 self.printHelp()
19 sys.exit(0)
20 self._tarfile = 'cpgrid.tar.gz'
21 self._isFirstRun = True
22 self._tarballRecreated = False
23 self._inputList = None
24 self._errorCollector = {} # Delay the error collection until the end of the script for better user experience
25 self._yamlPath = None
26 self.cmd = {} # sample name -> command
27
28 def _initRunscript(self):
29 if self._runscript is not None:
30 return self._runscript
31 elif isAthena:
32 from AnalysisAlgorithmsConfig.AthenaCPRunScript import AthenaCPRunScript
33 self._runscript = AthenaCPRunScript()
34 else:
35 from AnalysisAlgorithmsConfig.EventLoopCPRunScript import EventLoopCPRunScript
36 self._runscript = EventLoopCPRunScript()
37 return self._runscript
38
40 parser = argparse.ArgumentParser(description='CPGrid runscript to submit CPRun.py jobs to the grid. '
41 'This script will submit a job to the grid using files in the input text one by one.'
42 'CPRun.py can handle multiple sources of input and create one output; but not this script',
43 add_help=False,
44 formatter_class=argparse.RawTextHelpFormatter)
45 parser.add_argument('-h', '--help', dest='help', action='store_true', help='Show this help message and continue')
46
47 ioGroup = parser.add_argument_group('Input/Output file configuration')
48 ioGroup.add_argument('-i','--input-list', dest='input_list', help='Path to the text file containing list of containers on the panda grid. Each container will be passed to prun as --inDS and is run individually')
49 ioGroup.add_argument('--output-files', dest='output_files', nargs='+', default=['output.root'],
50 help='The output files of the grid job. Example: --output-files A.root B.txt B.root results in A/A.root, B/B.txt, B/B.root in the output directory. No need to specify if using CPRun.py')
51 ioGroup.add_argument('--destSE', dest='destSE', default='', type=str, help='Destination storage element (PanDA)')
52 ioGroup.add_argument('--mergeType', dest='mergeType', default='Default', type=str, help='Output merging type, [None, Default, xAOD]')
53
54 pandaGroup = parser.add_argument_group('Input/Output naming configuration')
55 pandaGroup.add_argument('--gridUsername', dest='gridUsername', default=os.getenv('USER', ''), type=str, help='Grid username, or the groupname. Default is the current user. Only affect file naming')
56 pandaGroup.add_argument('--prefix', dest='prefix', default='', type=str, help='Prefix for the output directory. Dynamically set with input container if not provided')
57 pandaGroup.add_argument('--suffix', dest='suffix', default='',type=str, help='Suffix for the output directory')
58 pandaGroup.add_argument('--outDS', dest='outDS', default='', type=str,
59 help='Name of an output dataset. outDS will contain all output files (PanDA). If not provided, support dynamic naming if input name is in the Atlas production format or typical user production format')
60
61 cpgridGroup = parser.add_argument_group('CPGrid configuration')
62 cpgridGroup.add_argument('--groupProduction', dest='groupProduction', action='store_true', help='Only use for official production')
63
64 cpgridGroup.add_argument('--exec', dest='exec', type=str,
65 help='Executable line for the CPRun.py or custom script to run on the grid encapsulated in a double quote (PanDA)\n'
66 'Run CPRun.py with preset behavior including streamlined file i/o. E.g, "CPRun.py -t config.yaml --no-systematics".\n'
67 'Run custom script: "customRun.py -i inputs -o output --text-config config.yaml --flagA --flagB"\n'
68 )
69
70 submissionGroup = parser.add_argument_group('Submission configuration')
71 submissionGroup.add_argument('-y', '--agreeAll', dest='agreeAll', action='store_true', help='Agree to all the submission details without asking for confirmation. Use with caution!')
72 submissionGroup.add_argument('--noSubmit', dest='noSubmit', action='store_true', help='Do not submit the job to the grid (PanDA). Useful to inspect the prun command')
73 submissionGroup.add_argument('--testRun', dest='testRun', action='store_true', help='Will submit job to the grid but greatly limit the number of files per job (10) and number of events (300)')
74 submissionGroup.add_argument('--checkInputDS', dest='checkInputDS', action='store_true', help='Check if the input datasets are available on the AMI.')
75 submissionGroup.add_argument('--recreateTar', dest='recreateTar', action='store_true', help='Re-compress the source code. Source code are compressed by default in submission, this is useful when the source code is updated')
76 submissionGroup.add_argument('--useCentralPackage', dest='useCentralPackage', action='store_true', help='Use central package instead of custom packages')
77 self.args, self.unknown_args = parser.parse_known_args()
79 return parser
80
81 def _createPrunArgsDict(self) -> dict:
82 '''
83 converting unknown args to a dictionary
84 '''
85 unknownArgsDict = self._unknownArgsDict()
86 if unknownArgsDict and self.hasPrun():
87 self._checkPrunArgs(unknownArgsDict)
88 logCPGridRun.info(f"Adding prun exclusive arguments: {unknownArgsDict.keys()}")
89 elif unknownArgsDict:
90 logCPGridRun.warning(f"Unknown arguments detected: {unknownArgsDict}. Cannot check the availablility in Prun because Prun is not available / noSubmit is on.")
91 else:
92 pass
93 return unknownArgsDict
94
95 @property
96 def inputList(self):
97 if self._inputList is None:
98 if self.args.input_list.endswith('.txt'):
99 self._inputList = CPGridRun._parseInputFileList(self.args.input_list)
100 elif self.args.input_list.endswith('.json'):
101 raise NotImplementedError('JSON input list parsing is not implemented')
102 elif CPGridRun.isAtlasProductionFormat(self.args.input_list):
103 self._inputList = [self.args.input_list]
104 else:
105 raise ValueError(
106 'use --input-list to specify input containers')
107 return self._inputList
108
110 output_files = []
111 for output in self.args.output_files:
112 if ',' in output:
113 output_files.extend(output.split(','))
114 else:
115 output_files.append(output)
116 self.output_files = output_files
117
118 def printHelp(self):
119 self.gridParser.print_help()
120 logCPGridRun.info("\033[92m\n If you are using CPRun.py, the following flags are for the CPRun.py in this framework\033[0m")
121 self._runscript.parser.usage = argparse.SUPPRESS
122 self._runscript.parser.print_help()
123
124 def getParser(self):
125 return self.gridParser
126
127 # This function do all the checking, cleaning and preparing the command to be submitted to the grid
128 # separated for client to be able to change the behavior
130 for input in self.inputList:
131 cmd = self.configureSubmissionSingleSample(input)
132 self.cmd[input] = cmd
133 self._isFirstRun = False
134
136 config = {
137 'inDS': input,
138 'outDS': self.args.outDS if self.args.outDS else self.outputDSFormatter(input) ,
139 'cmtConfig': os.environ["CMTCONFIG"],
140 'writeInputToTxt': 'IN:in.txt',
141 'outputs': self.outputsFormatter(),
142 'exec': self.execFormatter(),
143 'memory': "2000", # MB
144 'addNthFieldOfInDSToLFN': '2,3,6',
145 }
146 if self.args.noSubmit:
147 config['noSubmit'] = True
148
149 if self.args.mergeType == 'xAOD':
150 config['mergeScript'] = 'xAODMerge %OUT `echo %IN | sed \'s/,/ /g\'`'
151
152 if self.args.mergeType != 'None':
153 config['mergeOutput'] = True
154
155 # Three types of files sending the grid
156 if self.args.useCentralPackage: # 1. Using central package and have a yaml file only
157 config['extFile'] = self._yamlPath
158 config['noBuild'] = True
159 config['noCompile'] = True
160 config['athenaTag'] = f"AnalysisBase,{os.environ['AnalysisBase_VERSION']}"
161 elif self._filesChangedOrTarballNotCreated(): # 2. Using custom packages and haven't compressed the tarball since the last changes
162 config['outTarBall'] = self._tarfile
163 config['useAthenaPackages'] = True
164 self._tarballRecreated = True
165 elif self._hasCompressedTarball(): # 3. Using custom packages and have compressed the tarball
166 config['inTarBall'] = self._tarfile
167 config['useAthenaPackages'] = True
168
169 if self.args.groupProduction:
170 config['official'] = True
171 config['voms'] = f'atlas:/atlas/{self.args.gridUsername}/Role=production'
172
173 if self.args.destSE:
174 config['destSE'] = self.args.destSE
175
176 if self.args.testRun:
177 config['nEventsPerFile'] = 100
178 config['nFiles'] = 5
179 config.update(self.prunArgsDict)
180 cmd = 'prun \\\n'
181 for k, v in config.items():
182 if isinstance(v, bool) and v:
183 cmd += f'--{k} \\\n'
184 elif v is not None and v != '':
185 cmd += f'--{k} {v} \\\n'
186 return cmd.rstrip(' \\\n')
187
188 def _unknownArgsDict(self)->dict:
189 '''
190 Cleans the unknown args by removing leading dashes and ensuring they are in key-value pairs
191 '''
192 unknown_args_dict = {}
193 idx = 0
194 while idx < len(self.unknown_args):
195 if self.unknown_args[idx].startswith('-'):
196 if idx + 1 < len(self.unknown_args) and not self.unknown_args[idx + 1].startswith('-'):
197 unknown_args_dict[self.unknown_args[idx].lstrip('-')] = self.unknown_args[idx + 1]
198 idx += 2
199 else:
200 unknown_args_dict[self.unknown_args[idx].lstrip('-')] = True
201 idx += 1
202 return unknown_args_dict
203
204 def _checkPrunArgs(self,argDict):
205 '''
206 check the arguments against the prun script to ensure they are valid
207 See https://github.com/PanDAWMS/panda-client/blob/master/pandaclient/PrunScript.py
208 '''
209 import pandaclient.PrunScript
210 # We need to temporarily clear the sys.argv to avoid the parser from PrunScript to fail
211 original_argv = sys.argv
212 sys.argv = ['prun'] # Reset sys.argv to only contain the script name
213 prunArgsDict = {}
214 prunArgsDict = pandaclient.PrunScript.main(get_options=True)
215 sys.argv = original_argv # Restore the original sys.argv
216 nonPrunOrCPGridArgs = []
217 for arg in argDict:
218 if arg not in prunArgsDict:
219 nonPrunOrCPGridArgs.append(arg)
220 if nonPrunOrCPGridArgs:
221 logCPGridRun.error(f"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
222 raise ValueError(f"Unknown arguments detected: {nonPrunOrCPGridArgs}. They do not belong to CPGridRun or Panda.")
223
225 for key, cmd in self.cmd.items():
226 parsed_name = CPGridRun.atlasProductionNameParser(key)
227 logCPGridRun.info("\n"
228 f"Input: {key}\n" +
229 "\n".join([f" {k.replace('_', ' ').title()}: {v}" for k, v in parsed_name.items()]))
230 logCPGridRun.info(f"Command: \n{cmd}")
231 print("-" * 70)
232 # Add your submission logic here
233
234 def hasPyami(self):
235 try:
236 global pyAMI
237 import pyAMI.client
238 import pyAMI.atlas.api
239 except ModuleNotFoundError:
240 self._errorCollector['no AMI'] = (
241 "Cannot import pyAMI, please run the following commands:\n\n"
242 "```\n"
243 "lsetup pyami\n"
244 "voms-proxy-init -voms atlas\n"
245 "```\n"
246 "and make sure you have a valid certificate.")
247 return False
248 return True
249
250 def checkInputInPyami(self) -> bool:
251 if not self.hasPyami():
252 return False
253
254 client = pyAMI.client.Client('atlas')
255 pyAMI.atlas.api.init()
256
257 queries, datasetPtag = self._prepareAmiQueryFromInputList()
258 try:
259 results = pyAMI.atlas.api.list_datasets(client, patterns=queries)
260 except pyAMI.exception.Error:
261 self._errorCollector['no valid certificate'] = (
262 "Cannot query AMI, please run 'voms-proxy-init -voms atlas' and ensure your certificate is valid.")
263 return False
264
265 return self._analyzeAmiResults(results, datasetPtag)
266
268 '''
269 Helper function to prepare a list of queries for the AMI based on the input list.
270 It will replace the _p### with _p% to match the latest ptag.
271 '''
272 import re
273 regex = re.compile("_p[0-9]+")
274 queries = []
275 datasetPtag = {}
276 for datasetName in self.cmd:
277 parsed = CPGridRun.atlasProductionNameParser(datasetName)
278 datasetPtag[datasetName] = parsed.get('ptag')
279 queries.append(regex.sub("_p%", datasetName))
280 return queries, datasetPtag
281
282 def _analyzeAmiResults(self, results, datasetPtag) -> bool:
283 import re
284 regex = re.compile("_p[0-9]+")
285 results = [r['ldn'] for r in results]
286 notFound = []
287 latestPtag = {}
288
289 for datasetName in self.cmd:
290 if datasetName not in results:
291 notFound.append(datasetName)
292
293 base = regex.sub("_p%", datasetName)
294 matching = [r for r in results if r.startswith(base.replace("_p%", ""))]
295 for m in matching:
296 mParsed = CPGridRun.atlasProductionNameParser(m)
297 try:
298 mPtagInt = int(mParsed.get('ptag', 'p0')[1:])
299 currentPtagInt = int(datasetPtag.get(datasetName, 'p0')[1:])
300 if mPtagInt > currentPtagInt:
301 latestPtag[datasetName] = f"p{mPtagInt}"
302 except (ValueError, TypeError):
303 continue
304
305 if latestPtag:
306 logCPGridRun.info("Newer version of datasets found in AMI:")
307 for name, ptag in latestPtag.items():
308 logCPGridRun.info(f"{name} -> ptag: {ptag}")
309
310 if notFound:
311 logCPGridRun.error("Some input datasets are not available in AMI, missing datasets are likely to fail on the grid:")
312 logCPGridRun.error(", ".join(notFound))
313 return False
314
315 return True
316
318 return not self._tarballRecreated and (self.args.recreateTar or not os.path.exists(self._tarfile) or self._filesChanged())
319
321 return os.path.exists(self._tarfile) or self._tarballRecreated
322
323 def outputDSFormatter(self, name):
324 if CPGridRun.isAtlasProductionFormat(name):
325 return self._outputDSFormatter(name)
326 else:
327 return self._customOutputDSFormatter(name)
328
329 def _outputDSFormatter(self, name):
330 '''
331 {group/user}.{username}.{prefix}.{DSID}.{format}.{tags}.{suffix}
332 '''
333 nameParser = CPGridRun.atlasProductionNameParser(name)
334 base = 'group' if self.args.groupProduction else 'user'
335 username = self.args.gridUsername
336 dsid = nameParser['DSID']
337 tags = '_'.join(nameParser['tags'])
338 fileFormat = nameParser['format']
339 base = 'group' if self.args.groupProduction else 'user'
340 prefix = self.args.prefix if self.args.prefix else nameParser['main'].split('_')[0] # Dynamically set the prefix, likely to be something like PhPy8Eg
341 suffix = self._suffixFormatter()
342
343 result = [base, username, prefix, dsid, fileFormat, tags, suffix]
344 return ".".join(filter(None, result))
345
347 '''
348 {group/user}.{username}.{main}.outputDS.{suffix}
349 '''
350 parts = name.split('.')
351 base = 'group' if self.args.groupProduction else 'user'
352 username = self.args.gridUsername
353 main = parts[2]
354 outputDS = 'outputDS'
355 suffix = parts[-1]
356
357 result = [base, username,main, outputDS, suffix]
358 return ".".join(filter(None, result))
359
361 if self.args.suffix:
362 return self.args.suffix
363 if self.args.testRun:
364 import uuid
365 return f"test_{uuid.uuid4().hex[:6]}"
366 else:
367 ''
368
369 def _filesChanged(self):
370 tarball_mtime = os.path.getmtime(self._tarfile) if os.path.exists(self._tarfile) else 0
371 buildDir = self._buildDir()
372 sourceDir = self._sourceDir()
373
374 # Check for changes in buildDir
375 for root, _, files in os.walk(buildDir):
376 for file in files:
377 file_path = os.path.join(root, file)
378 try:
379 if os.path.getmtime(file_path) > tarball_mtime:
380 logCPGridRun.info(f"File {file_path} is newer than the tarball.")
381 return True
382 except FileNotFoundError:
383 continue
384
385 # Check for changes in sourceDir
386 if sourceDir is None:
387 logCPGridRun.warning("Source directory is not detected, auto-compression is not performed. Use --recreateTar to update the submission")
388 return False
389 for root, _, files in os.walk(sourceDir):
390 for file in files:
391 file_path = os.path.join(root, file)
392 try:
393 if os.path.getmtime(file_path) > tarball_mtime:
394 logCPGridRun.info(f"File {file_path} is newer than the tarball.")
395 return True
396 except FileNotFoundError:
397 continue
398 return False
399
400 def _buildDir(self):
401 buildDir = os.environ["CMAKE_PREFIX_PATH"]
402 buildDir = os.path.dirname(buildDir.split(":")[0])
403 return buildDir
404
405 def _sourceDir(self):
406 cmakeCachePath = os.path.join(self._buildDir(), 'CMakeCache.txt')
407 sourceDir = None
408 if not os.path.exists(cmakeCachePath):
409 return sourceDir
410 with open(cmakeCachePath, 'r') as cmakeCache:
411 for line in cmakeCache:
412 if '_SOURCE_DIR:STATIC=' in line:
413 sourceDir = line.split('=')[1].strip()
414 break
415 return sourceDir
416
417 def execFormatter(self):
418 # Check if the execution command starts with 'CPRun.py' or '-'
419 isCPRunDefault = self.args.exec.startswith('-') or self.args.exec.startswith('CPRun.py')
420 formatingClause = {
421 'input_list': 'in.txt',
422 'merge_output_files': True,
423 }
424 if not isCPRunDefault:
425 if self._isFirstRun: logCPGridRun.warning("Non-CPRun.py is detected, please ensure the exec string is formatted correctly. Exec string will not be automatically formatted.")
426 return f'"{self.args.exec}"'
427
428 # Parse the exec string using the parser to validate and extract known arguments
429 self._initRunscript()
430 runscriptArgs, unknownArgs = self._runscript.parser.parse_known_args(self.args.exec.split(' '))
431
432 # Throw error if unknownArgs contains any --args
433 unknown_flags = [arg for arg in unknownArgs if arg.startswith('--')]
434 if unknown_flags:
435 logCPGridRun.error(f"Unknown flags detected in the exec string: {unknown_flags}. Please check the exec string.")
436 raise ValueError(f"Unknown arguments detected: {unknown_flags}")
437
438 # Only override if value is None or the parser default
439 for key, value in formatingClause.items():
440 if hasattr(runscriptArgs, key):
441 old_value = getattr(runscriptArgs, key)
442 if old_value is None or old_value == self._runscript.parser.get_default(key):
443 setattr(runscriptArgs, key, value)
444 if self._isFirstRun: logCPGridRun.info(f"Setting '{key}' to '{value}' (CPRun.py default is: '{old_value}')")
445 else:
446 if self._isFirstRun: logCPGridRun.warning(f"Preserving user-defined '{key}': '{old_value}', default formatting '{value}' will not be applied.")
447 else:
448 logCPGridRun.error(f"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
449 raise ValueError(f"Formatting clause '{key}' is not recognized in the CPRun.py script. Check CPGridRun.py")
450 self._checkYamlExists(runscriptArgs)
451 # Return the formatted arguments as a string
452 arg_string = ' '.join(
453 f'--{k.replace("_", "-")}' if isinstance(v, bool) and v else
454 f'--{k.replace("_", "-")} {v}' for k, v in vars(runscriptArgs).items() if v not in [None, False]
455 )
456 return f'"CPRun.py {arg_string}"'
457
458 def _checkYamlExists(self, runscriptArgs):
459 from AnalysisAlgorithmsConfig.CPBaseRunner import CPBaseRunner
460 if not hasattr(runscriptArgs, 'text_config'):
461 self._errorCollector['no yaml'] = "No YAML configuration file is specified in the exec string. Please provide one using --text-config"
462 return
463 yamlPath = getattr(runscriptArgs, 'text_config')
464 self._yamlPath = yamlPath
465 haveLocalYaml = CPBaseRunner.findLocalPathYamlConfig(yamlPath)
466 if haveLocalYaml:
467 logCPGridRun.warning("A path to a local YAML configuration file is found, but it may not be grid-usable.")
468
469 repoYamls, _ = CPBaseRunner.findRepoPathYamlConfig(yamlPath)
470 if repoYamls and len(repoYamls) > 1:
471 self._errorCollector['ambiguous yamls'] = f'Multiple files named \"{yamlPath}\" found in the analysis repository. Please provide a more specific path to the config file.\nMatches found:\n' + '\n'.join(repoYamls)
472 return
473 elif repoYamls and len(repoYamls) == 1:
474 logCPGridRun.info(f"Found a grid-usable YAML configuration file in the analysis repository: {repoYamls[0]}")
475 return
476
477 if haveLocalYaml and self.args.useCentralPackage:
478 logCPGridRun.warning("A path to a local YAML configuration file is found, no custom packages are found, proceed with /cvmfs packages only.")
479
480 if not repoYamls and not self.args.useCentralPackage:
481 self._errorCollector['no usable yaml'] = f"Grid usable YAML configuration file not found: {yamlPath}"
482 if haveLocalYaml:
483 self._errorCollector['have local yaml'] = f"Only a local YAML configuration file is found: {yamlPath}, not usable in the grid.\n" \
484 f"Make sure the YAML file is in build/x86_64-el9-gcc14-opt/data/package_name/config.yaml. You can install the YAML file through CMakeList.txt with `atlas_install_data( data/* )`; use `-t package_name/config.yaml` in the --exec\n"\
485 f"Or if you are only using central packages, please use the `--useCentralPackage` flag."
486
488 outputs = [f'{output.split(".")[0]}:{output}' for output in self.args.output_files]
489 return ','.join(outputs)
490
491 def hasPrun(self) -> bool:
492 import shutil
493 prun_path = shutil.which("prun")
494 if prun_path is None:
495 self._errorCollector['no prun'] = (
496 "The 'prun' command is not found. If you are on lxplus, please run the following commands:\n\n"
497 "```\n"
498 "lsetup panda\n"
499 "voms-proxy-init -voms atlas\n"
500 "```\n"
501 "Make sure you have a valid certificate."
502 )
503 return False
504 return True
505
506 def submit(self):
507 import subprocess
508 for key, cmd in self.cmd.items():
509 process = subprocess.Popen(cmd, shell=True, stdout=sys.stdout, stderr=sys.stderr)
510 process.communicate()
511
512 @staticmethod
514 if name.startswith('mc') or name.startswith('data'):
515 return True
516 logCPGridRun.warning("Name is not in the Atlas production format, assuming it is a user production")
517 return False
518
519 @staticmethod
521 '''
522 The custom name has many variations, but most of them follow user/group.username.datasetname.suffix
523 '''
524 result = {}
525 parts = filename.split('.')
526 result['userType'] = parts[0]
527 result['username'] = parts[1]
528 result['main'] = parts[2]
529 result['suffix'] = parts[-1]
530 return result
531
532 @staticmethod
534 '''
535 Parsing file name into a dictionary, an example is given here
536 mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855/DAOD_PHYS.34865530._000740.pool.root.1
537 For the first part
538 datasetName: mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
539 projectName: mc20_13TeV
540 campaign: mc20
541 energy: 13 #(TeV)
542 DSID: 410470
543 main: PhPy8EG_A14_ttbar_hdamp258p75_nonallhad
544 TODO generator: PhPy8Eg
545 TODO tune: A14 # For Pythia8
546 TODO process: ttbar
547 TODO hdamp: 258p75 # For Powheg
548 TODO decayType: nonallhad
549 step: deriv
550 format: DAOD_PHYS
551 tags: e###_s###_r###_p###_a###_t###_b#
552 etag: e6337 # EVNT (EVGEN) production and merging
553 stag: s3681 # Geant4 simulation to produce HITS and merging!
554 rtag: r13167 # Digitisation and reconstruction, as well as AOD merging
555 ptag: p5855 # Production of NTUP_PILEUP format and merging
556 atag: aXXX: atlfast configuration (both simulation and digit/recon)
557 ttag: tXXX: tag production configuration
558 btag: bXXX: bytestream production configuration
559
560 For the second part
561 JeditaskID: 34865530
562 fileNumber: 000740
563 version: 1
564
565 '''
566 result = {}
567 #split the / in case
568 # mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
569 # /DAOD_PHYS.34865530._000740.pool.root.1
570 if '/' in filename:
571 datasetPart, filePart = filename.split('/')
572 else:
573 datasetPart = filename
574 filePart = None
575
576 # Split the dataset part by dots
577 datasetParts = datasetPart.split('.')
578 result['datasetName'] = datasetPart
579 # Extract the first part
580 result['projectName'] = datasetParts[0] # is positional
581 # Extract the campaign and energy
582 campaign_energy = result['projectName'].split('_')
583 result['campaign'] = campaign_energy[0]
584 result['energy'] = campaign_energy[1]
585
586 # Extract the DSID, positional
587 result['DSID'] = datasetParts[1]
588 result['main'] = datasetParts[2]
589 result['step'] = datasetParts[3]
590 result['format'] = datasetParts[4]
591
592 # Extract the tags (etag, stag, rtag, ptag)
593 tags = datasetParts[5].split('_')
594 result['tags'] = tags
595 for tag in tags:
596 if tag.startswith('e'):
597 result['etag'] = tag
598 elif tag.startswith('s'):
599 result['stag'] = tag
600 elif tag.startswith('r'):
601 result['rtag'] = tag
602 elif tag.startswith('p'):
603 result['ptag'] = tag
604 elif tag.startswith('a'):
605 result['atag'] = tag
606 elif tag.startswith('t'):
607 result['ttag'] = tag
608 elif tag.startswith('b'):
609 result['btag'] = tag
610
611 # Extract the file part if it exists
612 if filePart:
613 fileParts = filePart.split('.')
614 result['jediTaskID'] = fileParts[1]
615 result['fileNumber'] = fileParts[2]
616 result['version'] = fileParts[-1]
617 return result
618
619 @staticmethod
621 files = []
622 with open(path, 'r') as inputText:
623 for line in inputText.readlines():
624 # skip comments and empty lines
625 if line.startswith('#') or not line.strip():
626 continue
627 files += line.split(',')
628 # remove leading/trailing whitespaces, and \n
629 files = [file.strip() for file in files]
630 return files
631
633 if self._errorCollector:
634 logCPGridRun.error("Errors were collected during the script execution:")
635
636 for key, value in self._errorCollector.items():
637 logCPGridRun.error(f"{key}: {value}")
638 logCPGridRun.error("Please fix the errors and try again.")
639 sys.exit(1)
640
642 if self.args.noSubmit:
643 return
644 self.hasPrun()
645 if self.args.checkInputDS:
646 self.checkInputInPyami()
647
648 def askSubmission(self):
649 if self.args.noSubmit:
650 return
651 if self.args.agreeAll:
652 logCPGridRun.info("You have agreed to all the submission details. Jobs will be submitted without confirmation.")
653 self.submit()
654 return
655 answer = input("Please confirm ALL the submission details are correct before submitting [y/n]: ")
656 if answer.lower() == 'y':
657 self.submit()
658 elif answer.lower() == 'n':
659 logCPGridRun.info("Feel free to report any unexpected behavior to the CPAlgorithms team!")
660 else:
661 logCPGridRun.error("Invalid input. Please enter 'y' or 'n'. Jobs are not submitted.")
662
663if __name__ == '__main__':
664 cpgrid = CPGridRun()
665 cpgrid.configureSumbission()
666 cpgrid.printInputDetails()
667 cpgrid.checkExternalTools()
668 cpgrid.printDelayedErrorCollection()
669 cpgrid.askSubmission()
void printHelp()
void print(char *figname, TCanvas *c1)
outputDSFormatter(self, name)
Definition CPGridRun.py:323
dict _createPrunArgsDict(self)
Definition CPGridRun.py:81
rucioCustomNameParser(filename)
Definition CPGridRun.py:520
bool hasPrun(self)
Definition CPGridRun.py:491
_checkYamlExists(self, runscriptArgs)
Definition CPGridRun.py:458
_parseGridArguments(self)
Definition CPGridRun.py:39
bool checkInputInPyami(self)
Definition CPGridRun.py:250
bool _analyzeAmiResults(self, results, datasetPtag)
Definition CPGridRun.py:282
isAtlasProductionFormat(name)
Definition CPGridRun.py:513
_customOutputDSFormatter(self, name)
Definition CPGridRun.py:346
_prepareAmiQueryFromInputList(self)
Definition CPGridRun.py:267
_filesChangedOrTarballNotCreated(self)
Definition CPGridRun.py:317
_hasCompressedTarball(self)
Definition CPGridRun.py:320
atlasProductionNameParser(filename)
Definition CPGridRun.py:533
_outputDSFormatter(self, name)
Definition CPGridRun.py:329
_checkPrunArgs(self, argDict)
Definition CPGridRun.py:204
printDelayedErrorCollection(self)
Definition CPGridRun.py:632
dict _unknownArgsDict(self)
Definition CPGridRun.py:188
configureSubmissionSingleSample(self, input)
Definition CPGridRun.py:135
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177