2 from AnalysisAlgorithmsConfig.CPBaseRunner
import CPBaseRunner
9 self.logger.
info(
"EventLoopCPRunScript initialized")
15 derivedGroup = self.parser.add_argument_group(
'EventLoop specific arguments')
16 derivedGroup.add_argument(
'--direct-driver', dest=
'direct_driver',
17 action=
'store_true', help=
'Run the job with the direct driver')
18 derivedGroup.add_argument(
'--work-dir', dest=
'work_dir', nargs=
'?', const=
'workDir', default=
None,
19 help=
'The work directory for the EL job. defaults to "workDir".')
20 derivedGroup.add_argument(
'--merge-output-files', dest=
'merge_output_files', action=
'store_true', help=
'Merge the output histogram and n-tuple files into a single file.')
22 expertGroup = self.parser.add_argument_group(
'Experts arguments')
23 expertGroup.add_argument(
'--run-perf-stat', dest=
'run_perf_stat', action=
'store_true', help=
'Run xAOD::PerfStats to get input branch access data. This is mostly useful for AMG experts wanting to understand branch access patterns.')
24 expertGroup.add_argument(
'--algorithm-timers', dest=
'algorithm_timers', action=
'store_true', help=
'Enable algorithm timers. This is mostly useful for AMG experts wanting to understand tool performance.')
25 expertGroup.add_argument(
'--algorithm-memory-monitoring', dest=
'algorithm_memory_monitoring', action=
'store_true', help=
'Enable algorithm memory monitoring. This is mostly useful for AMG experts wanting to understand tool memory usage. Note that this is imperfect and may in cases assign memory to the wrong algorithm.')
29 from AnaAlgorithm.AlgSequence
import AlgSequence
30 from AnalysisAlgorithmsConfig.ConfigAccumulator
import ConfigAccumulator
32 self.logger.
info(
"Configuring algorithms based on YAML file")
34 self.logger.
info(
"Configuring common services")
35 configAccumulator = ConfigAccumulator(flags=self.flags,
37 noSystematics=self.args.no_systematics)
38 self.logger.
info(
"Configuring algorithms")
39 configSeq.fullConfigure(configAccumulator)
45 sampleFiles = ROOT.SH.SampleLocal(f
"{self.outputName}")
46 self.logger.
info(
"Adding files to the sample handler")
47 for file
in self.inputList:
52 from pathlib
import Path
54 self.logger.
info(
"Moving the analysis root file and the hist file to the top level.")
55 workDir = Path(self.args.work_dir)
if self.args.work_dir
else Path(
'workDir')
56 rootfileSymlink = (workDir /
'data-ANALYSIS' / f
'{self.outputName}.root')
57 rootfilePath = rootfileSymlink.resolve()
58 histfileSymlink = (workDir / f
'hist-{self.outputName}.root')
59 histfilePath = histfileSymlink.resolve()
60 currentDir = Path.cwd()
62 if rootfilePath.exists():
63 self.logger.
info(f
"Moving {rootfilePath} to {currentDir / f'{self.outputName}.root'}")
64 if rootfileSymlink.is_symlink():
65 rootfileSymlink.unlink()
66 shutil.move(
str(rootfilePath),
str(currentDir / f
"{self.outputName}.root"))
68 self.logger.warning(f
"Root file {rootfilePath} does not exist or merging is enabled, skipping move.")
70 if histfilePath.exists():
71 self.logger.
info(f
"Moving {histfilePath} to {currentDir / f'hist-{self.outputName}.root'}")
72 if histfileSymlink.is_symlink():
73 histfileSymlink.unlink()
74 shutil.move(
str(histfilePath),
str(currentDir / f
"hist-{self.outputName}.root"))
76 self.logger.warning(f
"Histogram file {histfilePath} does not exist or merging, skipping move.")
78 newHistFile = currentDir / f
"hist-{self.outputName}.root"
80 if self.args.merge_output_files
and newHistFile.exists():
81 self.logger.
info(f
"renaming the hist-{self.outputName}.root to {self.outputName}.root")
82 newHistFile.rename(currentDir / f
"{self.outputName}.root")
86 Important if you want to run code after submitting the job, with external driver e.g., ExecDriver.
87 Assistant function to call driver submit. Move the submission to a child process to avoid the main process being terminated.
88 Directly calling external driver submission will not return controls to the main process, the main thread will be terminated.
90 if (pid := os.fork()) == 0:
91 name = self.args.work_dir
if self.args.work_dir
else 'workDir'
92 driver.submit(self.
job, name)
100 statusCode = ROOT.EL.Driver.retrieve(self.args.work_dir
if self.args.work_dir
else 'workDir')
116 self.
job.
options().setDouble(ROOT.EL.Job.optFilesPerWorker, 100)
117 self.
job.
options().setDouble(ROOT.EL.Job.optMaxEvents, self.flags.Exec.MaxEvents)
118 self.
job.
options().setString(ROOT.EL.Job.optSubmitDirMode,
'unique-link')
119 self.
job.
options().setDouble(ROOT.EL.Job.optSkipEvents, self.flags.Exec.SkipEvents)
122 self.
job.algsAdd(alg)
123 if self.args.merge_output_files:
124 self.
job.
options().setString(ROOT.EL.Job.optStreamAliases,
"ANALYSIS=" + ROOT.EL.Job.histogramStreamName)
126 self.
job.outputAdd(ROOT.EL.OutputStream(
'ANALYSIS'))
128 if self.args.run_perf_stat:
129 self.
job.
options().setBool(ROOT.EL.Job.optXAODPerfStats, 1)
130 if self.args.algorithm_timers:
131 self.
job.
options().setBool(ROOT.EL.Job.optAlgorithmTimer, 1)
132 if self.args.algorithm_memory_monitoring:
133 self.
job.
options().setBool(ROOT.EL.Job.optAlgorithmMemoryMonitor, 1)
135 driver = ROOT.EL.DirectDriver()
if self.args.direct_driver
else ROOT.EL.ExecDriver()
139 if self.args.work_dir
is None: