ATLAS Offline Software
EventUtils.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 import os, re, time, glob, shutil
4 from pathlib import Path
5 from zipfile import ZipFile, ZIP_DEFLATED
6 from AthenaCommon.Logging import logging
7 
8 # This method reads the files in the given directory, sorts them by run/event number,
9 # finds atlantis and vp1 files belonging to the same event and returns a list of events
10 # and their corresponing files: (event, run, atlantis, vp1)
11 # checkpair=True and remove=True, remove the files that do not form an atlanits-vp1 pair
12 # checkpair=True and remove=False, generate event list for atlantis-vp1 pairs
13 # checkpair=False, generate event list without checking for valid pairs
14 def getEventlist(directory, checkpair, remove=True, patternAtlantis='.xml', patternVP1='.pool.root'):
15  msg = logging.getLogger( 'EventUtils' )
16  msg.verbose('%s begin get event list', time.ctime(time.time()))
17  filelist = []
18  files = os.listdir(directory)
19 
20  # look for JiveXML_{RunNumber}_{EventNumber}.xml or vp1_r{RunNumber}_ev{EventNumber}_{TimeStamps}CEST.pool.root files
21  # excluding the CastorScript bookkeeping files with .STATUS suffix to the original data files
22  pattern = r'(?:JiveXML|vp1)_(?:|r)(\d+)_(?:|ev)(\d+)'+f'(?:{re.escape(patternAtlantis)}|_.+CEST{re.escape(patternVP1)})'+r'(?!\.)'
23 
24  # Build a list of files ordered by run/event number
25  for file in files:
26  matches = re.search(pattern, file)
27 
28  # Event file, add tot the list
29  if matches:
30  run = "%012d" % int(matches.group(1))
31  event = "%012d" % int(matches.group(2))
32 
33  fileentry = run, event, file
34  filelist.append(fileentry)
35 
36  i = 0
37  eventlist = []
38  filelist.sort()
39  numfiles = len(filelist)
40 
41  # Now loop through the files to form pairs
42  while i < numfiles-1:
43  if checkpair and (filelist[i][0] != filelist[i+1][0] or filelist[i][1] != filelist[i+1][1]):
44 
45  # Make sure that files without a partner (atlantis-vp1) are also removed
46  if remove and i == 0:
47  msg.warning("One of the files is missing for run %s, event %s, removing the other as well.", filelist[i][0], filelist[i][1])
48  try:
49  msg.verbose("Removing %s/%s", directory, filelist[i][2])
50  os.unlink("%s/%s" % (directory, filelist[i][2]))
51  except OSError as err:
52  msg.warning("Could not remove '%s': %s", filelist[i][2], err)
53 
54  # Do not include such files in the event list
55  i = i + 1
56  else:
57  # Build event list
58  evententry = filelist[i][0], filelist[i][1], filelist[i][2], filelist[i+1][2]
59  eventlist.append(evententry)
60  i = i + 1
61 
62  msg.verbose('%s end get event list', time.ctime(time.time()))
63  return eventlist
64 
65 # Prune events in the given directory if the number exceeds the specified number
66 def pruneEvents(directory, maxevents, eventlist):
67  msg = logging.getLogger( 'EventUtils' )
68  msg.verbose('%s begin prune events', time.ctime(time.time()))
69  i = 0
70  numevents = len(eventlist)
71 
72  # Check if there are more events than allowed and prune a number of files equal to the excess
73  if numevents > maxevents:
74  for i in range(numevents-maxevents):
75  msg.verbose("maxevents=%d, numevents=%d, i=%d", maxevents, numevents, i)
76  run, event, atlantis, vp1 = eventlist.pop(0)
77  msg.verbose("Going to prune files %s and %s for run %s and event %s.", atlantis, vp1, run, event)
78  try:
79  msg.verbose("%s Trying to unlink %s", time.ctime(time.time()), atlantis)
80  os.unlink("%s/%s" % (directory, atlantis))
81  msg.verbose("%s Trying to unlink %s", time.ctime(time.time()), vp1)
82  os.unlink("%s/%s" % (directory, vp1))
83  msg.verbose("%s Done with unlink", time.ctime(time.time()))
84  except OSError as err:
85  msg.warning("Could not remove files for run %s, event %s: %s", run, event, err)
86 
87  else:
88  msg.debug("Nothing to prune (%d <= %d).", numevents, maxevents)
89  msg.verbose('%s end prune events', time.ctime(time.time()))
90 
91 # Build the event.list file that is used by atlas-live.cern.ch for synchronizing events
92 def writeEventlist(directory, eventlist, listname='event'):
93  msg = logging.getLogger( 'EventUtils' )
94  msg.verbose('%s begin write event list', time.ctime(time.time()))
95  pid = os.getpid()
96  try:
97  file = open("%s/%s.%d" % (directory, listname, pid), 'w')
98  for run, event, atlantis, vp1 in eventlist:
99  file.write("run:%s,event:%s,atlantis:%s,vp1:%s\n" % (run, event, atlantis, vp1))
100  file.close()
101  except IOError as err:
102  msg.warning("Could not write event list: %s", err)
103 
104  # Rename for an atomic overwrite operation
105  try:
106  os.rename("%s/%s.%d" % (directory, listname, pid), "%s/%s.list" % (directory, listname))
107  except OSError as err:
108  msg.warning("Could not rename %s.%d to %s.list: %s", listname, pid, listname, err)
109  msg.verbose('%s end write event list', time.ctime(time.time()))
110 
111 # Perform all of these in one command
112 def cleanDirectory(directory, maxevents, checkpair,isBeamSplashMode):
113  msg = logging.getLogger( 'EventUtils' )
114 
115  msg.verbose('%s begin clean directory', time.ctime(time.time()))
116  eventlist = getEventlist(directory, checkpair)
117  if maxevents>0:
118  pruneEvents(directory, maxevents, eventlist)
119  writeEventlist(directory, eventlist)
120 
121  # disable this for beam splashes. Call zipXMLFile directly in OnlineEventDisplaysSvc.py to transfer every event.
122  if len(eventlist)>0 and not isBeamSplashMode:
123  prepareFilesForTransfer(directory, eventlist, pair=checkpair, timeinterval=60)
124 
125  if isBeamSplashMode:
126  for filename in os.listdir(directory):
127  if filename.startswith("vp1") and "CEST.pool.root" in filename:
128  orgname = f'{directory}/{filename}'
129  newname = orgname.replace('.pool.root', '.online.pool.root')
130  try:
131  shutil.copyfile(Path(orgname), Path(newname))
132  except OSError as err:
133  msg.warning("Could not copy %s to %s: %s", orgname, newname, err)
134  msg.verbose('%s end clean directory', time.ctime(time.time()))
135 
136 def prepareFilesForTransfer(directory, eventlist, pair, timeinterval):
137  msg = logging.getLogger( 'EventUtils' )
138  """Preparing the list of files for CastorScript to transfer to EOS
139 
140  CastorScript is configured to look for *.zip and *.online.pool.root files to transfer.
141  This function count the number of CastorScript bookkeeping files *.COPIED
142  and compare with the number of original data files *.zip and *.online.pool.root to obtain the transfer status.
143  If all transfer has been completed, get the most recent event from eventlist and add it to the transferlist.
144  Zip the corresponding xml file or rename the corresponding pool.root file to trigger the transfer.
145 
146  eventlist: list of events that can be transferred
147  pair: True to transfer atlantis-vp1 pairs. False to transfer atlantis file only.
148  timeinterval: time interval between two events in the transfer list (in seconds).
149  """
150  msg.verbose('%s begin prepare files for transfer', time.ctime(time.time()))
151  transferlist = getEventlist(directory, checkpair=pair, remove=False, patternAtlantis='.zip', patternVP1='.online.pool.root')
152  if len(transferlist)>0 and eventlist[-1][0] == transferlist[-1][0] and eventlist[-1][1] == transferlist[-1][1]:
153  msg.debug("Last event already in transfer list. No new event to transfer.")
154  return
155 
156  # Check transfer status
157  matchAtlantis = glob.glob(f"{directory}/*.zip") # atlantis files ready for transfer
158  copiedAtlantis = glob.glob(f"{directory}/*.zip.COPIED") # CastorScript bookkeeping files indicating the transfer is done
159  matchVP1 = glob.glob(f"{directory}/*.online.pool.root") # VP1 files ready for transfer
160  copiedVP1 = glob.glob(f"{directory}/*.online.pool.root.COPIED") # CastorScript bookkeeping files indicating the transfer is done
161  # Check if the transfer is done
162  if len(matchAtlantis)>len(copiedAtlantis) or len(matchVP1)>len(copiedVP1):
163  msg.debug("There are files in transfer. Do not attemp to add new event to the transfer list.")
164  return
165  # Check if the previous transferred event is too new
166  elif len(transferlist)>0: # use the sorted list here
167  try:
168  age = time.time() - os.path.getmtime(f'{directory}/{transferlist[-1][2]}') # check the timestamp of the latest atlantis zip file
169  if age < timeinterval:
170  msg.debug("Wait for %ds before adding new events to the transfer queue. Last event in the queue was added %ds ago.", timeinterval, age)
171  return
172  except OSError as err:
173  msg.warning("Failed to check the timestamp of %s. %s", transferlist[-1][2], err)
174  return
175 
176  # Get the latest event from event list to transfer
177  run, event, atlantis, vp1 = eventlist[-1]
178 
179  # Handle atlantis files
180  msg.debug('%s going to zip file %s', time.ctime(time.time()), atlantis)
181  zipXMLFile(directory, atlantis)
182 
183  # Handle VP1 files
184  if pair:
185  msg.debug('%s going to rename ESD file %s', time.ctime(time.time()), vp1)
186  renameESDFile(directory, vp1)
187 
188  writeEventlist(directory, transferlist, listname='transfer')
189  msg.verbose('%s end prepare files for transfer', time.ctime(time.time()))
190 
191 def zipXMLFile(directory, filename):
192  msg = logging.getLogger( 'EventUtils' )
193  """Zip the JiveXML file for the specified event.
194 
195  Looks for a JiveXML file with the required filename in the given directory,
196  and if one is found, zip it. The original file is not deleted.
197  Zip the file to .tmp first, and then rename to .zip
198  to avoid triggering the transfer before the zip file is closed.
199  """
200  msg.verbose('%s begin zipXMLFile', time.ctime(time.time()))
201  if Path(filename).suffix != '.xml':
202  msg.warning("Unexpected Atlantis file name: %s", filename)
203  return
204  matchingFiles = glob.glob(f"{directory}/{filename}")
205  if len(matchingFiles) == 1: # Only proceed if exactly one matching file found, for safety
206  msg.verbose('exactly one matching file found')
207  matchingFilePath = Path(matchingFiles[0])
208  tmpFilePath = matchingFilePath.with_suffix('.tmp')
209  zipFilePath = matchingFilePath.with_suffix('.zip')
210  matchingFilePath = Path(matchingFilePath)
211  matchingFileName = matchingFilePath.name
212  msg.verbose('Zipping %s to %s', matchingFileName, zipFilePath.name)
213  try:
214  with ZipFile(tmpFilePath,'w', compression=ZIP_DEFLATED) as z:
215  z.write(matchingFilePath.as_posix(), arcname=matchingFileName)
216  os.rename(f'{directory}/{tmpFilePath.name}', f'{directory}/{zipFilePath.name}')
217  except OSError as err:
218  msg.warning("Could not zip %s: %s", filename, err)
219  msg.verbose('%s end of zipXMLFile', time.ctime(time.time()))
220 
221 def renameESDFile(directory, filename):
222  msg = logging.getLogger( 'EventUtils' )
223  """Rename the ESD for the specified event.
224 
225  Looks for an ESD file with the required filename in the given directory,
226  and if one is found, rename it to .online.pool.root. The original file is not deleted.
227  """
228  msg.verbose('%s begin renameESD', time.ctime(time.time()))
229  if Path(filename).suffixes != ['.pool', '.root']:
230  msg.warning("Unexpected VP1 file name: %s", filename)
231  return
232  orgname = f'{directory}/{filename}'
233  newname = orgname.replace('.pool.root', '.online.pool.root')
234  try:
235  shutil.copyfile(Path(orgname), Path(newname))
236  except OSError as err:
237  msg.warning("Could not copy %s to %s: %s", orgname, newname, err)
238  msg.verbose('%s end of renameESD', time.ctime(time.time()))
EventUtils.writeEventlist
def writeEventlist(directory, eventlist, listname='event')
Definition: EventUtils.py:92
EventUtils.zipXMLFile
def zipXMLFile(directory, filename)
Definition: EventUtils.py:191
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
EventUtils.cleanDirectory
def cleanDirectory(directory, maxevents, checkpair, isBeamSplashMode)
Definition: EventUtils.py:112
EventUtils.getEventlist
def getEventlist(directory, checkpair, remove=True, patternAtlantis='.xml', patternVP1='.pool.root')
Definition: EventUtils.py:14
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
EventUtils.pruneEvents
def pruneEvents(directory, maxevents, eventlist)
Definition: EventUtils.py:66
Trk::open
@ open
Definition: BinningType.h:40
EventUtils.renameESDFile
def renameESDFile(directory, filename)
Definition: EventUtils.py:221
EventUtils.prepareFilesForTransfer
def prepareFilesForTransfer(directory, eventlist, pair, timeinterval)
Definition: EventUtils.py:136