ATLAS Offline Software
BulkRun.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
2 
3 
4 from glob import glob
5 import subprocess
6 import os
7 import pickle
8 import sys
9 
10 class BulkRun :
11  """Run on multiple pedestal files
12 
13  Used to run on multiple pedestal bytestream files located somewhere on disk.
14  Puts results into seperate directories by run-number. It extracts pertinent
15  information from the file names, which are expected to be of the form:
16 
17  data10_calib.00157081.calibration_pedCSC.daq.RAW._lb0000._CSC-EB._0001.data
18 
19  run(numToRun) - process NumToRun (at most) runs out of those not yet run out of. This is the primary interface.
20  ReadProcessedFilesList() - find all files already run on from a local text file
21  AddProcessedFiles() - save newly run on files to disk
22  FindFiles() - Find pattern and run number of a set of files that have not yet been run on
23  RunAthena() - run athena job for a set of runs
24 
25  """
26  def __init__(
27  self,
28  inputPattern = "/raid02/schernau/ped/csc/csc/*.data" ,
29  processedFilesList = "ProcessedFiles.list",
30  outputDirBase = "/raid02/lampen/datasets/csc/PedProcessing2",
31  debug = False,
32  allowDirOverwrite = False,
33  ):
34  """initialize internal variables"""
35  self.InputPattern = inputPattern
36  self.ProcessedFilesList = processedFilesList
37  self.OutputDirBase = outputDirBase
38  self.debug = debug
39  self.AllowDirOverwrite = allowDirOverwrite
40  print (self.InputPattern)
41 
42  def run(self, numToRun = 10) :
43  """
44  Run over all run numbers, find files for each, and submit each set to
45  CscCalcPedMon.py
46  """
47 
48  print ("Running on " + str(numToRun) + " runsSet.")
49 
50 
51  runNumbers = []
52  for runCnt in range(numToRun) :
53  print (">>>>Running on runSet " + str(runCnt+1) + " of " + str(numToRun))
54  pattern,runNumber = self.FindFiles()
55  if(pattern != ""):
56  #have files to run on
57  runNumbers += [runNumber]
58  self.RunAthena(pattern,runNumber)
59  else:
60  print ("No more unprocessed files. Congrats!")
61  print ("N runs done: " + str(runCnt +1))
62  print (runNumbers)
63  return
64  print ("finished all " + str(numToRun) )
65  print ("Run numbers include:")
66  print (runNumbers)
67  print()
68  print ("All Processed files:" )
69  print (self.ReadProcessedFilesList())
70 
71  #Read list of previously processed files
73 
74  ProcessedFiles = []
75 
76  #Get processed files
77  f = open(self.ProcessedFilesList,"rb")
78  ProcessedFiles = pickle.load(f)
79  f.close()
80 
81  #Remove newline character from each filename
82 
83  #for index in range(len(ProcessedFiles)) :
84  # file = ProcessedFiles[index]
85  # ProcessedFiles[index] = file[0:len(file)-1]
86 
87  print ('Processed String: ')
88  print (ProcessedFiles)
89 
90  return ProcessedFiles
91 
92  def AddProcessedFiles(self,newFiles):
93  """Save new processed files to disk"""
94  ProcessedFiles = self.ReadProcessedFilesList()
95  ProcessedFiles += newFiles
96  f = open(self.ProcessedFilesList,"wb")
97  pickle.dump(ProcessedFiles,f)
98  f.close()
99 
100  return True
101 
102 
103  #Read list of previously processed files
104  def FindFiles(self):
105 
106  #Initial setup
107  FoundUnprocessedFile = False
108 
109  #Get processed file list
110  ProcessedFiles = self.ReadProcessedFilesList()
111 
112  #Get list of files in input dir
113  print ("Input pattern: " + self.InputPattern)
114  inputFiles = glob(self.InputPattern)
115 
116  if(self.debug):
117  print()
118  print ("Searching for file")
119  print ("InputPattern: " + self.InputPattern)
120  #print ("inputFiles: ")
121  #print (inputFiles)
122 
123  pattern = ""
124  runNumber = ""
125 
126  #Loop through list until find file that is
127  for file in inputFiles:
128  if not ProcessedFiles.count(file):
129 
130  index = file.find("data10")
131  if(index == -1):
132  index = file.find("data11")
133  if(index == -1):
134  print ("ERROR! Index of -1!")
135  raise Exception("Index error")
136  FoundUnprocessedFile = True
137  pattern = file[0:index + 22] + "*" #includes run number
138  runNumber = file[index + 13: index + 21]
139 
140  if(not FoundUnprocessedFile):
141  return "",""
142 
143  if(self.debug) :
144  print ("Found unprocessed file with pattern: " + pattern)
145  print ("This includes files:")
146  print (glob(pattern))
147 
148  return pattern, runNumber
149 
150  def RunAthena(self, pattern, runNumber):
151  """Run athena on a particular set of files matching pattern"""
152  outputDirPath = self.OutputDirBase + "/" + runNumber
153 
154  if(self.AllowDirOverwrite):
155  subprocess.call("rm -rf " + outputDirPath)
156 
157  print ("Making directory" + outputDirPath)
158  #Create output directory
159  os.mkdir(outputDirPath,0o755)
160 
161  #Change directory to output directory
162  #os.chdir(outputDirPath)
163 
164  #Athena options
165  athOpt = "outputPre='" + outputDirPath +"/" + runNumber
166  athOpt += "';inputPat='" + pattern
167  athOpt += "';reportPrefix='runNumber = "
168  athOpt += runNumber + "'"
169 
170  #athena arguments
171  athArgs = ["athena.py", "-c", athOpt, "CscCalcPedMon.py"]
172 
173  #Output log file
174  logFile = open(outputDirPath + "/run.log","w")
175 
176  print()
177  print ("**************************************")
178  print ("Starting running on run " + str(runNumber))
179  sys.stdout.flush()
180  subprocess.Popen(athArgs,stdout=logFile,stderr=subprocess.STDOUT).wait()
181  print ("Finished run " + str(runNumber))
182  print ("**************************************")
183  print()
184 
185  logFile.close()
186 
187 
188  #Add files we just ran on to file list
189  newProcessedFiles = glob(pattern)
190  self.AddProcessedFiles(newProcessedFiles)
BulkRun.BulkRun.__init__
def __init__(self, inputPattern="/raid02/schernau/ped/csc/csc/*.data", processedFilesList="ProcessedFiles.list", outputDirBase="/raid02/lampen/datasets/csc/PedProcessing2", debug=False, allowDirOverwrite=False)
Definition: BulkRun.py:26
BulkRun.BulkRun.AllowDirOverwrite
AllowDirOverwrite
Definition: BulkRun.py:32
BulkRun.BulkRun.InputPattern
InputPattern
Definition: BulkRun.py:28
BulkRun.BulkRun.OutputDirBase
OutputDirBase
Definition: BulkRun.py:30
BulkRun.BulkRun.FindFiles
def FindFiles(self)
Definition: BulkRun.py:104
BulkRun.BulkRun.AddProcessedFiles
def AddProcessedFiles(self, newFiles)
Definition: BulkRun.py:92
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:194
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:26
Trk::open
@ open
Definition: BinningType.h:40
BulkRun.BulkRun.ProcessedFilesList
ProcessedFilesList
Definition: BulkRun.py:29
if
if(febId1==febId2)
Definition: LArRodBlockPhysicsV0.cxx:567
str
Definition: BTagTrackIpAccessor.cxx:11
BulkRun.BulkRun
Definition: BulkRun.py:10
BulkRun.BulkRun.run
def run(self, numToRun=10)
Definition: BulkRun.py:42
BulkRun.BulkRun.RunAthena
def RunAthena(self, pattern, runNumber)
Definition: BulkRun.py:150
BulkRun.BulkRun.ReadProcessedFilesList
def ReadProcessedFilesList(self)
Definition: BulkRun.py:72
BulkRun.BulkRun.debug
debug
Definition: BulkRun.py:31