3 import os, re, time, glob, shutil
4 from pathlib
import Path
5 from zipfile
import ZipFile, ZIP_DEFLATED
6 from AthenaCommon.Logging
import logging
14 def getEventlist(directory, checkpair, remove=False, patternAtlantis='.xml', patternVP1='.pool.root'):
15 msg = logging.getLogger(
'EventUtils' )
16 msg.verbose(
'%s begin get event list', time.ctime(time.time()))
19 if os.path.exists(directory):
20 files = os.listdir(directory)
24 pattern =
r'(?:JiveXML|vp1)_(?:|r)(\d+)_(?:|ev)(\d+)'+f
'(?:{re.escape(patternAtlantis)}|_.+CEST{re.escape(patternVP1)})'+
r'(?!\.)'
28 matches = re.search(pattern, file)
32 run =
"%012d" %
int(matches.group(1))
33 event =
"%012d" %
int(matches.group(2))
35 fileentry = run, event, file
36 filelist.append(fileentry)
41 numfiles = len(filelist)
45 if checkpair
and (filelist[i][0] != filelist[i+1][0]
or filelist[i][1] != filelist[i+1][1]):
49 msg.warning(
"One of the files is missing for run %s, event %s, removing the other as well.", filelist[i][0], filelist[i][1])
51 msg.info(
"Removing %s/%s", directory, filelist[i][2])
52 os.unlink(
"%s/%s" % (directory, filelist[i][2]))
53 except OSError
as err:
54 msg.warning(
"Could not remove '%s': %s", filelist[i][2], err)
60 evententry = filelist[i][0], filelist[i][1], filelist[i][2], filelist[i+1][2]
61 eventlist.append(evententry)
64 msg.verbose(
'%s end get event list', time.ctime(time.time()))
66 msg.warning(
'The directory %s does not exist.', directory)
71 msg = logging.getLogger(
'EventUtils' )
72 msg.verbose(
'%s begin prune events', time.ctime(time.time()))
74 numevents = len(eventlist)
77 if numevents > maxevents:
78 for i
in range(numevents-maxevents):
79 msg.verbose(
"maxevents=%d, numevents=%d, i=%d", maxevents, numevents, i)
80 run, event, atlantis, vp1 = eventlist.pop(0)
81 msg.verbose(
"Going to prune files %s and %s for run %s and event %s.", atlantis, vp1, run, event)
83 msg.verbose(
"%s Trying to unlink %s", time.ctime(time.time()), atlantis)
84 os.unlink(
"%s/%s" % (directory, atlantis))
85 msg.verbose(
"%s Trying to unlink %s", time.ctime(time.time()), vp1)
86 os.unlink(
"%s/%s" % (directory, vp1))
87 msg.verbose(
"%s Done with unlink", time.ctime(time.time()))
88 except OSError
as err:
89 msg.warning(
"Could not remove files for run %s, event %s: %s", run, event, err)
92 msg.debug(
"Nothing to prune (%d <= %d).", numevents, maxevents)
93 msg.verbose(
'%s end prune events', time.ctime(time.time()))
97 msg = logging.getLogger(
'EventUtils' )
98 msg.verbose(
'%s begin write event list', time.ctime(time.time()))
101 file =
open(
"%s/%s.%d" % (directory, listname, pid),
'w')
102 for run, event, atlantis, vp1
in eventlist:
103 file.write(
"run:%s,event:%s,atlantis:%s,vp1:%s\n" % (run, event, atlantis, vp1))
105 except IOError
as err:
106 msg.warning(
"Could not write event list: %s", err)
110 os.rename(
"%s/%s.%d" % (directory, listname, pid),
"%s/%s.list" % (directory, listname))
111 except OSError
as err:
112 msg.warning(
"Could not rename %s.%d to %s.list: %s", listname, pid, listname, err)
113 msg.verbose(
'%s end write event list', time.ctime(time.time()))
117 msg = logging.getLogger(
'EventUtils' )
119 msg.verbose(
'%s begin clean directory', time.ctime(time.time()))
126 if len(eventlist)>0
and not isBeamSplashMode:
130 for filename
in os.listdir(directory):
131 if filename.startswith(
"vp1")
and "CEST.pool.root" in filename:
132 orgname = f
'{directory}/{filename}'
133 newname = orgname.replace(
'.pool.root',
'.online.pool.root')
135 shutil.copyfile(Path(orgname), Path(newname))
136 except OSError
as err:
137 msg.warning(
"Could not copy %s to %s: %s", orgname, newname, err)
138 msg.verbose(
'%s end clean directory', time.ctime(time.time()))
141 msg = logging.getLogger(
'EventUtils' )
142 """Preparing the list of files for CastorScript to transfer to EOS
144 CastorScript is configured to look for *.zip and *.online.pool.root files to transfer.
145 This function count the number of CastorScript bookkeeping files *.COPIED
146 and compare with the number of original data files *.zip and *.online.pool.root to obtain the transfer status.
147 If all transfer has been completed, get the most recent event from eventlist and add it to the transferlist.
148 Zip the corresponding xml file or rename the corresponding pool.root file to trigger the transfer.
150 eventlist: list of events that can be transferred
151 pair: True to transfer atlantis-vp1 pairs. False to transfer atlantis file only.
152 timeinterval: time interval between two events in the transfer list (in seconds).
154 msg.verbose(
'%s begin prepare files for transfer', time.ctime(time.time()))
155 transferlist =
getEventlist(directory, checkpair=pair, remove=
False, patternAtlantis=
'.zip', patternVP1=
'.online.pool.root')
156 if len(transferlist)>0
and eventlist[-1][0] == transferlist[-1][0]
and eventlist[-1][1] == transferlist[-1][1]:
157 msg.debug(
"Last event already in transfer list. No new event to transfer.")
161 matchAtlantis = glob.glob(f
"{directory}/*.zip")
162 copiedAtlantis = glob.glob(f
"{directory}/*.zip.COPIED")
163 matchVP1 = glob.glob(f
"{directory}/*.online.pool.root")
164 copiedVP1 = glob.glob(f
"{directory}/*.online.pool.root.COPIED")
166 if len(matchAtlantis)>len(copiedAtlantis)
or len(matchVP1)>len(copiedVP1):
167 msg.debug(
"There are files in transfer. Do not attemp to add new event to the transfer list.")
170 elif len(transferlist)>0:
172 age = time.time() - os.path.getmtime(f
'{directory}/{transferlist[-1][2]}')
173 if age < timeinterval:
174 msg.debug(
"Wait for %ds before adding new events to the transfer queue. Last event in the queue was added %ds ago.", timeinterval, age)
176 except OSError
as err:
177 msg.warning(
"Failed to check the timestamp of %s. %s", transferlist[-1][2], err)
181 run, event, atlantis, vp1 = eventlist[-1]
184 msg.debug(
'%s going to zip file %s', time.ctime(time.time()), atlantis)
189 msg.debug(
'%s going to rename ESD file %s', time.ctime(time.time()), vp1)
193 msg.verbose(
'%s end prepare files for transfer', time.ctime(time.time()))
196 msg = logging.getLogger(
'EventUtils' )
197 """Zip the JiveXML file for the specified event.
199 Looks for a JiveXML file with the required filename in the given directory,
200 and if one is found, zip it. The original file is not deleted.
201 Zip the file to .tmp first, and then rename to .zip
202 to avoid triggering the transfer before the zip file is closed.
204 msg.verbose(
'%s begin zipXMLFile', time.ctime(time.time()))
205 if Path(filename).suffix !=
'.xml':
206 msg.warning(
"Unexpected Atlantis file name: %s", filename)
208 matchingFiles = glob.glob(f
"{directory}/{filename}")
209 if len(matchingFiles) == 1:
210 msg.verbose(
'exactly one matching file found')
211 matchingFilePath = Path(matchingFiles[0])
212 tmpFilePath = matchingFilePath.with_suffix(
'.tmp')
213 zipFilePath = matchingFilePath.with_suffix(
'.zip')
214 matchingFilePath = Path(matchingFilePath)
215 matchingFileName = matchingFilePath.name
216 msg.verbose(
'Zipping %s to %s', matchingFileName, zipFilePath.name)
218 with ZipFile(tmpFilePath,
'w', compression=ZIP_DEFLATED)
as z:
219 z.write(matchingFilePath.as_posix(), arcname=matchingFileName)
220 os.rename(f
'{directory}/{tmpFilePath.name}', f
'{directory}/{zipFilePath.name}')
221 except OSError
as err:
222 msg.warning(
"Could not zip %s: %s", filename, err)
223 msg.verbose(
'%s end of zipXMLFile', time.ctime(time.time()))
226 msg = logging.getLogger(
'EventUtils' )
227 """Rename the ESD for the specified event.
229 Looks for an ESD file with the required filename in the given directory,
230 and if one is found, rename it to .online.pool.root. The original file is not deleted.
232 msg.verbose(
'%s begin renameESD', time.ctime(time.time()))
233 if Path(filename).suffixes != [
'.pool',
'.root']:
234 msg.warning(
"Unexpected VP1 file name: %s", filename)
236 orgname = f
'{directory}/{filename}'
237 newname = orgname.replace(
'.pool.root',
'.online.pool.root')
239 shutil.copyfile(Path(orgname), Path(newname))
240 except OSError
as err:
241 msg.warning(
"Could not copy %s to %s: %s", orgname, newname, err)
242 msg.verbose(
'%s end of renameESD', time.ctime(time.time()))