4 from AnaAlgorithm.DualUseConfig
import isAthena
5 from AnaAlgorithm.Logging
import logging
10 logCPGridRun = logging.getLogger(
'CPGridRun')
25 from AnalysisAlgorithmsConfig.AthenaCPRunScript
import AthenaCPRunScript
26 runscript = AthenaCPRunScript()
28 from AnalysisAlgorithmsConfig.EventLoopCPRunScript
import EventLoopCPRunScript
29 runscript = EventLoopCPRunScript()
33 parser = argparse.ArgumentParser(description=
'CPGrid runscript to submit CPRun.py jobs to the grid. '
34 'This script will submit a job to the grid using files in the input text one by one.'
35 'CPRun.py can handle multiple sources of input and create one output; but not this script',
37 formatter_class=argparse.RawTextHelpFormatter)
38 parser.add_argument(
'-h',
'--help', dest=
'help', action=
'store_true', help=
'Show this help message and continue')
40 ioGroup = parser.add_argument_group(
'Input/Output file configuration')
41 ioGroup.add_argument(
'-i',
'--input-list', dest=
'input_list', help=
'Path to the text file containing list of containers on the panda grid. Each container will be passed to prun as --inDS and is run individually')
42 ioGroup.add_argument(
'--output-files', dest=
'output_files', default=
'output.root',
43 help=
'The output files of the grid job. Example: --output-files "A.root,B.txt,B.root" results in A/A.root, B/B.txt, B/B.root in the output directory. No need to specify if using CPRun.py')
44 ioGroup.add_argument(
'--destSE', dest=
'destSE', default=
'', type=str, help=
'Destination storage element (PanDA)')
45 ioGroup.add_argument(
'--mergeType', dest=
'mergeType', default=
'Default', type=str, help=
'Output merging type, [None, Default, xAOD]')
47 pandaGroup = parser.add_argument_group(
'Input/Output naming configuration')
48 pandaGroup.add_argument(
'--gridUsername', dest=
'gridUsername', default=os.getenv(
'USER',
''), type=str, help=
'Grid username, or the groupname. Default is the current user. Only affect file naming')
49 pandaGroup.add_argument(
'--prefix', dest=
'prefix', default=
'', type=str, help=
'Prefix for the output directory. Dynamically set with input container if not provided')
50 pandaGroup.add_argument(
'--suffix', dest=
'suffix', default=
'',type=str, help=
'Suffix for the output directory')
51 pandaGroup.add_argument(
'--outDS', dest=
'outDS', default=
'', type=str,
52 help=
'Name of an output dataset. outDS will contain all output files (PanDA). If not provided, support dynamic naming if input name is in the Atlas production format or typical user production format')
54 cpgridGroup = parser.add_argument_group(
'CPGrid configuration')
55 cpgridGroup.add_argument(
'--groupProduction', dest=
'groupProduction', action=
'store_true', help=
'Only use for official production')
57 cpgridGroup.add_argument(
'--exec', dest=
'exec', type=str,
58 help=
'Executable line for the CPRun.py or custom script to run on the grid encapsulated in a double quote (PanDA)\n'
59 'Run CPRun.py with preset behavior including streamlined file i/o. E.g, "-t config.yaml --no-systematics".\n'
60 'CPRun.py but overriding the preset behavior (for future and experts): "CPRun.py --input-list ourExpert.txt -t config.yaml --flagB"\n'
61 'Run custom script: "customRun.py -i inputs -o output --text-config config.yaml --flagA --flagB"\n'
64 submissionGroup = parser.add_argument_group(
'Submission configuration')
65 submissionGroup.add_argument(
'--noSubmit', dest=
'noSubmit', action=
'store_true', help=
'Do not submit the job to the grid (PanDA). Useful to inspect the prun command')
66 submissionGroup.add_argument(
'--testRun', dest=
'testRun', action=
'store_true', help=
'Will submit job to the grid but greatly limit the number of files per job (10) and number of events (300)')
67 submissionGroup.add_argument(
'--recreateTar', dest=
'recreateTar', action=
'store_true', help=
'Re-compress the source code. Source code are compressed by default in submission, this is useful when the source code is updated')
69 self.
args = parser.parse_args()
75 if self.
args.input_list.endswith(
'.txt'):
76 self.
_inputList = CPGridRun._parseInputFileList(self.
args.input_list)
77 elif self.
args.input_list.endswith(
'.json'):
78 raise NotImplementedError(
'JSON input list parsing is not implemented')
79 elif CPGridRun.isAtlasProductionFormat(self.
args.input_list):
83 'use --input-list to specify input containers')
88 logCPGridRun.info(
"\033[92m\n If you are using CPRun.py, the following flags are for the CPRun.py in this framework\033[0m")
89 self.
_runscript.parser.usage = argparse.SUPPRESS
102 self.
cmd[input] = cmd
109 'useAthenaPackages':
True,
110 'cmtConfig': os.environ[
"CMTCONFIG"],
111 'writeInputToTxt':
'IN:in.txt',
115 'addNthFieldOfInDSToLFN':
'2,3,6',
117 if self.
args.noSubmit:
118 config[
'noSubmit'] =
True
120 if self.
args.mergeType ==
'xAOD':
121 config[
'mergeScript'] =
'xAODMerge %OUT `echo %IN | sed \'s/,/ /g\'`'
123 if self.
args.mergeType !=
'None':
124 config[
'mergeOutput'] =
True
127 config[
'outTarBall'] = self.
_tarfile
132 if self.
args.groupProduction:
133 config[
'official'] =
True
134 config[
'voms'] = f
'atlas:/atlas/{self.args.gridUsername}/Role=production'
137 config[
'destSE'] = self.
args.destSE
139 if self.
args.testRun:
140 config[
'nEventsPerFile'] = 300
141 config[
'nFiles'] = 10
144 for k, v
in config.items():
145 if isinstance(v, bool)
and v:
147 elif v
is not None and v !=
'':
148 cmd += f
'--{k} {v} \\\n'
149 return cmd.rstrip(
' \\\n')
153 parsed_name = CPGridRun.atlasProductionNameParser(key)
154 logCPGridRun.info(
"\n"
156 "\n".
join([f
" {k.replace('_', ' ').title()}: {v}" for k, v
in parsed_name.items()]))
157 logCPGridRun.info(f
"Command: \n{cmd}")
163 if CPGridRun.isAtlasProductionFormat(name):
170 {group/user}.{username}.{prefix}.{DSID}.{format}.{tags}.{suffix}
172 nameParser = CPGridRun.atlasProductionNameParser(name)
173 base =
'group' if self.
args.groupProduction
else 'user'
174 username = self.
args.gridUsername
175 dsid = nameParser[
'DSID']
176 tags =
'_'.
join(nameParser[
'tags'])
177 fileFormat = nameParser[
'format']
178 base =
'group' if self.
args.groupProduction
else 'user'
179 prefix = self.
args.prefix
if self.
args.prefix
else nameParser[
'main'].
split(
'_')[0]
182 result = [base, username, prefix, dsid, fileFormat, tags, suffix]
187 {group/user}.{username}.{main}.outputDS.{suffix}
189 parts = name.split(
'.')
190 base =
'group' if self.
args.groupProduction
else 'user'
191 username = self.
args.gridUsername
193 outputDS =
'outputDS'
196 result = [base, username,main, outputDS, suffix]
201 return self.
args.suffix
202 if self.
args.testRun:
204 return f
"test_{uuid.uuid4().hex[:6]}"
209 tarball_mtime = os.path.getmtime(self.
_tarfile)
if os.path.exists(self.
_tarfile)
else 0
214 for root, _, files
in os.walk(buildDir):
216 file_path = os.path.join(root, file)
218 if os.path.getmtime(file_path) > tarball_mtime:
219 logCPGridRun.info(f
"File {file_path} is newer than the tarball.")
221 except FileNotFoundError:
225 if sourceDir
is None:
226 logCPGridRun.warning(
"Source directory is not detected, auto-compression is not performed. Use --recreateTar to update the submission")
228 for root, _, files
in os.walk(sourceDir):
230 file_path = os.path.join(root, file)
232 if os.path.getmtime(file_path) > tarball_mtime:
233 logCPGridRun.info(f
"File {file_path} is newer than the tarball.")
235 except FileNotFoundError:
240 buildDir = os.environ[
"CMAKE_PREFIX_PATH"]
241 buildDir = os.path.dirname(buildDir.split(
":")[0])
245 cmakeCachePath = os.path.join(self.
_buildDir(),
'CMakeCache.txt')
247 if not os.path.exists(cmakeCachePath):
249 with open(cmakeCachePath,
'r')
as cmakeCache:
250 for line
in cmakeCache:
251 if '_SOURCE_DIR:STATIC=' in line:
252 sourceDir = line.split(
'=')[1].strip()
258 isCPRunDefault = self.
args.exec.startswith(
'-')
260 clause = self.
args.exec.split(
' ')
262 inputClause = [
'--input-list in.txt']
263 strip = [
'--strip']
if not isAthena
else []
264 return f
'"{ " ".join(base + inputClause + strip + clause) }"'
266 return f
'"{self.args.exec}"'
269 outputs = self.
args.output_files.split(
',')
270 outputs = [f
'{output.split(".")[0]}:{output}' for output
in outputs]
271 return ','.
join(outputs)
276 prun_path = shutil.which(
"prun")
277 if prun_path
is None:
278 logCPGridRun.error(
"The 'prun' command is not found. If you use are on lxplus, please run `setupATLAS` and `lsetup panda`")
281 process = subprocess.Popen(cmd, shell=
True, stdout=sys.stdout, stderr=sys.stderr)
282 process.communicate()
286 if name.startswith(
'mc')
or name.startswith(
'data'):
288 logCPGridRun.warning(
"Name is not in the Atlas production format, assuming it is a user production")
294 The custom name has many variations, but most of them follow user/group.username.datasetname.suffix
297 parts = filename.split(
'.')
298 result[
'userType'] = parts[0]
299 result[
'username'] = parts[1]
300 result[
'main'] = parts[2]
301 result[
'suffix'] = parts[-1]
307 Parsing file name into a dictionary, an example is given here
308 mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855/DAOD_PHYS.34865530._000740.pool.root.1
310 datasetName: mc20_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.deriv.DAOD_PHYS.e6337_s3681_r13167_p5855
311 projectName: mc20_13TeV
315 main: PhPy8EG_A14_ttbar_hdamp258p75_nonallhad
316 TODO generator: PhPy8Eg
317 TODO tune: A14 # For Pythia8
319 TODO hdamp: 258p75 # For Powheg
320 TODO decayType: nonallhad
323 tags: e###_s###_r###_p###_a###_t###_b#
324 etag: e6337 # EVNT (EVGEN) production and merging
325 stag: s3681 # Geant4 simulation to produce HITS and merging!
326 rtag: r13167 # Digitisation and reconstruction, as well as AOD merging
327 ptag: p5855 # Production of NTUP_PILEUP format and merging
328 atag: aXXX: atlfast configuration (both simulation and digit/recon)
329 ttag: tXXX: tag production configuration
330 btag: bXXX: bytestream production configuration
343 datasetPart, filePart = filename.split(
'/')
345 datasetPart = filename
349 datasetParts = datasetPart.split(
'.')
350 result[
'datasetName'] = datasetPart
352 result[
'projectName'] = datasetParts[0]
354 campaign_energy = result[
'projectName'].
split(
'_')
355 result[
'campaign'] = campaign_energy[0]
356 result[
'energy'] = campaign_energy[1]
359 result[
'DSID'] = datasetParts[1]
360 result[
'main'] = datasetParts[2]
361 result[
'step'] = datasetParts[3]
362 result[
'format'] = datasetParts[4]
365 tags = datasetParts[5].
split(
'_')
366 result[
'tags'] = tags
368 if tag.startswith(
'e'):
370 elif tag.startswith(
's'):
372 elif tag.startswith(
'r'):
374 elif tag.startswith(
'p'):
376 elif tag.startswith(
'a'):
378 elif tag.startswith(
't'):
380 elif tag.startswith(
'b'):
385 fileParts = filePart.split(
'.')
386 result[
'jediTaskID'] = fileParts[1]
387 result[
'fileNumber'] = fileParts[2]
388 result[
'version'] = fileParts[-1]
394 with open(path,
'r')
as inputText:
395 for line
in inputText.readlines():
397 if line.startswith(
'#')
or not line.strip():
399 files += line.split(
',')
401 files = [file.strip()
for file
in files]
405 answer = input(
"[Tenative asking] Please confirm ALL the submission details are correct to submit [y/n]: ")
406 if answer.lower() ==
'y':
408 elif answer.lower() ==
'n':
409 logCPGridRun.info(
"Feel free to report any unexpected behavior to CPAlgorithms team!")
411 logCPGridRun.error(
"Invalid input. Please enter 'y' or 'n'. Jobs are not submitted")
413 if __name__ ==
'__main__':
415 cpgrid.configureSumbission()
416 cpgrid.printInputDetails()
417 cpgrid._askSubmission()