3 from pathlib 
import Path
 
    6 from .Helpers 
import warnings_count
 
    7 from .Inputs 
import references_CVMFS_path
 
    8 from .Test 
import TestSetup, WorkflowCheck, WorkflowTest, WorkflowType
 
   12     """Was the q test successful? To check simply count the number of lines containing the string "successful run".""" 
   14     def run(self, test: WorkflowTest) -> bool:
 
   17         for step 
in test.steps:
 
   18             self.
logger.
info(
"-----------------------------------------------------")
 
   19             log = test.validation_path / f
"log.{step}" 
   24                 with log.open() 
as file:
 
   26                         if (
"ERROR" in line 
and "| ERROR |" not in line) 
or (
"FATAL" in line 
and "| FATAL |" not in line):
 
   27                             errors.append(line[9:].strip())
 
   28                         elif "WARNING" in line 
and "| WARNING |" not in line:
 
   29                             warnings.append(line[9:].strip())
 
   30                         elif '"successful run"' in line:
 
   32                         elif step == 
"DQHistogramMerge" and "Writing file: myHIST.root" in line:  
 
   36                 self.
logger.
info(f
"{step} validation test step WARNINGS")
 
   37                 warnings = 
list(dict.fromkeys(warnings))
 
   40                 self.
logger.
info(
"-----------------------------------------------------")
 
   43                 self.
logger.
info(f
"{step} validation test step ERRORS")
 
   44                 errors = 
list(dict.fromkeys(errors))
 
   47                 self.
logger.
info(
"-----------------------------------------------------")
 
   49             if counter 
and not errors:
 
   50                 self.
logger.
info(f
"{step} validation test step successful")
 
   52                 if step == 
"DQHistogramMerge":
 
   54                     with log.open() 
as file:
 
   57                     self.
logger.
info(
"-----------------------------------------------------")
 
   62                     self.
logger.
error(f
"{step} validation test step failed")
 
   64                     with log.open() 
as file:
 
   67                     self.
logger.
info(
"-----------------------------------------------------")
 
   69                     self.
logger.
error(f
"{step} validation test step did not run")
 
   73                         with (test.validation_path / f
"{test.ID}.log").
open() 
as file:
 
   76                         self.
logger.
info(
"-----------------------------------------------------")
 
   78             if self.
setup.validation_only:
 
   81             log = test.reference_path / f
"log.{step}" 
   83             with log.open() 
as file:
 
   85                     if '"successful run"' in line:
 
   87                     elif (step == 
"DQHistogramMerge"   
   88                           and "Writing file: myHIST.root" in line):
 
   92                 self.
logger.
info(f
"{step} reference test step successful")
 
   94                 self.
logger.
error(f
"{step} reference test step failed")
 
   98             self.
logger.
info(f
"All {test.ID} athena steps completed successfully\n")
 
  100             self.
logger.
error(f
"One or more {test.ID} Athena steps failed. Please investigate the cause.\n")
 
  106     """Run Frozen Tier0 Policy Check.""" 
  108     def __init__(self, setup: TestSetup, input_format: str, max_events: int) -> 
None:
 
  114     def run(self, test: WorkflowTest) -> bool:
 
  115         self.
logger.
info(
"---------------------------------------------------------------------------------------")
 
  116         self.
logger.
info(f
"Running {test.ID} Frozen Tier0 Policy Check on {self.format} for {self.max_events} events")
 
  118         diff_rules_path: Path = self.
setup.diff_rules_path
 
  119         diff_rules_exclusion_filename: str = f
"{test.ID}_{self.format}_diff-exclusion-list.txt" 
  120         diff_rules_interest_filename: str = f
"{test.ID}_{self.format}_diff-interest-list.txt" 
  121         diff_rules_file = 
None 
  123         file_name = f
"my{self.format}.pool.root" 
  124         if test.type == WorkflowType.Derivation:
 
  125             file_name = f
"{self.format}.myOutput.pool.root" 
  127         if reference_file 
is None:
 
  128             self.
