ATLAS Offline Software
Loading...
Searching...
No Matches
DetStatusLib.py
Go to the documentation of this file.
1# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
2
3# DetStatusLib.py
4# python functions/classes to help management of detector status information
5# Richard Hawkings, 5/2/07
6
7
8def folderName(runLumi=True):
9 if (runLumi):
10 return '/GLOBAL/DETSTATUS/LBSUMM'
11 else:
12 return '/GLOBAL/DETSTATUS/TISUMM'
13
14def colour(code):
15 if (code==-1):
16 return "BLACK"
17 rcode=code % 4
18 if (rcode==1):
19 return "RED"
20 elif (rcode==2):
21 return "YELLOW"
22 elif (rcode==3):
23 return "GREEN"
24 return "UNKNOWN"
25
26def colourVal(sval):
27 "Translate traffic light string (numerical literal or letter) to number"
28 status=0
29 try:
30 uplight=sval.upper()
31 if uplight in ["R","RED"]:
32 status=1
33 elif uplight in ["Y","YELLOW"]:
34 status=2
35 elif uplight in ["G","GREEN"]:
36 status=3
37 elif uplight in ["U","UNKNOWN"]:
38 status=0
39 elif uplight in ["B","BLACK"]:
40 status=-1
41 else:
42 status=int(sval)
43 except Exception:
44 status=None
45 return status
46
47
49 def __init__(self):
50 # current list of status flags
51 # has to match definition in DetStatusSvc.cxx
52
53 self.namedict={'PIXB':101,'PIX0':102,'PIXEA':104,'PIXEC':105,
54 'SCTB':111,'SCTEA':114,'SCTEC':115,
55 'TRTB':121,'TRTEA':124,'TRTEC':125,'TRTTR':126,
56 'IDGL':130,
57 'IDAL':140,
58 'IDBS':150,
59 'IDPF':160,'IDVX':161,
60 'IDBCM':170,
61 'EMBA':202,'EMBC':203,'EMECA':204,'EMECC':205,
62 'HECA':214,'HECC':215,'FCALA':224,'FCALC':225,
63 'TIGB':230,
64 'TILBA':232,'TILBC':233,'TIEBA':234,'TIEBC':235,
65 'MBTSA':244,'MBTSC':245,
66 'CALBA':251,'CALEA':254,'CALEC':255,
67 'MDTBA':302,'MDTBC':303,'MDTEA':304,'MDTEC':305,
68 'RPCBA':312,'RPCBC':313,
69 'TGCEA':324,'TGCEC':325,
70 'CSCEA':334,'CSCEC':335,
71 'LCD':350,'LCDA':353,'LCDC':354,
72 'ALFA':360,'ZDC':370,
73 'L1CAL':401,'L1MUB':402,'L1MUE':403,'L1CTP':404,
74 'TRCAL':411,
75 'TRDF':420,
76 'TRBJT':421,'TRBPH':422,'TRCOS':423,'TRELE':424,
77 'TRGAM':425,'TRJET':426,'TRMET':427,'TRMBI':428,
78 'TRMUO':429,'TRTAU':430,'TRIDT':431,
79 'LUMI':450,'LUMIONL':451,
80 'RUNCLT':460,
81 'RCOPS':461,
82 'ATLGL':480,'ATLSOL':481,'ATLTOR':482,
83 'EIDB':501,'EIDCR':502,'EIDE':503,
84 'PIDB':505,'PIDCR':506,'PIDE':507,
85 'EIDF':508,'EIDSOFT':509,
86 'MSTACO':510,'MMUIDCB':511,'MMUIDVX':512,
87 'MMUGIRL':513,'MMUBOY':514,'MMUIDSA':515,
88 'MMUTAG':516,'MMTIMO':517,'MCMUTAG':518,
89 'MCALLHR':519,
90 'JETB':521,'JETEA':524,'JETEC':525,
91 'JETFA':526,'JETFC':527,
92 'MET':530,'METCALO':531,'METMUON':532,
93 'BTGLIFE':541,'BTGSOFTE':544,'BTGSOFTM':545,
94 'TAUB':551,'TAUCR':552,'TAUE':553}
95
96 # initialise reverse map from channel numbers to names
97 self.numdict={}
98 for (name,num) in self.namedict.items():
99 self.numdict[num]=name
100
101 def name(self,num):
102 return self.numdict[num]
103
104 def num(self,name):
105 "Return the numeric channel identifier which exactly matches name"
106 return self.namedict[name]
107
108 def nums(self,name):
109 "Return a list of all numeric channel identifiers which match name"
110 result=[]
111 for (iname,inum) in self.namedict.items():
112 if (name==iname[:len(name)]):
113 result+=[inum]
114 return result
115
116 def allNames(self):
117 a=self.namedict.keys()
118 a.sort()
119 return a
120
121 def allNums(self):
122 a=self.namedict.values()
123 a.sort()
124 return a
125
127 "Class to hold a list of detector status requirements"
128
129 def __init__(self):
130 "Initialise to empty set of requirements"
131 # requirements are stored as dict[status_channelID]=minimum value
132 self.req={}
133 # access to name/number translation
135
136 def setFromString(self,reqstring):
137 """Set requirements from a space-separated string with flag req pairs
138 e.g. 'SCTB 3 EMEC G' (numbers or letters for status)"""
139 self.req={}
140 namelist=self.names.allNames()
141 tokens=str(reqstring).split()
142 ix=0
143 while (ix+1<len(tokens)):
144 flagname=tokens[ix]
145 val=colourVal(tokens[ix+1])
146 if (val is not None):
147 used=False
148 # check if the name matches any flags
149 for name in namelist:
150 if (flagname==name[0:len(flagname)]):
151 self.req[self.names.num(name)]=val
152 used=True
153 if (not used):
154 print ("Name %s does not match any status flag" % flagname)
155 else:
156 print ("Value %s does not define a status" % tokens[ix+1])
157 ix+=2
158
159 def getDict(self):
160 "Return the dictionary holding the requirements (channel/value pairs)"
161 return self.req
162
163 def __str__(self):
164 "Print representation of status requirements"
165 result=""
166 for (key,val) in self.req.items():
167 result+="%s %i " % (self.names.name(key),val)
168 return result
169
171 "Transient representation of one detector status"
172 def __init__(self,start,stop,code,deadfrac,thrust,nconfig=-1,nworking=-1,comment=''):
173 self.start=start
174 self.stop=stop
175 self.code=code
176 self.deadfrac=deadfrac
177 self.thrust=thrust
178 self.nconfig=nconfig
179 self.nworking=nworking
180 self.comment=comment
181
182 def updateStart(self,newStart):
183 self.start=newStart
184
185 def updateStop(self,newStop):
186 self.stop=newStop
187
189 "Transient representation of detector status list (code,deadfrac,thrust)"
190 def __init__(self):
191 "Initialise to empty sequence"
192 # sequence stores the detector status in time order
193 self._seq=[]
194
195 def merge(self,mobj,override=False):
196 "Merge the given StatusObj into the list, ANDing (default) or override"
197 if (mobj.start>=mobj.stop):
198 return
199 # loop over the list, looking for places affected by the new obj
200 ix=0
201 oldstop=0
202 while (ix<len(self._seq)):
203 # first check for a gap between current and previous object
204 # where a new object has to be inserted
205 if (mobj.start<self._seq[ix].start and mobj.stop>oldstop):
206 # calculate extent of object to insert
207 nstart=max(oldstop,mobj.start)
208 nstop=min(self._seq[ix].start,mobj.stop)
209 # only insert if gap is of non-zero length
210 if (nstop>nstart):
211 # insert the new object at this position
212 self._seq.insert(ix,StatusObj(nstart,nstop,mobj.code,mobj.deadfrac,mobj.thrust,mobj.nconfig,mobj.nworking,mobj.comment))
213 ix+=1
214 # now check if object intersects current one
215 if (mobj.start<self._seq[ix].stop and mobj.stop>self._seq[ix].start):
216 # use mobj if override set or
217 # (new status is <= existing and new is not undefined) or
218 # old is undefined
219 if (override or
220 (mobj.code<=self._seq[ix].code and mobj.code!=0) or
221 (self._seq[ix].code==0)):
222 # record existing status for splitting
223 ecode=self._seq[ix].code
224 edeadfrac=self._seq[ix].deadfrac
225 ethrust=self._seq[ix].thrust
226 enconfig=self._seq[ix].nconfig
227 enworking=self._seq[ix].nworking
228 ecomment=self._seq[ix].comment
229 estart=self._seq[ix].start
230 estop=self._seq[ix].stop
231 # position where status will be updated
232 iy=ix
233 if (mobj.start>estart):
234 # need to split at front
235 self._seq.insert(ix,StatusObj(estart,mobj.start,ecode,edeadfrac,ethrust,enconfig,enworking,ecomment))
236 ix+=1
237 iy+=1
238 if (mobj.stop<estop):
239 # need to split at end
240 self._seq.insert(ix+1,StatusObj(mobj.stop,estop,ecode,edeadfrac,ethrust,enconfig,enworking,ecomment))
241 ix+=1
242 # now update entry at iy with new status
243 self._seq[iy]=StatusObj(max(mobj.start,estart),min(mobj.stop,estop),mobj.code,mobj.deadfrac,mobj.thrust,mobj.nconfig,mobj.nworking,mobj.comment)
244 # move on to next object
245 oldstop=self._seq[ix].stop
246 ix+=1
247 # check if merged object extends over end of current list
248 # may not have processed the whole list in the loop
249 if (len(self._seq)>0):
250 oldstop=self._seq[-1].stop
251 if (mobj.stop>oldstop):
252 nstart=max(oldstop,mobj.start)
253 self._seq+=[StatusObj(nstart,mobj.stop,mobj.code,mobj.deadfrac,mobj.thrust,mobj.nconfig,mobj.nworking,mobj.comment)]
254
255 def compress(self):
256 "Compress StatusList removing redundant entries with IoVs which can be combined"
257 ix=1
258 while (ix<len(self._seq)):
259 # check if payload of this entry is same as previous
260 # and start/stop times match without any gap
261 if (self._seq[ix].start==self._seq[ix-1].stop and
262 self._seq[ix].code==self._seq[ix-1].code and
263 self._seq[ix].deadfrac==self._seq[ix-1].deadfrac and
264 self._seq[ix].thrust==self._seq[ix-1].thrust and
265 self._seq[ix].nconfig==self._seq[ix-1].nconfig and
266 self._seq[ix].nworking==self._seq[ix-1].nworking and
267 self._seq[ix].comment==self._seq[ix-1].comment):
268 self._seq[ix].updateStart(self._seq[ix-1].start)
269 self._seq[ix-1:]=self._seq[ix:]
270 else:
271 ix+=1
272 return len(self._seq)
273
274 def size(self):
275 "Return size of list"
276 return len(self._seq)
277
278 def list(self):
279 "Return the list itself"
280 return self._seq
281
282 def __str__(self):
283 "Print representation of StatusList"
284 rep=''
285 for i in self._seq:
286 rep+='[%i %i] : %i %6.3f %6.3f %s\n' % (i.start,i.stop,i.code,i.deadfrac,i.thrust,i.comment)
287 return rep
#define min(a, b)
Definition cfImp.cxx:40
#define max(a, b)
Definition cfImp.cxx:41
__init__(self, start, stop, code, deadfrac, thrust, nconfig=-1, nworking=-1, comment='')
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
Definition merge.py:1
folderName(runLumi=True)