8 from EventIndexProducer.EIPBof
import EIPBof
9 import EventIndexProducer.EIconstants
as eic
18 def update(self, run_number, evt_number):
19 if run_number
not in self.
runs:
20 self.
runs[run_number] = (sys.maxsize, -1)
21 (min, max) = self.
runs[run_number]
26 self.
runs[run_number] = (min, max)
37 self.
info[
'size'] = os.path.getsize(filename)
40 self.
eispb = gzip.open(filename,
'rb')
59 magic = struct.unpack(
'<I', self.
buf[self.
bpos:self.
bpos+4])[0]
61 version = struct.unpack(
'<I', self.
buf[self.
bpos:self.
bpos+4])[0]
63 if magic != 0x6e56c8c7:
64 raise Exception(
"Invalid magic in EISPB file")
77 toread = size-self.
blen
86 mtype = struct.unpack(
'<I', self.
buf[self.
bpos:self.
bpos+4])[0]
90 mlen = struct.unpack(
'<I', self.
buf[self.
bpos:self.
bpos+4])[0]
97 return (mtype, mlen, msg)
117 self.
info[
'guids'] = []
130 SPB file format is a stream of google's protocol buffer messages
131 Reading is sequential, no random read of messages is possible
138 (mtype_ver, mlen, msg) = self.
getMsg()
139 mtype = (mtype_ver & 0x000fff00) >> 8
140 ver = (mtype_ver & 0x000000ff)
141 if (ver != eic.EI_PROTO_MSGVER):
142 raise Exception(
"Invalid message version found in EISPB file")
144 if mtype == eic.EI_PROTO_HEADER:
145 header = eipbof.Header()
146 header.ParseFromString(msg)
147 self.
info[
'jobID'] = header.jobID
148 self.
info[
'taskID'] = header.taskID
149 self.
info[
'startProcTime'] = header.startProcTime
150 self.
info[
'inputDsName'] = header.inputDsName
151 self.
info[
'provenanceRef'] = header.provenanceRef
152 self.
info[
'triggerInfo'] = header.triggerInfo
153 elif mtype == eic.EI_PROTO_BEGINGUID:
154 beginGUID = eipbof.BeginGUID()
155 beginGUID.ParseFromString(msg)
157 currentGUID[
'fileStartProcTime'] = beginGUID.startProcTime
158 currentGUID[
'guid'] = beginGUID.guid
159 currentGUID[
'fileno'] = fileno
169 currentGUID[
'AMITag'] = beginGUID.AMITag
170 currentGUID[
'trigStream'] = beginGUID.trigStream
171 currentGUID[
'projName'] = beginGUID.projName
172 elif mtype == eic.EI_PROTO_ENDGUID:
173 endGUID = eipbof.EndGUID()
174 endGUID.ParseFromString(msg)
175 currentGUID[
'nevents'] = nevents
176 currentGUID[
'nuevents'] = nuevents
177 currentGUID[
'fileEndProcTime'] = endGUID.endProcTime
178 currentGUID[
'runevtRanges'] = runevtRanges.getRanges()
180 tot_nevents += nevents
181 tot_nuevents += nuevents
183 elif mtype == eic.EI_PROTO_EIEVENT:
184 eventPB = eipbof.EIEvent()
185 eventPB.ParseFromString(msg)
188 runevtRanges.update(
int(eventPB.runNumber),
189 int(eventPB.eventNumber))
190 runevtkey =
"{0:08d}-{1:011d}".
format(
int(eventPB.runNumber),
191 int(eventPB.eventNumber))
192 if runevtkey
not in runevt:
193 runevt[runevtkey] = 1
195 elif mtype == eic.EI_PROTO_TRIGGERMENU:
196 triggerMenu = eipbof.TriggerMenu()
197 triggerMenu.ParseFromString(msg)
198 elif mtype == eic.EI_PROTO_TRAILER:
199 trailer = eipbof.Trailer()
200 trailer.ParseFromString(msg)
201 self.
info[
'endProcTime'] = trailer.endProcTime
202 self.
info[
'nevents'] = trailer.nentries
203 self.
info[
'nfiles'] = trailer.nfiles
209 self.
info[
'nuevents'] = tot_nuevents
213 if self.
info[
'nfiles'] != fileno:
214 log.info(
"ERROR nfiles {} {}".
format(fileno, self.
info[
'nfiles']))
215 if self.
info[
'nevents'] != tot_nevents:
216 log.info(
"ERROR tot_nevents {} {}".
format(tot_nevents,
217 self.
info[
'nevents']))