ATLAS Offline Software
BulkRunFollowup.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
2 
3 from __future__ import print_function
4 
5 from glob import glob
6 import subprocess
7 import os
8 import pickle
9 import sys
10 
12  """Run on multiple pedestal files
13 
14  #Used to run on multiple pedestal bytestream files located somewhere on disk.
15  #Puts results into seperate directories by run-number. It extracts pertinent
16  #information from the file names, which are expected to be of the form:
17 
18  Quickie folowup when BulkRun failed on a few. This will re-enter existing directories
19  of particular run numbers. It should be given a sepearte processedFiles.list
20 
21  data10_calib.00157081.calibration_pedCSC.daq.RAW._lb0000._CSC-EB._0001.data
22 
23  run() - run over all runs
24  ReadProcessedFilesList() - find all files already run on from a local text file
25  AddProcessedFiles() - save newly run on files to disk
26  FindFiles() - Find pattern and run number of a set of files that have not yet been run on
27  RunAthena() - run athena job for a set of runs
28 
29  """
30  def __init__(
31  self,
32  allowedRuns,
33  inputPattern = "/raid02/schernau/ped/csc/*.data",
34  processedFilesList = "ProcessedFiles_Followup.list",
35  outputDirBase = "/raid02/lampen/datasets/csc/PedProcessing",
36  debug = False,
37  allowDirOverwrite = False,
38  ):
39  """initialize internal variables"""
40  self.AllowedRuns = allowedRuns
41  self.InputPattern = inputPattern
42  self.ProcessedFilesList = processedFilesList
43  self.OutputDirBase = outputDirBase
44  self.debug = debug
45  self.AllowDirOverwrite = allowDirOverwrite
46 
47  def run(self, numToRun = 10) :
48  """
49  Run over all run numbers, find files for each, and submit each set to
50  CscCalcPedMon.py
51  """
52 
53  print ("Running on " + str(numToRun) + " runsSet.")
54 
55 
56  runNumbers = []
57  for runCnt in range(numToRun) :
58  print (">>>>Running on runSet " + str(runCnt+1) + " of " + str(numToRun))
59  pattern,runNumber = self.FindFiles()
60  if(pattern != ""):
61  #have files to run on
62  if runNumber in self.AllowedRuns:
63  runNumbers += [runNumber]
64  self.RunAthena(pattern,runNumber)
65  else:
66  if(self.debug):
67  print ("Skipping file from runNumber " + str(runNumber) + " (Not allowed)")
68  #Add files we're skipping into file list
69  newProcessedFiles = glob(pattern)
70  self.AddProcessedFiles(newProcessedFiles)
71 
72  else:
73  print ("No more unprocessed files. Congrats!")
74  print ("N runs done: " + str(runCnt +1))
75  print (runNumbers)
76  return
77  print ("finished all " + str(numToRun) )
78  print ("Run numbers include:")
79  print (runNumbers)
80  print()
81  print ("All Processed files:" )
82  print (self.ReadProcessedFilesList())
83 
84  #Read list of previously processed files
86 
87  ProcessedFiles = []
88 
89  #Get processed files
90  f = open(self.ProcessedFilesList,"rb")
91  ProcessedFiles = pickle.load(f)
92  f.close()
93 
94  print ('Processed String: ')
95  print (ProcessedFiles)
96 
97  return ProcessedFiles
98 
99  def AddProcessedFiles(self,newFiles):
100  """Save new processed files to disk"""
101  ProcessedFiles = self.ReadProcessedFilesList()
102  print ("Got old processed files")
103  print (ProcessedFiles)
104  ProcessedFiles += newFiles
105  print ("New version:")
106  print (ProcessedFiles)
107  f = open(self.ProcessedFilesList,"wb")
108  pickle.dump(ProcessedFiles,f)
109  f.close()
110 
111  return True
112 
113 
114  #Read list of previously processed files
115  def FindFiles(self):
116 
117  #Initial setup
118  FoundUnprocessedFile = False
119 
120  #Get processed file list
121  ProcessedFiles = self.ReadProcessedFilesList()
122 
123  #Get list of files in input dir
124  inputFiles = glob(self.InputPattern)
125 
126  if(self.debug):
127  print()
128  print ("Searching for file")
129  print ("InputPattern: " + self.InputPattern)
130 
131  pattern = ""
132  runNumber = ""
133 
134  #Loop through list until find file that is
135  for file in inputFiles:
136  if not ProcessedFiles.count(file):
137 
138  index = file.find("data10")
139  if(index ==-1):
140  index = file.find("data11")
141  if(index == -1):
142  print ("ERROR! Index of -1 searching for data1X prefix in filename:")
143  print (file)
144  raise Exception("Index error")
145  FoundUnprocessedFile = True
146  pattern = file[0:index + 22] + "*" #includes run number
147  runNumber = file[index + 13: index + 21]
148 
149  if(not FoundUnprocessedFile):
150  if(self.debug):
151  print ("Did not find unprocessed file")
152  return "",""
153 
154  if(self.debug) :
155  print ("Found unprocessed file with pattern: " + pattern)
156  print ("This includes files:")
157  print (glob(pattern))
158 
159  return pattern, runNumber
160 
161  def RunAthena(self, pattern, runNumber):
162  """Run athena on a particular set of files matching pattern"""
163  outputDirPath = self.OutputDirBase + "/" + runNumber
164 
165  if(self.AllowDirOverwrite):
166  subprocess.call("rm -rf " + outputDirPath)
167 
168  print ("Making directory" + outputDirPath)
169  #Create output directory
170  os.mkdir(outputDirPath,0o755)
171 
172  #Athena options
173  athOpt = "outputPre='" + outputDirPath +"/" + runNumber
174  athOpt += "';inputPat='" + pattern
175  athOpt += "';reportPrefix='runNumber = "
176  athOpt += runNumber + "'"
177 
178  #athena arguments
179  athArgs = ["athena.py", "-c", athOpt, "CscCalcPedMon.py"]
180 
181  #Output log file
182  logFile = open(outputDirPath + "/run2.log","w")
183 
184  print()
185  print ("**************************************")
186  print ("Starting running on run " + str(runNumber))
187  sys.stdout.flush()
188  subprocess.Popen(athArgs,stdout=logFile,stderr=subprocess.STDOUT).wait()
189  print ("Finished run " + str(runNumber))
190  print ("**************************************")
191  print()
192 
193  logFile.close()
194 
195  #Add files we just ran on to file list
196  newProcessedFiles = glob(pattern)
197  self.AddProcessedFiles(newProcessedFiles)
BulkRunFollowup.BulkRunFollowup.RunAthena
def RunAthena(self, pattern, runNumber)
Definition: BulkRunFollowup.py:161
BulkRunFollowup.BulkRunFollowup
Definition: BulkRunFollowup.py:11
BulkRunFollowup.BulkRunFollowup.OutputDirBase
OutputDirBase
Definition: BulkRunFollowup.py:35
BulkRunFollowup.BulkRunFollowup.ProcessedFilesList
ProcessedFilesList
Definition: BulkRunFollowup.py:34
BulkRunFollowup.BulkRunFollowup.debug
debug
Definition: BulkRunFollowup.py:36
BulkRunFollowup.BulkRunFollowup.AddProcessedFiles
def AddProcessedFiles(self, newFiles)
Definition: BulkRunFollowup.py:99
BulkRunFollowup.BulkRunFollowup.InputPattern
InputPattern
Definition: BulkRunFollowup.py:33
BulkRunFollowup.BulkRunFollowup.run
def run(self, numToRun=10)
Definition: BulkRunFollowup.py:47
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
BulkRunFollowup.BulkRunFollowup.AllowDirOverwrite
AllowDirOverwrite
Definition: BulkRunFollowup.py:37
BulkRunFollowup.BulkRunFollowup.AllowedRuns
AllowedRuns
Definition: BulkRunFollowup.py:32
Trk::open
@ open
Definition: BinningType.h:40
BulkRunFollowup.BulkRunFollowup.ReadProcessedFilesList
def ReadProcessedFilesList(self)
Definition: BulkRunFollowup.py:85
if
if(febId1==febId2)
Definition: LArRodBlockPhysicsV0.cxx:567
BulkRunFollowup.BulkRunFollowup.__init__
def __init__(self, allowedRuns, inputPattern="/raid02/schernau/ped/csc/*.data", processedFilesList="ProcessedFiles_Followup.list", outputDirBase="/raid02/lampen/datasets/csc/PedProcessing", debug=False, allowDirOverwrite=False)
Definition: BulkRunFollowup.py:30
str
Definition: BTagTrackIpAccessor.cxx:11
dbg::print
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
Definition: SGImplSvc.cxx:70
BulkRunFollowup.BulkRunFollowup.FindFiles
def FindFiles(self)
Definition: BulkRunFollowup.py:115