ATLAS Offline Software
BulkRun.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
2 
3 from __future__ import print_function
4 
5 from glob import glob
6 import subprocess
7 import os
8 import pickle
9 import sys
10 
11 class BulkRun :
12  """Run on multiple pedestal files
13 
14  Used to run on multiple pedestal bytestream files located somewhere on disk.
15  Puts results into seperate directories by run-number. It extracts pertinent
16  information from the file names, which are expected to be of the form:
17 
18  data10_calib.00157081.calibration_pedCSC.daq.RAW._lb0000._CSC-EB._0001.data
19 
20  run(numToRun) - process NumToRun (at most) runs out of those not yet run out of. This is the primary interface.
21  ReadProcessedFilesList() - find all files already run on from a local text file
22  AddProcessedFiles() - save newly run on files to disk
23  FindFiles() - Find pattern and run number of a set of files that have not yet been run on
24  RunAthena() - run athena job for a set of runs
25 
26  """
27  def __init__(
28  self,
29  inputPattern = "/raid02/schernau/ped/csc/csc/*.data" ,
30  processedFilesList = "ProcessedFiles.list",
31  outputDirBase = "/raid02/lampen/datasets/csc/PedProcessing2",
32  debug = False,
33  allowDirOverwrite = False,
34  ):
35  """initialize internal variables"""
36  self.InputPattern = inputPattern
37  self.ProcessedFilesList = processedFilesList
38  self.OutputDirBase = outputDirBase
39  self.debug = debug
40  self.AllowDirOverwrite = allowDirOverwrite
41  print (self.InputPattern)
42 
43  def run(self, numToRun = 10) :
44  """
45  Run over all run numbers, find files for each, and submit each set to
46  CscCalcPedMon.py
47  """
48 
49  print ("Running on " + str(numToRun) + " runsSet.")
50 
51 
52  runNumbers = []
53  for runCnt in range(numToRun) :
54  print (">>>>Running on runSet " + str(runCnt+1) + " of " + str(numToRun))
55  pattern,runNumber = self.FindFiles()
56  if(pattern != ""):
57  #have files to run on
58  runNumbers += [runNumber]
59  self.RunAthena(pattern,runNumber)
60  else:
61  print ("No more unprocessed files. Congrats!")
62  print ("N runs done: " + str(runCnt +1))
63  print (runNumbers)
64  return
65  print ("finished all " + str(numToRun) )
66  print ("Run numbers include:")
67  print (runNumbers)
68  print()
69  print ("All Processed files:" )
70  print (self.ReadProcessedFilesList())
71 
72  #Read list of previously processed files
74 
75  ProcessedFiles = []
76 
77  #Get processed files
78  f = open(self.ProcessedFilesList,"rb")
79  ProcessedFiles = pickle.load(f)
80  f.close()
81 
82  #Remove newline character from each filename
83 
84  #for index in range(len(ProcessedFiles)) :
85  # file = ProcessedFiles[index]
86  # ProcessedFiles[index] = file[0:len(file)-1]
87 
88  print ('Processed String: ')
89  print (ProcessedFiles)
90 
91  return ProcessedFiles
92 
93  def AddProcessedFiles(self,newFiles):
94  """Save new processed files to disk"""
95  ProcessedFiles = self.ReadProcessedFilesList()
96  ProcessedFiles += newFiles
97  f = open(self.ProcessedFilesList,"wb")
98  pickle.dump(ProcessedFiles,f)
99  f.close()
100 
101  return True
102 
103 
104  #Read list of previously processed files
105  def FindFiles(self):
106 
107  #Initial setup
108  FoundUnprocessedFile = False
109 
110  #Get processed file list
111  ProcessedFiles = self.ReadProcessedFilesList()
112 
113  #Get list of files in input dir
114  print ("Input pattern: " + self.InputPattern)
115  inputFiles = glob(self.InputPattern)
116 
117  if(self.debug):
118  print()
119  print ("Searching for file")
120  print ("InputPattern: " + self.InputPattern)
121  #print ("inputFiles: ")
122  #print (inputFiles)
123 
124  pattern = ""
125  runNumber = ""
126 
127  #Loop through list until find file that is
128  for file in inputFiles:
129  if not ProcessedFiles.count(file):
130 
131  index = file.find("data10")
132  if(index == -1):
133  index = file.find("data11")
134  if(index == -1):
135  print ("ERROR! Index of -1!")
136  raise Exception("Index error")
137  FoundUnprocessedFile = True
138  pattern = file[0:index + 22] + "*" #includes run number
139  runNumber = file[index + 13: index + 21]
140 
141  if(not FoundUnprocessedFile):
142  return "",""
143 
144  if(self.debug) :
145  print ("Found unprocessed file with pattern: " + pattern)
146  print ("This includes files:")
147  print (glob(pattern))
148 
149  return pattern, runNumber
150 
151  def RunAthena(self, pattern, runNumber):
152  """Run athena on a particular set of files matching pattern"""
153  outputDirPath = self.OutputDirBase + "/" + runNumber
154 
155  if(self.AllowDirOverwrite):
156  subprocess.call("rm -rf " + outputDirPath)
157 
158  print ("Making directory" + outputDirPath)
159  #Create output directory
160  os.mkdir(outputDirPath,0o755)
161 
162  #Change directory to output directory
163  #os.chdir(outputDirPath)
164 
165  #Athena options
166  athOpt = "outputPre='" + outputDirPath +"/" + runNumber
167  athOpt += "';inputPat='" + pattern
168  athOpt += "';reportPrefix='runNumber = "
169  athOpt += runNumber + "'"
170 
171  #athena arguments
172  athArgs = ["athena.py", "-c", athOpt, "CscCalcPedMon.py"]
173 
174  #Output log file
175  logFile = open(outputDirPath + "/run.log","w")
176 
177  print()
178  print ("**************************************")
179  print ("Starting running on run " + str(runNumber))
180  sys.stdout.flush()
181  subprocess.Popen(athArgs,stdout=logFile,stderr=subprocess.STDOUT).wait()
182  print ("Finished run " + str(runNumber))
183  print ("**************************************")
184  print()
185 
186  logFile.close()
187 
188 
189  #Add files we just ran on to file list
190  newProcessedFiles = glob(pattern)
191  self.AddProcessedFiles(newProcessedFiles)
BulkRun.BulkRun.__init__
def __init__(self, inputPattern="/raid02/schernau/ped/csc/csc/*.data", processedFilesList="ProcessedFiles.list", outputDirBase="/raid02/lampen/datasets/csc/PedProcessing2", debug=False, allowDirOverwrite=False)
Definition: BulkRun.py:27
BulkRun.BulkRun.AllowDirOverwrite
AllowDirOverwrite
Definition: BulkRun.py:33
BulkRun.BulkRun.InputPattern
InputPattern
Definition: BulkRun.py:29
BulkRun.BulkRun.OutputDirBase
OutputDirBase
Definition: BulkRun.py:31
BulkRun.BulkRun.FindFiles
def FindFiles(self)
Definition: BulkRun.py:105
BulkRun.BulkRun.AddProcessedFiles
def AddProcessedFiles(self, newFiles)
Definition: BulkRun.py:93
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
Trk::open
@ open
Definition: BinningType.h:40
BulkRun.BulkRun.ProcessedFilesList
ProcessedFilesList
Definition: BulkRun.py:30
if
if(febId1==febId2)
Definition: LArRodBlockPhysicsV0.cxx:567
str
Definition: BTagTrackIpAccessor.cxx:11
dbg::print
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
Definition: SGImplSvc.cxx:70
BulkRun.BulkRun
Definition: BulkRun.py:11
BulkRun.BulkRun.run
def run(self, numToRun=10)
Definition: BulkRun.py:43
BulkRun.BulkRun.RunAthena
def RunAthena(self, pattern, runNumber)
Definition: BulkRun.py:151
BulkRun.BulkRun.ReadProcessedFilesList
def ReadProcessedFilesList(self)
Definition: BulkRun.py:73
BulkRun.BulkRun.debug
debug
Definition: BulkRun.py:32