15 #include "eformat/FullEventFragmentNoTemplates.h"
16 #include "eformat/ROBFragmentNoTemplates.h"
17 #include "eformat/write/FullEventFragment.h"
18 #include "eformat/write/ROBFragment.h"
19 #include "eformat/compression.h"
20 #include "EventStorage/DataReader.h"
21 #include "EventStorage/pickDataReader.h"
22 #include "EventStorage/DataWriter.h"
23 #include "EventStorage/DRError.h"
24 #include "EventStorage/DWError.h"
25 #include "EventStorage/EventStorageRecords.h"
26 #include "EventStorage/ESCompression.h"
34 #include <unordered_set>
37 constexpr
bool s_debugLogging{
false};
38 constexpr
size_t s_printInterval{500};
39 constexpr
size_t s_defaultBufferSize{100};
43 std::unique_ptr<const uint32_t[]>
blob;
58 std::vector<eformat::write::ROBFragment>
m_robs;
77 m_frag.compression_type(eformat::Compression::UNCOMPRESSED);
78 m_frag.compression_level(0);
79 std::vector<eformat::read::ROBFragment> readRobs;
81 m_robs.reserve(readRobs.size());
83 m_robs.emplace_back(rob.start());
84 const eformat::write::node_t*
top =
m_robs.back().bind();
88 if (writtenSize != writeSize) {
89 std::cerr <<
"Error in ROB serialisation, copied " << writtenSize <<
" instead of " << writeSize
90 <<
" words, skipping ROB" << std::endl;
97 const eformat::write::node_t*
top =
m_frag.bind();
101 if (writtenSize !=
m_size) {
102 std::cerr <<
"Error in event serialisation, copied " << writtenSize <<
" instead of " <<
m_size <<
" words" << std::endl;
109 const std::unique_ptr<uint32_t[]>&
blob()
const {
return m_blob;}
116 std::unique_ptr<EventStorage::DataReader>
m_reader;
121 return a.frag.global_id() < b.frag.global_id();
126 char* blobChars{
nullptr};
127 unsigned int blobCharsSize{0};
128 if (
const EventStorage::DRError err_code =
m_reader->getData(blobCharsSize, &blobChars); err_code != EventStorage::DRError::DROK) {
129 std::cerr <<
"Error code " << err_code <<
" from EventStorage::DataReader::getData" << std::endl;
136 return EventStorage::DRError::DROK;
143 if (s_debugLogging) {
144 std::cout <<
"Constructed a Buffer with " <<
m_events.size() <<
" events loaded, global_id range: ["
145 <<
m_events.front().frag.global_id() <<
", " <<
m_events.back().frag.global_id() <<
"]" << std::endl;
148 std::optional<std::reference_wrapper<const Event>>
peek()
const {
149 if (
m_events.empty()) {
return std::nullopt;}
162 const EventStorage::DataReader&
reader()
const {
168 EventStorage::run_parameters_record
rp{};
172 rp.trigger_type =
reader.triggerType();
173 constexpr
static std::bitset<128> bitMask64{0xffffffffffffffff};
174 rp.detector_mask_LS = (
reader.detectorMask() & bitMask64).to_ulong();
175 rp.detector_mask_MS = ((
reader.detectorMask() >> 64) & bitMask64).to_ulong();
177 rp.beam_energy =
reader.beamEnergy();
179 return std::make_unique<EventStorage::DataWriter>(
183 reader.freeMetaDataStrings(),
193 std::cout <<
"Usage: " <<
argv[0] <<
" FILE FILE [FILE]" << std::endl
194 <<
"At least 2 input files are required" << std::endl;
198 std::vector<Buffer> buffers;
199 size_t total_events{0}, read_events{0};
200 std::unordered_set<uint64_t> written_event_ids;
203 std::cout <<
"Reading " <<
argc-1 <<
" files:" << std::endl;
205 std::cout <<
" - " <<
argv[
i] << std::endl;
206 buffers.emplace_back(std::unique_ptr<EventStorage::DataReader>(pickDataReader(
argv[
i])), s_defaultBufferSize);
207 total_events += buffers.back().eventsInFile();
210 std::cout <<
"Input files contain " << total_events <<
" events in total" << std::endl;
214 while (read_events < total_events) {
215 std::optional<std::reference_wrapper<Buffer>> next_buffer;
217 int buffer_id{-1}, next_buffer_id{-1};
220 std::optional<std::reference_wrapper<const Event>>
event =
buffer.peek();
221 if (not
event.has_value()) {
continue;}
222 const uint64_t event_id{
event.value().get().frag.global_id()};
223 if (event_id < lowest_event_id) {
224 lowest_event_id = event_id;
226 if (s_debugLogging) {
227 next_buffer_id = buffer_id;
231 if (not next_buffer.has_value()) {
232 std::cout <<
"End of inputs reached, read " << read_events <<
" out of " << total_events <<
" events" << std::endl;
236 const Event event = next_buffer.value().get().next();
238 const uint64_t event_id{
event.frag.global_id()};
239 if (not written_event_ids.insert(event_id).second) {
240 std::cout <<
"Duplicate event with global_id " << event_id <<
", skipping" << std::endl;
243 if (event_id > max_event_id) {
244 max_event_id = event_id;
245 }
else if (s_debugLogging) {
246 std::cout <<
"The current event global_id " << event_id <<
" is lower than previously written event "
247 << max_event_id <<
" - event ordering not strictly preserved" << std::endl;
250 if (s_debugLogging) {
251 std::cout <<
"Writing event " << written_event_ids.size() <<
" with global_id " << event_id
252 <<
" read from file " << next_buffer_id << std::endl;
253 }
else if (read_events % s_printInterval == 0) {
254 std::cout <<
"Read " << read_events <<
" out of " << total_events <<
" events and wrote "
255 << written_event_ids.size() <<
" events" << std::endl;
260 std::cerr <<
"Error in event serialisation" << std::endl;
266 const EventStorage::DWError err_code =
writer->putData(sizeInBytes, writeEvent.blob().get(), writeSizeOnDisk);
267 if (err_code != EventStorage::DWError::DWOK or not
writer->good()) {
268 std::cerr <<
"Error writing to file, exiting..." << std::endl;
271 if (writeSizeOnDisk != sizeInBytes) {
272 std::cerr <<
"Error writing to file, wrote " << writeSizeOnDisk <<
" instead of " << sizeInBytes <<
" words" << std::endl;
277 std::cout <<
"Merging successful, read " << read_events <<
" out of " << total_events <<
" events and wrote "
278 << written_event_ids.size() <<
" events" << std::endl;