ATLAS Offline Software
Loading...
Searching...
No Matches
trigbs_orderedMerge.cxx File Reference

Standalone executable to order and decompress events from multiple files and write them into a single file. More...

#include "eformat/FullEventFragmentNoTemplates.h"
#include "eformat/ROBFragmentNoTemplates.h"
#include "eformat/write/FullEventFragment.h"
#include "eformat/write/ROBFragment.h"
#include "eformat/compression.h"
#include "EventStorage/DataReader.h"
#include "EventStorage/pickDataReader.h"
#include "EventStorage/DataWriter.h"
#include "EventStorage/DRError.h"
#include "EventStorage/DWError.h"
#include "EventStorage/EventStorageRecords.h"
#include "EventStorage/ESCompression.h"
#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <optional>
#include <functional>
#include <unordered_set>

Go to the source code of this file.

Classes

struct  Event
class  WriteEvent
class  Buffer

Functions

std::unique_ptr< EventStorage::DataWriter > createWriter (const EventStorage::DataReader &reader)
int main (int argc, char **argv)

Detailed Description

Standalone executable to order and decompress events from multiple files and write them into a single file.

This is intended mainly for merging files from same run and LB but different streams. The reordering ensures uniform distribution of different kinds of events from different streams and removes duplicates. Using this script with files from different LBs is pointless since all events in LB N+1 are always after all events from LB N. The reordering is not strict and the level of out-of-order events is tied to the buffer size. The smaller the buffer, the larger the chance of finding lower-ID events still to be read from input files.

Definition in file trigbs_orderedMerge.cxx.

Function Documentation

◆ createWriter()

std::unique_ptr< EventStorage::DataWriter > createWriter ( const EventStorage::DataReader & reader)

Definition at line 167 of file trigbs_orderedMerge.cxx.

167 {
168 EventStorage::run_parameters_record rp{};
169 rp.run_number = reader.runNumber();
170 rp.max_events = 0;
171 rp.rec_enable = 0;
172 rp.trigger_type = reader.triggerType();
173 constexpr static std::bitset<128> bitMask64{0xffffffffffffffff};
174 rp.detector_mask_LS = (reader.detectorMask() & bitMask64).to_ulong();
175 rp.detector_mask_MS = ((reader.detectorMask() >> 64) & bitMask64).to_ulong();
176 rp.beam_type = reader.beamType();
177 rp.beam_energy = reader.beamEnergy();
178
179 return std::make_unique<EventStorage::DataWriter>(
180 /* writingPath = */ ".",
181 /* fileNameCore = */ "orderedMerge",
182 /* rPar = */ rp,
183 /* fmdStrings = */ reader.freeMetaDataStrings(),
184 /* startIndex = */ 1,
185 /* compression = */ EventStorage::CompressionType::NONE,
186 /* compLevel = */ 0
187 );
188}
ReadCards * rp
reader
read the goodrunslist xml file(s)
Definition collisions.py:22

◆ main()

int main ( int argc,
char ** argv )

Definition at line 190 of file trigbs_orderedMerge.cxx.

190 {
191
192 if (argc < 3) {
193 std::cout << "Usage: " << argv[0] << " FILE FILE [FILE]" << std::endl
194 << "At least 2 input files are required" << std::endl;
195 return 1;
196 }
197
198 std::vector<Buffer> buffers;
199 size_t total_events{0}, read_events{0};
200 std::unordered_set<uint64_t> written_event_ids;
201 uint64_t max_event_id{0};
202
203 std::cout << "Reading " << argc-1 << " files:" << std::endl;
204 for (int i=1; i<argc; ++i) {
205 std::cout << " - " << argv[i] << std::endl;
206 buffers.emplace_back(std::unique_ptr<EventStorage::DataReader>(pickDataReader(argv[i])), s_defaultBufferSize);
207 total_events += buffers.back().eventsInFile();
208 }
209
210 std::cout << "Input files contain " << total_events << " events in total" << std::endl;
211
212 std::unique_ptr<EventStorage::DataWriter> writer = createWriter(buffers[0].reader());
213
214 while (read_events < total_events) {
215 std::optional<std::reference_wrapper<Buffer>> next_buffer;
216 uint64_t lowest_event_id{std::numeric_limits<uint64_t>::max()};
217 int buffer_id{-1}, next_buffer_id{-1};
218 for (Buffer& buffer : buffers) {
219 ++buffer_id;
220 std::optional<std::reference_wrapper<const Event>> event = buffer.peek();
221 if (not event.has_value()) {continue;}
222 const uint64_t event_id{event.value().get().frag.global_id()};
223 if (event_id < lowest_event_id) {
224 lowest_event_id = event_id;
225 next_buffer = buffer;
226 if (s_debugLogging) {
227 next_buffer_id = buffer_id;
228 }
229 }
230 }
231 if (not next_buffer.has_value()) {
232 std::cout << "End of inputs reached, read " << read_events << " out of " << total_events << " events" << std::endl;
233 break;
234 }
235
236 const Event event = next_buffer.value().get().next();
237 ++read_events;
238 const uint64_t event_id{event.frag.global_id()};
239 if (not written_event_ids.insert(event_id).second) {
240 std::cout << "Duplicate event with global_id " << event_id << ", skipping" << std::endl;
241 continue;
242 }
243 if (event_id > max_event_id) {
244 max_event_id = event_id;
245 } else if (s_debugLogging) {
246 std::cout << "The current event global_id " << event_id << " is lower than previously written event "
247 << max_event_id << " - event ordering not strictly preserved" << std::endl;
248 }
249
250 if (s_debugLogging) {
251 std::cout << "Writing event " << written_event_ids.size() << " with global_id " << event_id
252 << " read from file " << next_buffer_id << std::endl;
253 } else if (read_events % s_printInterval == 0) {
254 std::cout << "Read " << read_events << " out of " << total_events << " events and wrote "
255 << written_event_ids.size() << " events" << std::endl;
256 }
257
258 const WriteEvent writeEvent{event};
259 if (writeEvent.status() != WriteEvent::Status::OK) {
260 std::cerr << "Error in event serialisation" << std::endl;
261 return 1;
262 }
263
264 const uint32_t sizeInBytes = writeEvent.size() * sizeof(uint32_t);
265 uint32_t writeSizeOnDisk{0};
266 const EventStorage::DWError err_code = writer->putData(sizeInBytes, writeEvent.blob().get(), writeSizeOnDisk);
267 if (err_code != EventStorage::DWError::DWOK or not writer->good()) {
268 std::cerr << "Error writing to file, exiting..." << std::endl;
269 return 1;
270 }
271 if (writeSizeOnDisk != sizeInBytes) {
272 std::cerr << "Error writing to file, wrote " << writeSizeOnDisk << " instead of " << sizeInBytes << " words" << std::endl;
273 return 1;
274 }
275 }
276
277 std::cout << "Merging successful, read " << read_events << " out of " << total_events << " events and wrote "
278 << written_event_ids.size() << " events" << std::endl;
279
280 return 0;
281}
std::shared_ptr< HepMC3::Writer > writer
Status status() const
uint32_t size() const
const std::unique_ptr< uint32_t[]> & blob() const
setEventNumber uint32_t
std::unique_ptr< EventStorage::DataWriter > createWriter(const EventStorage::DataReader &reader)