ATLAS Offline Software
Loading...
Searching...
No Matches
EventUtils Namespace Reference

Functions

 cleanDirectory (directory, max_pairs, check_pair, is_beam_splash_mode)
 getEventlist (directory)
 checkPairs (file_pairs, directory)
 prune (file_pairs, max_pairs, directory)
 writeEventlist (directory, file_pairs, listname='event')
 prepareFilesForTransfer (directory, file_pairs, timeinterval)
 prepareALLFilesForTransfer (directory, file_pairs)
 extract_numbers (filename)
 remove_file (file_path)
 get_latest_file_timestamp (file_list)
 zipXMLFile (directory, filename)
 renameESDFile (directory, filename)

Function Documentation

◆ checkPairs()

EventUtils.checkPairs ( file_pairs,
directory )
Ensures only valid JiveXML-VP1 pairs remain in the list, removing unmatched files.

Definition at line 105 of file EventUtils.py.

105def checkPairs(file_pairs, directory):
106 """
107 Ensures only valid JiveXML-VP1 pairs remain in the list, removing unmatched files.
108 """
109 msg = logging.getLogger('EventUtils')
110 updated_file_pairs = []
111
112 for jive_file, vp1_file in file_pairs[:-1]: # Ignore last entry
113 if jive_file and vp1_file:
114 updated_file_pairs.append((jive_file, vp1_file))
115 else:
116 file_to_remove = jive_file or vp1_file
117 if file_to_remove:
118 file_path = os.path.join(directory, file_to_remove)
119 remove_file(file_path)
120 msg.info('Removed unmatched file: %s', file_path)
121
122 return updated_file_pairs
123

◆ cleanDirectory()

EventUtils.cleanDirectory ( directory,
max_pairs,
check_pair,
is_beam_splash_mode )
Main routine for managing cleanup and preparation of JiveXML/VP1 event files.

Parameters:
- directory (str): Path where event files are stored.
- max_pairs (int): Maximum number of event pairs to keep in the directory.
- check_pair (bool): If True, remove unpaired files before pruning.
- is_beam_splash_mode (bool): If True, disables pruning and pairing to preserve rare beam splash data.

Definition at line 17 of file EventUtils.py.

17def cleanDirectory(directory, max_pairs, check_pair, is_beam_splash_mode):
18
19 """
20 Main routine for managing cleanup and preparation of JiveXML/VP1 event files.
21
22 Parameters:
23 - directory (str): Path where event files are stored.
24 - max_pairs (int): Maximum number of event pairs to keep in the directory.
25 - check_pair (bool): If True, remove unpaired files before pruning.
26 - is_beam_splash_mode (bool): If True, disables pruning and pairing to preserve rare beam splash data.
27 """
28
29 msg = logging.getLogger('EventUtils')
30 msg.info('%s: Starting to clean directory %s', time.ctime(time.time()), directory)
31
32 """
33 We want to transfer everything for beam splash events are they are rare,
34 so we don't want to miss them
35 """
36
37 if is_beam_splash_mode:
38 check_pair = False
39 try:
40 file_pairs = getEventlist(directory)
41 if check_pair:
42 file_pairs = checkPairs(file_pairs, directory)
43 if not is_beam_splash_mode:
44 prune(file_pairs, max_pairs,directory)
45 writeEventlist(directory,file_pairs)
46 prepareFilesForTransfer(directory, file_pairs, timeinterval=60)
47 else:
48 prepareALLFilesForTransfer(directory, file_pairs)
49 except Exception as e:
50 msg.error('Error occurred while cleaning directory %s: %s', directory, str(e))
51
52 msg.info('%s: Finished cleaning directory %s', time.ctime(time.time()), directory)
53
54

◆ extract_numbers()

EventUtils.extract_numbers ( filename)

Definition at line 280 of file EventUtils.py.

280def extract_numbers(filename):
281 # Try to match the pattern "r<run>_ev<event>" (for VP1 files)
282 match_vp1 = re.search(r'r(\d+)_ev(\d+)', filename)
283 if match_vp1:
284 return match_vp1.group(1), match_vp1.group(2)
285
286 # Try to match the pattern "JiveXML_<run>_<event>" (for JiveXML files)
287 match_jivexml = re.search(r'JiveXML_(\d+)_(\d+)', filename)
288 if match_jivexml:
289 return match_jivexml.group(1), match_jivexml.group(2)
290
291 return None, None
292

◆ get_latest_file_timestamp()

EventUtils.get_latest_file_timestamp ( file_list)
Returns the latest modified file timestamp from a list of files.

Definition at line 304 of file EventUtils.py.