logger.
error(f
"Reference file {file_name} not found")
 
  131         if self.
setup.validation_only:
 
  132             cvmfs_path = Path(references_CVMFS_path)
 
  133             diff_rules_path = cvmfs_path / self.
setup.release_ID / test.ID
 
  135         self.
logger.
info(f
"Reading the reference file from location {reference_file}")
 
  138         branches_of_interest = 
False 
  139         if self.
setup.diff_rules_path 
is None:
 
  140             diff_rules_exclusion_local_path = test.validation_path / diff_rules_exclusion_filename
 
  141             diff_rules_interest_local_path = test.validation_path / diff_rules_interest_filename
 
  142             subprocess.Popen([
"/bin/bash", 
"-c", f
"cd {test.validation_path}; get_files -remove -data {diff_rules_exclusion_filename}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
 
  143             subprocess.Popen([
"/bin/bash", 
"-c", f
"cd {test.validation_path}; get_files -remove -data {diff_rules_interest_filename}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
 
  144             if not diff_rules_exclusion_local_path.exists() 
and not diff_rules_interest_local_path.exists():
 
  145                 self.
logger.
info(f
"Neither '{diff_rules_exclusion_local_path}' nor '{diff_rules_interest_local_path}' files exist in the release.")
 
  146             elif diff_rules_exclusion_local_path.exists():
 
  147                 diff_rules_file = diff_rules_exclusion_local_path
 
  148             elif diff_rules_interest_local_path.exists():
 
  149                 diff_rules_file = diff_rules_interest_local_path
 
  150                 branches_of_interest = 
True 
  152         if diff_rules_file 
is None and diff_rules_path 
is not None:
 
  153             diff_rules_file = diff_rules_path / diff_rules_exclusion_filename
 
  154             if not diff_rules_file.exists():
 
  155                 diff_rules_file = diff_rules_path / diff_rules_interest_filename
 
  156                 if diff_rules_file.exists():
 
  157                     branches_of_interest = 
True 
  159         if diff_rules_file 
is not None and diff_rules_file.exists():
 
  160             self.
logger.
info(f
"Reading the diff rules file from location {diff_rules_file}")
 
  162             with diff_rules_file.open() 
as f:
 
  164                     stripped_line = line.rstrip()
 
  165                     if stripped_line 
and stripped_line[0] != 
'#':
 
  166                         diff_root_list.append(
r"'{}'".
format(stripped_line))
 
  168             self.
logger.
info(
"No diff rules file exists, using the default list")
 
  169             diff_root_list = [
r"'index_ref'", 
r"'(.*)_timings(.*)'", 
r"'(.*)_mems(.*)'"]
 
  171         validation_file = test.validation_path / file_name
 
  172         log_file = test.validation_path / f
"diff-root-{test.ID}.{self.format}.log" 
  173         diff_root_list = 
" ".
join(diff_root_list)
 
  174         diff_root_mode = 
"--branches-of-interest" if branches_of_interest 
else "--ignore-leaves" 
  177         comparison_command = f
"acmd.py diff-root {reference_file} {validation_file} --order-trees --nan-equal --exact-branches --mode {comparison_mode} --error-mode resilient {diff_root_mode} {diff_root_list} --entries {self.max_events} > {log_file} 2>&1" 
  178         output, error = subprocess.Popen([
"/bin/bash", 
"-c", comparison_command], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
 
  179         output, error = output.decode(
"utf-8"), error.decode(
"utf-8")
 
  184         passed_frozen_tier0_test = 
True 
  186         with log_file.open() 
as file:
 
  188                 if "WARNING" in line:  
 
  190                     passed_frozen_tier0_test = 
False 
  191                 if "leaves differ" in line:  
 
  193                     passed_frozen_tier0_test = 
False 
  196                     passed_frozen_tier0_test = 
False 
  197                 if "INFO all good." in line:
 
  200         result = passed_frozen_tier0_test 
and all_good
 
  205             if self.
setup.disable_release_setup:
 
  206                 self.
logger.
print(f
"ATLAS-CI-ADD-LABEL: {test.run.value}-{test.type.value}-output-changed")
 
  210                 self.
logger.
error(f
"Your change breaks the frozen derivation policy in test {test.ID}.")
 
  211                 self.
logger.
error(
"Please make sure you explain the reason for the change and ask relevant experts for approval.")
 
  213                 self.
logger.
error(f
"Your change breaks the frozen tier0 policy in test {test.ID}.")
 
  214                 self.
logger.
error(
"Please make sure this has been discussed in the correct meeting (RIG or Simulation) meeting and approved by the relevant experts.")
 
  217             if self.
setup.disable_release_setup:
 
  218                 comparison_command = f
"CopyCIArtifact.sh {validation_file}" 
  219                 output, error = subprocess.Popen([
"/bin/bash", 
"-c", comparison_command], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
 
  220                 output, error = output.decode(
"utf-8"), error.decode(
"utf-8")
 
  222                 if error 
or not output:
 
  223                     self.
logger.
error(f
"Tried copying '{validation_file}' to the CI artifacts area but it failed.")
 
  228             with log_file.open() 
as file:
 
  231                 self.
logger.
info(
"-----------------------------------------------------\n")
 
  237     """Run metadata check.""" 
  239     def __init__(self, setup: TestSetup, input_format: str) -> 
None:
 
  243     def run(self, test: WorkflowTest) -> bool:
 
  244         self.
logger.
info(
"---------------------------------------------------------------------------------------")
 
  245         self.
logger.
info(f
"Running {test.ID} metadata check on {self.format}")
 
  247         file_name = f
"my{self.format}.pool.root" 
  248         if test.type == WorkflowType.Derivation:
 
  249             file_name = f
"{self.format}.myOutput.pool.root" 
  252         if reference_file 
is None:
 
  253             self.
logger.
error(f
"Reference file {file_name} not found")
 
  256         self.
logger.
info(f
"Reading the reference file from location {reference_file}")
 
  258         exclusion_list = 
" ".
join([
"file_guid", 
"file_size", 
"/TagInfo/AtlasRelease", 
"FileMetaData/productionRelease", 
"StreamDAOD_PHYS/eventTypes", 
"StreamDAOD_PHYSLITE/eventTypes"])
 
  260         validation_file = test.validation_path / file_name
 
  261         log_file = test.validation_path / f
"meta-diff-{test.ID}.{self.format}.log" 
  263         comparison_command = f
"meta-diff --ordered -m full -x diff {reference_file} {validation_file} --drop {exclusion_list} --ignoreTrigger > {log_file} 2>&1" 
  264         output, error = subprocess.Popen([
"/bin/bash", 
"-c", comparison_command], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
 
  265         output, error = output.decode(
"utf-8"), error.decode(
"utf-8")
 
  268         with log_file.open() 
as file:
 
  277             if self.
setup.disable_release_setup:
 
  278                 self.
logger.
print(f
"ATLAS-CI-ADD-LABEL: {test.run.value}-{test.type.value}-output-changed")
 
  281             self.
logger.
error(f
"Your change breaks the frozen tier0 policy in test {test.ID}.")
 
  282             self.
logger.
error(
"Please make sure this has been discussed in the correct meeting (RIG or Simulation) meeting and approved by the relevant experts.")
 
  285             if self.
setup.disable_release_setup:
 
  286                 comparison_command = f
"CopyCIArtifact.sh {validation_file}" 
  287                 output, error = subprocess.Popen([
"/bin/bash", 
"-c", comparison_command], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
 
  288                 output, error = output.decode(
"utf-8"), error.decode(
"utf-8")
 
  290                 if error 
or not output:
 
  291                     self.
logger.
error(f
"Tried copying '{validation_file}' to the CI artifacts area but it failed.")
 
  296             with log_file.open() 
as file:
 
  299                 self.
logger.
info(
"-----------------------------------------------------\n")
 
  304     """Run AOD Content Check.""" 
  306     def run(self, test: WorkflowTest) -> bool:
 
  307         self.
logger.
info(
"---------------------------------------------------------------------------------------")
 
  308         self.
logger.
info(f
"Running {test.ID} AOD content check")
 
  310         file_name = 
"myAOD.pool.root" 
  311         output_name = f
"{test.ID}_AOD_content.txt" 
  313         validation_file = test.validation_path / file_name
 
  314         validation_output = test.validation_path / output_name
 
  315         validation_command = f
"acmd.py chk-file {validation_file} | awk '/---/{{flag=1;next}}/===/{{flag=0}}flag' | awk '{{print $10}}' | LC_ALL=C sort | uniq > {validation_output}" 
  317         output_val, error_val = subprocess.Popen([
"/bin/bash", 
"-c", validation_command], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
 
  318         output_val, error_val = output_val.decode(
"utf-8"), error_val.decode(
"utf-8")
 
  320             self.
logger.
error(f
"Something went wrong with retrieving the content for test {test.ID}:")
 
  324         if self.
setup.validation_only:
 
  326             reference_path = test.validation_path
 
  327             reference_output_name = f
"{test.ID}_AOD_content.ref" 
  328             reference_output = reference_path / reference_output_name
 
  329             subprocess.Popen([
"/bin/bash", 
"-c", f
"cd {reference_path}; get_files -remove -data {reference_output_name}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
 
  330             if not reference_output.exists():
 
  331                 self.
logger.
info(f
"No reference file '{reference_output_name}' to compare the content with.")
 
  334             reference_path = test.reference_path
 
  335             reference_output = reference_path / output_name
 
  336             reference_file = reference_path / file_name
 
  338             reference_command = f
"acmd.py chk-file {reference_file} | awk '/---/{{flag=1;next}}/===/{{flag=0}}flag' | awk '{{print $10}}' | LC_ALL=C sort | uniq > {reference_output}" 
  339             subprocess.Popen([
"/bin/bash", 
"-c", reference_command], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
 
  343         if test.type == WorkflowType.MCReco 
or test.type == WorkflowType.MCPileUpReco:
 
  344             extra_diff_args = 
"-I '^HLT' -I '^LVL1' -I '^L1'" 
  347         diff_output, diff_error = subprocess.Popen([
"/bin/bash", 
"-c", f
"diff {extra_diff_args} {reference_output} {validation_output}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
 
  348         diff_output, diff_error = diff_output.decode(
"utf-8"), diff_error.decode(
"utf-8")
 
  351         if not diff_output 
and not diff_error:
 
  356             if self.
setup.disable_release_setup:
 
  357                 self.
logger.
print(f
"ATLAS-CI-ADD-LABEL: {test.run.value}-{test.type.value}-output-changed")
 
  360             self.
logger.
error(f
"Your change modifies the output in test {test.ID}.")
 
  361             self.
logger.
error(
"Please make sure this has been discussed in the correct meeting (RIG or Simulation) meeting and approved by the relevant experts.")
 
  362             if self.
setup.validation_only:
 
  363                 self.
logger.
error(f
"The output '{output_name}' (>) differs from the reference '{reference_output_name}' (<):")
 
  365                 self.
logger.
error(f
"The output '{validation_output}' (>) differs from the reference '{reference_output}' (<):")
 
  371             self.
logger.
info(
"-----------------------------------------------------\n")
 
  377     """Run AOD Digest Check.""" 
  379     def __init__(self, setup: TestSetup, max_events: int = -1) -> 
None:
 
  383     def run(self, test: WorkflowTest) -> bool:
 
  384         self.
logger.
info(
"---------------------------------------------------------------------------------------")
 
  385         self.
logger.
info(f
"Running {test.ID} AOD digest")
 
  387         file_name = 
"myAOD.pool.root" 
  388         output_name = f
"{test.ID}_AOD_digest.txt" 
  390         validation_file = test.validation_path / file_name
 
  391         validation_output = test.validation_path / output_name
 
  392         validation_log_file = test.validation_path / f
"AODdigest-{test.ID}.log" 
  393         validation_command = f
"xAODDigest.py {validation_file} {validation_output} > {validation_log_file} 2>&1" 
  395         output_val, error_val = subprocess.Popen([
"/bin/bash", 
"-c", validation_command], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
 
  396         output_val, error_val = output_val.decode(
"utf-8"), error_val.decode(
"utf-8")
 
  398             self.
logger.
error(f
"Something went wrong with the digest calculation for test {test.ID}:")
 
  402         if self.
setup.validation_only:
 
  404             reference_path = test.validation_path
 
  405             reference_output_name = f
"{test.ID}_AOD_digest.ref" 
  406             reference_output = reference_path / reference_output_name
 
  407             subprocess.Popen([
"/bin/bash", 
"-c", f
"cd {reference_path}; get_files -remove -data {reference_output_name}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
 
  408             if not reference_output.exists():
 
  409                 self.
logger.
info(f
"No reference file '{reference_output_name}' to compare the digest with. Printing the full digest:")
 
  410                 with validation_output.open() 
as f:
 
  415             reference_path = test.reference_path
 
  416             reference_output = reference_path / output_name
 
  417             reference_file = reference_path / file_name
 
  418             reference_log_file = test.reference_path / f
"AODdigest-{test.ID}.log" 
  420             reference_command = f
"xAODDigest.py {reference_file} {reference_output} > {reference_log_file} 2>&1" 
  421             subprocess.Popen([
"/bin/bash", 
"-c", reference_command], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
 
  424         diff_output, diff_error = subprocess.Popen([
"/bin/bash", 
"-c", f
"diff {reference_output} {validation_output}"], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
 
  425         diff_output, diff_error = diff_output.decode(
"utf-8"), diff_error.decode(
"utf-8")
 
  428         if not diff_output 
and not diff_error:
 
  433             if self.
setup.disable_release_setup:
 
  434                 self.
logger.
print(f
"ATLAS-CI-ADD-LABEL: {test.run.value}-{test.type.value}-output-changed")
 
  437             self.
logger.
error(f
"Your change breaks the digest in test {test.ID}.")
 
  438             self.
logger.
error(
"Please make sure this has been discussed in the correct meeting (RIG or Simulation) meeting and approved by the relevant experts.")
 
  439             if self.
setup.validation_only:
 
  440                 self.
logger.
error(f
"The output '{output_name}' (>) differs from the reference '{reference_output_name}' (<):")
 
  442                 self.
logger.
error(f
"The output '{validation_output}' (>) differs from the reference '{reference_output}' (<):")
 
  444                 with reference_output.open() 
as file:
 
  449             self.
logger.
info(
"-----------------------------------------------------\n")
 
  455     """Run A Very Simple Check.""" 
  457     def __init__(self, setup: TestSetup, name: str, quantity: str, unit: str, field: int, threshold: float):
 
  465     def run(self, test: WorkflowTest) -> bool:
 
  466         self.
logger.
info(
"-----------------------------------------------------")
 
  467         self.
logger.
info(f
"Running {test.ID} {self.name} Check")
 
  470         for step 
in test.steps:
 
  471             log_name = f
"log.{step}" 
  472             reference_log = test.reference_path / log_name
 
  473             validation_log = test.validation_path / log_name
 
  476             with reference_log.open() 
as file:
 
  480                         reference_value = 
float(line.split()[self.
field])
 
  484                     self.
logger.
error(f
"No data available in {reference_log}. Job failed.")
 
  488             with validation_log.open() 
as file:
 
  492                         validation_value = 
float(line.split()[self.
field])
 
  496                     self.
logger.
error(f
"No data available in {validation_log}. Job failed.")
 
  499             if reference_value != 0:
 
  500                 factor = validation_value / reference_value
 
  505                     self.
logger.
error(f
"{self.quantity} in the {step} step with(out) your change is {validation_value} ({reference_value}) {self.unit}")
 
  506                     self.
logger.
error(f
"Your change changes {self.quantity} by a factor {factor}")
 
  507                     self.
logger.
error(
"Is this an expected outcome of your change(s)?")
 
  510                     self.
logger.
error(f
"ref  {reference_value} {self.unit}")
 
  511                     self.
logger.
error(f
"val {validation_value} {self.unit}")
 
  513                     self.
logger.warning(f
"{self.quantity} in the {step} step with(out) your change is {validation_value} ({reference_value}) {self.unit}")
 
  514                     self.
logger.warning(f
"Your change changes {self.quantity} by a factor {factor}")
 
  515                     self.
logger.warning(
"Is this an expected outcome of your change(s)?")
 
  517                     self.
logger.warning(f
"{step}: {self.name}")
 
  518                     self.
logger.warning(f
"ref  {reference_value} {self.unit}")
 
  519                     self.
logger.warning(f
"val {validation_value} {self.unit}")
 
  530     """Run WARNINGS check.""" 
  532     def run(self, test: WorkflowTest):
 
  533         self.
logger.
info(
"-----------------------------------------------------")
 
  534         self.
logger.
info(f
"Running {test.ID} WARNINGS Check\n")
 
  537         for step 
in test.steps:
 
  538             log_name = f
"log.{step}" 
  539             reference_log = test.reference_path / log_name
 
  540             validation_log = test.validation_path / log_name
 
  542             warnings_validation  = warnings_count (validation_log)
 
  545             for w 
in warnings_reference:
 
  548             for w 
in warnings_validation:
 
  555             if len(warnings_validation) > len(warnings_reference):
 
  556                 self.
logger.
error(f
"Validation log file {validation_log} has {len(warnings_validation) - len(warnings_reference)} more warning(s) than the reference log file {reference_log}")
 
  557                 self.
logger.
error(
"Please remove the new warning message(s):")
 
  561             elif len(warnings_validation) < len(warnings_reference):
 
  562                 self.
logger.
info(f
"Validation log file {validation_log} has {len(warnings_reference) - len(warnings_validation)} less warnings than the reference log file {reference_log}")
 
  563                 self.
logger.
info(
"The reduction of unnecessary WARNINGs is much appreciated. Is it expected?")
 
  564                 self.
logger.
info(
"The following warning messages have been removed:")
 
  569                 self.
logger.
info(f
"Validation log file {validation_log} has the same number of warnings as the reference log file {reference_log}")
 
  585     ignoreTestTypes = [WorkflowType.FullSim, WorkflowType.AF3]
 
  586     ignoreTestIDs = [
'x686']
 
  588     def run(self, test: WorkflowTest):
 
  589         self.
logger.
info(
"-----------------------------------------------------")
 
  590         self.
logger.
info(f
"Running {test.ID} FPE Check")
 
  593         for step 
in test.steps:
 
  594             log = test.validation_path / f
"log.{step}" 
  597             with log.open() 
as file:
 
  598                 last_stack_trace = 
None 
  600                     if "WARNING FPE" in line:
 
  601                         last_stack_trace = 
None 
  603                         for part 
in reversed(line.split()):
 
  612                                 last_stack_trace = []
 
  613                                 stack_traces[fpe] = last_stack_trace
 
  614                     elif "FPE stacktrace" in line 
and last_stack_trace 
is not None:
 
  616                         last_stack_trace.append(line.strip()[9:])
 
  621                 self.
logger.
log(msgLvl, f
" {step} validation test step FPEs")
 
  622                 for fpe, count 
in sorted(fpes.items(), key=
lambda item: item[1]):
 
  623                     self.
logger.
log(msgLvl, f
"{count:>5}  {fpe}")
 
  624                 for fpe 
in fpes.keys():
 
  625                     self.
logger.
log(msgLvl, 
"-----------------------------------------------------")
 
  626                     self.
logger.
log(msgLvl, f
" first stack trace for algorithm {fpe}:")
 
  627                     for line 
in stack_traces[fpe]:
 
  629                     self.
logger.
log(msgLvl, 
"-----------------------------------------------------")
 
  634             self.
logger.warning(
"Failed!")
 
  635             self.
logger.warning(
"Check disabled due to irreproducibilities!\n")