ATLAS Offline Software
ScriptUtils.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
2 import logging
3 import threading
4 from argparse import ArgumentParser, Namespace
5 from os import environ
6 from pathlib import Path
7 from sys import exit, stdout
8 from typing import List
9 
10 from .Checks import FailedOrPassedCheck, FPECheck, SimpleCheck, WarningsComparisonCheck
11 from .Inputs import references_CVMFS_path
12 from .Test import TestSetup, WorkflowCheck, WorkflowTest, WorkflowType
13 
14 
15 def setup_logger(name: str) -> logging.Logger:
16  # Add level for plain printing
17  printLevel = logging.INFO + 5
18  printName = 'PRINT'
19  printMethodName = 'print'
20 
21  def logForLevel(self, message, *args, **kwargs):
22  if self.isEnabledFor(logging.PRINT):
23  self._log(logging.PRINT, message, args, **kwargs)
24  def logToRoot(message, *args, **kwargs):
25  logging.log(logging.PRINT, message, *args, **kwargs)
26 
27  logging.addLevelName(printLevel, printName)
28  setattr(logging, printName, printLevel)
29  setattr(logging.getLoggerClass(), printMethodName, logForLevel)
30  setattr(logging, printMethodName, logToRoot)
31  logging.addLevelName(logging.INFO + 1, 'PRINT')
32 
33  # Setup global logging
34  class CustomFormatter(logging.Formatter):
35  """Custom formatter."""
36  def __init__(self, fmt):
37  self._default_fmt = fmt
38  super().__init__(fmt, datefmt="%m-%d %H:%M")
39 
40  def format(self, record):
41  if record.levelno == logging.PRINT:
42  self._style._fmt = "%(message)s"
43  else:
44  self._style._fmt = self._default_fmt
45  return super().format(record)
46 
47  fileFormatter = CustomFormatter("%(asctime)s %(levelname)-8s %(message)s")
48  fileHandler = logging.FileHandler(f"./{name}.log", mode="w")
49  fileHandler.setFormatter(fileFormatter)
50 
51  streamFormatter = CustomFormatter("%(levelname)-8s %(message)s")
52  streamHandler = logging.StreamHandler(stdout)
53  streamHandler.setFormatter(streamFormatter)
54 
55  logger = logging.getLogger()
56  logger.addHandler(fileHandler)
57  logger.addHandler(streamHandler)
58  logger.setLevel(logging.INFO)
59 
60  return logger
61 
62 
63 def setup_parser() -> ArgumentParser:
64  parser = ArgumentParser()
65  common = parser.add_argument_group("common")
66  common.add_argument("-e", "--extra", type=str, dest="extra_args", default="",
67  help="Define additional args to pass e.g. --preExec 'r2e':'...' ")
68  common.add_argument("-f", "--fast", action="store_true", dest="fast_mode", default=False,
69  help="""Fast option will run all q tests simultaneously,
70  such that it will run faster if you have 4 cpu core slots on which to run. Be
71  warned! Only recommended when running on a high performance machine, not
72  lxplus!""")
73  common.add_argument("-v", "--validation", action="store_true", dest="validation_only", default=False,
74  help=f"""Run validation only.
75  File output comparisons will only be performed against pre-defined
76  reference files stored in the directory
77  {references_CVMFS_path}
78  and performance comparison tests will not be run.""")
79  common.add_argument("--run-only", action="store_true", dest="run_only", default=False,
80  help="Run only the main command(s).")
81  common.add_argument("--checks-only", type=str, dest="unique_ID", nargs="?", default=None, const='local',
82  help="Re-run only the checks.")
83 
84  advanced = parser.add_argument_group("advanced")
85  advanced.add_argument("--CI", action="store_true", dest="ci_mode", default=False,
86  help="Will not setup Athena - only for CI tests!")
87  advanced.add_argument("--threads", type=int, dest="threads", default=None,
88  help="Override the number of threads to run the test with.")
89  advanced.add_argument("--ref", type=str, dest="reference_release", default=None,
90  help="Define a particular reference release.")
91  advanced.add_argument("--val", type=str, dest="validation_release", default=None,
92  help="Define a particular validation release")
93  advanced.add_argument("--output-path", type=str, dest="validation_run_path", default="",
94  help="Specify the head directory for running the validation tests. The default is ${PWD}")
95  advanced.add_argument("--reference-path", type=str, dest="reference_run_path", default="",
96  help="Specify the head directory for running the reference tests. The default is /tmp/${USER}")
97  advanced.add_argument("-z", "--exclusion-lists", "--interest-lists", type=str, dest="diff_rules_path", default=None,
98  help="""Specify the directory that contains the lists of variables that will be omitted
99  while comparing the outputs. The default is ./ and the format of the files is
100  ${test}_${format}_diff-exclusion-list.txt, e.g. q445_AOD_diff-exclusion-list.txt or
101  ${test}_${format}_diff-interest-list.txt, e.g. q445_AOD_diff-interest-list.txt.
102  The file should contain one regexp per line.""")
103  advanced.add_argument("--no-output-checks", action="store_true", dest="disable_output_checks", default=False,
104  help="Disable output checks")
105  advanced.add_argument("--detailed-comparison", action="store_true", dest="detailed_comparison", default=False,
106  help="Detailed output comparison")
107 
108  tests = parser.add_argument_group("tests")
109  tests.add_argument("-t", "--test", type=str, dest="test", default=None,
110  help="Specify a test to run. Supported options are: sim, overlay, pileup, reco")
111  tests.add_argument("-a", "--tag", type=str, dest="ami_tag", default=None,
112  help="Override the AMI tag of the test.")
113  tests.add_argument("-w", "--workflow", type=WorkflowType, dest="workflow", choices=list(WorkflowType), default=None,
114  help="Specify the workflow that is being run (required for AMI tags or if you want to run only one workflow)")
115  tests.add_argument("--dsid", type=str, dest="dsid", default=None,
116  help="Override the DSID of the test (only for generation).")
117 
118  # shortcuts
119  tests.add_argument("-g", "--gen", action="store_true", dest="generation", default=False,
120  help="Run generation test using Gen_tf.py")
121  tests.add_argument("-s", "--sim", action="store_true", dest="simulation", default=False,
122  help="Run simulation test using Sim_tf.py")
123  tests.add_argument("-o", "--overlay", action="store_true", dest="overlay", default=False,
124  help="Run overlay test using Overlay_tf.py")
125  tests.add_argument("-p", "--pileup", action="store_true", dest="pileup", default=False,
126  help="Run MC reconstruction chain with pile-up")
127  tests.add_argument("-r", "--reco", action="store_true", dest="reco", default=False,
128  help="Run MC reconstruction (in case the default execution also runs simulation)")
129  tests.add_argument("-d", "--derivation", action="store_true", dest="derivation", default=False,
130  help="Run derivation test using Derivation_tf.py")
131 
132  return parser
133 
134 
135 def get_test_setup(name: str, options: Namespace, log: logging.Logger) -> TestSetup:
136  # define test setup
137  setup = TestSetup(log)
138  setup.validation_run_path = Path(options.validation_run_path) if options.validation_run_path else Path.cwd()
139  setup.reference_run_path = Path(options.reference_run_path) if options.reference_run_path else Path(f"/tmp/{environ['USER']}")
140  setup.diff_rules_path = Path(options.diff_rules_path) if options.diff_rules_path is not None else None
141  setup.disable_release_setup = options.ci_mode
142  setup.validation_only = options.validation_only
143  setup.run_only = options.run_only
144  if options.unique_ID:
145  setup.checks_only = True
146  setup.unique_ID = options.unique_ID
147  setup.parallel_execution = options.fast_mode
148  setup.disable_output_checks = options.disable_output_checks
149  setup.custom_threads = options.threads
150  setup.detailed_comparison = options.detailed_comparison
151  # not in global setup:
152  # options.extra_args
153 
154  if options.ami_tag and not options.workflow:
155  log.error("Custom AMI tags supported only with specific workflows!")
156  exit(1)
157 
158  # Are we running in CI
159  if setup.disable_release_setup:
160  log.info("You're running in CI mode.")
161  log.info("This mode assumes athena is setup w/ necessary changes and only runs validation tests.")
162  log.info("Then results are checked against reference files and no performance test is run.")
163  log.info("If you don't know what this mode does, you shouldn't be using it.\n")
164  setup.validation_only = True
165 
166  # Does the clean run head directory exist?
167  if setup.validation_only:
168  log.info("You are running in validation-only mode whereby only tests against your build are being run.")
169  log.info("In this mode ESD and AOD outputs are compared with pre-defined reference files found in the directory")
170  log.info(f"{references_CVMFS_path}\n")
171  if not Path(references_CVMFS_path).exists():
172  log.error(f"Exit. Validation-only mode can only be run on nodes with access to {references_CVMFS_path}")
173  exit(2)
174  elif setup.reference_run_path.exists():
175  log.info(f"The job unique ID is '{setup.unique_ID}' (can be used to re-run the checks)\n")
176  else:
177  log.error("Exit. Please specify a directory that exists for the argument of the --reference-path option\n")
178  log.error(f"{name}.py --reference-path <ExistingDirectory>")
179  exit(1)
180 
181  # Is an ATLAS release setup?
182  if "AtlasPatchVersion" not in environ and "AtlasArea" not in environ and "AtlasBaseDir" not in environ and "AtlasVersion" not in environ:
183  log.warning("Not running in a standard ATLAS release setup.\n")
184 
185  # setup reference path
186  setup.reference_run_path /= f"reference_test_{setup.unique_ID}"
187 
188  # Release setup & list the packages in the local InstallArea
189  setup.setup_release(options.reference_release, options.validation_release)
190 
191  # Parse test string if needed
192  parse_test_string(setup, options)
193 
194  return setup
195 
196 
197 def parse_test_string(setup: TestSetup, options: Namespace) -> None:
198  if not options.test:
199  return
200 
201  test_string = options.test.lower()
202 
203  # simulation
204  if test_string in ["s", "sim", "simulation"]:
205  options.simulation = True
206  return
207 
208  # overlay
209  if test_string in ["o", "overlay"]:
210  options.overlay = True
211  return
212 
213  # pile-up
214  if test_string in ["p", "pileup", "pile-up"]:
215  options.pileup = True
216  return
217 
218  # reco
219  if test_string in ["r", "reco", "reconstruction"]:
220  options.reco = True
221  return
222 
223 
224 def get_standard_performance_checks(setup: TestSetup) -> List[WorkflowCheck]:
225  return [
226  SimpleCheck(setup, "CPU Time" , "evtloop_time", "msec/event", 4, 0.4),
227  SimpleCheck(setup, "Physical Memory", "VmRSS", "kBytes", 4, 0.2),
228  SimpleCheck(setup, "Virtual Memory" , "VmSize", "kBytes", 4, 0.2),
229  SimpleCheck(setup, "Memory Leak" , "leakperevt_evt11", "kBytes/event", 7, 0.05),
231  ]
232 
233 
234 def run_tests(setup: TestSetup, tests: List[WorkflowTest]) -> None:
235  if setup.checks_only:
236  return
237 
238  threads = {}
239  setup.logger.info("------------------ Run Athena workflow test jobs---------------")
240  if setup.parallel_execution and not setup.validation_only:
241  for test in tests:
242  threads[f"{test.ID}_reference"] = threading.Thread(target=lambda test=test: test.run_reference())
243  threads[f"{test.ID}_validation"] = threading.Thread(target=lambda test=test: test.run_validation())
244  threads[f"{test.ID}_reference"].start()
245  threads[f"{test.ID}_validation"].start()
246 
247  for thread in threads:
248  threads[thread].join()
249  elif setup.validation_only:
250  for test in tests:
251  threads[f"{test.ID}_validation"] = threading.Thread(target=lambda test=test: test.run_validation())
252  threads[f"{test.ID}_validation"].start()
253  if not setup.parallel_execution:
254  threads[f"{test.ID}_validation"].join()
255 
256  if setup.parallel_execution:
257  for thread in threads:
258  threads[thread].join()
259  else:
260  for test in tests:
261  threads[f"{test.ID}_reference"] = threading.Thread(target=lambda test=test: test.run_reference())
262  threads[f"{test.ID}_validation"] = threading.Thread(target=lambda test=test: test.run_validation())
263  threads[f"{test.ID}_reference"].start()
264  threads[f"{test.ID}_validation"].start()
265  threads[f"{test.ID}_reference"].join()
266  threads[f"{test.ID}_validation"].join()
267 
268 
269 def run_checks(setup: TestSetup, tests: List[WorkflowTest], performance_checks: List[WorkflowCheck]) -> bool:
270  all_passed = True
271  # define common checks
272  main_check = FailedOrPassedCheck(setup)
273  fpe_check = FPECheck(setup)
274  # run checks
275  for test in tests:
276  test_succeeded = main_check.run(test)
277  test_succeeded = fpe_check.run(test) and test_succeeded
278  all_passed = test_succeeded and all_passed
279  if test_succeeded and not setup.run_only:
280  all_passed = test.run_checks(performance_checks) and all_passed
281  return all_passed
282 
283 
284 def run_summary(setup: TestSetup, tests: List[WorkflowTest], status: bool) -> None:
285  setup.logger.info("-----------------------------------------------------")
286  setup.logger.info("---------------------- Summary ----------------------")
287  if status:
288  setup.logger.info("ALL TESTS: PASSED (0)")
289  else:
290  setup.logger.error("ALL TESTS: FAILED (10)")
291  exit(10)
python.Checks.FailedOrPassedCheck
Definition: Checks.py:11
python.Checks.SimpleCheck
Definition: Checks.py:454
vtune_athena.format
format
Definition: vtune_athena.py:14
mergePhysValFiles.start
start
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:14
python.ScriptUtils.setup_logger
logging.Logger setup_logger(str name)
Definition: ScriptUtils.py:15
python.Checks.FPECheck
Definition: Checks.py:580
python.Test.TestSetup
Definition: Tools/WorkflowTestRunner/python/Test.py:15
python.ScriptUtils.get_standard_performance_checks
List[WorkflowCheck] get_standard_performance_checks(TestSetup setup)
Definition: ScriptUtils.py:224
python.Checks.WarningsComparisonCheck
Definition: Checks.py:529
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.ScriptUtils.run_summary
None run_summary(TestSetup setup, List[WorkflowTest] tests, bool status)
Definition: ScriptUtils.py:284
calibdata.exit
exit
Definition: calibdata.py:236
python.ScriptUtils.setup_parser
ArgumentParser setup_parser()
Definition: ScriptUtils.py:63
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.ScriptUtils.parse_test_string
None parse_test_string(TestSetup setup, Namespace options)
Definition: ScriptUtils.py:197
python.ScriptUtils.get_test_setup
TestSetup get_test_setup(str name, Namespace options, logging.Logger log)
Definition: ScriptUtils.py:135
python.Utilities.__init__
__init__
Definition: Utilities.py:103
python.ScriptUtils.run_tests
None run_tests(TestSetup setup, List[WorkflowTest] tests)
Definition: ScriptUtils.py:234
python.dummyaccess.exists
def exists(filename)
Definition: dummyaccess.py:9
python.ScriptUtils.run_checks
bool run_checks(TestSetup setup, List[WorkflowTest] tests, List[WorkflowCheck] performance_checks)
Definition: ScriptUtils.py:269