304def get_latest_file_timestamp(file_list):
305 """Returns the latest modified file timestamp from a list of files."""
306 if not file_list:
307 return (time.time() - 120)
308 latest_file = max(file_list, key=os.path.getmtime)
309 return os.path.getmtime(latest_file)
310
#define max(a, b)
Definition cfImp.cxx:41

◆ getEventlist()

EventUtils.getEventlist ( directory)
Retrieves a list of paired files (JiveXML and VP1) from the specified directory.

Definition at line 55 of file EventUtils.py.

55def getEventlist(directory):
56 """
57 Retrieves a list of paired files (JiveXML and VP1) from the specified directory.
58 """
59 msg = logging.getLogger('EventUtils')
60 msg.info('%s: Starting to get event list from directory %s', time.ctime(time.time()), directory)
61
62 # Find all relevant files in the directory
63 jive_files = [os.path.basename(f) for f in glob.glob(f"{directory}/JiveXML*.xml")]
64 vp1_files = [os.path.basename(f) for f in glob.glob(f"{directory}/vp1*CEST.pool.root")]
65
66 # Compile regex patterns for matching run and event numbers in filenames
67 vp1_pattern = re.compile(r'vp1_r(\d+)_ev(\d+)_')
68 jive_pattern = re.compile(r'JiveXML_(\d+)_(\d+)\.xml')
69
70 # Dictionary to store VP1 files by (run, event) tuple
71 vp1_dict = {}
72 for vp1_file in vp1_files:
73 match = vp1_pattern.search(vp1_file)
74 if match:
75 run, event = match.groups()
76 vp1_dict[(run, event)] = vp1_file
77
78 # Generate file pairs
79 file_pairs = []
80 for jive_file in jive_files:
81 match = jive_pattern.search(jive_file)
82 if match:
83 run, event = match.groups()
84 # Get corresponding VP1 file
85 vp1_file = vp1_dict.get((run, event))
86 file_pairs.append((jive_file, vp1_file))
87 else:
88 # Add jive file without a matching VP1 file
89 file_pairs.append((jive_file, None))
90
91 # Add any VP1 files that don't have a matching JiveXML file
92 for vp1_file in vp1_files:
93 if not any(vp1_file in pair for pair in file_pairs):
94 file_pairs.append((None, vp1_file))
95
96 # Sort file pairs by last modified time (newest last)
97 file_pairs.sort(key=lambda pair: (
98 max(os.path.getmtime(os.path.join(directory, f)) if f else 0 for f in pair)
99 ), reverse=False)
100
101 msg.info('%s: Event list retrieved with %d pairs', time.ctime(time.time()), len(file_pairs))
102
103 return file_pairs
104

◆ prepareALLFilesForTransfer()

EventUtils.prepareALLFilesForTransfer ( directory,
file_pairs )

Definition at line 238 of file EventUtils.py.

238def prepareALLFilesForTransfer(directory, file_pairs):
239 msg = logging.getLogger('EventUtils')
240 beamsplash_file = os.path.join(directory, 'beamsplash.list')
241 files_in_beamsplash_file = set()
242
243 # Check if beamsplash.list exists and is older than 1 day
244 if os.path.exists(beamsplash_file):
245 file_mod_time = os.path.getmtime(beamsplash_file)
246 if (time.time() - file_mod_time) > 86400: # 1 day = 86400 seconds
247 msg.info(f"{beamsplash_file} is older than 1 day. Recreating...")
248 open(beamsplash_file, "w").close() # Recreate the file (empty)
249 else:
250 with open(beamsplash_file, "r") as f:
251 files_in_beamsplash_file = set(f.read().splitlines()) # Read existing lines into a set
252 else:
253 msg.info(f"{beamsplash_file} does not exist. Creating a new one...")
254 open(beamsplash_file, "w").close() # Create the file
255
256 files_to_transfer = []
257 for jive_file, vp1_file in file_pairs:
258 if jive_file and jive_file not in files_in_beamsplash_file:
259 files_to_transfer.append(jive_file)
260 zipXMLFile(directory, jive_file)
261 if vp1_file and vp1_file not in files_in_beamsplash_file:
262 files_to_transfer.append(vp1_file)
263 renameESDFile(directory, vp1_file)
264
265 if files_to_transfer:
266 try:
267 with open(beamsplash_file, "a+") as f:
268 f.seek(0)
269 existing_data = f.read().strip()
270 existing_files = set(existing_data.split(",")) if existing_data else set()
271
272 # Determine new files to add
273 new_files = set(files_to_transfer) - existing_files
274 if new_files:
275 separator = "," if existing_files else ""
276 f.write(separator + ",".join(new_files))
277 except IOError as e:
278 msg.error(f"Error handling file {beamsplash_file}: {e}")
279
STL class.

◆ prepareFilesForTransfer()

EventUtils.prepareFilesForTransfer ( directory,
file_pairs,
timeinterval )

