7 from pathlib
import Path
8 from zipfile
import ZipFile, ZIP_DEFLATED
9 from AthenaCommon.Logging
import logging
12 Event file cleanup and transfer preparation utility for JiveXML and VP1 outputs.
13 Handles pairing, validation, pruning, and transfer staging of event files
14 based on run and event numbers, with special support for beam splash events.
20 Main routine for managing cleanup and preparation of JiveXML/VP1 event files.
23 - directory (str): Path where event files are stored.
24 - max_pairs (int): Maximum number of event pairs to keep in the directory.
25 - check_pair (bool): If True, remove unpaired files before pruning.
26 - is_beam_splash_mode (bool): If True, disables pruning and pairing to preserve rare beam splash data.
29 msg = logging.getLogger(
'EventUtils')
30 msg.info(
'%s: Starting to clean directory %s', time.ctime(time.time()), directory)
33 We want to transfer everything for beam splash events are they are rare,
34 so we don't want to miss them
37 if is_beam_splash_mode:
43 if not is_beam_splash_mode:
44 prune(file_pairs, max_pairs,directory)
49 except Exception
as e:
50 msg.error(
'Error occurred while cleaning directory %s: %s', directory,
str(e))
52 msg.info(
'%s: Finished cleaning directory %s', time.ctime(time.time()), directory)
57 Retrieves a list of paired files (JiveXML and VP1) from the specified directory.
59 msg = logging.getLogger(
'EventUtils')
60 msg.info(
'%s: Starting to get event list from directory %s', time.ctime(time.time()), directory)
63 jive_files = [os.path.basename(f)
for f
in glob.glob(f
"{directory}/JiveXML*.xml")]
64 vp1_files = [os.path.basename(f)
for f
in glob.glob(f
"{directory}/vp1*CEST.pool.root")]
67 vp1_pattern = re.compile(
r'vp1_r(\d+)_ev(\d+)_')
68 jive_pattern = re.compile(
r'JiveXML_(\d+)_(\d+)\.xml')
72 for vp1_file
in vp1_files:
73 match = vp1_pattern.search(vp1_file)
75 run, event = match.groups()
76 vp1_dict[(run, event)] = vp1_file
80 for jive_file
in jive_files:
81 match = jive_pattern.search(jive_file)
83 run, event = match.groups()
85 vp1_file = vp1_dict.get((run, event))
86 file_pairs.append((jive_file, vp1_file))
89 file_pairs.append((jive_file,
None))
92 for vp1_file
in vp1_files:
93 if not any(vp1_file
in pair
for pair
in file_pairs):
94 file_pairs.append((
None, vp1_file))
97 file_pairs.sort(key=
lambda pair: (
98 max(os.path.getmtime(os.path.join(directory, f))
if f
else 0
for f
in pair)
101 msg.info(
'%s: Event list retrieved with %d pairs', time.ctime(time.time()), len(file_pairs))
107 Ensures only valid JiveXML-VP1 pairs remain in the list, removing unmatched files.
109 msg = logging.getLogger(
'EventUtils')
110 updated_file_pairs = []
112 for jive_file, vp1_file
in file_pairs[:-1]:
113 if jive_file
and vp1_file:
114 updated_file_pairs.append((jive_file, vp1_file))
116 file_to_remove = jive_file
or vp1_file
118 file_path = os.path.join(directory, file_to_remove)
120 msg.info(
'Removed unmatched file: %s', file_path)
122 return updated_file_pairs
124 def prune(file_pairs, max_pairs, directory):
126 Removes the oldest event pairs to keep only the latest `max_pairs`.
129 - file_pairs (list): List of (JiveXML, VP1) file pairs sorted by timestamp.
130 - max_pairs (int): Maximum number of file pairs to retain.
131 - directory (str): Directory where files are located.
134 - list: The pruned list of `max_pairs` most recent file pairs.
137 if len(file_pairs) <= max_pairs:
140 msg = logging.getLogger(
'EventUtils')
141 msg.info(
'Pruning file list: Keeping latest %d out of %d entries.', max_pairs, len(file_pairs))
144 removed_pairs = file_pairs[:-max_pairs]
147 for jive_file, vp1_file
in removed_pairs:
148 for file
in (jive_file, vp1_file):
150 file_path = os.path.join(directory, file)
153 msg.info(
'Removed %d file pairs.', len(removed_pairs))
156 return file_pairs[-max_pairs:]
160 msg = logging.getLogger(
'EventUtils')
161 msg.info(
'%s begin write event list', time.ctime())
164 temp_filename = os.path.join(directory, f
"{listname}.{pid}")
165 final_filename = os.path.join(directory, f
"{listname}.list")
168 with open(temp_filename,
'w')
as file:
169 for jive_file, vp1_file
in file_pairs:
170 run_number, event_number =
None,
None
178 msg.info(f
"JiveXML: {jive_file}, VP1: {vp1_file}, Run: {run_number}, Event: {event_number}")
181 file.write(f
"run:{run_number or 'N/A'},event:{event_number or 'N/A'},"
182 f
"atlantis:{jive_file or 'N/A'},vp1:{vp1_file or 'N/A'}\n")
184 except IOError
as err:
185 msg.warning(f
"Could not write event list: {err}")
190 os.rename(temp_filename, final_filename)
191 except OSError
as err:
192 msg.warning(f
"Could not rename {temp_filename} to {final_filename}: {err}")
194 msg.info(
'%s end write event list', time.ctime())
197 msg = logging.getLogger(
'EventUtils' )
198 msg.info(
'%s begin prepare files for transfer', time.ctime(time.time()))
200 ready_jive = glob.glob(f
"{directory}/*.zip")
201 copied_jive = glob.glob(f
"{directory}/*.zip.COPIED")
202 ready_vp1 = glob.glob(f
"{directory}/*.online.pool.root")
203 copied_vp1 = glob.glob(f
"{directory}/*.online.pool.root.COPIED")
205 if len(ready_jive)>len(copied_jive)
or len(ready_vp1)>len(copied_vp1):
206 msg.info(
"There are files about to be transferred. Do not attempt to add new files to be transferred.")
213 if (latest_ready_jive_age < timeinterval)
or (latest_ready_vp1_age < timeinterval):
214 msg.info(
"Wait for %ds before adding new events to the transfer queue. Last jive event in the queue was added %ds ago, last vp1 event in the queue was added %ds ago", timeinterval, latest_ready_jive_age, latest_ready_vp1_age)
218 if len(file_pairs) > 1:
219 second_last_element = file_pairs[-2]
220 jive_file, vp1_file = second_last_element
222 jive_without_extension = os.path.splitext(jive_file)[0]
if jive_file
else None
225 vp1_match = re.match(
r"(vp1_r\d+_ev\d+_u\d+)", vp1_file)
if vp1_file
else None
226 vp1_without_extension = vp1_match.group(1)
if vp1_match
else None
228 has_match = any(jive_without_extension
in os.path.basename(f)
for f
in ready_jive)
or any(vp1_without_extension
in os.path.basename(f)
for f
in ready_vp1)
232 msg.info(
'%s going to zip file %s ready for transfer to eos', time.ctime(time.time()), jive_file)
235 msg.info(
'%s going to rename ESD file %s ready for transfer to eos', time.ctime(time.time()), vp1_file)
239 msg = logging.getLogger(
'EventUtils')
240 beamsplash_file = os.path.join(directory,
'beamsplash.list')
241 files_in_beamsplash_file =
set()
244 if os.path.exists(beamsplash_file):
245 file_mod_time = os.path.getmtime(beamsplash_file)
246 if (time.time() - file_mod_time) > 86400:
247 msg.info(f
"{beamsplash_file} is older than 1 day. Recreating...")
248 open(beamsplash_file,
"w").close()
250 with open(beamsplash_file,
"r")
as f:
251 files_in_beamsplash_file =
set(f.read().splitlines())
253 msg.info(f
"{beamsplash_file} does not exist. Creating a new one...")
254 open(beamsplash_file,
"w").close()
256 files_to_transfer = []
257 for jive_file, vp1_file
in file_pairs:
258 if jive_file
and jive_file
not in files_in_beamsplash_file:
259 files_to_transfer.append(jive_file)
261 if vp1_file
and vp1_file
not in files_in_beamsplash_file:
262 files_to_transfer.append(vp1_file)
265 if files_to_transfer:
267 with open(beamsplash_file,
"a+")
as f:
269 existing_data = f.read().strip()
270 existing_files =
set(existing_data.split(
","))
if existing_data
else set()
273 new_files =
set(files_to_transfer) - existing_files
275 separator =
"," if existing_files
else ""
276 f.write(separator +
",".
join(new_files))
278 msg.error(f
"Error handling file {beamsplash_file}: {e}")
282 match_vp1 = re.search(
r'r(\d+)_ev(\d+)', filename)
284 return match_vp1.group(1), match_vp1.group(2)
287 match_jivexml = re.search(
r'JiveXML_(\d+)_(\d+)', filename)
289 return match_jivexml.group(1), match_jivexml.group(2)
294 msg = logging.getLogger(
'EventUtils')
296 if os.path.exists(file_path):
298 msg.info(
'Removed file: %s', file_path)
300 msg.warning(
'File not found, skipping: %s', file_path)
301 except Exception
as e:
302 msg.error(
'Error removing file %s: %s', file_path,
str(e))
305 """Returns the latest modified file timestamp from a list of files."""
307 return (time.time() - 120)
308 latest_file =
max(file_list, key=os.path.getmtime)
309 return os.path.getmtime(latest_file)
312 msg = logging.getLogger(
'EventUtils' )
313 """Zip the JiveXML file for the specified event.
315 Looks for a JiveXML file with the required filename in the given directory,
316 and if one is found, zip it. The original file is not deleted.
317 Zip the file to .tmp first, and then rename to .zip
318 to avoid triggering the transfer before the zip file is closed.
320 msg.info(
'%s begin zipXMLFile', time.ctime(time.time()))
321 if Path(filename).suffix !=
'.xml':
322 msg.warning(
"Unexpected Atlantis file name: %s", filename)
324 matchingFiles = glob.glob(f
"{directory}/{filename}")
325 if len(matchingFiles) == 1:
326 msg.info(
'exactly one matching file found')
327 matchingFilePath = Path(matchingFiles[0])
328 tmpFilePath = matchingFilePath.with_suffix(
'.tmp')
329 zipFilePath = matchingFilePath.with_suffix(
'.zip')
330 matchingFilePath = Path(matchingFilePath)
331 matchingFileName = matchingFilePath.name
332 msg.info(
'Zipping %s to %s', matchingFileName, zipFilePath.name)
334 with ZipFile(tmpFilePath,
'w', compression=ZIP_DEFLATED)
as z:
335 z.write(matchingFilePath.as_posix(), arcname=matchingFileName)
336 os.rename(f
'{directory}/{tmpFilePath.name}', f
'{directory}/{zipFilePath.name}')
337 except OSError
as err:
338 msg.warning(
"Could not zip %s: %s", filename, err)
339 msg.info(
'%s end of zipXMLFile', time.ctime(time.time()))
342 msg = logging.getLogger(
'EventUtils' )
343 """Rename the ESD for the specified event.
345 Looks for an ESD file with the required filename in the given directory,
346 and if one is found, rename it to .online.pool.root. The original file is not deleted.
348 msg.info(
'Begin renaming VP1 file %s for transfer', filename)
349 if Path(filename).suffixes != [
'.pool',
'.root']:
350 msg.warning(
"Unexpected VP1 file name: %s", filename)
352 orgname = f
'{directory}/{filename}'
353 newname = orgname.replace(
'.pool.root',
'.online.pool.root')
355 shutil.copyfile(Path(orgname), Path(newname))
356 except OSError
as err:
357 msg.warning(
"Could not copy %s to %s: %s", orgname, newname, err)