3 import os, re, time, glob, shutil
4 from pathlib
import Path
5 from zipfile
import ZipFile, ZIP_DEFLATED
6 from AthenaCommon.Logging
import logging
14 def getEventlist(directory, checkpair, remove=True, patternAtlantis='.xml', patternVP1='.pool.root'):
15 msg = logging.getLogger(
'EventUtils' )
16 msg.verbose(
'%s begin get event list', time.ctime(time.time()))
18 files = os.listdir(directory)
22 pattern =
r'(?:JiveXML|vp1)_(?:|r)(\d+)_(?:|ev)(\d+)'+f
'(?:{re.escape(patternAtlantis)}|_.+CEST{re.escape(patternVP1)})'+
r'(?!\.)'
26 matches = re.search(pattern, file)
30 run =
"%012d" %
int(matches.group(1))
31 event =
"%012d" %
int(matches.group(2))
33 fileentry = run, event, file
34 filelist.append(fileentry)
39 numfiles = len(filelist)
43 if checkpair
and (filelist[i][0] != filelist[i+1][0]
or filelist[i][1] != filelist[i+1][1]):
47 msg.warning(
"One of the files is missing for run %s, event %s, removing the other as well.", filelist[i][0], filelist[i][1])
49 msg.verbose(
"Removing %s/%s", directory, filelist[i][2])
50 os.unlink(
"%s/%s" % (directory, filelist[i][2]))
51 except OSError
as err:
52 msg.warning(
"Could not remove '%s': %s", filelist[i][2], err)
58 evententry = filelist[i][0], filelist[i][1], filelist[i][2], filelist[i+1][2]
59 eventlist.append(evententry)
62 msg.verbose(
'%s end get event list', time.ctime(time.time()))
67 msg = logging.getLogger(
'EventUtils' )
68 msg.verbose(
'%s begin prune events', time.ctime(time.time()))
70 numevents = len(eventlist)
73 if numevents > maxevents:
74 for i
in range(numevents-maxevents):
75 msg.verbose(
"maxevents=%d, numevents=%d, i=%d", maxevents, numevents, i)
76 run, event, atlantis, vp1 = eventlist.pop(0)
77 msg.verbose(
"Going to prune files %s and %s for run %s and event %s.", atlantis, vp1, run, event)
79 msg.verbose(
"%s Trying to unlink %s", time.ctime(time.time()), atlantis)
80 os.unlink(
"%s/%s" % (directory, atlantis))
81 msg.verbose(
"%s Trying to unlink %s", time.ctime(time.time()), vp1)
82 os.unlink(
"%s/%s" % (directory, vp1))
83 msg.verbose(
"%s Done with unlink", time.ctime(time.time()))
84 except OSError
as err:
85 msg.warning(
"Could not remove files for run %s, event %s: %s", run, event, err)
88 msg.debug(
"Nothing to prune (%d <= %d).", numevents, maxevents)
89 msg.verbose(
'%s end prune events', time.ctime(time.time()))
93 msg = logging.getLogger(
'EventUtils' )
94 msg.verbose(
'%s begin write event list', time.ctime(time.time()))
97 file =
open(
"%s/%s.%d" % (directory, listname, pid),
'w')
98 for run, event, atlantis, vp1
in eventlist:
99 file.write(
"run:%s,event:%s,atlantis:%s,vp1:%s\n" % (run, event, atlantis, vp1))
101 except IOError
as err:
102 msg.warning(
"Could not write event list: %s", err)
106 os.rename(
"%s/%s.%d" % (directory, listname, pid),
"%s/%s.list" % (directory, listname))
107 except OSError
as err:
108 msg.warning(
"Could not rename %s.%d to %s.list: %s", listname, pid, listname, err)
109 msg.verbose(
'%s end write event list', time.ctime(time.time()))
113 msg = logging.getLogger(
'EventUtils' )
115 msg.verbose(
'%s begin clean directory', time.ctime(time.time()))
122 if len(eventlist)>0
and not isBeamSplashMode:
126 for filename
in os.listdir(directory):
127 if filename.startswith(
"vp1")
and "CEST.pool.root" in filename:
128 orgname = f
'{directory}/{filename}'
129 newname = orgname.replace(
'.pool.root',
'.online.pool.root')
131 shutil.copyfile(Path(orgname), Path(newname))
132 except OSError
as err:
133 msg.warning(
"Could not copy %s to %s: %s", orgname, newname, err)
134 msg.verbose(
'%s end clean directory', time.ctime(time.time()))
137 msg = logging.getLogger(
'EventUtils' )
138 """Preparing the list of files for CastorScript to transfer to EOS
140 CastorScript is configured to look for *.zip and *.online.pool.root files to transfer.
141 This function count the number of CastorScript bookkeeping files *.COPIED
142 and compare with the number of original data files *.zip and *.online.pool.root to obtain the transfer status.
143 If all transfer has been completed, get the most recent event from eventlist and add it to the transferlist.
144 Zip the corresponding xml file or rename the corresponding pool.root file to trigger the transfer.
146 eventlist: list of events that can be transferred
147 pair: True to transfer atlantis-vp1 pairs. False to transfer atlantis file only.
148 timeinterval: time interval between two events in the transfer list (in seconds).
150 msg.verbose(
'%s begin prepare files for transfer', time.ctime(time.time()))
151 transferlist =
getEventlist(directory, checkpair=pair, remove=
False, patternAtlantis=
'.zip', patternVP1=
'.online.pool.root')
152 if len(transferlist)>0
and eventlist[-1][0] == transferlist[-1][0]
and eventlist[-1][1] == transferlist[-1][1]:
153 msg.debug(
"Last event already in transfer list. No new event to transfer.")
157 matchAtlantis = glob.glob(f
"{directory}/*.zip")
158 copiedAtlantis = glob.glob(f
"{directory}/*.zip.COPIED")
159 matchVP1 = glob.glob(f
"{directory}/*.online.pool.root")
160 copiedVP1 = glob.glob(f
"{directory}/*.online.pool.root.COPIED")
162 if len(matchAtlantis)>len(copiedAtlantis)
or len(matchVP1)>len(copiedVP1):
163 msg.debug(
"There are files in transfer. Do not attemp to add new event to the transfer list.")
166 elif len(transferlist)>0:
168 age = time.time() - os.path.getmtime(f
'{directory}/{transferlist[-1][2]}')
169 if age < timeinterval:
170 msg.debug(
"Wait for %ds before adding new events to the transfer queue. Last event in the queue was added %ds ago.", timeinterval, age)
172 except OSError
as err:
173 msg.warning(
"Failed to check the timestamp of %s. %s", transferlist[-1][2], err)
177 run, event, atlantis, vp1 = eventlist[-1]
180 msg.debug(
'%s going to zip file %s', time.ctime(time.time()), atlantis)
185 msg.debug(
'%s going to rename ESD file %s', time.ctime(time.time()), vp1)
189 msg.verbose(
'%s end prepare files for transfer', time.ctime(time.time()))
192 msg = logging.getLogger(
'EventUtils' )
193 """Zip the JiveXML file for the specified event.
195 Looks for a JiveXML file with the required filename in the given directory,
196 and if one is found, zip it. The original file is not deleted.
197 Zip the file to .tmp first, and then rename to .zip
198 to avoid triggering the transfer before the zip file is closed.
200 msg.verbose(
'%s begin zipXMLFile', time.ctime(time.time()))
201 if Path(filename).suffix !=
'.xml':
202 msg.warning(
"Unexpected Atlantis file name: %s", filename)
204 matchingFiles = glob.glob(f
"{directory}/{filename}")
205 if len(matchingFiles) == 1:
206 msg.verbose(
'exactly one matching file found')
207 matchingFilePath = Path(matchingFiles[0])
208 tmpFilePath = matchingFilePath.with_suffix(
'.tmp')
209 zipFilePath = matchingFilePath.with_suffix(
'.zip')
210 matchingFilePath = Path(matchingFilePath)
211 matchingFileName = matchingFilePath.name
212 msg.verbose(
'Zipping %s to %s', matchingFileName, zipFilePath.name)
214 with ZipFile(tmpFilePath,
'w', compression=ZIP_DEFLATED)
as z:
215 z.write(matchingFilePath.as_posix(), arcname=matchingFileName)
216 os.rename(f
'{directory}/{tmpFilePath.name}', f
'{directory}/{zipFilePath.name}')
217 except OSError
as err:
218 msg.warning(
"Could not zip %s: %s", filename, err)
219 msg.verbose(
'%s end of zipXMLFile', time.ctime(time.time()))
222 msg = logging.getLogger(
'EventUtils' )
223 """Rename the ESD for the specified event.
225 Looks for an ESD file with the required filename in the given directory,
226 and if one is found, rename it to .online.pool.root. The original file is not deleted.
228 msg.verbose(
'%s begin renameESD', time.ctime(time.time()))
229 if Path(filename).suffixes != [
'.pool',
'.root']:
230 msg.warning(
"Unexpected VP1 file name: %s", filename)
232 orgname = f
'{directory}/{filename}'
233 newname = orgname.replace(
'.pool.root',
'.online.pool.root')
235 shutil.copyfile(Path(orgname), Path(newname))
236 except OSError
as err:
237 msg.warning(
"Could not copy %s to %s: %s", orgname, newname, err)
238 msg.verbose(
'%s end of renameESD', time.ctime(time.time()))