Definition at line 196 of file EventUtils.py.

196def prepareFilesForTransfer(directory, file_pairs, timeinterval):
197 msg = logging.getLogger( 'EventUtils' )
198 msg.info('%s begin prepare files for transfer', time.ctime(time.time()))
199
200 ready_jive = glob.glob(f"{directory}/*.zip") # atlantis files ready for transfer
201 copied_jive = glob.glob(f"{directory}/*.zip.COPIED") # CastorScript bookkeeping files indicating the transfer is done
202 ready_vp1 = glob.glob(f"{directory}/*.online.pool.root") # VP1 files ready for transfer
203 copied_vp1 = glob.glob(f"{directory}/*.online.pool.root.COPIED") # CastorScript bookkeeping files indicating the transfer is done
204
205 if len(ready_jive)>len(copied_jive) or len(ready_vp1)>len(copied_vp1):
206 msg.info("There are files about to be transferred. Do not attempt to add new files to be transferred.")
207 return
208
209 latest_ready_jive_age = time.time() - get_latest_file_timestamp(ready_jive)
210 latest_ready_vp1_age = time.time() - get_latest_file_timestamp(ready_vp1)
211
212 # If files are too recent, wait before adding new files, but only if there are existing files
213 if (latest_ready_jive_age < timeinterval) or (latest_ready_vp1_age < timeinterval):
214 msg.info("Wait for %ds before adding new events to the transfer queue. Last jive event in the queue was added %ds ago, last vp1 event in the queue was added %ds ago", timeinterval, latest_ready_jive_age, latest_ready_vp1_age)
215 return
216
217 #if the last but one pair is already ready for transfer, return, otherwise prepare it for transfer
218 if len(file_pairs) > 1: # Ensure there are at least two elements to be able to get last but one element
219 second_last_element = file_pairs[-2]
220 jive_file, vp1_file = second_last_element
221
222 jive_without_extension = os.path.splitext(jive_file)[0] if jive_file else None
223
224 # Extract relevant part from VP1 file using regex
225 vp1_match = re.match(r"(vp1_r\d+_ev\d+_u\d+)", vp1_file) if vp1_file else None
226 vp1_without_extension = vp1_match.group(1) if vp1_match else None
227
228 has_match = any(jive_without_extension in os.path.basename(f) for f in ready_jive) or any(vp1_without_extension in os.path.basename(f) for f in ready_vp1)
229 if has_match:
230 return
231 if jive_file:
232 msg.info('%s going to zip file %s ready for transfer to eos', time.ctime(time.time()), jive_file)
233 zipXMLFile(directory, jive_file)
234 if vp1_file:
235 msg.info('%s going to rename ESD file %s ready for transfer to eos', time.ctime(time.time()), vp1_file)
236 renameESDFile(directory, vp1_file)
237

◆ prune()

EventUtils.prune ( file_pairs,
max_pairs,
directory )
Removes the oldest event pairs to keep only the latest `max_pairs`.

Parameters:
- file_pairs (list): List of (JiveXML, VP1) file pairs sorted by timestamp.
- max_pairs (int): Maximum number of file pairs to retain.
- directory (str): Directory where files are located.

Returns:
- list: The pruned list of `max_pairs` most recent file pairs.

Definition at line 124 of file EventUtils.py.

124def prune(file_pairs, max_pairs, directory):
125 """
126 Removes the oldest event pairs to keep only the latest `max_pairs`.
127
128 Parameters:
129 - file_pairs (list): List of (JiveXML, VP1) file pairs sorted by timestamp.
130 - max_pairs (int): Maximum number of file pairs to retain.
131 - directory (str): Directory where files are located.
132
133 Returns:
134 - list: The pruned list of `max_pairs` most recent file pairs.
135 """
136
137 if len(file_pairs) <= max_pairs:
138 return [] # No files removed
139
140 msg = logging.getLogger('EventUtils')
141 msg.info('Pruning file list: Keeping latest %d out of %d entries.', max_pairs, len(file_pairs))
142
143 # Determine files to remove (oldest entries)
144 removed_pairs = file_pairs[:-max_pairs]
145
146 # Remove old files from the directory
147 for jive_file, vp1_file in removed_pairs:
148 for file in (jive_file, vp1_file):
149 if file:
150 file_path = os.path.join(directory, file)
151 remove_file(file_path)
152
153 msg.info('Removed %d file pairs.', len(removed_pairs))
154
155 # Return the remaining pairs (latest `max_pairs`)
156 return file_pairs[-max_pairs:]
157
158

◆ remove_file()

EventUtils.remove_file ( file_path)

Definition at line 293 of file EventUtils.py.

