ATLAS Offline Software
Loading...
Searching...
No Matches
EventLoopCPRunScript.py
Go to the documentation of this file.
1# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2from AnalysisAlgorithmsConfig.CPBaseRunner import CPBaseRunner
3import os
4import sys
5
6class EventLoopCPRunScript(CPBaseRunner):
7 def __init__(self):
8 super().__init__()
9 self.logger.info("EventLoopCPRunScript initialized")
11 self.algSeq = None
12 # Avoid putting call to parse_args() here! Otherwise it is hard to retrieve the parser infos
13
15 # add arguments here
16 derivedGroup = self.parser.add_argument_group('EventLoop specific arguments')
17 derivedGroup.add_argument('--direct-driver', dest='direct_driver',
18 action='store_true', help='Run the job with the direct driver')
19 derivedGroup.add_argument('--work-dir', dest='work_dir', nargs='?', const='workDir', default=None,
20 help='The work directory for the EL job. defaults to "workDir".')
21 derivedGroup.add_argument('--merge-output-files', dest='merge_output_files', action='store_true', help='Merge the output histogram and n-tuple files into a single file.')
22 derivedGroup.add_argument('--dump-full-config', dest='dump_full_config', action='store_true', help='Save the full CP configuration log to a json file. This can be useful for debugging purposes.')
23
24 expertGroup = self.parser.add_argument_group('Experts arguments')
25 expertGroup.add_argument('--run-perf-stat', dest='run_perf_stat', action='store_true', help='Run xAOD::PerfStats to get input branch access data. This is mostly useful for AMG experts wanting to understand branch access patterns.')
26 expertGroup.add_argument('--algorithm-timers', dest='algorithm_timers', action='store_true', help='Enable algorithm timers. This is mostly useful for AMG experts wanting to understand tool performance.')
27 expertGroup.add_argument('--algorithm-memory-monitoring', dest='algorithm_memory_monitoring', action='store_true', help='Enable algorithm memory monitoring. This is mostly useful for AMG experts wanting to understand tool memory usage. Note that this is imperfect and may in cases assign memory to the wrong algorithm.')
28 return
29
30 def makeAlgSequence(self):
31 from AnaAlgorithm.AlgSequence import AlgSequence
32 from AnalysisAlgorithmsConfig.ConfigAccumulator import ConfigAccumulator
33 algSeq = AlgSequence()
34 self.logger.info("Configuring algorithms based on YAML file")
35 configSeq = self.config.configure()
36 self.logger.info("Configuring common services")
37 configAccumulator = ConfigAccumulator(flags=self.flags,
38 algSeq=algSeq,
39 noSystematics=self.args.no_systematics)
40 self.logger.info("Configuring algorithms")
41 configSeq.fullConfigure(configAccumulator)
42 self.algSeq = algSeq
43 self.modifyAlgSequence()
44 return algSeq
45
46 def readSamples(self):
47 import ROOT
48 self.sampleHandler = ROOT.SH.SampleHandler()
49 sampleFiles = ROOT.SH.SampleLocal(f"{self.outputName}")
50 self.logger.info("Adding files to the sample handler")
51 for file in self.inputList:
52 sampleFiles.add(file)
53 self.sampleHandler.add(sampleFiles)
54
55 # This functionality should not be in the runscript, instead should be put into PrintConfiguration alg.
56 # This is a temporary solution to dump the full config until PrintConfiguration alg is completely ready.
57 def _dumpFullConfig(self):
58 from AnalysisAlgorithmsConfig.SaveConfigUtils import save_algs_from_sequence_ELjob, combine_tools_and_algorithms_ELjob
59 import json
60 with(open("_alg_sequence.json", 'w', encoding='utf-8')) as seq_out_file:
61 output_dict = {}
62 try:
63 save_algs_from_sequence_ELjob(self.algSeq, output_dict)
64 json.dump(output_dict, seq_out_file, ensure_ascii=False, indent=4)
65 except Exception as e:
66 self.logger.warning(f'Dumping full config failed with: {e}')
67 self.logger.warning('Please also check if "PrintConfiguration" is enabled in the text config.')
68 try:
69 combine_tools_and_algorithms_ELjob(combine_dictionaries=False, alg_file="_alg_sequence.json", output_file="full_config.json")
70 self.logger.info("Combining full config to full_config.json succeeded")
71
72 except Exception as e:
73 self.logger.warning(f'Combining full config failed with: {e}')
74 self.logger.warning('Please also check if "PrintConfiguration" is enabled in the text config.')
75 finally:
76 os.remove("_alg_sequence.json")
77
78 def moveOutputFiles(self):
79 from pathlib import Path
80 import shutil
81 self.logger.info("Moving the analysis root file and the hist file to the top level.")
82 workDir = Path(self.args.work_dir) if self.args.work_dir else Path('workDir')
83 rootfileSymlink = (workDir / 'data-ANALYSIS' / f'{self.outputName}.root')
84 rootfilePath = rootfileSymlink.resolve()
85 histfileSymlink = (workDir / f'hist-{self.outputName}.root')
86 histfilePath = histfileSymlink.resolve()
87 currentDir = Path.cwd()
88 # move ntuple file if it exists
89 if rootfilePath.exists():
90 self.logger.info(f"Moving {rootfilePath} to {currentDir / f'{self.outputName}.root'}")
91 if rootfileSymlink.is_symlink(): # The check is needed to avoid FileNotFoundError if using direct driver
92 rootfileSymlink.unlink()
93 shutil.move(str(rootfilePath), str(currentDir / f"{self.outputName}.root"))
94 else:
95 self.logger.warning(f"Root file {rootfilePath} does not exist or merging is enabled, skipping move.")
96 #move histogram file if it exists
97 if histfilePath.exists():
98 self.logger.info(f"Moving {histfilePath} to {currentDir / f'hist-{self.outputName}.root'}")
99 if histfileSymlink.is_symlink(): # The check is needed to avoid FileNotFoundError if using direct driver
100 histfileSymlink.unlink()
101 shutil.move(str(histfilePath), str(currentDir / f"hist-{self.outputName}.root"))
102 else:
103 self.logger.warning(f"Histogram file {histfilePath} does not exist or merging, skipping move.")
104
105 newHistFile = currentDir / f"hist-{self.outputName}.root"
106 # rename merged hist-ntuple to output_name.root
107 if self.args.merge_output_files and newHistFile.exists():
108 self.logger.info(f"renaming the hist-{self.outputName}.root to {self.outputName}.root")
109 newHistFile.rename(currentDir / f"{self.outputName}.root")
110
111 def driverSubmit(self, driver):
112 '''
113 Important if you want to run code after submitting the job, with external driver e.g., ExecDriver.
114 Assistant function to call driver submit. Move the submission to a child process to avoid the main process being terminated.
115 Directly calling external driver submission will not return controls to the main process, the main thread will be terminated.
116 '''
117 if (pid := os.fork()) == 0: # child process
118 name = self.args.work_dir if self.args.work_dir else 'workDir'
119 driver.submit(self.job, name)
120 exit(0)
121 else:
122 os.waitpid(pid, 0) # parent waits for child process to finish
123 return
124
125 def getExitCode(self):
126 import ROOT
127 statusCode = ROOT.EL.Driver.retrieve(self.args.work_dir if self.args.work_dir else 'workDir')
128 if statusCode:
129 return 0
130 return 1
131
132 def run(self):
133 self.setup()
134 # importing ROOT has a long upfront time, so we do it here
135 import ROOT
136 ROOT.xAOD.Init().ignore()
137 self.readSamples()
138 self.flags.lock()
139 self.printFlags()
140
141 self.job = ROOT.EL.Job()
143 self.job.options().setDouble(ROOT.EL.Job.optFilesPerWorker, 100)
144 self.job.options().setDouble(ROOT.EL.Job.optMaxEvents, self.flags.Exec.MaxEvents)
145 self.job.options().setString(ROOT.EL.Job.optSubmitDirMode, 'unique-link')
146 self.job.options().setDouble(ROOT.EL.Job.optSkipEvents, self.flags.Exec.SkipEvents)
147
148 for alg in self.makeAlgSequence():
149 self.job.algsAdd(alg)
150 if self.args.merge_output_files:
151 self.job.options().setString(ROOT.EL.Job.optStreamAliases, "ANALYSIS=" + ROOT.EL.Job.histogramStreamName)
152 else:
153 self.job.outputAdd(ROOT.EL.OutputStream('ANALYSIS'))
154
155 if self.args.run_perf_stat:
156 self.job.options().setBool(ROOT.EL.Job.optXAODPerfStats, 1)
157 if self.args.algorithm_timers:
158 self.job.options().setBool(ROOT.EL.Job.optAlgorithmTimer, 1)
159 if self.args.algorithm_memory_monitoring:
160 self.job.options().setBool(ROOT.EL.Job.optAlgorithmMemoryMonitor, 1)
161
162 driver = ROOT.EL.DirectDriver() if self.args.direct_driver else ROOT.EL.ExecDriver()
163 self.driverSubmit(driver)
164
165 if self.args.dump_full_config:
166 self._dumpFullConfig()
167 exitCode = self.getExitCode()
168
169 if self.args.work_dir is None: # move output if work_dir is not used
170 self.moveOutputFiles()
171
172 sys.exit(exitCode)
bool configure(asg::AnaToolHandle< ITrigGlobalEfficiencyCorrectionTool > &tool, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronEffToolsHandles, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronSFToolsHandles, ToolHandleArray< CP::IMuonTriggerScaleFactors > &muonToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonEffToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonSFToolsHandles, const std::string &triggers, const std::map< std::string, std::string > &legsPerTool, unsigned long nToys, bool debug)
bool add(const std::string &hname, TKey *tobj)
Definition fastadd.cxx:55