ATLAS Offline Software
EventLoopCPRunScript.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2 from AnalysisAlgorithmsConfig.CPBaseRunner import CPBaseRunner
3 import os
4 import sys
5 
6 class EventLoopCPRunScript(CPBaseRunner):
7  def __init__(self):
8  super().__init__()
9  self.logger.info("EventLoopCPRunScript initialized")
10  self.addCustomArguments()
11  self.algSeq = None
12  # Avoid putting call to parse_args() here! Otherwise it is hard to retrieve the parser infos
13 
14  def addCustomArguments(self):
15  # add arguments here
16  derivedGroup = self.parser.add_argument_group('EventLoop specific arguments')
17  derivedGroup.add_argument('--direct-driver', dest='direct_driver',
18  action='store_true', help='Run the job with the direct driver')
19  derivedGroup.add_argument('--work-dir', dest='work_dir', nargs='?', const='workDir', default=None,
20  help='The work directory for the EL job. defaults to "workDir".')
21  derivedGroup.add_argument('--merge-output-files', dest='merge_output_files', action='store_true', help='Merge the output histogram and n-tuple files into a single file.')
22  derivedGroup.add_argument('--dump-full-config', dest='dump_full_config', action='store_true', help='Save the full CP configuration log to a json file. This can be useful for debugging purposes.')
23 
24  expertGroup = self.parser.add_argument_group('Experts arguments')
25  expertGroup.add_argument('--run-perf-stat', dest='run_perf_stat', action='store_true', help='Run xAOD::PerfStats to get input branch access data. This is mostly useful for AMG experts wanting to understand branch access patterns.')
26  expertGroup.add_argument('--algorithm-timers', dest='algorithm_timers', action='store_true', help='Enable algorithm timers. This is mostly useful for AMG experts wanting to understand tool performance.')
27  expertGroup.add_argument('--algorithm-memory-monitoring', dest='algorithm_memory_monitoring', action='store_true', help='Enable algorithm memory monitoring. This is mostly useful for AMG experts wanting to understand tool memory usage. Note that this is imperfect and may in cases assign memory to the wrong algorithm.')
28  return
29 
30  def makeAlgSequence(self):
31  from AnaAlgorithm.AlgSequence import AlgSequence
32  from AnalysisAlgorithmsConfig.ConfigAccumulator import ConfigAccumulator
33  algSeq = AlgSequence()
34  self.logger.info("Configuring algorithms based on YAML file")
35  configSeq = self.config.configure()
36  self.logger.info("Configuring common services")
37  configAccumulator = ConfigAccumulator(flags=self.flags,
38  algSeq=algSeq,
39  noSystematics=self.args.no_systematics)
40  self.logger.info("Configuring algorithms")
41  configSeq.fullConfigure(configAccumulator)
42  self.algSeq = algSeq
43  return algSeq
44 
45  def readSamples(self):
46  import ROOT
47  self.sampleHandler = ROOT.SH.SampleHandler()
48  sampleFiles = ROOT.SH.SampleLocal(f"{self.outputName}")
49  self.logger.info("Adding files to the sample handler")
50  for file in self.inputList:
51  sampleFiles.add(file)
52  self.sampleHandler.add(sampleFiles)
53 
54  # This functionality should not be in the runscript, instead should be put into PrintConfiguration alg.
55  # This is a temporary solution to dump the full config until PrintConfiguration alg is completely ready.
56  def _dumpFullConfig(self):
57  from AnalysisAlgorithmsConfig.SaveConfigUtils import save_algs_from_sequence_ELjob, combine_tools_and_algorithms_ELjob
58  import json
59  with(open("_alg_sequence.json", 'w', encoding='utf-8')) as seq_out_file:
60  output_dict = {}
61  try:
62  save_algs_from_sequence_ELjob(self.algSeq, output_dict)
63  json.dump(output_dict, seq_out_file, ensure_ascii=False, indent=4)
64  except Exception as e:
65  self.logger.warning(f'Dumping full config failed with: {e}')
66  self.logger.warning('Please also check if "PrintConfiguration" is enabled in the text config.')
67  try:
68  combine_tools_and_algorithms_ELjob(combine_dictionaries=False, alg_file="_alg_sequence.json", output_file="full_config.json")
69  self.logger.info("Combining full config to full_config.json succeeded")
70 
71  except Exception as e:
72  self.logger.warning(f'Combining full config failed with: {e}')
73  self.logger.warning('Please also check if "PrintConfiguration" is enabled in the text config.')
74  finally:
75  os.remove("_alg_sequence.json")
76 
77  def moveOutputFiles(self):
78  from pathlib import Path
79  import shutil
80  self.logger.info("Moving the analysis root file and the hist file to the top level.")
81  workDir = Path(self.args.work_dir) if self.args.work_dir else Path('workDir')
82  rootfileSymlink = (workDir / 'data-ANALYSIS' / f'{self.outputName}.root')
83  rootfilePath = rootfileSymlink.resolve()
84  histfileSymlink = (workDir / f'hist-{self.outputName}.root')
85  histfilePath = histfileSymlink.resolve()
86  currentDir = Path.cwd()
87  # move ntuple file if it exists
88  if rootfilePath.exists():
89  self.logger.info(f"Moving {rootfilePath} to {currentDir / f'{self.outputName}.root'}")
90  if rootfileSymlink.is_symlink(): # The check is needed to avoid FileNotFoundError if using direct driver
91  rootfileSymlink.unlink()
92  shutil.move(str(rootfilePath), str(currentDir / f"{self.outputName}.root"))
93  else:
94  self.logger.warning(f"Root file {rootfilePath} does not exist or merging is enabled, skipping move.")
95  #move histogram file if it exists
96  if histfilePath.exists():
97  self.logger.info(f"Moving {histfilePath} to {currentDir / f'hist-{self.outputName}.root'}")
98  if histfileSymlink.is_symlink(): # The check is needed to avoid FileNotFoundError if using direct driver
99  histfileSymlink.unlink()
100  shutil.move(str(histfilePath), str(currentDir / f"hist-{self.outputName}.root"))
101  else:
102  self.logger.warning(f"Histogram file {histfilePath} does not exist or merging, skipping move.")
103 
104  newHistFile = currentDir / f"hist-{self.outputName}.root"
105  # rename merged hist-ntuple to output_name.root
106  if self.args.merge_output_files and newHistFile.exists():
107  self.logger.info(f"renaming the hist-{self.outputName}.root to {self.outputName}.root")
108  newHistFile.rename(currentDir / f"{self.outputName}.root")
109 
110  def driverSubmit(self, driver):
111  '''
112  Important if you want to run code after submitting the job, with external driver e.g., ExecDriver.
113  Assistant function to call driver submit. Move the submission to a child process to avoid the main process being terminated.
114  Directly calling external driver submission will not return controls to the main process, the main thread will be terminated.
115  '''
116  if (pid := os.fork()) == 0: # child process
117  name = self.args.work_dir if self.args.work_dir else 'workDir'
118  driver.submit(self.job, name)
119  exit(0)
120  else:
121  os.waitpid(pid, 0) # parent waits for child process to finish
122  return
123 
124  def getExitCode(self):
125  import ROOT
126  statusCode = ROOT.EL.Driver.retrieve(self.args.work_dir if self.args.work_dir else 'workDir')
127  if statusCode:
128  return 0
129  return 1
130 
131  def run(self):
132  self.setup()
133  # importing ROOT has a long upfront time, so we do it here
134  import ROOT
135  ROOT.xAOD.Init().ignore()
136  self.readSamples()
137  self.flags.lock()
138  self.printFlags()
139 
140  self.job = ROOT.EL.Job()
141  self.job.sampleHandler(self.sampleHandler)
142  self.job.options().setDouble(ROOT.EL.Job.optFilesPerWorker, 100)
143  self.job.options().setDouble(ROOT.EL.Job.optMaxEvents, self.flags.Exec.MaxEvents)
144  self.job.options().setString(ROOT.EL.Job.optSubmitDirMode, 'unique-link')
145  self.job.options().setDouble(ROOT.EL.Job.optSkipEvents, self.flags.Exec.SkipEvents)
146 
147  for alg in self.makeAlgSequence():
148  self.job.algsAdd(alg)
149  if self.args.merge_output_files:
150  self.job.options().setString(ROOT.EL.Job.optStreamAliases, "ANALYSIS=" + ROOT.EL.Job.histogramStreamName)
151  else:
152  self.job.outputAdd(ROOT.EL.OutputStream('ANALYSIS'))
153 
154  if self.args.run_perf_stat:
155  self.job.options().setBool(ROOT.EL.Job.optXAODPerfStats, 1)
156  if self.args.algorithm_timers:
157  self.job.options().setBool(ROOT.EL.Job.optAlgorithmTimer, 1)
158  if self.args.algorithm_memory_monitoring:
159  self.job.options().setBool(ROOT.EL.Job.optAlgorithmMemoryMonitor, 1)
160 
161  driver = ROOT.EL.DirectDriver() if self.args.direct_driver else ROOT.EL.ExecDriver()
162  self.driverSubmit(driver)
163 
164  if self.args.dump_full_config:
165  self._dumpFullConfig()
166  exitCode = self.getExitCode()
167 
168  if self.args.work_dir is None: # move output if work_dir is not used
169  self.moveOutputFiles()
170 
171  sys.exit(exitCode)
python.EventLoopCPRunScript.EventLoopCPRunScript.__init__
def __init__(self)
Definition: EventLoopCPRunScript.py:7
configure
bool configure(asg::AnaToolHandle< ITrigGlobalEfficiencyCorrectionTool > &tool, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronEffToolsHandles, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronSFToolsHandles, ToolHandleArray< CP::IMuonTriggerScaleFactors > &muonToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonEffToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonSFToolsHandles, const std::string &triggers, const std::map< std::string, std::string > &legsPerTool, unsigned long nToys, bool debug)
Definition: TrigGlobEffCorrValidation.cxx:514
python.EventLoopCPRunScript.EventLoopCPRunScript._dumpFullConfig
def _dumpFullConfig(self)
Definition: EventLoopCPRunScript.py:56
python.AlgSequence.AlgSequence
AlgSequence
Definition: PhysicsAnalysis/D3PDTools/AnaAlgorithm/python/AlgSequence.py:7
python.SaveConfigUtils.combine_tools_and_algorithms_ELjob
def combine_tools_and_algorithms_ELjob(combine_dictionaries=True, text_file='tool_config.txt', alg_file='alg_sequence.json', output_file='my_analysis_config.json')
Definition: SaveConfigUtils.py:10
python.EventLoopCPRunScript.EventLoopCPRunScript.moveOutputFiles
def moveOutputFiles(self)
Definition: EventLoopCPRunScript.py:77
python.RatesEmulationExample.lock
lock
Definition: RatesEmulationExample.py:148
DiTauMassTools::ignore
void ignore(T &&)
Definition: PhysicsAnalysis/TauID/DiTauMassTools/DiTauMassTools/HelperFunctions.h:58
python.EventLoopCPRunScript.EventLoopCPRunScript.readSamples
def readSamples(self)
Definition: EventLoopCPRunScript.py:45
python.EventLoopCPRunScript.EventLoopCPRunScript.getExitCode
def getExitCode(self)
Definition: EventLoopCPRunScript.py:124
python.EventLoopCPRunScript.EventLoopCPRunScript.driverSubmit
def driverSubmit(self, driver)
Definition: EventLoopCPRunScript.py:110
add
bool add(const std::string &hname, TKey *tobj)
Definition: fastadd.cxx:55
python.AtlRunQueryLib.options
options
Definition: AtlRunQueryLib.py:378
python.EventLoopCPRunScript.EventLoopCPRunScript.run
def run(self)
Definition: EventLoopCPRunScript.py:131
python.SaveConfigUtils.save_algs_from_sequence_ELjob
def save_algs_from_sequence_ELjob(sequence, output_dict)
Definition: SaveConfigUtils.py:131
calibdata.exit
exit
Definition: calibdata.py:235
python.EventLoopCPRunScript.EventLoopCPRunScript
Definition: EventLoopCPRunScript.py:6
python.EventLoopCPRunScript.EventLoopCPRunScript.sampleHandler
sampleHandler
Definition: EventLoopCPRunScript.py:47
python.EventLoopCPRunScript.EventLoopCPRunScript.makeAlgSequence
def makeAlgSequence(self)
Definition: EventLoopCPRunScript.py:30
Trk::open
@ open
Definition: BinningType.h:40
python.EventLoopCPRunScript.EventLoopCPRunScript.algSeq
algSeq
Definition: EventLoopCPRunScript.py:11
python.EventLoopCPRunScript.EventLoopCPRunScript.addCustomArguments
def addCustomArguments(self)
Definition: EventLoopCPRunScript.py:14
str
Definition: BTagTrackIpAccessor.cxx:11
python.ParticleTypeUtil.info
def info
Definition: ParticleTypeUtil.py:87
python.EventLoopCPRunScript.EventLoopCPRunScript.job
job
Definition: EventLoopCPRunScript.py:140