ATLAS Offline Software
Loading...
Searching...
No Matches
Tools/WorkflowTestRunner/python/Test.py
Go to the documentation of this file.
1# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2from enum import Enum
3from logging import Logger
4from os import environ
5from pathlib import Path
6from typing import List, Optional
7from uuid import uuid4
8import subprocess
9
10from .Helpers import get_release_setup, list_changed_packages
11from .Inputs import references_CVMFS_path, references_override_url
12from .References import references_map
13
14
16 """Test setup."""
17
18 def __init__(self, logger: Logger) -> None:
19 self.logger = logger
20 self.validation_run_path = Path.cwd()
21 self.reference_run_path = Path("/tmp")
22 self.diff_rules_path = None
23 self.unique_ID = str(uuid4())
25 self.validation_only = False
26 self.run_only = False
27 self.checks_only = False
30 self.release_ID = "main" # The following is not flexible enough, can probably be hardcoded: environ["AtlasVersion"][0:4]
31 self.parallel_execution = False
33 self.custom_threads = None
35
36 def setup_release(self, reference=None, validation=None) -> None:
37 if reference and validation:
38 self.release_reference = reference
39 self.release_validation = validation
40 self.logger.info(f"WARNING: You have specified a dedicated release as reference {reference} and as validation {validation} release.")
41 self.logger.info("Your local setup area will not be considered!!!")
42 self.logger.info("this option is mainly designed for comparing release versions!!")
43 elif reference:
44 self.release_reference = reference
45 self.release_validation = ''
46 self.logger.info(f"You have specified a dedicated release as reference {reference}.")
47 else:
48 self.release_reference = get_release_setup(self.logger, self.disable_release_setup)
50 try:
51 list_changed_packages(self.logger, self.disable_release_setup)
52 except Exception:
53 self.logger.warning("Cannot list changed packages...\n")
54
55
56class WorkflowRun(Enum):
57 Run2 = "Run2"
58 Run3 = "Run3"
59 Run4 = "Run4"
60
61 def __str__(self):
62 return self.name
63
64
65class WorkflowType(Enum):
66 Generation = "Generation"
67 FullSim = "FullSim"
68 AF3 = "AF3"
69 HitsMerge = "HitsMerge"
70 HitsFilter = "HitsFilter"
71 MCOverlay = "MCOverlay"
72 DataOverlay = "DataOverlay"
73 DataOverlayChain = "DataOverlayChain"
74 DataOverlayReco = "DataOverlayReco"
75 MCReco = "MCReco"
76 MCPileUpReco = "MCPileUpReco"
77 DataReco = "DataReco"
78 PileUpPresampling = "PileUpPresampling"
79 MinbiasPreprocessing = "MinbiasPreprocessing"
80 Derivation = "Derivation"
81
82 def __str__(self):
83 return self.name
84
85
87 """Workflow check base class."""
88
89 def __init__(self, setup: TestSetup) -> None:
90 self.setup = setup
91 self.logger = setup.logger
92
93 def reference_file(self, test: "WorkflowTest", file_name: str) -> Optional[Path]:
94 reference_path: Path = test.reference_path
95 reference_file = reference_path / file_name
96
97 # Read references from CVMFS
98 if self.setup.validation_only:
99 # Resolve the subfolder first. Results are stored like: main_folder/branch/test/version/.
100 reference_revision = references_map[f"{test.ID}"]
101 cvmfs_path = Path(references_CVMFS_path)
102 rel_path = Path(self.setup.release_ID) / test.ID / reference_revision
103 reference_path = cvmfs_path / rel_path
104 reference_file = reference_path / file_name
105
106 if not reference_path.exists():
107 self.logger.error(f"CVMFS reference location {reference_path} does not exist!")
108 return None
109
110 if references_override_url is not None:
111 import requests
112
113 url = references_override_url
114 if not url.endswith("/"): url += "/"
115 url += str(rel_path / file_name)
116 self.logger.info("Checking for reference override at %s", url)
117 if requests.head(url).ok: # file exists at url
118 reference_file = Path.cwd() / f"reference_{file_name}"
119 self.logger.info("Downloading reference from %s to %s", url, reference_file)
120 r = requests.get(url, stream=True)
121 with reference_file.open('wb') as f:
122 for chunk in r.iter_content(chunk_size=1024):
123 if chunk: # filter out keep-alive new chunks
124 f.write(chunk)
125 else:
126 self.logger.info("No reference override found")
127
128 return reference_file
129
130
132 """Workflow test base class."""
133
134 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup) -> None:
135 if not hasattr(self, "ID"):
136 self.ID = ID
137
138 if not hasattr(self, "tag"):
139 self.tag = ID
140
141 if not hasattr(self, "steps"):
142 self.steps = steps
143
144 if not self.command:
145 raise NotImplementedError("Command needs to be defined")
146
147 if not hasattr(self, "output_checks"):
149
150 if not hasattr(self, "digest_checks"):
152
153 if not hasattr(self, "skip_performance_checks"):
155
156 self.run = run
157 self.type = type
158 self.setup = setup
159 self.logger = setup.logger
160 self.validation_path: Path = self.setup.validation_run_path / f"run_{self.ID}"
161 self.reference_path: Path = self.setup.reference_run_path / f"run_{self.ID}"
162
163 def run_reference(self) -> None:
164 self.logger.info(f"Running reference in rel {self.setup.release_reference}")
165 self.logger.info(f"\"{self.command}\"")
166
167 self.reference_path.mkdir(parents=True, exist_ok=True)
168
169 cmd = (f"cd {self.reference_path};"
170 f"source $AtlasSetup/scripts/asetup.sh {self.setup.release_reference} >& /dev/null;")
171 cmd += f"TRF_NOECHO=1 {self.command} > {self.ID}.log 2>&1"
172
173 subprocess.call(cmd, shell=True)
174
175 self.logger.info(f"Finished clean in rel {self.setup.release_reference}")
176 self.logger.info(f"\"{self.command}\"")
177
178 def run_validation(self) -> None:
179 self.logger.info(f"Running validation in rel {self.setup.release_validation}")
180 self.logger.info(f"\"{self.command}\"")
181
182 self.validation_path.mkdir(parents=True, exist_ok=True)
183
184 cmd = f"cd {self.validation_path};"
185 if self.setup.disable_release_setup or not self.setup.release_validation:
186 pass
187 elif "WorkDir_DIR" in environ:
188 cmake_build_dir = environ["WorkDir_DIR"]
189 cmd += (f"source $AtlasSetup/scripts/asetup.sh {self.setup.release_validation} >& /dev/null;"
190 f"source {cmake_build_dir}/setup.sh;")
191 else:
192 cmd += f"source $AtlasSetup/scripts/asetup.sh {self.setup.release_validation} >& /dev/null;"
193 cmd += f"TRF_NOECHO=1 {self.command} > {self.ID}.log 2>&1"
194
195 subprocess.call(cmd, shell=True)
196
197 self.logger.info(f"Finished validation in rel {self.setup.release_validation}")
198 self.logger.info(f"\"{self.command}\"")
199
200 def run_checks(self, performance_checks: List[WorkflowCheck]) -> bool:
201 self.logger.info("-----------------------------------------------------")
202 self.logger.info(f"----------- Post-processing of {self.ID} Test -----------")
203 result = True
204
205 # digest checks
206 for check in self.digest_checks:
207 result = check.run(self) and result
208
209 # output checks
210 if not self.setup.disable_output_checks:
211 for check in self.output_checks:
212 result = check.run(self) and result
213
214 if self.setup.validation_only or self.skip_performance_checks:
215 return result # Performance checks against static references not possible
216
217 # performance checks
218 for check in performance_checks:
219 result = check.run(self) and result
220
221 return result
None setup_release(self, reference=None, validation=None)
Optional[Path] reference_file(self, "WorkflowTest" test, str file_name)
bool run_checks(self, List[WorkflowCheck] performance_checks)
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup)