ATLAS Offline Software
Loading...
Searching...
No Matches
spbfile.py
Go to the documentation of this file.
2# Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3
4import os
5import sys
6import gzip
7import struct
8from EventIndexProducer.EIPBof import EIPBof
9import EventIndexProducer.EIconstants as eic
10import logging as log
11
12
14
15 def __init__(self):
16 self.runs = {}
17
18 def update(self, run_number, evt_number):
19 if run_number not in self.runs:
20 self.runs[run_number] = (sys.maxsize, -1)
21 (min, max) = self.runs[run_number]
22 if evt_number > max:
23 max = evt_number
24 if evt_number < min:
25 min = evt_number
26 self.runs[run_number] = (min, max)
27
28 def getRanges(self):
29 return self.runs
30
31
32class SpbFile():
33
34 def __init__(self, filename):
35
36 self.info = {}
37 self.info['size'] = os.path.getsize(filename)
38
39 try:
40 self.eispb = gzip.open(filename, 'rb')
41 except Exception:
42 raise
43
44 # buffer to read file
45 self.buf = b"" # read buffer
46 self.blen = 0 # buffer length
47 self.bpos = 0 # buffer current read position
48 self.usize = 0 # aproximate uncompressed size
49 try:
51 except Exception:
52 raise
53
54 def getVersion(self):
55 return self.version
56
58 self.growBufToSize(8)
59 magic = struct.unpack('<I', self.buf[self.bpos:self.bpos+4])[0]
60 self.bpos += 4
61 version = struct.unpack('<I', self.buf[self.bpos:self.bpos+4])[0]
62 self.bpos += 4
63 if magic != 0x6e56c8c7:
64 raise Exception("Invalid magic in EISPB file")
65 return version
66
67 def growBufToSize(self, size):
68 if self.bpos + size < self.blen:
69 return
70 # throw bytes away until current read position
71 self.buf = self.buf[self.bpos:]
72 self.bpos = 0
73 self.blen = len(self.buf)
74 # more bytes needed ?
75 if self.blen < size:
76 # read at least 40960
77 toread = size-self.blen
78 if toread < 40960:
79 toread = 40960
80 self.buf += self.eispb.read(toread)
81 self.usize += toread
82 self.blen = len(self.buf)
83
84 def getMsg(self):
85 self.growBufToSize(4)
86 mtype = struct.unpack('<I', self.buf[self.bpos:self.bpos+4])[0]
87 self.bpos += 4
88
89 self.growBufToSize(4)
90 mlen = struct.unpack('<I', self.buf[self.bpos:self.bpos+4])[0]
91 self.bpos += 4
92
93 self.growBufToSize(mlen)
94 msg = self.buf[self.bpos:self.bpos+mlen]
95 self.bpos += mlen
96
97 return (mtype, mlen, msg)
98
99 def close(self):
100 self.eispb.close()
101
102 def rewind(self):
103 self.eispb.rewind()
104 self.buf = b"" # read buffer
105 self.blen = 0 # buffer length
106 self.bpos = 0 # buffer current read position
107 self.usize = 0 # aproximate uncompressed size
108 # skip magic and version
109 self.growBufToSize(8)
110 self.bpos += 8
111
112 def tell(self):
113 self.eispb.tell()
114
115 def getInfo(self):
116
117 self.info['guids'] = []
118 fileno = 0
119 tot_nevents = 0
120 tot_nuevents = 0
121 tot_nbytes = 0
122 nevents = 0
123 nuevents = 0
124 nbytes = 0
125 runevtRanges = RunEvtRanges()
126 runevt = {}
127 currentGUID = {}
128
129 """
130 SPB file format is a stream of google's protocol buffer messages
131 Reading is sequential, no random read of messages is possible
132 """
133
134 eipbof = EIPBof(self.version)
135 self.info['version'] = self.version
136
137 while True:
138 (mtype_ver, mlen, msg) = self.getMsg()
139 mtype = (mtype_ver & 0x000fff00) >> 8
140 ver = (mtype_ver & 0x000000ff)
141 if (ver != eic.EI_PROTO_MSGVER):
142 raise Exception("Invalid message version found in EISPB file")
143
144 if mtype == eic.EI_PROTO_HEADER: # Header
145 header = eipbof.Header()
146 header.ParseFromString(msg)
147 self.info['jobID'] = header.jobID
148 self.info['taskID'] = header.taskID
149 self.info['startProcTime'] = header.startProcTime
150 self.info['inputDsName'] = header.inputDsName
151 self.info['provenanceRef'] = header.provenanceRef
152 self.info['triggerInfo'] = header.triggerInfo
153 elif mtype == eic.EI_PROTO_BEGINGUID: # Begin GUID
154 beginGUID = eipbof.BeginGUID()
155 beginGUID.ParseFromString(msg)
156 currentGUID = {}
157 currentGUID['fileStartProcTime'] = beginGUID.startProcTime
158 currentGUID['guid'] = beginGUID.guid
159 currentGUID['fileno'] = fileno
160 fileno += 1
161 nevents = 0
162 nuevents = 0
163 nbytes = 0
164 # run-evt, used to calculate unique events
165 runevt = {}
166 # to calculate run evt ranges
167 runevtRanges = RunEvtRanges()
168 #
169 currentGUID['AMITag'] = beginGUID.AMITag
170 currentGUID['trigStream'] = beginGUID.trigStream
171 currentGUID['projName'] = beginGUID.projName
172 elif mtype == eic.EI_PROTO_ENDGUID: # End GUID
173 endGUID = eipbof.EndGUID()
174 endGUID.ParseFromString(msg)
175 currentGUID['nevents'] = nevents
176 currentGUID['nuevents'] = nuevents
177 currentGUID['fileEndProcTime'] = endGUID.endProcTime
178 currentGUID['runevtRanges'] = runevtRanges.getRanges()
179 self.info['guids'].append(currentGUID)
180 tot_nevents += nevents
181 tot_nuevents += nuevents
182 tot_nbytes += nbytes
183 elif mtype == eic.EI_PROTO_EIEVENT: # event
184 eventPB = eipbof.EIEvent()
185 eventPB.ParseFromString(msg)
186 nevents += 1
187 nbytes += mlen
188 runevtRanges.update(int(eventPB.runNumber),
189 int(eventPB.eventNumber))
190 runevtkey = "{0:08d}-{1:011d}".format(int(eventPB.runNumber),
191 int(eventPB.eventNumber))
192 if runevtkey not in runevt:
193 runevt[runevtkey] = 1
194 nuevents += 1
195 elif mtype == eic.EI_PROTO_TRIGGERMENU: # Trigger menu
196 triggerMenu = eipbof.TriggerMenu()
197 triggerMenu.ParseFromString(msg)
198 elif mtype == eic.EI_PROTO_TRAILER: # Trailer
199 trailer = eipbof.Trailer()
200 trailer.ParseFromString(msg)
201 self.info['endProcTime'] = trailer.endProcTime
202 self.info['nevents'] = trailer.nentries
203 self.info['nfiles'] = trailer.nfiles
204 break
205 else:
206 break
207
208 self.info['usize'] = self.usize
209 self.info['nuevents'] = tot_nuevents
210 self.rewind()
211
212 # checkings
213 if self.info['nfiles'] != fileno:
214 log.info("ERROR nfiles {} {}".format(fileno, self.info['nfiles']))
215 if self.info['nevents'] != tot_nevents:
216 log.info("ERROR tot_nevents {} {}".format(tot_nevents,
217 self.info['nevents']))
218
219 return self.info
update(self, run_number, evt_number)
Definition spbfile.py:18
__init__(self, filename)
Definition spbfile.py:34
growBufToSize(self, size)
Definition spbfile.py:67
IovVectorMap_t read(const Folder &theFolder, const SelectionCriterion &choice, const unsigned int limit=10)