7 from pathlib 
import Path
 
    8 from zipfile 
import ZipFile, ZIP_DEFLATED
 
    9 from AthenaCommon.Logging 
import logging
 
   12 Event file cleanup and transfer preparation utility for JiveXML and VP1 outputs. 
   13 Handles pairing, validation, pruning, and transfer staging of event files 
   14 based on run and event numbers, with special support for beam splash events. 
   20     Main routine for managing cleanup and preparation of JiveXML/VP1 event files. 
   23     - directory (str): Path where event files are stored. 
   24     - max_pairs (int): Maximum number of event pairs to keep in the directory. 
   25     - check_pair (bool): If True, remove unpaired files before pruning. 
   26     - is_beam_splash_mode (bool): If True, disables pruning and pairing to preserve rare beam splash data. 
   29     msg = logging.getLogger(
'EventUtils')
 
   30     msg.info(
'%s: Starting to clean directory %s', time.ctime(time.time()), directory)
 
   33     We want to transfer everything for beam splash events are they are rare, 
   34     so we don't want to miss them 
   37     if is_beam_splash_mode:
 
   43         if not is_beam_splash_mode:
 
   44             prune(file_pairs, max_pairs,directory)
 
   49     except Exception 
as e:
 
   50         msg.error(
'Error occurred while cleaning directory %s: %s', directory, 
str(e))
 
   52     msg.info(
'%s: Finished cleaning directory %s', time.ctime(time.time()), directory)
 
   57     Retrieves a list of paired files (JiveXML and VP1) from the specified directory. 
   59     msg = logging.getLogger(
'EventUtils')
 
   60     msg.info(
'%s: Starting to get event list from directory %s', time.ctime(time.time()), directory)
 
   63     jive_files = [os.path.basename(f) 
for f 
in glob.glob(f
"{directory}/JiveXML*.xml")]
 
   64     vp1_files = [os.path.basename(f) 
for f 
in glob.glob(f
"{directory}/vp1*CEST.pool.root")]
 
   67     vp1_pattern = re.compile(
r'vp1_r(\d+)_ev(\d+)_')
 
   68     jive_pattern = re.compile(
r'JiveXML_(\d+)_(\d+)\.xml')
 
   72     for vp1_file 
in vp1_files:
 
   73         match = vp1_pattern.search(vp1_file)
 
   75             run, event = match.groups()
 
   76             vp1_dict[(run, event)] = vp1_file
 
   80     for jive_file 
in jive_files:
 
   81         match = jive_pattern.search(jive_file)
 
   83             run, event = match.groups()
 
   85             vp1_file = vp1_dict.get((run, event))
 
   86             file_pairs.append((jive_file, vp1_file))
 
   89             file_pairs.append((jive_file, 
None))
 
   92     for vp1_file 
