3 from logging
import Logger
5 from pathlib
import Path
6 from typing
import List, Optional
10 from .Helpers
import get_release_setup, list_changed_packages
11 from .Inputs
import references_CVMFS_path, references_override_url
12 from .References
import references_map
37 if reference
and validation:
40 self.
logger.
info(f
"WARNING: You have specified a dedicated release as reference {reference} and as validation {validation} release.")
41 self.
logger.
info(
"Your local setup area will not be considered!!!")
42 self.
logger.
info(
"this option is mainly designed for comparing release versions!!")
46 self.
logger.
info(f
"You have specified a dedicated release as reference {reference}.")
53 self.
logger.warning(
"Cannot list changed packages...\n")
66 Generation =
"Generation"
69 HitsMerge =
"HitsMerge"
70 HitsFilter =
"HitsFilter"
71 MCOverlay =
"MCOverlay"
72 DataOverlay =
"DataOverlay"
74 MCPileUpReco =
"MCPileUpReco"
76 PileUpPresampling =
"PileUpPresampling"
77 Derivation =
"Derivation"
84 """Workflow check base class."""
90 def reference_file(self, test:
"WorkflowTest", file_name: str) -> Optional[Path]:
91 reference_path: Path = test.reference_path
92 reference_file = reference_path / file_name
95 if self.
setup.validation_only:
97 reference_revision = references_map[f
"{test.ID}"]
98 cvmfs_path = Path(references_CVMFS_path)
99 rel_path = Path(self.
setup.release_ID) / test.ID / reference_revision
100 reference_path = cvmfs_path / rel_path
101 reference_file = reference_path / file_name
103 if not reference_path.exists():
104 self.
logger.
error(f
"CVMFS reference location {reference_path} does not exist!")
107 if references_override_url
is not None:
110 url = references_override_url
111 if not url.endswith(
"/"): url +=
"/"
112 url +=
str(rel_path / file_name)
113 self.
logger.
info(
"Checking for reference override at %s", url)
114 if requests.head(url).ok:
115 reference_file = Path.cwd() / f
"reference_{file_name}"
116 self.
logger.
info(
"Downloading reference from %s to %s", url, reference_file)
117 r = requests.get(url, stream=
True)
118 with reference_file.open(
'wb')
as f:
119 for chunk
in r.iter_content(chunk_size=1024):
123 self.
logger.
info(
"No reference override found")
125 return reference_file
129 """Workflow test base class."""
131 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup) ->
None:
132 if not hasattr(self,
"ID"):
135 if not hasattr(self,
"tag"):
138 if not hasattr(self,
"steps"):
142 raise NotImplementedError(
"Command needs to be defined")
144 if not hasattr(self,
"output_checks"):
147 if not hasattr(self,
"digest_checks"):
150 if not hasattr(self,
"skip_performance_checks"):
157 self.validation_path: Path = self.
setup.validation_run_path / f
"run_{self.ID}"
158 self.reference_path: Path = self.
setup.reference_run_path / f
"run_{self.ID}"
161 self.
logger.
info(f
"Running reference in rel {self.setup.release_reference}")
164 self.reference_path.
mkdir(parents=
True, exist_ok=
True)
166 cmd = (f
"cd {self.reference_path};"
167 f
"source $AtlasSetup/scripts/asetup.sh {self.setup.release_reference} >& /dev/null;")
168 cmd += f
"TRF_NOECHO=1 {self.command} > {self.ID}.log 2>&1"
170 subprocess.call(cmd, shell=
True)
172 self.
logger.
info(f
"Finished clean in rel {self.setup.release_reference}")
176 self.
logger.
info(f
"Running validation in rel {self.setup.release_validation}")
179 self.validation_path.
mkdir(parents=
True, exist_ok=
True)
181 cmd = f
"cd {self.validation_path};"
182 if self.
setup.disable_release_setup
or not self.
setup.release_validation:
184 elif "WorkDir_DIR" in environ:
185 cmake_build_dir = environ[
"WorkDir_DIR"]
186 cmd += (f
"source $AtlasSetup/scripts/asetup.sh {self.setup.release_validation} >& /dev/null;"
187 f
"source {cmake_build_dir}/setup.sh;")
189 cmd += f
"source $AtlasSetup/scripts/asetup.sh {self.setup.release_validation} >& /dev/null;"
190 cmd += f
"TRF_NOECHO=1 {self.command} > {self.ID}.log 2>&1"
192 subprocess.call(cmd, shell=
True)
194 self.
logger.
info(f
"Finished validation in rel {self.setup.release_validation}")
197 def run_checks(self, performance_checks: List[WorkflowCheck]) -> bool:
198 self.
logger.
info(
"-----------------------------------------------------")
199 self.
logger.
info(f
"----------- Post-processing of {self.ID} Test -----------")
204 result = check.run(self)
and result
207 if not self.
setup.disable_output_checks:
209 result = check.run(self)
and result
215 for check
in performance_checks:
216 result = check.run(self)
and result