3 from logging
import Logger
5 from pathlib
import Path
6 from typing
import List, Optional
10 from .Helpers
import get_release_setup, list_changed_packages
11 from .Inputs
import references_CVMFS_path, references_override_url
12 from .References
import references_map
37 if reference
and validation:
40 self.
logger.
info(f
"WARNING: You have specified a dedicated release as reference {reference} and as validation {validation} release.")
41 self.
logger.
info(
"Your local setup area will not be considered!!!")
42 self.
logger.
info(
"this option is mainly designed for comparing release versions!!")
46 self.
logger.
info(f
"You have specified a dedicated release as reference {reference}.")
53 self.
logger.warning(
"Cannot list changed packages...\n")
66 Generation =
"Generation"
69 HitsMerge =
"HitsMerge"
70 HitsFilter =
"HitsFilter"
71 MCOverlay =
"MCOverlay"
72 DataOverlay =
"DataOverlay"
74 MCPileUpReco =
"MCPileUpReco"
76 PileUpPresampling =
"PileUpPresampling"
77 MinbiasPreprocessing =
"MinbiasPreprocessing"
78 Derivation =
"Derivation"
85 """Workflow check base class."""
91 def reference_file(self, test:
"WorkflowTest", file_name: str) -> Optional[Path]:
92 reference_path: Path = test.reference_path
93 reference_file = reference_path / file_name
96 if self.
setup.validation_only:
98 reference_revision = references_map[f
"{test.ID}"]
99 cvmfs_path = Path(references_CVMFS_path)
100 rel_path = Path(self.
setup.release_ID) / test.ID / reference_revision
101 reference_path = cvmfs_path / rel_path
102 reference_file = reference_path / file_name
104 if not reference_path.exists():
105 self.
logger.
error(f
"CVMFS reference location {reference_path} does not exist!")
108 if references_override_url
is not None:
111 url = references_override_url
112 if not url.endswith(
"/"): url +=
"/"
113 url +=
str(rel_path / file_name)
114 self.
logger.
info(
"Checking for reference override at %s", url)
115 if requests.head(url).ok:
116 reference_file = Path.cwd() / f
"reference_{file_name}"
117 self.
logger.
info(
"Downloading reference from %s to %s", url, reference_file)
118 r = requests.get(url, stream=
True)
119 with reference_file.open(
'wb')
as f:
120 for chunk
in r.iter_content(chunk_size=1024):
124 self.
logger.
info(
"No reference override found")
126 return reference_file
130 """Workflow test base class."""
132 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup) ->
None:
133 if not hasattr(self,
"ID"):
136 if not hasattr(self,
"tag"):
139 if not hasattr(self,
"steps"):
143 raise NotImplementedError(
"Command needs to be defined")
145 if not hasattr(self,
"output_checks"):
148 if not hasattr(self,
"digest_checks"):
151 if not hasattr(self,
"skip_performance_checks"):
158 self.validation_path: Path = self.
setup.validation_run_path / f
"run_{self.ID}"
159 self.reference_path: Path = self.
setup.reference_run_path / f
"run_{self.ID}"
162 self.
logger.
info(f
"Running reference in rel {self.setup.release_reference}")
165 self.reference_path.
mkdir(parents=
True, exist_ok=
True)
167 cmd = (f
"cd {self.reference_path};"
168 f
"source $AtlasSetup/scripts/asetup.sh {self.setup.release_reference} >& /dev/null;")
169 cmd += f
"TRF_NOECHO=1 {self.command} > {self.ID}.log 2>&1"
171 subprocess.call(cmd, shell=
True)
173 self.
logger.
info(f
"Finished clean in rel {self.setup.release_reference}")
177 self.
logger.
info(f
"Running validation in rel {self.setup.release_validation}")
180 self.validation_path.
mkdir(parents=
True, exist_ok=
True)
182 cmd = f
"cd {self.validation_path};"
183 if self.
setup.disable_release_setup
or not self.
setup.release_validation:
185 elif "WorkDir_DIR" in environ:
186 cmake_build_dir = environ[
"WorkDir_DIR"]
187 cmd += (f
"source $AtlasSetup/scripts/asetup.sh {self.setup.release_validation} >& /dev/null;"
188 f
"source {cmake_build_dir}/setup.sh;")
190 cmd += f
"source $AtlasSetup/scripts/asetup.sh {self.setup.release_validation} >& /dev/null;"
191 cmd += f
"TRF_NOECHO=1 {self.command} > {self.ID}.log 2>&1"
193 subprocess.call(cmd, shell=
True)
195 self.
logger.
info(f
"Finished validation in rel {self.setup.release_validation}")
198 def run_checks(self, performance_checks: List[WorkflowCheck]) -> bool:
199 self.
logger.
info(
"-----------------------------------------------------")
200 self.
logger.
info(f
"----------- Post-processing of {self.ID} Test -----------")
205 result = check.run(self)
and result
208 if not self.
setup.disable_output_checks:
210 result = check.run(self)
and result
216 for check
in performance_checks:
217 result = check.run(self)
and result