21def peb_writer():
22 """Runs the splitting routines"""
23
24 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
25
26 parser.add_argument("files", metavar="FILE", nargs='+',
27 help="RAW file to inspect")
28
29 parser.add_argument('-s', '--stream-name', metavar='NAME', type=str, required=True,
30 help='Name(s) of stream(s) which should be written out, e.g. "stream1,stream2,stream3"')
31
32 parser.add_argument('-o', '--output-name', metavar='NAME', type=str,
33 help='Core output file name (by default derived from input)')
34
35 parser.add_argument('-d', '--output-dir', metavar='DIR', type=str, default='.',
36 help='Directory in which the output file should be written')
37
38 parser.add_argument('-a', '--start-event', metavar='N', type=int, default=0,
39 help='Number of events which should be skipped from the begin')
40
41 parser.add_argument('-n', '--max-events', metavar='N', type=int,
42 help='Maximum number of events in the output file')
43
44 parser.add_argument('-u', '--uncompressed', action='store_true',
45 help='Write out uncompressed data')
46
47 parser.add_argument('-p', '--project-tag', metavar='TAG', type=str,
48 help='Project tag which should be used for the output file')
49
50 parser.add_argument('-l', '--lumi-block', metavar='N', type=int,
51 help='Lumiblock number used for the output file. Use 0 if multiple LBs in file.')
52
53 parser.add_argument('-m', '--hlt-only', metavar='ID', type=int,
54 help='Drop all detector data and write out only HLT data for the given module ID. '
55 'Module ID <0 is a wildcard for all HLT module IDs.')
56
57 parser.add_argument('-v', '--verbosity', metavar='N', type=int, default=logging.INFO,
58 help='Log verbosity')
59
60 parser.add_argument('-P', '--progress-bar', action='store_true',
61 help='Show progress bar when running interactively')
62
63 args = parser.parse_args()
64
65
66 logging.getLogger('').name = 'trigbs_extractStream'
67 logging.getLogger('').setLevel(args.verbosity)
68
69
70 stream = eformat.istream(args.files)
71
72
73 dr = eformat.EventStorage.pickDataReader(args.files[0])
74
75
76 df = eformat.EventStorage.RawFileName(args.files[0])
77
78
79 projectTag = args.project_tag or dr.projectTag()
80 lumiBlockNumber = args.lumi_block if args.lumi_block is not None else dr.lumiblockNumber()
81 applicationName = dr.appName()
82 streamType = 'unknown'
83 if df.hasValidCore() :
84 productionStep = df.productionStep()
85 else:
86 productionStep = 'unknown'
87
88
89 runNumber = dr.runNumber()
90 outputDirectory = args.output_dir
91 streamName = args.stream_name
92
93
94 streamNames_out = streamName.split(',')
95 if len(streamNames_out) > 1:
96 streamName = 'accepted'
97
98 if lumiBlockNumber==0:
99 productionStep = 'merge'
100
101
102 if (not os.path.exists(outputDirectory)) or (not os.path.isdir(outputDirectory)):
103 logging.fatal(' Output directory %s does not exist ' % outputDirectory)
104 sys.exit(1)
105
106
107 totalEvents_in = 0
108 totalEvents_out = 0
109 totalEvents_skipped = 0
110
111
112 for e in stream:
113
114 if args.max_events and totalEvents_in >= args.max_events:
115 logging.info(' Maximum number of events reached : %d', args.max_events)
116 break
117
118 totalEvents_in += 1
119
120
121 if args.start_event > 0:
122 args.start_event -= 1
123 totalEvents_skipped += 1
124 continue
125
126
127 streamTags = e.stream_tag()
128 logging.debug(' === New Event nr = %s (Run,Global ID) = (%d,%d) === ', totalEvents_in,e.run_no(),e.global_id())
129
130 streamAccepted = 0
131 for tag in streamTags:
132 if tag.name in streamNames_out:
133
134 if streamAccepted : continue
135 streamAccepted += 1
136
137
138 logging.debug(' Matching event found for stream tag = %s', tag)
139 logging.debug(' Stream Tag:Robs = %s', [hex(r) for r in tag.robs])
140 logging.debug(' Stream Tag:Dets = %s', [hex(d) for d in tag.dets])
141
142
143
144 if lumiBlockNumber > 0:
145 if e.lumi_block() != lumiBlockNumber:
146 logging.error(' Event (Run,Global ID) = (%d,%d) has a lumi block number %d,'
147 ' which is different from LB = %d for the output file. Event skipped.',
148 e.run_no(),e.global_id(),e.lumi_block(),lumiBlockNumber)
149 continue
150
151
152 if e.run_no() != runNumber:
153 logging.error(' Event (Run,Global ID) = (%d,%d) has a run number,'
154 ' which is different from the run number = %d for the output file. Event skipped.',
155 e.run_no(),e.global_id(),runNumber)
156 continue
157
158
159 if streamType != tag.type:
160 streamType = tag.type
161 logging.debug(' streamType set to = %s', streamType)
162
163
164 outRawFile = args.output_name or \
165 eformat.EventStorage.RawFileName(projectTag,
166 runNumber,
167 streamType,
168 streamName,
169 lumiBlockNumber,
170 applicationName,
171 productionStep).fileNameCore()
172 logging.debug(' set output file name = %s', outRawFile)
173
174
175 compressionTypeES = CompressionType.NONE if args.uncompressed else CompressionType.ZLIB
176 compressionType = eformat.helper.Compression.UNCOMPRESSED if args.uncompressed \
177 else eformat.helper.Compression.ZLIB
178 compressionLevel = 0 if args.uncompressed else 1
179
180
181 ostream = eformat.ostream(directory=outputDirectory,
182 core_name=outRawFile,
183 run_number=dr.runNumber(),
184 trigger_type=dr.triggerType(),
185 detector_mask=dr.detectorMask(),
186 beam_type=dr.beamType(),
187 beam_energy=dr.beamEnergy(),
188 compression=compressionTypeES,
189 complevel=compressionLevel)
190
191
192 is_feb_tag = (len(tag.robs)==0 and len(tag.dets)==0)
193 if is_feb_tag and not args.hlt_only:
194
195 pbev = eformat.write.FullEventFragment(e)
196 logging.debug(' Write full event fragment ')
197 else:
198
199 dets = []
200 robs = []
201 if args.hlt_only:
202 if args.hlt_only < 0:
203 dets = [SubDetector.TDAQ_HLT] if SubDetector.TDAQ_HLT in tag.dets or is_feb_tag else []
204 robs = [robid for robid in tag.robs if SourceIdentifier(robid).subdetector_id()==SubDetector.TDAQ_HLT]
205 else:
206 requested_rob_id = int(SourceIdentifier(SubDetector.TDAQ_HLT, args.hlt_only))
207 if SubDetector.TDAQ_HLT in tag.dets or requested_rob_id in tag.robs or is_feb_tag:
208 robs = [requested_rob_id]
209 else:
210 dets = list(tag.dets)
211 robs = list(tag.robs)
212
213
214 rob_output_list = []
215 logging.debug(' Write partial event fragment ')
216 for rob in e:
217 if rob.source_id().
code()
in robs:
218 rob_output_list.append(rob)
219 if rob.source_id().subdetector_id() in dets:
220 rob_output_list.append(rob)
221
222 pbev = eformat.write.FullEventFragment()
223 pbev.copy_header(e)
224 for out_rob in rob_output_list:
225 pbev.append_unchecked(out_rob)
226
227
228 pbev.compression_type(compressionType)
229 pbev.compression_level(compressionLevel)
230 ostream.write(pbev)
231 if (logging.getLogger('').getEffectiveLevel() > logging.DEBUG) and args.progress_bar:
232 sys.stdout.write('.')
233 sys.stdout.flush()
234
235
236 totalEvents_out += 1
237
238
239 logging.info('Input file(s) = %s ', args.files)
240 logging.info('Total number of events processed = %d ', totalEvents_in)
241 logging.info('Number of events skipped at the beginning = %d ', totalEvents_skipped)
242 logging.info('Number of events written to output file = %d ', totalEvents_out)
243 if totalEvents_out > 0:
244 logging.info('Output file = %s ', ostream.last_filename())
245 else:
246 logging.error('No events selected so no output file created')
247 sys.exit(1)
248
249 sys.exit(0)
250
251