ATLAS Offline Software
Loading...
Searching...
No Matches
EventLoopCPRunScript.py
Go to the documentation of this file.
1# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2from AnalysisAlgorithmsConfig.CPBaseRunner import CPBaseRunner
3import os
4import sys
5
6class EventLoopCPRunScript(CPBaseRunner):
7 def __init__(self):
8 super().__init__()
9 self.logger.info("EventLoopCPRunScript initialized")
11 self.algSeq = None
12 # Avoid putting call to parse_args() here! Otherwise it is hard to retrieve the parser infos
13
15 # add arguments here
16 derivedGroup = self.parser.add_argument_group('EventLoop specific arguments')
17 derivedGroup.add_argument('--direct-driver', dest='direct_driver',
18 action='store_true', help='Run the job with the direct driver')
19 derivedGroup.add_argument('--work-dir', dest='work_dir', nargs='?', const='workDir', default=None,
20 help='The work directory for the EL job. defaults to "workDir".')
21 derivedGroup.add_argument('--merge-output-files', dest='merge_output_files', action='store_true', help='Merge the output histogram and n-tuple files into a single file.')
22 derivedGroup.add_argument('--dump-full-config', dest='dump_full_config', action='store_true', help='Save the full CP configuration log to a json file. This can be useful for debugging purposes.')
23
24 expertGroup = self.parser.add_argument_group('Experts arguments')
25 expertGroup.add_argument('--run-perf-stat', dest='run_perf_stat', action='store_true', help='Run xAOD::PerfStats to get input branch access data. This is mostly useful for AMG experts wanting to understand branch access patterns.')
26 expertGroup.add_argument('--algorithm-timers', dest='algorithm_timers', action='store_true', help='Enable algorithm timers. This is mostly useful for AMG experts wanting to understand tool performance.')
27 expertGroup.add_argument('--algorithm-memory-monitoring', dest='algorithm_memory_monitoring', action='store_true', help='Enable algorithm memory monitoring. This is mostly useful for AMG experts wanting to understand tool memory usage. Note that this is imperfect and may in cases assign memory to the wrong algorithm.')
28 return
29
30 def makeAlgSequence(self):
31 from AnaAlgorithm.AlgSequence import AlgSequence
32 from AnalysisAlgorithmsConfig.ConfigAccumulator import ConfigAccumulator
33 algSeq = AlgSequence()
34 self.logger.info("Configuring algorithms based on YAML file")
35 configSeq = self.config.configure()
36 self.logger.info("Configuring common services")
37 configAccumulator = ConfigAccumulator(flags=self.flags,
38 algSeq=algSeq,
39 noSystematics=self.args.no_systematics)
40 self.logger.info("Configuring algorithms")
41 configSeq.fullConfigure(configAccumulator)
42 self.algSeq = algSeq
43 return algSeq
44
45 def readSamples(self):
46 import ROOT
47 self.sampleHandler = ROOT.SH.SampleHandler()
48 sampleFiles = ROOT.SH.SampleLocal(f"{self.outputName}")
49 self.logger.info("Adding files to the sample handler")
50 for file in self.inputList:
51 sampleFiles.add(file)
52 self.sampleHandler.add(sampleFiles)
53
54 # This functionality should not be in the runscript, instead should be put into PrintConfiguration alg.
55 # This is a temporary solution to dump the full config until PrintConfiguration alg is completely ready.
56 def _dumpFullConfig(self):
57 from AnalysisAlgorithmsConfig.SaveConfigUtils import save_algs_from_sequence_ELjob, combine_tools_and_algorithms_ELjob
58 import json
59 with(open("_alg_sequence.json", 'w', encoding='utf-8')) as seq_out_file:
60 output_dict = {}
61 try:
62 save_algs_from_sequence_ELjob(self.algSeq, output_dict)
63 json.dump(output_dict, seq_out_file, ensure_ascii=False, indent=4)
64 except Exception as e:
65 self.logger.warning(f'Dumping full config failed with: {e}')
66 self.logger.warning('Please also check if "PrintConfiguration" is enabled in the text config.')
67 try:
68 combine_tools_and_algorithms_ELjob(combine_dictionaries=False, alg_file="_alg_sequence.json", output_file="full_config.json")
69 self.logger.info("Combining full config to full_config.json succeeded")
70
71 except Exception as e:
72 self.logger.warning(f'Combining full config failed with: {e}')
73 self.logger.warning('Please also check if "PrintConfiguration" is enabled in the text config.')
74 finally:
75 os.remove("_alg_sequence.json")
76
77 def moveOutputFiles(self):
78 from pathlib import Path
79 import shutil
80 self.logger.info("Moving the analysis root file and the hist file to the top level.")
81 workDir = Path(self.args.work_dir) if self.args.work_dir else Path('workDir')
82 rootfileSymlink = (workDir / 'data-ANALYSIS' / f'{self.outputName}.root')
83 rootfilePath = rootfileSymlink.resolve()
84 histfileSymlink = (workDir / f'hist-{self.outputName}.root')
85 histfilePath = histfileSymlink.resolve()
86 currentDir = Path.cwd()
87 # move ntuple file if it exists
88 if rootfilePath.exists():
89 self.logger.info(f"Moving {rootfilePath} to {currentDir / f'{self.outputName}.root'}")
90 if rootfileSymlink.is_symlink(): # The check is needed to avoid FileNotFoundError if using direct driver
91 rootfileSymlink.unlink()
92 shutil.move(str(rootfilePath), str(currentDir / f"{self.outputName}.root"))
93 else:
94 self.logger.warning(f"Root file {rootfilePath} does not exist or merging is enabled, skipping move.")
95 #move histogram file if it exists
96 if histfilePath.exists():
97 self.logger.info(f"Moving {histfilePath} to {currentDir / f'hist-{self.outputName}.root'}")
98 if histfileSymlink.is_symlink(): # The check is needed to avoid FileNotFoundError if using direct driver
99 histfileSymlink.unlink()
100 shutil.move(str(histfilePath), str(currentDir / f"hist-{self.outputName}.root"))
101 else:
102 self.logger.warning(f"Histogram file {histfilePath} does not exist or merging, skipping move.")
103
104 newHistFile = currentDir / f"hist-{self.outputName}.root"
105 # rename merged hist-ntuple to output_name.root
106 if self.args.merge_output_files and newHistFile.exists():
107 self.logger.info(f"renaming the hist-{self.outputName}.root to {self.outputName}.root")
108 newHistFile.rename(currentDir / f"{self.outputName}.root")
109
110 def driverSubmit(self, driver):
111 '''
112 Important if you want to run code after submitting the job, with external driver e.g., ExecDriver.
113 Assistant function to call driver submit. Move the submission to a child process to avoid the main process being terminated.
114 Directly calling external driver submission will not return controls to the main process, the main thread will be terminated.
115 '''
116 if (pid := os.fork()) == 0: # child process
117 name = self.args.work_dir if self.args.work_dir else 'workDir'
118 driver.submit(self.job, name)
119 exit(0)
120 else:
121 os.waitpid(pid, 0) # parent waits for child process to finish
122 return
123
124 def getExitCode(self):
125 import ROOT
126 statusCode = ROOT.EL.Driver.retrieve(self.args.work_dir if self.args.work_dir else 'workDir')
127 if statusCode:
128 return 0
129 return 1
130
131 def run(self):
132 self.setup()
133 # importing ROOT has a long upfront time, so we do it here
134 import ROOT
135 ROOT.xAOD.Init().ignore()
136 self.readSamples()
137 self.flags.lock()
138 self.printFlags()
139
140 self.job = ROOT.EL.Job()
142 self.job.options().setDouble(ROOT.EL.Job.optFilesPerWorker, 100)
143 self.job.options().setDouble(ROOT.EL.Job.optMaxEvents, self.flags.Exec.MaxEvents)
144 self.job.options().setString(ROOT.EL.Job.optSubmitDirMode, 'unique-link')
145 self.job.options().setDouble(ROOT.EL.Job.optSkipEvents, self.flags.Exec.SkipEvents)
146
147 for alg in self.makeAlgSequence():
148 self.job.algsAdd(alg)
149 if self.args.merge_output_files:
150 self.job.options().setString(ROOT.EL.Job.optStreamAliases, "ANALYSIS=" + ROOT.EL.Job.histogramStreamName)
151 else:
152 self.job.outputAdd(ROOT.EL.OutputStream('ANALYSIS'))
153
154 if self.args.run_perf_stat:
155 self.job.options().setBool(ROOT.EL.Job.optXAODPerfStats, 1)
156 if self.args.algorithm_timers:
157 self.job.options().setBool(ROOT.EL.Job.optAlgorithmTimer, 1)
158 if self.args.algorithm_memory_monitoring:
159 self.job.options().setBool(ROOT.EL.Job.optAlgorithmMemoryMonitor, 1)
160
161 driver = ROOT.EL.DirectDriver() if self.args.direct_driver else ROOT.EL.ExecDriver()
162 self.driverSubmit(driver)
163
164 if self.args.dump_full_config:
165 self._dumpFullConfig()
166 exitCode = self.getExitCode()
167
168 if self.args.work_dir is None: # move output if work_dir is not used
169 self.moveOutputFiles()
170
171 sys.exit(exitCode)
bool configure(asg::AnaToolHandle< ITrigGlobalEfficiencyCorrectionTool > &tool, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronEffToolsHandles, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronSFToolsHandles, ToolHandleArray< CP::IMuonTriggerScaleFactors > &muonToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonEffToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonSFToolsHandles, const std::string &triggers, const std::map< std::string, std::string > &legsPerTool, unsigned long nToys, bool debug)
bool add(const std::string &hname, TKey *tobj)
Definition fastadd.cxx:55
Definition run.py:1