in vp1_files:
 
   93         if not any(vp1_file 
in pair 
for pair 
in file_pairs):
 
   94             file_pairs.append((
None, vp1_file))
 
   97     file_pairs.sort(key=
lambda pair: (
 
   98         max(os.path.getmtime(os.path.join(directory, f)) 
if f 
else 0 
for f 
in pair)
 
  101     msg.info(
'%s: Event list retrieved with %d pairs', time.ctime(time.time()), len(file_pairs))
 
  107     Ensures only valid JiveXML-VP1 pairs remain in the list, removing unmatched files. 
  109     msg = logging.getLogger(
'EventUtils')
 
  110     updated_file_pairs = []
 
  112     for jive_file, vp1_file 
in file_pairs[:-1]:  
 
  113         if jive_file 
and vp1_file:
 
  114             updated_file_pairs.append((jive_file, vp1_file))
 
  116             file_to_remove = jive_file 
or vp1_file
 
  118                 file_path = os.path.join(directory, file_to_remove)
 
  120                 msg.info(
'Removed unmatched file: %s', file_path)
 
  122     return updated_file_pairs
 
  124 def prune(file_pairs, max_pairs, directory):
 
  126     Removes the oldest event pairs to keep only the latest `max_pairs`. 
  129     - file_pairs (list): List of (JiveXML, VP1) file pairs sorted by timestamp. 
  130     - max_pairs (int): Maximum number of file pairs to retain. 
  131     - directory (str): Directory where files are located. 
  134     - list: The pruned list of `max_pairs` most recent file pairs. 
  137     if len(file_pairs) <= max_pairs:
 
  140     msg = logging.getLogger(
'EventUtils')
 
  141     msg.info(
'Pruning file list: Keeping latest %d out of %d entries.', max_pairs, len(file_pairs))
 
  144     removed_pairs = file_pairs[:-max_pairs]
 
  147     for jive_file, vp1_file 
in removed_pairs:
 
  148         for file 
in (jive_file, vp1_file):
 
  150                 file_path = os.path.join(directory, file)
 
  153     msg.info(
'Removed %d file pairs.', len(removed_pairs))
 
  156     return file_pairs[-max_pairs:]
 
  160     msg = logging.getLogger(
'EventUtils')
 
  161     msg.info(
'%s begin write event list', time.ctime())
 
  164     temp_filename = os.path.join(directory, f
"{listname}.{pid}")
 
  165     final_filename = os.path.join(directory, f
"{listname}.list")
 
  168         with open(temp_filename, 
'w') 
as file:
 
  169             for jive_file, vp1_file 
in file_pairs:
 
  170                 run_number, event_number = 
None, 
None 
  178                 msg.info(f
"JiveXML: {jive_file}, VP1: {vp1_file}, Run: {run_number}, Event: {event_number}")
 
  181                 file.write(f
"run:{run_number or 'N/A'},event:{event_number or 'N/A'}," 
  182                            f
"atlantis:{jive_file or 'N/A'},vp1:{vp1_file or 'N/A'}\n")
 
  184     except IOError 
as err:
 
  185         msg.warning(f
"Could not write event list: {err}")
 
  190         os.rename(temp_filename, final_filename)
 
  191     except OSError 
as err:
 
  192         msg.warning(f
"Could not rename {temp_filename} to {final_filename}: {err}")
 
  194     msg.info(
'%s end write event list', time.ctime())
 
  197     msg = logging.getLogger( 
'EventUtils' )
 
  198     msg.info(
'%s begin prepare files for transfer', time.ctime(time.time()))
 
  200     ready_jive = glob.glob(f
"{directory}/*.zip") 
 
  201     copied_jive = glob.glob(f
"{directory}/*.zip.COPIED") 
 
  202     ready_vp1 = glob.glob(f
"{directory}/*.online.pool.root") 
 
  203     copied_vp1 = glob.glob(f
"{directory}/*.online.pool.root.COPIED") 
 
  205     if len(ready_jive)>len(copied_jive) 
or len(ready_vp1)>len(copied_vp1):
 
  206         msg.info(
"There are files about to be transferred. Do not attempt to add new files to be transferred.")
 
  213     if (latest_ready_jive_age < timeinterval) 
or (latest_ready_vp1_age < timeinterval):
 
  214         msg.info(
"Wait for %ds before adding new events to the transfer queue. Last jive event in the queue was added %ds ago, last vp1 event in the queue was added %ds ago", timeinterval, latest_ready_jive_age, latest_ready_vp1_age)
 
  218     if len(file_pairs) > 1:  
 
  219         second_last_element = file_pairs[-2]
 
  220         jive_file, vp1_file = second_last_element
 
  222         jive_without_extension = os.path.splitext(jive_file)[0] 
if jive_file 
else None 
  225         vp1_match = re.match(
r"(vp1_r\d+_ev\d+_u\d+)", vp1_file) 
if vp1_file 
else None 
  226         vp1_without_extension = vp1_match.group(1) 
if vp1_match 
else None 
  228         has_match = any(jive_without_extension 
in os.path.basename(f) 
for f 
in ready_jive) 
or any(vp1_without_extension 
in os.path.basename(f) 
for f 
in ready_vp1)
 
  232             msg.info(
'%s going to zip file %s ready for transfer to eos', time.ctime(time.time()), jive_file)
 
  235             msg.info(
'%s going to rename ESD file %s ready for transfer to eos', time.ctime(time.time()), vp1_file)
 
  239     msg = logging.getLogger(
'EventUtils')
 
  240     beamsplash_file = os.path.join(directory, 
'beamsplash.list')
 
  241     files_in_beamsplash_file = 
set()
 
  244     if os.path.exists(beamsplash_file):
 
  245         file_mod_time = os.path.getmtime(beamsplash_file)
 
  246         if (time.time() - file_mod_time) > 86400:  
 
  247             msg.info(f
"{beamsplash_file} is older than 1 day. Recreating...")
 
  248             open(beamsplash_file, 
"w").close()  
 
  250             with open(beamsplash_file, 
"r") 
as f:
 
  251                 files_in_beamsplash_file = 
set(f.read().splitlines())  
 
  253         msg.info(f
"{beamsplash_file} does not exist. Creating a new one...")
 
  254         open(beamsplash_file, 
"w").close()  
 
  256     files_to_transfer = []
 
  257     for jive_file, vp1_file 
in file_pairs:
 
  258         if jive_file 
and jive_file 
not in files_in_beamsplash_file:
 
  259             files_to_transfer.append(jive_file)
 
  261         if vp1_file 
and vp1_file 
not in files_in_beamsplash_file:
 
  262             files_to_transfer.append(vp1_file)
 
  265     if files_to_transfer:
 
  267             with open(beamsplash_file, 
"a+") 
as f:
 
  269                 existing_data = f.read().strip()
 
  270                 existing_files = 
set(existing_data.split(
",")) 
if existing_data 
else set()
 
  273                 new_files = 
set(files_to_transfer) - existing_files
 
  275                     separator = 
"," if existing_files 
else "" 
  276                     f.write(separator + 
",".
join(new_files))
 
  278             msg.error(f
"Error handling file {beamsplash_file}: {e}")
 
  282     match_vp1 = re.search(
r'r(\d+)_ev(\d+)', filename)
 
  284         return match_vp1.group(1), match_vp1.group(2)
 
  287     match_jivexml = re.search(
r'JiveXML_(\d+)_(\d+)', filename)
 
  289         return match_jivexml.group(1), match_jivexml.group(2)
 
  294     msg = logging.getLogger(
'EventUtils')
 
  296         if os.path.exists(file_path):
 
  298             msg.info(
'Removed file: %s', file_path)
 
  300             msg.warning(
'File not found, skipping: %s', file_path)
 
  301     except Exception 
as e:
 
  302         msg.error(
'Error removing file %s: %s', file_path, 
str(e))
 
  305     """Returns the latest modified file timestamp from a list of files.""" 
  307         return (time.time() - 120)
 
  308     latest_file = 
max(file_list, key=os.path.getmtime)
 
  309     return os.path.getmtime(latest_file)
 
  312     msg = logging.getLogger( 
'EventUtils' )
 
  313     """Zip the JiveXML file for the specified event. 
  315     Looks for a JiveXML file with the required filename in the given directory, 
  316     and if one is found, zip it. The original file is not deleted. 
  317     Zip the file to .tmp first, and then rename to .zip 
  318     to avoid triggering the transfer before the zip file is closed. 
  320     msg.info(
'%s begin zipXMLFile', time.ctime(time.time()))
 
  321     if Path(filename).suffix != 
'.xml':
 
  322         msg.warning(
"Unexpected Atlantis file name: %s", filename)
 
  324     matchingFiles = glob.glob(f
"{directory}/{filename}")
 
  325     if len(matchingFiles) == 1: 
 
  326         msg.info(
'exactly one matching file found')
 
  327         matchingFilePath = Path(matchingFiles[0])
 
  328         tmpFilePath      = matchingFilePath.with_suffix(
'.tmp')
 
  329         zipFilePath      = matchingFilePath.with_suffix(
'.zip')
 
  330         matchingFilePath = Path(matchingFilePath)
 
  331         matchingFileName = matchingFilePath.name
 
  332         msg.info(
'Zipping %s to %s', matchingFileName, zipFilePath.name)
 
  334             with ZipFile(tmpFilePath,
'w', compression=ZIP_DEFLATED) 
as z:
 
  335                 z.write(matchingFilePath.as_posix(), arcname=matchingFileName)
 
  336             os.rename(f
'{directory}/{tmpFilePath.name}', f
'{directory}/{zipFilePath.name}')
 
  337         except OSError 
as err:
 
  338             msg.warning(
"Could not zip %s: %s", filename, err)
 
  339     msg.info(
'%s end of zipXMLFile', time.ctime(time.time()))
 
  342     msg = logging.getLogger( 
'EventUtils' )
 
  343     """Rename the ESD for the specified event. 
  345     Looks for an ESD file with the required filename in the given directory, 
  346     and if one is found, rename it to .online.pool.root. The original file is not deleted. 
  348     msg.info(
'Begin renaming VP1 file %s for transfer', filename)
 
  349     if Path(filename).suffixes != [
'.pool', 
'.root']:
 
  350         msg.warning(
"Unexpected VP1 file name: %s", filename)
 
  352     orgname = f
'{directory}/{filename}' 
  353     newname = orgname.replace(
'.pool.root', 
'.online.pool.root')
 
  355         shutil.copyfile(Path(orgname), Path(newname))
 
  356     except OSError 
as err:
 
  357         msg.warning(
"Could not copy %s to %s: %s", orgname, newname, err)