ATLAS Offline Software
CscCalibQuery.py
Go to the documentation of this file.
1 #/usr/bin/env python
2 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
3 
4 from __future__ import print_function
5 
6 import os
7 import sys
8 import re
9 import glob
10 import subprocess
11 import pickle
12 
13 
14 #This script looks for new CSC calibration files on castor and runs on them. It:
15 #1: Checks to see if a job is currently running by looking for a calibrationRunning### file in the script directory. Quits if anything is running.
16 #2: If no job is currently running, sees if there are any new calibration directories that we haven't yet processed. Castor is checked and its contents are checked against a Castor Contents local text file. This file isn't nesscary (there is a redundency later on to prevent duplicate calib jobs), but speeds things up.
17 #3: If there are new directories, we check to see if any have at least numFilesToRun files. If they don't, we don't update Castor Contents, but we don't run on them either (its assumed they are being staged and we'll pick them up next time CscCalibQuery is run)
18 #4: If we find a valid calibration directory with enough files, we check to see if a directory with calibration output for the new calibration run exists. If it does, we don't run on it again.
19 #5: If it is indeed a new calibration directory, a bash script is created and submitted to a batch queue. The bash script:
20 # a) Runs athena calibration algorithm
21 # b) Produces web page with calibration monitoring plots
22 # c) Emails mailing list about state of calibration (any problems)
23 # d) Generates mysql database file for merging into COOL
24 # e) (Disabled) automatically adds to database
25 #...
26 
27 #minimum number of calibration file in castor to start calibration
28 numFilesToRun=3
29 responsiblePerson = "youzhou@email.arizona.edu"
30 maillist = responsiblePerson
31 
32 CoolMergeByDefault = False
33 
34 #Utility functions################################
35 
36 
37 def updateList(oldListPath,newList):
38  print ('updating file list')
39  #update the old file list
40  outFile = open(oldListPath, 'w')
41  outFile.write(newList)
42  outFile.close()
43 
44 #runs calibration on input file.
45 #It creates a bash script which can setup the job, run it, and do post job
46 #processing. The bash script is submitted to lxbatch
47 def runCalib(calType, runNumber,workDir,castorCopyCmd):
48  #print ('running calib')
49  #initialize based on calibration type
50  scriptDir = '${HOME}/CSC/run/'
51  if calType == 'pulser':
52  calibScript = scriptDir + 'CscCalcSlopeMon.py'
53  dbScript = scriptDir + 'cscWritePSlopeCool.py'
54  webDir = '${HOME}/www/csc/pulser'
55  runListFile = "pulserRunList.pickle"
56  #extractScript = scriptDir +'CscExtractPulser.py'
57  elif( calType == 'ped'):
58  calibScript = scriptDir + 'CscCalcPedMon.py'
59  dbScript = scriptDir + 'cscWritePedRefCool.py'
60  onlDbFile = scriptDir + 'online.cal'
61  webDir = '${HOME}/www/csc/ped'
62  webPageUrl = 'https://atlas-csc-calib.web.cern.ch/atlas-csc-calib/ped/pedRun_' +runNumber
63  runListFile = "pedRunList.pickle"
64  #extractScript = scriptDir + 'CscExtractPed.py'
65 
66  outputDir = workDir +"/CalibResults"
67  bsDir = workDir +"/Bytestream"
68 
69  print ('outputDir = ' + outputDir)
70 
71 
72  #Setup finished email message
73  emailMessage = 'Finished ' + calType + ' calibration on run number ' + runNumber
74  emailMessage += '. Output in ' + outputDir + '.'
75  emailMessage += '\\nAnd website at:\\n'
76  emailMessage += webPageUrl
77 
78  #Setup finished email subjects
79  goodEmailSubject = '[CSC CALIB PROC]: SUCCESS with ' + calType + 'calib run' + runNumber
80  badEmailSubject = '[CSC CALIB PROC]: PROBLEMS with ' + calType + 'calib run' + runNumber
81 
82  #Prepare bash script for batch system
83  bashFilePath = workDir+'/CscCalib_' + calType + '_' + runNumber + '.sh'
84 
85  bsubCmd ='cd ' + outputDir + ';bsub -q 2nd -R "type==SLC5_64&&mem>420" ' + bashFilePath
86 
87  bashFileContents = "#!/bin/bash\n"
88  bashFileContents += "#To resubmit this job, submit it to the atlasmuonqueu like so:\n#"
89  bashFileContents += bsubCmd +"\n"
90  bashFileContents += "source ~/CSC/CscSetup.sh\n"
91  bashFileContents += "\n"
92  bashFileContents += "resultDir=\"" + outputDir + "\"\n"
93  bashFileContents += 'bytestreamDir="' + bsDir + '"\n'
94  bashFileContents += 'maillist="' + maillist + '"\n'
95  bashFileContents += 'webSiteDir="' + webDir + '"\n'
96 
97  calFilePrefix= "${resultDir}/" + runNumber
98  inputPattern = "${bytestreamDir}/*.data"
99 
100  calibCommand = 'echo "Running calibration"\n' \
101  + 'mkdir ${resultDir}\n' \
102  + 'athena.py -c "outputPre=\'' + calFilePrefix \
103  + '\';inputOnlCalibFile=\'' +onlDbFile \
104  + '\';inputPat=\'' + inputPattern \
105  + '\';reportPrefix=\'' + emailMessage \
106  + '\';" ' \
107  + calibScript
108 
109 
110  goodEmailCommand = ' mail -s "' + goodEmailSubject + '" $maillist < ' + calFilePrefix + "CalibReport.txt"
111  badEmailCommand = ' mail -s "' + badEmailSubject + '" $maillist < ' + calFilePrefix + "CalibReport.txt"
112 
113  #For reference tag, we actually want the IOV to start just after the LAST run number
114  #Get old run numbers
115  infile = open(runListFile,"rb")
116  runList = pickle.load(infile)
117  runList.sort()
118  print ("got runs")
119  print (runList)
120  infile.close()
121 
122 
123  if(runNumber in runList):
124  print ("Mailing message")
125  message =["mail","-s",\
126  '"New castor run directory found for previously processed run ' + str(runNumber) + '"',\
127  responsiblePerson,\
128  "<",\
129  "runAlreadyProcessed.mail"]
130  print (message)
131  subprocess.call(message)
132  sys.exit()
133 
134 
135  highestRun = runList[-1]
136 
137  isRunNumberConflict = False
138 
139 
140  if(highestRun > runNumber):
141  #Something odd happening. The Input run number is lower than the last run number
142  #this script (thinks it) processed
143  #Notify someone important, and don't add to cool when done...
144  subprocess.call(["mail","-s",\
145  "[CSC CALIB PROC] Wrong run number ordering on run " + str(runNumber) \
146  + "! Human intervension required!",\
147  responsiblePerson,\
148  "<",\
149  "runNumberConflict.mail"]\
150  )
151  isRunNumberConflict = True
152  else:
153  #No problem, update run list
154  runList += [runNumber]
155  outfile = open(runListFile,"wb")
156  pickle.dump(runList,outfile)
157  outfile.close()
158 
159 
160  #Label that we're working, so that other instances of CscCalibQuery won't run
161  subprocess.call(['touch','/afs/cern.ch/user/m/muoncali/CSC/run/runningCalibration' + runNumber])
162 
163  #Command to create .db file
164  DbCommand = 'athena.py -c "input=\'' + calFilePrefix+'.cal\';output=\'' \
165  + calFilePrefix + '.db\';IOVRunStart=int(\'' + highestRun + '\')" ' + dbScript
166 
167  UploadCommand = ''
168  #Noexec prevents actual execution of cool
169  #UploadCommand = '/afs/cern.ch/user/a/atlcond/utils/AtlCoolMerge.py ' \
170  # + "--batch --noexec --comment=\'Automated UPD1 update from " + calType \
171  # + ' run ' + runNumber +'\' ' \
172  # + calFilePrefix + '.db' \
173  # + ' COMP200 ATONR_COOL ATLAS_COOLONL_CSC_W PASSWORD '
174  #UploadCommand += "\n"
175 
176 
177 
178 
179 
180  #Command to upload .db file to database
181  UploadCommand += '/afs/cern.ch/user/a/atlcond/utils/AtlCoolMerge.py ' \
182  + "--batch "
183  if(not CoolMergeByDefault or isRunNumberConflict):
184  UploadCommand += " --noexec "
185 
186  UploadCommand += "--comment=\'Automated reference update from " + calType \
187  + ' run ' + runNumber + ' to IOV starting at ' + highestRun + "' " \
188  + calFilePrefix + '.db' \
189  + ' COMP200 ATLAS_COOLWRITE ATLAS_COOLOFL_CSC_W WCOOLOFL4CSC17 '
190 
191  WebSiteCommand = '\nfs sa ${resultDir} webserver:afs read\n'
192  WebSiteCommand += 'cd $resultDir\n'
193  WebSiteCommand += 'ln -s ${resultDir} ${webSiteDir}/pedRun_' + runNumber + '\n'
194  WebSiteCommand += 'MakeCscPedSite.exe ' + runNumber + '.root ' + runNumber + '.cal_online\n'
195 
196  t1 = '\t'
197 
198  #Run calibration. If no problems detected, go ahead and upload.
199  bashFileContents += '\ncd ' + workDir + '\n' \
200  + "#Copying files from castor#\n" \
201  + castorCopyCmd \
202  + '\n'\
203  + "#Running calibration (and calib monitoring)\n"\
204  + "#, resulting in .cal file and status report\n"\
205  + calibCommand +'\n' \
206  + "\n"\
207  + "#Athena job to transform *.cal file to *.db file.\n"\
208  + DbCommand + '\n' \
209  + "#Python utility to upload *.db file to database. When entering by\n"\
210  + "#hand, I recomend removing '--batch' flag so you can check puposed\n"\
211  + "#operations before submision to database\n"\
212  + UploadCommand + '\n' \
213  + '#Check if AllCalibMonGood file was created, which means that\n'\
214  + '#this run passed acceptable criteria in the calibration monitoring\n'\
215  + 'if [ -a AllCalibMonGood ]; then\n' \
216  + t1 + "#Email list that the calibration looks good\n"\
217  + t1 + goodEmailCommand +'\n' \
218  + t1 + "################################################################\n"\
219  + t1 + "#Execute next two commands if you want to submit database entry#\n"\
220  + t1 + "#Useful if these steps were skipped due to suspicious behaviour#\n"\
221  + t1 + "#during calibration. #\n"\
222  + t1 + "###############################################################\n"\
223  + t1 + "\n"\
224  + t1 + '\n'\
225  + 'else\n' \
226  + t1 + "#Suspicious behaviour in calibration. Notify mail list of this fact\n"\
227  + t1 + badEmailCommand + '\n' \
228  + 'fi\n'\
229  + '\n'\
230  + '#Always create website'\
231  + WebSiteCommand + '\n' \
232  + 'rm -rf $bytestreamDir\n' \
233  + 'rm -rf ' + scriptDir +"runningCalibration" + runNumber +'\n'
234 
235  #Write bashfile
236  print ("Printing bash file to: " +bashFilePath)
237  bashFile = open(bashFilePath, 'w')
238  bashFile.write(bashFileContents)
239  bashFile.close()
240 
241  #Submit script
242  os.system('chmod +x ' + bashFilePath)
243  bsubsMessage = os.popen(bsubCmd).read()
244 
245  #Send alert email
246  emailMessage = 'Starting ' + calType + ' calibration on run number ' + runNumber
247  emailSubject = '[CSC CALIB PROC]: ' + emailMessage
248  emailMessage += '\nbsubs output:\n' + bsubsMessage
249  os.system('echo "' + emailMessage + '" | mail -s "' + emailSubject + '" ' + maillist)
250 
251 
254 
255 print ('running' )
256 #first command line argument should be the calibration type, pulser or ped.
257 calType = sys.argv[1]
258 if calType == 'pulser':
259  calibFileDir = '/castor/cern.ch/user/l/lampen/CalibRunTest/slope/'
260  oldListFilePath = '/afs/cern.ch/user/m/muoncali/CSC/run/pulserList.txt'
261  outputDir = '/afs/cern.ch/user/m/muoncali/w0/CSC/runs/pulser/pulser'
262 elif calType == 'ped':
263  calibFileDir = '/castor/cern.ch/grid/atlas/DAQ/muon/csc/'
264  oldListFilePath = '/afs/cern.ch/user/m/muoncali/CSC/run/pedDirList.txt'
265  outputDir = '/afs/cern.ch/user/m/muoncali/w0/CSC/runs/ped/ped'
266  calibRe = re.compile('data1.*_calib.*calibration_pedCSC\\.daq\\.RAW.*\\.data')
267 else:
268  print ('Need to specify pulser or ped')
269  os._exit(0)
270 
271 
272 #First, see if a calibration is already running
273 #If already running, stop
274 testFile = glob.glob("/afs/cern.ch/user/m/muoncali/CSC/run/runningCalibration*")
275 if(testFile):
276  print ('already running: ' + str(testFile))
277  sys.exit()
278 
279 
280 #Get the current list of directories in castor
281 print ('rfdir ' + calibFileDir)
282 currentLs = os.popen('rfdir ' + calibFileDir).read()
283 
284 os.popen('touch testing')
285 
286 #Get the old list of files from castor
287 inFile = open(oldListFilePath, 'r')
288 oldLs = inFile.read()
289 
290 inFile.close()
291 
292 print ('checking for changes')
293 
294 import datetime
295 now = datetime.datetime.now()
296 today = now.day
297 
298 #Check if there are any changes between the lists
299 if(oldLs != currentLs):
300  print ('There has been a change')
301  currentDirList = currentLs.split('\n')
302  oldDirList = oldLs.split('\n')
303 
304  updateRunList = True
305  runningDir = ""
306  #Run on any new directories
307  for Dir in currentDirList:
308  #print ('Checking ' + Dir)
309  if Dir not in oldDirList:
310  print ("**********************************************************")
311  splitDir = Dir.split()
312  day = int(splitDir[-3])
313  DirName =splitDir[-1] #last ' ' delimited word in line is Dirname
314 
315  print (splitDir)
316  timediff = today - day
317  if(timediff > 7 or timediff < -23):
318  print (timediff)
319 
320  print ("Found old dir " + DirName + ", but its over at least a week old, so we're skipping it")
321 
322  continue
323 
324 
325 
326  print ("day is " + str(day))
327 
328  #print ('Dirname is ' + DirName)
329  cmd = 'rfdir ' + calibFileDir + DirName
330  fileList = os.popen(cmd).read().split('\n')
331 
332  nFiles = len(fileList) -1
333  print ("found " + str(nFiles) + " files")
334 
335  runNumber = DirName
336 
337  #prepare output directory
338  outputDirFull = outputDir + 'Run_' + runNumber
339  print ('outputDirFull is ' + outputDirFull)
340 
341  #Loop through files in directory.
342  #Start building castor copy cmd
343  #If any files don't match the calibration file
344  #requirement, mark this directory as something not interested in.
345  ThisCalibDir = False
346  castorCopyCmd = "mkdir ${bytestreamDir}"
347  for file in fileList:
348  fileName = (file.split(' '))[-1] #last ' ' delimited word in line is fileName
349  if(fileName != ""):
350  print ("fileName: " +fileName)
351  ThisCalibDir = re.match(calibRe,fileName)
352  if(nFiles < numFilesToRun):
353  print ("only " +str(nFiles) + " files. Breaking.")
354  if(ThisCalibDir):
355  print ("There is a calib dir, but it only has " + str(nFiles)
356  + " file. Will not process.")
357  updateRunList = False
358  break
359  if(ThisCalibDir):
360  fullPath = calibFileDir + DirName + '/' + fileName
361  castorCopyCmd += "\nxrdcp root://castoratlas/" + fullPath + " ${bytestreamDir}"
362  #print ('found ' + fullPath)
363  else:
364  break
365 
366  print ("found " + str(nFiles) + " files")
367 
368  #only run if have enough files
369  if(nFiles >= numFilesToRun):
370  #Comment next 4 lines out if you want to run on multiple runs at once
371  if(runningDir):
372  updateRunList = False
373  print ("Found new calibration directory to run on (" +DirName + "), but already running on " + runningDir)
374  continue
375  print ('running on ' + DirName)
376 
377  if(nFiles > numFilesToRun):
378  print ('WARNING! Found ' + str(nFiles) + ' calib files, when only ' + str(numFilesToRun) + ' were expected!')
379 
380 
381  if( os.path.exists(outputDirFull)):
382  print ("There is already a directory for run number " + runNumber)
383  print ("Will not run on this pedestal run")
384  continue
385 
386 
387  if(ThisCalibDir):
388  print (castorCopyCmd)
389  #updateList(oldListFilePath,currentLs)
390  #os.makedirs(outputDirFull+"/Bytestream")
391  os.makedirs(outputDirFull+"/CalibResults")
392 
393  runCalib(calType, runNumber, outputDirFull, castorCopyCmd)
394 
395  print('Launched job, but will continue to check if we have more runs to run on later')
396  runningDir = DirName
397  updateRunList = False
398  continue
399 
400 
401  if( updateRunList):
402  #We update the run list so long as there are no runs on it that we expect to run on in the future.
403  #The safest thing to do is to wait until all runs are processed.
404  print ("No unprocessed or currently being processed calibration runs. Will update run list.")
405  updateList(oldListFilePath,currentLs)
406  else:
407  print ("NOT updating run list")
408 
409 else:
410  print ('No changes between old and new runs')
411  pass
read
IovVectorMap_t read(const Folder &theFolder, const SelectionCriterion &choice, const unsigned int limit=10)
Definition: openCoraCool.cxx:569
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
CscCalibQuery.runCalib
def runCalib(calType, runNumber, workDir, castorCopyCmd)
Definition: CscCalibQuery.py:47
Trk::open
@ open
Definition: BinningType.h:40
if
if(febId1==febId2)
Definition: LArRodBlockPhysicsV0.cxx:567
str
Definition: BTagTrackIpAccessor.cxx:11
dbg::print
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
Definition: SGImplSvc.cxx:70
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
CscCalibQuery.updateList
def updateList(oldListPath, newList)
Definition: CscCalibQuery.py:37