293def remove_file(file_path):
294 msg = logging.getLogger('EventUtils')
295 try:
296 if os.path.exists(file_path):
297 os.unlink(file_path)
298 msg.info('Removed file: %s', file_path)
299 else:
300 msg.warning('File not found, skipping: %s', file_path)
301 except Exception as e:
302 msg.error('Error removing file %s: %s', file_path, str(e))
303

◆ renameESDFile()

EventUtils.renameESDFile ( directory,
filename )

Definition at line 341 of file EventUtils.py.

341def renameESDFile(directory, filename):
342 msg = logging.getLogger( 'EventUtils' )
343 """Rename the ESD for the specified event.
344
345 Looks for an ESD file with the required filename in the given directory,
346 and if one is found, rename it to .online.pool.root. The original file is not deleted.
347 """
348 msg.info('Begin renaming VP1 file %s for transfer', filename)
349 if Path(filename).suffixes != ['.pool', '.root']:
350 msg.warning("Unexpected VP1 file name: %s", filename)
351 return
352 orgname = f'{directory}/{filename}'
353 newname = orgname.replace('.pool.root', '.online.pool.root')
354 try:
355 shutil.copyfile(Path(orgname), Path(newname))
356 except OSError as err:
357 msg.warning("Could not copy %s to %s: %s", orgname, newname, err)

◆ writeEventlist()

EventUtils.writeEventlist ( directory,
file_pairs,
listname = 'event' )

Definition at line 159 of file EventUtils.py.

159def writeEventlist(directory, file_pairs, listname='event'):
160 msg = logging.getLogger('EventUtils')
161 msg.info('%s begin write event list', time.ctime())
162
163 pid = os.getpid()
164 temp_filename = os.path.join(directory, f"{listname}.{pid}")
165 final_filename = os.path.join(directory, f"{listname}.list")
166
167 try:
168 with open(temp_filename, 'w') as file:
169 for jive_file, vp1_file in file_pairs:
170 run_number, event_number = None, None
171
172 # Extract run and event numbers from the available file
173 if jive_file:
174 run_number, event_number = extract_numbers(jive_file)
175 elif vp1_file:
176 run_number, event_number = extract_numbers(vp1_file)
177
178 msg.info(f"JiveXML: {jive_file}, VP1: {vp1_file}, Run: {run_number}, Event: {event_number}")
179
180 # Write to file, replacing None values with 'N/A' for clarity
181 file.write(f"run:{run_number or 'N/A'},event:{event_number or 'N/A'},"
182 f"atlantis:{jive_file or 'N/A'},vp1:{vp1_file or 'N/A'}\n")
183
184 except IOError as err:
185 msg.warning(f"Could not write event list: {err}")
186 return # Exit early if writing fails
187
188 # Perform atomic rename operation
189 try:
190 os.rename(temp_filename, final_filename)
191 except OSError as err:
192 msg.warning(f"Could not rename {temp_filename} to {final_filename}: {err}")
193
194 msg.info('%s end write event list', time.ctime())
195

◆ zipXMLFile()

EventUtils.zipXMLFile ( directory,
filename )

Definition at line 311 of file EventUtils.py.

311def zipXMLFile(directory, filename):
312 msg = logging.getLogger( 'EventUtils' )
313 """Zip the JiveXML file for the specified event.
314
315 Looks for a JiveXML file with the required filename in the given directory,
316 and if one is found, zip it. The original file is not deleted.
317 Zip the file to .tmp first, and then rename to .zip
318 to avoid triggering the transfer before the zip file is closed.
319 """
320 msg.info('%s begin zipXMLFile', time.ctime(time.time()))
321 if Path(filename).suffix != '.xml':
322 msg.warning("Unexpected Atlantis file name: %s", filename)
323 return
324 matchingFiles = glob.glob(f"{directory}/{filename}")
325 if len(matchingFiles) == 1: # Only proceed if exactly one matching file found, for safety
326 msg.info('exactly one matching file found')
327 matchingFilePath = Path(matchingFiles[0])
328 tmpFilePath = matchingFilePath.with_suffix('.tmp')
329 zipFilePath = matchingFilePath.with_suffix('.zip')
330 matchingFilePath = Path(matchingFilePath)
331 matchingFileName = matchingFilePath.name
332 msg.info('Zipping %s to %s', matchingFileName, zipFilePath.name)
333 try:
334 with ZipFile(tmpFilePath,'w', compression=ZIP_DEFLATED) as z:
335 z.write(matchingFilePath.as_posix(), arcname=matchingFileName)
336 os.rename(f'{directory}/{tmpFilePath.name}', f'{directory}/{zipFilePath.name}')
337 except OSError as err:
338 msg.warning("Could not zip %s: %s", filename, err)
339 msg.info('%s end of zipXMLFile', time.ctime(time.time()))
340