4 from argparse
import ArgumentParser, Namespace
6 from pathlib
import Path
7 from sys
import exit, stdout
8 from typing
import List
10 from .Checks
import FailedOrPassedCheck, FPECheck, SimpleCheck, WarningsComparisonCheck
11 from .Inputs
import references_CVMFS_path
12 from .Test
import TestSetup, WorkflowCheck, WorkflowTest, WorkflowType
17 printLevel = logging.INFO + 5
19 printMethodName =
'print'
21 def logForLevel(self, message, *args, **kwargs):
22 if self.isEnabledFor(logging.PRINT):
23 self._log(logging.PRINT, message, args, **kwargs)
24 def logToRoot(message, *args, **kwargs):
25 logging.log(logging.PRINT, message, *args, **kwargs)
27 logging.addLevelName(printLevel, printName)
28 setattr(logging, printName, printLevel)
29 setattr(logging.getLoggerClass(), printMethodName, logForLevel)
30 setattr(logging, printMethodName, logToRoot)
31 logging.addLevelName(logging.INFO + 1,
'PRINT')
34 class CustomFormatter(logging.Formatter):
35 """Custom formatter."""
37 self._default_fmt = fmt
38 super().
__init__(fmt, datefmt=
"%m-%d %H:%M")
41 if record.levelno == logging.PRINT:
42 self._style._fmt =
"%(message)s"
44 self._style._fmt = self._default_fmt
45 return super().
format(record)
47 fileFormatter = CustomFormatter(
"%(asctime)s %(levelname)-8s %(message)s")
48 fileHandler = logging.FileHandler(f
"./{name}.log", mode=
"w")
49 fileHandler.setFormatter(fileFormatter)
51 streamFormatter = CustomFormatter(
"%(levelname)-8s %(message)s")
52 streamHandler = logging.StreamHandler(stdout)
53 streamHandler.setFormatter(streamFormatter)
55 logger = logging.getLogger()
56 logger.addHandler(fileHandler)
57 logger.addHandler(streamHandler)
58 logger.setLevel(logging.INFO)
64 parser = ArgumentParser()
65 common = parser.add_argument_group(
"common")
66 common.add_argument(
"-e",
"--extra", type=str, dest=
"extra_args", default=
"",
67 help=
"Define additional args to pass e.g. --preExec 'r2e':'...' ")
68 common.add_argument(
"-f",
"--fast", action=
"store_true", dest=
"fast_mode", default=
False,
69 help=
"""Fast option will run all q tests simultaneously,
70 such that it will run faster if you have 4 cpu core slots on which to run. Be
71 warned! Only recommended when running on a high performance machine, not
73 common.add_argument(
"-v",
"--validation", action=
"store_true", dest=
"validation_only", default=
False,
74 help=f
"""Run validation only.
75 File output comparisons will only be performed against pre-defined
76 reference files stored in the directory
77 {references_CVMFS_path}
78 and performance comparison tests will not be run.""")
79 common.add_argument(
"--run-only", action=
"store_true", dest=
"run_only", default=
False,
80 help=
"Run only the main command(s).")
81 common.add_argument(
"--checks-only", type=str, dest=
"unique_ID", nargs=
"?", default=
None, const=
'local',
82 help=
"Re-run only the checks.")
84 advanced = parser.add_argument_group(
"advanced")
85 advanced.add_argument(
"--CI", action=
"store_true", dest=
"ci_mode", default=
False,
86 help=
"Will not setup Athena - only for CI tests!")
87 advanced.add_argument(
"--threads", type=int, dest=
"threads", default=
None,
88 help=
"Override the number of threads to run the test with.")
89 advanced.add_argument(
"--ref", type=str, dest=
"reference_release", default=
None,
90 help=
"Define a particular reference release.")
91 advanced.add_argument(
"--val", type=str, dest=
"validation_release", default=
None,
92 help=
"Define a particular validation release")
93 advanced.add_argument(
"--output-path", type=str, dest=
"validation_run_path", default=
"",
94 help=
"Specify the head directory for running the validation tests. The default is ${PWD}")
95 advanced.add_argument(
"--reference-path", type=str, dest=
"reference_run_path", default=
"",
96 help=
"Specify the head directory for running the reference tests. The default is /tmp/${USER}")
97 advanced.add_argument(
"-z",
"--exclusion-lists",
"--interest-lists", type=str, dest=
"diff_rules_path", default=
None,
98 help=
"""Specify the directory that contains the lists of variables that will be omitted
99 while comparing the outputs. The default is ./ and the format of the files is
100 ${test}_${format}_diff-exclusion-list.txt, e.g. q445_AOD_diff-exclusion-list.txt or
101 ${test}_${format}_diff-interest-list.txt, e.g. q445_AOD_diff-interest-list.txt.
102 The file should contain one regexp per line.""")
103 advanced.add_argument(
"--no-output-checks", action=
"store_true", dest=
"disable_output_checks", default=
False,
104 help=
"Disable output checks")
105 advanced.add_argument(
"--detailed-comparison", action=
"store_true", dest=
"detailed_comparison", default=
False,
106 help=
"Detailed output comparison")
108 tests = parser.add_argument_group(
"tests")
109 tests.add_argument(
"-t",
"--test", type=str, dest=
"test", default=
None,
110 help=
"Specify a test to run. Supported options are: sim, overlay, pileup, reco")
111 tests.add_argument(
"-a",
"--tag", type=str, dest=
"ami_tag", default=
None,
112 help=
"Override the AMI tag of the test.")
113 tests.add_argument(
"-w",
"--workflow", type=WorkflowType, dest=
"workflow", choices=
list(WorkflowType), default=
None,
114 help=
"Specify the workflow that is being run (required for AMI tags or if you want to run only one workflow)")
115 tests.add_argument(
"--dsid", type=str, dest=
"dsid", default=
None,
116 help=
"Override the DSID of the test (only for generation).")
119 tests.add_argument(
"-g",
"--gen", action=
"store_true", dest=
"generation", default=
False,
120 help=
"Run generation test using Gen_tf.py")
121 tests.add_argument(
"-s",
"--sim", action=
"store_true", dest=
"simulation", default=
False,
122 help=
"Run simulation test using Sim_tf.py")
123 tests.add_argument(
"-o",
"--overlay", action=
"store_true", dest=
"overlay", default=
False,
124 help=
"Run overlay test using Overlay_tf.py")
125 tests.add_argument(
"-p",
"--pileup", action=
"store_true", dest=
"pileup", default=
False,
126 help=
"Run MC reconstruction chain with pile-up")
127 tests.add_argument(
"-r",
"--reco", action=
"store_true", dest=
"reco", default=
False,
128 help=
"Run MC reconstruction (in case the default execution also runs simulation)")
129 tests.add_argument(
"-d",
"--derivation", action=
"store_true", dest=
"derivation", default=
False,
130 help=
"Run derivation test using Derivation_tf.py")
135 def get_test_setup(name: str, options: Namespace, log: logging.Logger) -> TestSetup:
138 setup.validation_run_path = Path(options.validation_run_path)
if options.validation_run_path
else Path.cwd()
139 setup.reference_run_path = Path(options.reference_run_path)
if options.reference_run_path
else Path(f
"/tmp/{environ['USER']}")
140 setup.diff_rules_path = Path(options.diff_rules_path)
if options.diff_rules_path
is not None else None
141 setup.disable_release_setup = options.ci_mode
142 setup.validation_only = options.validation_only
143 setup.run_only = options.run_only
144 if options.unique_ID:
145 setup.checks_only =
True
146 setup.unique_ID = options.unique_ID
147 setup.parallel_execution = options.fast_mode
148 setup.disable_output_checks = options.disable_output_checks
149 setup.custom_threads = options.threads
150 setup.detailed_comparison = options.detailed_comparison
154 if options.ami_tag
and not options.workflow:
155 log.error(
"Custom AMI tags supported only with specific workflows!")
159 if setup.disable_release_setup:
160 log.info(
"You're running in CI mode.")
161 log.info(
"This mode assumes athena is setup w/ necessary changes and only runs validation tests.")
162 log.info(
"Then results are checked against reference files and no performance test is run.")
163 log.info(
"If you don't know what this mode does, you shouldn't be using it.\n")
164 setup.validation_only =
True
167 if setup.validation_only:
168 log.info(
"You are running in validation-only mode whereby only tests against your build are being run.")
169 log.info(
"In this mode ESD and AOD outputs are compared with pre-defined reference files found in the directory")
170 log.info(f
"{references_CVMFS_path}\n")
171 if not Path(references_CVMFS_path).
exists():
172 log.error(f
"Exit. Validation-only mode can only be run on nodes with access to {references_CVMFS_path}")
174 elif setup.reference_run_path.exists():
175 log.info(f
"The job unique ID is '{setup.unique_ID}' (can be used to re-run the checks)\n")
177 log.error(
"Exit. Please specify a directory that exists for the argument of the --reference-path option\n")
178 log.error(f
"{name}.py --reference-path <ExistingDirectory>")
182 if "AtlasPatchVersion" not in environ
and "AtlasArea" not in environ
and "AtlasBaseDir" not in environ
and "AtlasVersion" not in environ:
183 log.warning(
"Not running in a standard ATLAS release setup.\n")
186 setup.reference_run_path /= f
"reference_test_{setup.unique_ID}"
189 setup.setup_release(options.reference_release, options.validation_release)
201 test_string = options.test.lower()
204 if test_string
in [
"s",
"sim",
"simulation"]:
205 options.simulation =
True
209 if test_string
in [
"o",
"overlay"]:
210 options.overlay =
True
214 if test_string
in [
"p",
"pileup",
"pile-up"]:
215 options.pileup =
True
219 if test_string
in [
"r",
"reco",
"reconstruction"]:
226 SimpleCheck(setup,
"CPU Time" ,
"evtloop_time",
"msec/event", 4, 0.4),
227 SimpleCheck(setup,
"Physical Memory",
"VmRSS",
"kBytes", 4, 0.2),
228 SimpleCheck(setup,
"Virtual Memory" ,
"VmSize",
"kBytes", 4, 0.2),
229 SimpleCheck(setup,
"Memory Leak" ,
"leakperevt_evt11",
"kBytes/event", 7, 0.05),
234 def run_tests(setup: TestSetup, tests: List[WorkflowTest]) ->
None:
235 if setup.checks_only:
239 setup.logger.info(
"------------------ Run Athena workflow test jobs---------------")
240 if setup.parallel_execution
and not setup.validation_only:
242 threads[f
"{test.ID}_reference"] = threading.Thread(target=
lambda test=test: test.run_reference())
243 threads[f
"{test.ID}_validation"] = threading.Thread(target=
lambda test=test: test.run_validation())
244 threads[f
"{test.ID}_reference"].
start()
245 threads[f
"{test.ID}_validation"].
start()
247 for thread
in threads:
248 threads[thread].
join()
249 elif setup.validation_only:
251 threads[f
"{test.ID}_validation"] = threading.Thread(target=
lambda test=test: test.run_validation())
252 threads[f
"{test.ID}_validation"].
start()
253 if not setup.parallel_execution:
254 threads[f
"{test.ID}_validation"].
join()
256 if setup.parallel_execution:
257 for thread
in threads:
258 threads[thread].
join()
261 threads[f
"{test.ID}_reference"] = threading.Thread(target=
lambda test=test: test.run_reference())
262 threads[f
"{test.ID}_validation"] = threading.Thread(target=
lambda test=test: test.run_validation())
263 threads[f
"{test.ID}_reference"].
start()
264 threads[f
"{test.ID}_validation"].
start()
265 threads[f
"{test.ID}_reference"].
join()
266 threads[f
"{test.ID}_validation"].
join()
269 def run_checks(setup: TestSetup, tests: List[WorkflowTest], performance_checks: List[WorkflowCheck]) -> bool:
276 test_succeeded = main_check.run(test)
277 test_succeeded = fpe_check.run(test)
and test_succeeded
278 all_passed = test_succeeded
and all_passed
279 if test_succeeded
and not setup.run_only:
280 all_passed = test.run_checks(performance_checks)
and all_passed
284 def run_summary(setup: TestSetup, tests: List[WorkflowTest], status: bool) ->
None:
285 setup.logger.info(
"-----------------------------------------------------")
286 setup.logger.info(
"---------------------- Summary ----------------------")
288 setup.logger.info(
"ALL TESTS: PASSED (0)")
290 setup.logger.error(
"ALL TESTS: FAILED (10)")