ATLAS Offline Software
EventLoopCPRunScript.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2 from AnalysisAlgorithmsConfig.CPBaseRunner import CPBaseRunner
3 import os
4 import sys
5 
6 class EventLoopCPRunScript(CPBaseRunner):
7  def __init__(self):
8  super().__init__()
9  self.logger.info("EventLoopCPRunScript initialized")
10  self.addCustomArguments()
11  # Avoid putting call to parse_args() here! Otherwise it is hard to retrieve the parser infos
12 
13  def addCustomArguments(self):
14  # add arguments here
15  derivedGroup = self.parser.add_argument_group('EventLoop specific arguments')
16  derivedGroup.add_argument('--direct-driver', dest='direct_driver',
17  action='store_true', help='Run the job with the direct driver')
18  derivedGroup.add_argument('--work-dir', dest='work_dir', nargs='?', const='workDir', default=None,
19  help='The work directory for the EL job. defaults to "workDir".')
20  derivedGroup.add_argument('--merge-output-files', dest='merge_output_files', action='store_true', help='Merge the output histogram and n-tuple files into a single file.')
21 
22  expertGroup = self.parser.add_argument_group('Experts arguments')
23  expertGroup.add_argument('--run-perf-stat', dest='run_perf_stat', action='store_true', help='Run xAOD::PerfStats to get input branch access data. This is mostly useful for AMG experts wanting to understand branch access patterns.')
24  expertGroup.add_argument('--algorithm-timers', dest='algorithm_timers', action='store_true', help='Enable algorithm timers. This is mostly useful for AMG experts wanting to understand tool performance.')
25  expertGroup.add_argument('--algorithm-memory-monitoring', dest='algorithm_memory_monitoring', action='store_true', help='Enable algorithm memory monitoring. This is mostly useful for AMG experts wanting to understand tool memory usage. Note that this is imperfect and may in cases assign memory to the wrong algorithm.')
26  return
27 
28  def makeAlgSequence(self):
29  from AnaAlgorithm.AlgSequence import AlgSequence
30  from AnalysisAlgorithmsConfig.ConfigAccumulator import ConfigAccumulator
31  algSeq = AlgSequence()
32  self.logger.info("Configuring algorithms based on YAML file")
33  configSeq = self.config.configure()
34  self.logger.info("Configuring common services")
35  configAccumulator = ConfigAccumulator(flags=self.flags,
36  algSeq=algSeq,
37  noSystematics=self.args.no_systematics)
38  self.logger.info("Configuring algorithms")
39  configSeq.fullConfigure(configAccumulator)
40  return algSeq
41 
42  def readSamples(self):
43  import ROOT
44  self.sampleHandler = ROOT.SH.SampleHandler()
45  sampleFiles = ROOT.SH.SampleLocal(f"{self.outputName}")
46  self.logger.info("Adding files to the sample handler")
47  for file in self.inputList:
48  sampleFiles.add(file)
49  self.sampleHandler.add(sampleFiles)
50 
51  def moveOutputFiles(self):
52  from pathlib import Path
53  import shutil
54  self.logger.info("Moving the analysis root file and the hist file to the top level.")
55  workDir = Path(self.args.work_dir) if self.args.work_dir else Path('workDir')
56  rootfileSymlink = (workDir / 'data-ANALYSIS' / f'{self.outputName}.root')
57  rootfilePath = rootfileSymlink.resolve()
58  histfileSymlink = (workDir / f'hist-{self.outputName}.root')
59  histfilePath = histfileSymlink.resolve()
60  currentDir = Path.cwd()
61  # move ntuple file if it exists
62  if rootfilePath.exists():
63  self.logger.info(f"Moving {rootfilePath} to {currentDir / f'{self.outputName}.root'}")
64  if rootfileSymlink.is_symlink(): # The check is needed to avoid FileNotFoundError if using direct driver
65  rootfileSymlink.unlink()
66  shutil.move(str(rootfilePath), str(currentDir / f"{self.outputName}.root"))
67  else:
68  self.logger.warning(f"Root file {rootfilePath} does not exist or merging is enabled, skipping move.")
69  #move histogram file if it exists
70  if histfilePath.exists():
71  self.logger.info(f"Moving {histfilePath} to {currentDir / f'hist-{self.outputName}.root'}")
72  if histfileSymlink.is_symlink(): # The check is needed to avoid FileNotFoundError if using direct driver
73  histfileSymlink.unlink()
74  shutil.move(str(histfilePath), str(currentDir / f"hist-{self.outputName}.root"))
75  else:
76  self.logger.warning(f"Histogram file {histfilePath} does not exist or merging, skipping move.")
77 
78  newHistFile = currentDir / f"hist-{self.outputName}.root"
79  # rename merged hist-ntuple to output_name.root
80  if self.args.merge_output_files and newHistFile.exists():
81  self.logger.info(f"renaming the hist-{self.outputName}.root to {self.outputName}.root")
82  newHistFile.rename(currentDir / f"{self.outputName}.root")
83 
84  def driverSubmit(self, driver):
85  '''
86  Important if you want to run code after submitting the job, with external driver e.g., ExecDriver.
87  Assistant function to call driver submit. Move the submission to a child process to avoid the main process being terminated.
88  Directly calling external driver submission will not return controls to the main process, the main thread will be terminated.
89  '''
90  if (pid := os.fork()) == 0: # child process
91  name = self.args.work_dir if self.args.work_dir else 'workDir'
92  driver.submit(self.job, name)
93  exit(0)
94  else:
95  os.waitpid(pid, 0) # parent waits for child process to finish
96  return
97 
98  def getExitCode(self):
99  import ROOT
100  statusCode = ROOT.EL.Driver.retrieve(self.args.work_dir if self.args.work_dir else 'workDir')
101  if statusCode:
102  return 0
103  return 1
104 
105  def run(self):
106  self.setup()
107  # importing ROOT has a long upfront time, so we do it here
108  import ROOT
109  ROOT.xAOD.Init().ignore()
110  self.readSamples()
111  self.flags.lock()
112  self.printFlags()
113 
114  self.job = ROOT.EL.Job()
115  self.job.sampleHandler(self.sampleHandler)
116  self.job.options().setDouble(ROOT.EL.Job.optFilesPerWorker, 100)
117  self.job.options().setDouble(ROOT.EL.Job.optMaxEvents, self.flags.Exec.MaxEvents)
118  self.job.options().setString(ROOT.EL.Job.optSubmitDirMode, 'unique-link')
119  self.job.options().setDouble(ROOT.EL.Job.optSkipEvents, self.flags.Exec.SkipEvents)
120 
121  for alg in self.makeAlgSequence():
122  self.job.algsAdd(alg)
123  if self.args.merge_output_files:
124  self.job.options().setString(ROOT.EL.Job.optStreamAliases, "ANALYSIS=" + ROOT.EL.Job.histogramStreamName)
125  else:
126  self.job.outputAdd(ROOT.EL.OutputStream('ANALYSIS'))
127 
128  if self.args.run_perf_stat:
129  self.job.options().setBool(ROOT.EL.Job.optXAODPerfStats, 1)
130  if self.args.algorithm_timers:
131  self.job.options().setBool(ROOT.EL.Job.optAlgorithmTimer, 1)
132  if self.args.algorithm_memory_monitoring:
133  self.job.options().setBool(ROOT.EL.Job.optAlgorithmMemoryMonitor, 1)
134 
135  driver = ROOT.EL.DirectDriver() if self.args.direct_driver else ROOT.EL.ExecDriver()
136  self.driverSubmit(driver)
137  exitCode = self.getExitCode()
138 
139  if self.args.work_dir is None: # move output if work_dir is not used
140  self.moveOutputFiles()
141 
142  sys.exit(exitCode)
python.EventLoopCPRunScript.EventLoopCPRunScript.__init__
def __init__(self)
Definition: EventLoopCPRunScript.py:7
configure
bool configure(asg::AnaToolHandle< ITrigGlobalEfficiencyCorrectionTool > &tool, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronEffToolsHandles, ToolHandleArray< IAsgElectronEfficiencyCorrectionTool > &electronSFToolsHandles, ToolHandleArray< CP::IMuonTriggerScaleFactors > &muonToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonEffToolsHandles, ToolHandleArray< IAsgPhotonEfficiencyCorrectionTool > &photonSFToolsHandles, const std::string &triggers, const std::map< std::string, std::string > &legsPerTool, unsigned long nToys, bool debug)
Definition: TrigGlobEffCorrValidation.cxx:514
python.AlgSequence.AlgSequence
AlgSequence
Definition: PhysicsAnalysis/D3PDTools/AnaAlgorithm/python/AlgSequence.py:7
python.EventLoopCPRunScript.EventLoopCPRunScript.moveOutputFiles
def moveOutputFiles(self)
Definition: EventLoopCPRunScript.py:51
python.RatesEmulationExample.lock
lock
Definition: RatesEmulationExample.py:148
DiTauMassTools::ignore
void ignore(T &&)
Definition: PhysicsAnalysis/TauID/DiTauMassTools/DiTauMassTools/HelperFunctions.h:58
python.EventLoopCPRunScript.EventLoopCPRunScript.readSamples
def readSamples(self)
Definition: EventLoopCPRunScript.py:42
python.EventLoopCPRunScript.EventLoopCPRunScript.getExitCode
def getExitCode(self)
Definition: EventLoopCPRunScript.py:98
python.EventLoopCPRunScript.EventLoopCPRunScript.driverSubmit
def driverSubmit(self, driver)
Definition: EventLoopCPRunScript.py:84
add
bool add(const std::string &hname, TKey *tobj)
Definition: fastadd.cxx:55
python.AtlRunQueryLib.options
options
Definition: AtlRunQueryLib.py:378
python.EventLoopCPRunScript.EventLoopCPRunScript.run
def run(self)
Definition: EventLoopCPRunScript.py:105
calibdata.exit
exit
Definition: calibdata.py:235
python.EventLoopCPRunScript.EventLoopCPRunScript
Definition: EventLoopCPRunScript.py:6
python.EventLoopCPRunScript.EventLoopCPRunScript.sampleHandler
sampleHandler
Definition: EventLoopCPRunScript.py:44
python.EventLoopCPRunScript.EventLoopCPRunScript.makeAlgSequence
def makeAlgSequence(self)
Definition: EventLoopCPRunScript.py:28
python.EventLoopCPRunScript.EventLoopCPRunScript.addCustomArguments
def addCustomArguments(self)
Definition: EventLoopCPRunScript.py:13
str
Definition: BTagTrackIpAccessor.cxx:11
python.ParticleTypeUtil.info
def info
Definition: ParticleTypeUtil.py:87
python.EventLoopCPRunScript.EventLoopCPRunScript.job
job
Definition: EventLoopCPRunScript.py:114