ATLAS Offline Software
Public Member Functions | Public Attributes | List of all members
python.Test.WorkflowCheck Class Reference
Inheritance diagram for python.Test.WorkflowCheck:
Collaboration diagram for python.Test.WorkflowCheck:

Public Member Functions

None __init__ (self, TestSetup setup)
 
Optional[Path] reference_file (self, "WorkflowTest" test, str file_name)
 

Public Attributes

 setup
 
 logger
 

Detailed Description

Workflow check base class.

Definition at line 83 of file Tools/WorkflowTestRunner/python/Test.py.

Constructor & Destructor Documentation

◆ __init__()

None python.Test.WorkflowCheck.__init__ (   self,
TestSetup  setup 
)

Definition at line 86 of file Tools/WorkflowTestRunner/python/Test.py.

86  def __init__(self, setup: TestSetup) -> None:
87  self.setup = setup
88  self.logger = setup.logger
89 

Member Function Documentation

◆ reference_file()

Optional[Path] python.Test.WorkflowCheck.reference_file (   self,
"WorkflowTest"  test,
str  file_name 
)

Definition at line 90 of file Tools/WorkflowTestRunner/python/Test.py.

90  def reference_file(self, test: "WorkflowTest", file_name: str) -> Optional[Path]:
91  reference_path: Path = test.reference_path
92  reference_file = reference_path / file_name
93 
94  # Read references from CVMFS
95  if self.setup.validation_only:
96  # Resolve the subfolder first. Results are stored like: main_folder/branch/test/version/.
97  reference_revision = references_map[f"{test.ID}"]
98  cvmfs_path = Path(references_CVMFS_path)
99  rel_path = Path(self.setup.release_ID) / test.ID / reference_revision
100  reference_path = cvmfs_path / rel_path
101  reference_file = reference_path / file_name
102 
103  if not reference_path.exists():
104  self.logger.error(f"CVMFS reference location {reference_path} does not exist!")
105  return None
106 
107  if references_override_url is not None:
108  import requests
109 
110  url = references_override_url
111  if not url.endswith("/"): url += "/"
112  url += str(rel_path / file_name)
113  self.logger.info("Checking for reference override at %s", url)
114  if requests.head(url).ok: # file exists at url
115  reference_file = Path.cwd() / f"reference_{file_name}"
116  self.logger.info("Downloading reference from %s to %s", url, reference_file)
117  r = requests.get(url, stream=True)
118  with reference_file.open('wb') as f:
119  for chunk in r.iter_content(chunk_size=1024):
120  if chunk: # filter out keep-alive new chunks
121  f.write(chunk)
122  else:
123  self.logger.info("No reference override found")
124 
125  return reference_file
126 
127 

Member Data Documentation

◆ logger

python.Test.WorkflowCheck.logger

Definition at line 88 of file Tools/WorkflowTestRunner/python/Test.py.

◆ setup

python.Test.WorkflowCheck.setup

Definition at line 87 of file Tools/WorkflowTestRunner/python/Test.py.


The documentation for this class was generated from the following file:
grepfile.info
info
Definition: grepfile.py:38
python.processes.powheg.ZZ.ZZ.__init__
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: ZZ.py:18
str
Definition: BTagTrackIpAccessor.cxx:11
error
Definition: IImpactPoint3dEstimator.h:70