57 eformat::write::FullEventFragment
m_frag;
58 std::vector<eformat::write::ROBFragment>
m_robs;
64 m_frag.status(readEvent.frag.nstatus(), readEvent.frag.status());
65 m_frag.run_type(readEvent.frag.run_type());
66 m_frag.run_no(readEvent.frag.run_no());
67 m_frag.global_id(readEvent.frag.global_id());
68 m_frag.lumi_block(readEvent.frag.lumi_block());
69 m_frag.bc_id(readEvent.frag.bc_id());
70 m_frag.bc_time_seconds(readEvent.frag.bc_time_seconds());
71 m_frag.bc_time_nanoseconds(readEvent.frag.bc_time_nanoseconds());
72 m_frag.lvl1_id(readEvent.frag.lvl1_id());
73 m_frag.lvl1_trigger_type(readEvent.frag.lvl1_trigger_type());
74 m_frag.lvl1_trigger_info(readEvent.frag.nlvl1_trigger_info(), readEvent.frag.lvl1_trigger_info());
75 m_frag.hlt_info(readEvent.frag.nhlt_info(), readEvent.frag.hlt_info());
76 m_frag.stream_tag(readEvent.frag.nstream_tag(), readEvent.frag.stream_tag());
77 m_frag.compression_type(eformat::Compression::UNCOMPRESSED);
78 m_frag.compression_level(0);
79 std::vector<eformat::read::ROBFragment> readRobs;
80 readEvent.frag.robs(readRobs);
81 m_robs.reserve(readRobs.size());
82 for (
const eformat::read::ROBFragment& rob : readRobs) {
83 m_robs.emplace_back(rob.start());
84 const eformat::write::node_t*
top =
m_robs.back().bind();
85 const uint32_t writeSize =
m_robs.back().size_word();
86 m_robBlobs.push_back(std::make_unique<uint32_t[]>(writeSize));
87 const uint32_t writtenSize = eformat::write::copy(*
top,
m_robBlobs.back().get(), writeSize);
88 if (writtenSize != writeSize) {
89 std::cerr <<
"Error in ROB serialisation, copied " << writtenSize <<
" instead of " << writeSize
90 <<
" words, skipping ROB" << std::endl;
97 const eformat::write::node_t*
top =
m_frag.bind();
100 const uint32_t writtenSize = eformat::write::copy(*
top,
m_blob.get(),
m_size);
101 if (writtenSize !=
m_size) {
102 std::cerr <<
"Error in event serialisation, copied " << writtenSize <<
" instead of " <<
m_size <<
" words" << std::endl;
109 const std::unique_ptr<uint32_t[]>&
blob()
const {
return m_blob;}
110 const eformat::write::FullEventFragment&
frag()
const {
return m_frag;}
167std::unique_ptr<EventStorage::DataWriter>
createWriter(
const EventStorage::DataReader& reader) {
168 EventStorage::run_parameters_record
rp{};
169 rp.run_number = reader.runNumber();
172 rp.trigger_type = reader.triggerType();
173 constexpr static std::bitset<128> bitMask64{0xffffffffffffffff};
174 rp.detector_mask_LS = (reader.detectorMask() & bitMask64).to_ulong();
175 rp.detector_mask_MS = ((reader.detectorMask() >> 64) & bitMask64).to_ulong();
176 rp.beam_type = reader.beamType();
177 rp.beam_energy = reader.beamEnergy();
179 return std::make_unique<EventStorage::DataWriter>(
183 reader.freeMetaDataStrings(),
185 EventStorage::CompressionType::NONE,
190int main(
int argc,
char** argv) {
193 std::cout <<
"Usage: " << argv[0] <<
" FILE FILE [FILE]" << std::endl
194 <<
"At least 2 input files are required" << std::endl;
198 std::vector<Buffer> buffers;
199 size_t total_events{0}, read_events{0};
200 std::unordered_set<uint64_t> written_event_ids;
201 uint64_t max_event_id{0};
203 std::cout <<
"Reading " << argc-1 <<
" files:" << std::endl;
204 for (
int i=1; i<argc; ++i) {
205 std::cout <<
" - " << argv[i] << std::endl;
206 buffers.emplace_back(std::unique_ptr<EventStorage::DataReader>(pickDataReader(argv[i])), s_defaultBufferSize);
207 total_events += buffers.back().eventsInFile();
210 std::cout <<
"Input files contain " << total_events <<
" events in total" << std::endl;
212 std::unique_ptr<EventStorage::DataWriter> writer =
createWriter(buffers[0].reader());
214 while (read_events < total_events) {
215 std::optional<std::reference_wrapper<Buffer>> next_buffer;
216 uint64_t lowest_event_id{std::numeric_limits<uint64_t>::max()};
217 int buffer_id{-1}, next_buffer_id{-1};
218 for (
Buffer& buffer : buffers) {
220 std::optional<std::reference_wrapper<const Event>>
event = buffer.peek();
221 if (not event.has_value()) {
continue;}
222 const uint64_t event_id{
event.value().get().frag.global_id()};
223 if (event_id < lowest_event_id) {
224 lowest_event_id = event_id;
225 next_buffer = buffer;
226 if (s_debugLogging) {
227 next_buffer_id = buffer_id;
231 if (not next_buffer.has_value()) {
232 std::cout <<
"End of inputs reached, read " << read_events <<
" out of " << total_events <<
" events" << std::endl;
236 const Event event = next_buffer.value().get().next();
238 const uint64_t event_id{
event.frag.global_id()};
239 if (not written_event_ids.insert(event_id).second) {
240 std::cout <<
"Duplicate event with global_id " << event_id <<
", skipping" << std::endl;
243 if (event_id > max_event_id) {
244 max_event_id = event_id;
245 }
else if (s_debugLogging) {
246 std::cout <<
"The current event global_id " << event_id <<
" is lower than previously written event "
247 << max_event_id <<
" - event ordering not strictly preserved" << std::endl;
250 if (s_debugLogging) {
251 std::cout <<
"Writing event " << written_event_ids.size() <<
" with global_id " << event_id
252 <<
" read from file " << next_buffer_id << std::endl;
253 }
else if (read_events % s_printInterval == 0) {
254 std::cout <<
"Read " << read_events <<
" out of " << total_events <<
" events and wrote "
255 << written_event_ids.size() <<
" events" << std::endl;
260 std::cerr <<
"Error in event serialisation" << std::endl;
264 const uint32_t sizeInBytes = writeEvent.
size() *
sizeof(uint32_t);
265 uint32_t writeSizeOnDisk{0};
266 const EventStorage::DWError err_code = writer->putData(sizeInBytes, writeEvent.
blob().get(), writeSizeOnDisk);
267 if (err_code != EventStorage::DWError::DWOK or not writer->good()) {
268 std::cerr <<
"Error writing to file, exiting..." << std::endl;
271 if (writeSizeOnDisk != sizeInBytes) {
272 std::cerr <<
"Error writing to file, wrote " << writeSizeOnDisk <<
" instead of " << sizeInBytes <<
" words" << std::endl;
277 std::cout <<
"Merging successful, read " << read_events <<
" out of " << total_events <<
" events and wrote "
278 << written_event_ids.size() <<
" events" << std::endl;