ATLAS Offline Software
EventUtils.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 import os, re, time, glob, shutil
4 from pathlib import Path
5 from zipfile import ZipFile, ZIP_DEFLATED
6 from AthenaCommon.Logging import logging
7 
8 # This method reads the files in the given directory, sorts them by run/event number,
9 # finds atlantis and vp1 files belonging to the same event and returns a list of events
10 # and their corresponing files: (event, run, atlantis, vp1)
11 # checkpair=True and remove=True, remove the files that do not form an atlanits-vp1 pair
12 # checkpair=True and remove=False, generate event list for atlantis-vp1 pairs
13 # checkpair=False, generate event list without checking for valid pairs
14 def getEventlist(directory, checkpair, remove=False, patternAtlantis='.xml', patternVP1='.pool.root'):
15  msg = logging.getLogger( 'EventUtils' )
16  msg.verbose('%s begin get event list', time.ctime(time.time()))
17  filelist = []
18  eventlist = []
19  if os.path.exists(directory):
20  files = os.listdir(directory)
21 
22  # look for JiveXML_{RunNumber}_{EventNumber}.xml or vp1_r{RunNumber}_ev{EventNumber}_{TimeStamps}CEST.pool.root files
23  # excluding the CastorScript bookkeeping files with .STATUS suffix to the original data files
24  pattern = r'(?:JiveXML|vp1)_(?:|r)(\d+)_(?:|ev)(\d+)'+f'(?:{re.escape(patternAtlantis)}|_.+CEST{re.escape(patternVP1)})'+r'(?!\.)'
25 
26  # Build a list of files ordered by run/event number
27  for file in files:
28  matches = re.search(pattern, file)
29 
30  # Event file, add tot the list
31  if matches:
32  run = "%012d" % int(matches.group(1))
33  event = "%012d" % int(matches.group(2))
34 
35  fileentry = run, event, file
36  filelist.append(fileentry)
37 
38  i = 0
39  eventlist = []
40  filelist.sort()
41  numfiles = len(filelist)
42 
43  # Now loop through the files to form pairs
44  while i < numfiles-1:
45  if checkpair and (filelist[i][0] != filelist[i+1][0] or filelist[i][1] != filelist[i+1][1]):
46 
47  # Make sure that files without a partner (atlantis-vp1) are also removed
48  if remove and i == 0:
49  msg.warning("One of the files is missing for run %s, event %s, removing the other as well.", filelist[i][0], filelist[i][1])
50  try:
51  msg.info("Removing %s/%s", directory, filelist[i][2])
52  os.unlink("%s/%s" % (directory, filelist[i][2]))
53  except OSError as err:
54  msg.warning("Could not remove '%s': %s", filelist[i][2], err)
55 
56  # Do not include such files in the event list
57  i = i + 1
58  else:
59  # Build event list
60  evententry = filelist[i][0], filelist[i][1], filelist[i][2], filelist[i+1][2]
61  eventlist.append(evententry)
62  i = i + 1
63 
64  msg.verbose('%s end get event list', time.ctime(time.time()))
65  else:
66  msg.warning('The directory %s does not exist.', directory)
67  return eventlist
68 
69 # Prune events in the given directory if the number exceeds the specified number
70 def pruneEvents(directory, maxevents, eventlist):
71  msg = logging.getLogger( 'EventUtils' )
72  msg.verbose('%s begin prune events', time.ctime(time.time()))
73  i = 0
74  numevents = len(eventlist)
75 
76  # Check if there are more events than allowed and prune a number of files equal to the excess
77  if numevents > maxevents:
78  for i in range(numevents-maxevents):
79  msg.verbose("maxevents=%d, numevents=%d, i=%d", maxevents, numevents, i)
80  run, event, atlantis, vp1 = eventlist.pop(0)
81  msg.verbose("Going to prune files %s and %s for run %s and event %s.", atlantis, vp1, run, event)
82  try:
83  msg.verbose("%s Trying to unlink %s", time.ctime(time.time()), atlantis)
84  os.unlink("%s/%s" % (directory, atlantis))
85  msg.verbose("%s Trying to unlink %s", time.ctime(time.time()), vp1)
86  os.unlink("%s/%s" % (directory, vp1))
87  msg.verbose("%s Done with unlink", time.ctime(time.time()))
88  except OSError as err:
89  msg.warning("Could not remove files for run %s, event %s: %s", run, event, err)
90 
91  else:
92  msg.debug("Nothing to prune (%d <= %d).", numevents, maxevents)
93  msg.verbose('%s end prune events', time.ctime(time.time()))
94 
95 # Build the event.list file that is used by atlas-live.cern.ch for synchronizing events
96 def writeEventlist(directory, eventlist, listname='event'):
97  msg = logging.getLogger( 'EventUtils' )
98  msg.verbose('%s begin write event list', time.ctime(time.time()))
99  pid = os.getpid()
100  try:
101  file = open("%s/%s.%d" % (directory, listname, pid), 'w')
102  for run, event, atlantis, vp1 in eventlist:
103  file.write("run:%s,event:%s,atlantis:%s,vp1:%s\n" % (run, event, atlantis, vp1))
104  file.close()
105  except IOError as err:
106  msg.warning("Could not write event list: %s", err)
107 
108  # Rename for an atomic overwrite operation
109  try:
110  os.rename("%s/%s.%d" % (directory, listname, pid), "%s/%s.list" % (directory, listname))
111  except OSError as err:
112  msg.warning("Could not rename %s.%d to %s.list: %s", listname, pid, listname, err)
113  msg.verbose('%s end write event list', time.ctime(time.time()))
114 
115 # Perform all of these in one command
116 def cleanDirectory(directory, maxevents, checkpair,isBeamSplashMode):
117  msg = logging.getLogger( 'EventUtils' )
118 
119  msg.verbose('%s begin clean directory', time.ctime(time.time()))
120  eventlist = getEventlist(directory, checkpair)
121  if maxevents>0:
122  pruneEvents(directory, maxevents, eventlist)
123  writeEventlist(directory, eventlist)
124 
125  # disable this for beam splashes. Call zipXMLFile directly in OnlineEventDisplaysSvc.py to transfer every event.
126  if len(eventlist)>0 and not isBeamSplashMode:
127  prepareFilesForTransfer(directory, eventlist, pair=checkpair, timeinterval=60)
128 
129  if isBeamSplashMode:
130  for filename in os.listdir(directory):
131  if filename.startswith("vp1") and "CEST.pool.root" in filename:
132  orgname = f'{directory}/{filename}'
133  newname = orgname.replace('.pool.root', '.online.pool.root')
134  try:
135  shutil.copyfile(Path(orgname), Path(newname))
136  except OSError as err:
137  msg.warning("Could not copy %s to %s: %s", orgname, newname, err)
138  msg.verbose('%s end clean directory', time.ctime(time.time()))
139 
140 def prepareFilesForTransfer(directory, eventlist, pair, timeinterval):
141  msg = logging.getLogger( 'EventUtils' )
142  """Preparing the list of files for CastorScript to transfer to EOS
143 
144  CastorScript is configured to look for *.zip and *.online.pool.root files to transfer.
145  This function count the number of CastorScript bookkeeping files *.COPIED
146  and compare with the number of original data files *.zip and *.online.pool.root to obtain the transfer status.
147  If all transfer has been completed, get the most recent event from eventlist and add it to the transferlist.
148  Zip the corresponding xml file or rename the corresponding pool.root file to trigger the transfer.
149 
150  eventlist: list of events that can be transferred
151  pair: True to transfer atlantis-vp1 pairs. False to transfer atlantis file only.
152  timeinterval: time interval between two events in the transfer list (in seconds).
153  """
154  msg.verbose('%s begin prepare files for transfer', time.ctime(time.time()))
155  transferlist = getEventlist(directory, checkpair=pair, remove=False, patternAtlantis='.zip', patternVP1='.online.pool.root')
156  if len(transferlist)>0 and eventlist[-1][0] == transferlist[-1][0] and eventlist[-1][1] == transferlist[-1][1]:
157  msg.debug("Last event already in transfer list. No new event to transfer.")
158  return
159 
160  # Check transfer status
161  matchAtlantis = glob.glob(f"{directory}/*.zip") # atlantis files ready for transfer
162  copiedAtlantis = glob.glob(f"{directory}/*.zip.COPIED") # CastorScript bookkeeping files indicating the transfer is done
163  matchVP1 = glob.glob(f"{directory}/*.online.pool.root") # VP1 files ready for transfer
164  copiedVP1 = glob.glob(f"{directory}/*.online.pool.root.COPIED") # CastorScript bookkeeping files indicating the transfer is done
165  # Check if the transfer is done
166  if len(matchAtlantis)>len(copiedAtlantis) or len(matchVP1)>len(copiedVP1):
167  msg.debug("There are files in transfer. Do not attemp to add new event to the transfer list.")
168  return
169  # Check if the previous transferred event is too new
170  elif len(transferlist)>0: # use the sorted list here
171  try:
172  age = time.time() - os.path.getmtime(f'{directory}/{transferlist[-1][2]}') # check the timestamp of the latest atlantis zip file
173  if age < timeinterval:
174  msg.debug("Wait for %ds before adding new events to the transfer queue. Last event in the queue was added %ds ago.", timeinterval, age)
175  return
176  except OSError as err:
177  msg.warning("Failed to check the timestamp of %s. %s", transferlist[-1][2], err)
178  return
179 
180  # Get the latest event from event list to transfer
181  run, event, atlantis, vp1 = eventlist[-1]
182 
183  # Handle atlantis files
184  msg.debug('%s going to zip file %s', time.ctime(time.time()), atlantis)
185  zipXMLFile(directory, atlantis)
186 
187  # Handle VP1 files
188  if pair:
189  msg.debug('%s going to rename ESD file %s', time.ctime(time.time()), vp1)
190  renameESDFile(directory, vp1)
191 
192  writeEventlist(directory, transferlist, listname='transfer')
193  msg.verbose('%s end prepare files for transfer', time.ctime(time.time()))
194 
195 def zipXMLFile(directory, filename):
196  msg = logging.getLogger( 'EventUtils' )
197  """Zip the JiveXML file for the specified event.
198 
199  Looks for a JiveXML file with the required filename in the given directory,
200  and if one is found, zip it. The original file is not deleted.
201  Zip the file to .tmp first, and then rename to .zip
202  to avoid triggering the transfer before the zip file is closed.
203  """
204  msg.verbose('%s begin zipXMLFile', time.ctime(time.time()))
205  if Path(filename).suffix != '.xml':
206  msg.warning("Unexpected Atlantis file name: %s", filename)
207  return
208  matchingFiles = glob.glob(f"{directory}/{filename}")
209  if len(matchingFiles) == 1: # Only proceed if exactly one matching file found, for safety
210  msg.verbose('exactly one matching file found')
211  matchingFilePath = Path(matchingFiles[0])
212  tmpFilePath = matchingFilePath.with_suffix('.tmp')
213  zipFilePath = matchingFilePath.with_suffix('.zip')
214  matchingFilePath = Path(matchingFilePath)
215  matchingFileName = matchingFilePath.name
216  msg.verbose('Zipping %s to %s', matchingFileName, zipFilePath.name)
217  try:
218  with ZipFile(tmpFilePath,'w', compression=ZIP_DEFLATED) as z:
219  z.write(matchingFilePath.as_posix(), arcname=matchingFileName)
220  os.rename(f'{directory}/{tmpFilePath.name}', f'{directory}/{zipFilePath.name}')
221  except OSError as err:
222  msg.warning("Could not zip %s: %s", filename, err)
223  msg.verbose('%s end of zipXMLFile', time.ctime(time.time()))
224 
225 def renameESDFile(directory, filename):
226  msg = logging.getLogger( 'EventUtils' )
227  """Rename the ESD for the specified event.
228 
229  Looks for an ESD file with the required filename in the given directory,
230  and if one is found, rename it to .online.pool.root. The original file is not deleted.
231  """
232  msg.verbose('%s begin renameESD', time.ctime(time.time()))
233  if Path(filename).suffixes != ['.pool', '.root']:
234  msg.warning("Unexpected VP1 file name: %s", filename)
235  return
236  orgname = f'{directory}/{filename}'
237  newname = orgname.replace('.pool.root', '.online.pool.root')
238  try:
239  shutil.copyfile(Path(orgname), Path(newname))
240  except OSError as err:
241  msg.warning("Could not copy %s to %s: %s", orgname, newname, err)
242  msg.verbose('%s end of renameESD', time.ctime(time.time()))
EventUtils.writeEventlist
def writeEventlist(directory, eventlist, listname='event')
Definition: EventUtils.py:96
EventUtils.zipXMLFile
def zipXMLFile(directory, filename)
Definition: EventUtils.py:195
EventUtils.getEventlist
def getEventlist(directory, checkpair, remove=False, patternAtlantis='.xml', patternVP1='.pool.root')
Definition: EventUtils.py:14
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
EventUtils.cleanDirectory
def cleanDirectory(directory, maxevents, checkpair, isBeamSplashMode)
Definition: EventUtils.py:116
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
EventUtils.pruneEvents
def pruneEvents(directory, maxevents, eventlist)
Definition: EventUtils.py:70
Trk::open
@ open
Definition: BinningType.h:40
EventUtils.renameESDFile
def renameESDFile(directory, filename)
Definition: EventUtils.py:225
EventUtils.prepareFilesForTransfer
def prepareFilesForTransfer(directory, eventlist, pair, timeinterval)
Definition: EventUtils.py:140