ATLAS Offline Software
Loading...
Searching...
No Matches
trigbs_orderedMerge.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3*/
14
15#include "eformat/FullEventFragmentNoTemplates.h"
16#include "eformat/ROBFragmentNoTemplates.h"
17#include "eformat/write/FullEventFragment.h"
18#include "eformat/write/ROBFragment.h"
19#include "eformat/compression.h"
20#include "EventStorage/DataReader.h"
21#include "EventStorage/pickDataReader.h"
22#include "EventStorage/DataWriter.h"
23#include "EventStorage/DRError.h"
24#include "EventStorage/DWError.h"
25#include "EventStorage/EventStorageRecords.h"
26#include "EventStorage/ESCompression.h"
27
28#include <iostream>
29#include <string>
30#include <vector>
31#include <memory>
32#include <optional>
33#include <functional>
34#include <unordered_set>
35
36namespace {
37 constexpr bool s_debugLogging{false};
38 constexpr size_t s_printInterval{500};
39 constexpr size_t s_defaultBufferSize{100};
40}
41
42struct Event {
43 std::unique_ptr<const uint32_t[]> blob;
44 eformat::read::FullEventFragment frag;
45 explicit Event(std::unique_ptr<const uint32_t[]>&& b) : blob(std::move(b)) {
46 frag = eformat::read::FullEventFragment(blob.get());
47 }
48};
49
51public:
52 enum class Status {OK=0, NOT_OK=1};
53
54private:
56 std::unique_ptr<uint32_t[]> m_blob;
57 eformat::write::FullEventFragment m_frag;
58 std::vector<eformat::write::ROBFragment> m_robs;
59 std::vector<std::unique_ptr<uint32_t[]>> m_robBlobs;
60 uint32_t m_size{0};
61
62public:
63 explicit WriteEvent(const Event& readEvent) {
64 m_frag.status(readEvent.frag.nstatus(), readEvent.frag.status());
65 m_frag.run_type(readEvent.frag.run_type());
66 m_frag.run_no(readEvent.frag.run_no());
67 m_frag.global_id(readEvent.frag.global_id());
68 m_frag.lumi_block(readEvent.frag.lumi_block());
69 m_frag.bc_id(readEvent.frag.bc_id());
70 m_frag.bc_time_seconds(readEvent.frag.bc_time_seconds());
71 m_frag.bc_time_nanoseconds(readEvent.frag.bc_time_nanoseconds());
72 m_frag.lvl1_id(readEvent.frag.lvl1_id());
73 m_frag.lvl1_trigger_type(readEvent.frag.lvl1_trigger_type());
74 m_frag.lvl1_trigger_info(readEvent.frag.nlvl1_trigger_info(), readEvent.frag.lvl1_trigger_info());
75 m_frag.hlt_info(readEvent.frag.nhlt_info(), readEvent.frag.hlt_info());
76 m_frag.stream_tag(readEvent.frag.nstream_tag(), readEvent.frag.stream_tag());
77 m_frag.compression_type(eformat::Compression::UNCOMPRESSED);
78 m_frag.compression_level(0);
79 std::vector<eformat::read::ROBFragment> readRobs;
80 readEvent.frag.robs(readRobs);
81 m_robs.reserve(readRobs.size());
82 for (const eformat::read::ROBFragment& rob : readRobs) {
83 m_robs.emplace_back(rob.start());
84 const eformat::write::node_t* top = m_robs.back().bind();
85 const uint32_t writeSize = m_robs.back().size_word();
86 m_robBlobs.push_back(std::make_unique<uint32_t[]>(writeSize));
87 const uint32_t writtenSize = eformat::write::copy(*top, m_robBlobs.back().get(), writeSize);
88 if (writtenSize != writeSize) {
89 std::cerr << "Error in ROB serialisation, copied " << writtenSize << " instead of " << writeSize
90 << " words, skipping ROB" << std::endl;
92 continue;
93 }
94 m_frag.append_unchecked(m_robBlobs.back().get());
95 }
96
97 const eformat::write::node_t* top = m_frag.bind();
98 m_size = m_frag.size_word();
99 m_blob = std::make_unique<uint32_t[]>(m_size);
100 const uint32_t writtenSize = eformat::write::copy(*top, m_blob.get(), m_size);
101 if (writtenSize != m_size) {
102 std::cerr << "Error in event serialisation, copied " << writtenSize << " instead of " << m_size << " words" << std::endl;
104 }
105
107 }
108 Status status() const {return m_status;}
109 const std::unique_ptr<uint32_t[]>& blob() const {return m_blob;}
110 const eformat::write::FullEventFragment& frag() const {return m_frag;}
111 uint32_t size() const {return m_size;}
112};
113
114class Buffer {
115private:
116 std::unique_ptr<EventStorage::DataReader> m_reader;
117 size_t m_size{0};
118 std::vector<Event> m_events;
119 void sort() {
120 std::sort(m_events.begin(), m_events.end(), [](const Event& a, const Event& b){
121 return a.frag.global_id() < b.frag.global_id();
122 });
123 }
124 EventStorage::DRError fillBuffer() {
125 while (m_reader->good() && m_events.size() < m_size) {
126 char* blobChars{nullptr};
127 unsigned int blobCharsSize{0};
128 if (const EventStorage::DRError err_code = m_reader->getData(blobCharsSize, &blobChars); err_code != EventStorage::DRError::DROK) {
129 std::cerr << "Error code " << err_code << " from EventStorage::DataReader::getData" << std::endl;
130 return err_code;
131 }
132 const uint32_t* blobWords = reinterpret_cast<const uint32_t*>(blobChars);
133 m_events.emplace_back(std::unique_ptr<const uint32_t[]>(blobWords));
134 }
135 sort();
136 return EventStorage::DRError::DROK;
137 }
138
139public:
140 Buffer(std::unique_ptr<EventStorage::DataReader>&& reader, size_t size)
141 : m_reader(std::move(reader)), m_size(size) {
142 fillBuffer();
143 if (s_debugLogging) {
144 std::cout << "Constructed a Buffer with " << m_events.size() << " events loaded, global_id range: ["
145 << m_events.front().frag.global_id() << ", " << m_events.back().frag.global_id() << "]" << std::endl;
146 }
147 }
148 std::optional<std::reference_wrapper<const Event>> peek() const {
149 if (m_events.empty()) {return std::nullopt;}
150 return m_events.at(0);
151 }
153 Event event{std::move(m_events[0].blob)};
154 m_events[0].blob.reset();
155 m_events.erase(m_events.begin());
156 fillBuffer();
157 return event;
158 }
159 unsigned int eventsInFile() {
160 return m_reader->eventsInFile();
161 }
162 const EventStorage::DataReader& reader() const {
163 return *m_reader;
164 }
165};
166
167std::unique_ptr<EventStorage::DataWriter> createWriter(const EventStorage::DataReader& reader) {
168 EventStorage::run_parameters_record rp{};
169 rp.run_number = reader.runNumber();
170 rp.max_events = 0;
171 rp.rec_enable = 0;
172 rp.trigger_type = reader.triggerType();
173 constexpr static std::bitset<128> bitMask64{0xffffffffffffffff};
174 rp.detector_mask_LS = (reader.detectorMask() & bitMask64).to_ulong();
175 rp.detector_mask_MS = ((reader.detectorMask() >> 64) & bitMask64).to_ulong();
176 rp.beam_type = reader.beamType();
177 rp.beam_energy = reader.beamEnergy();
178
179 return std::make_unique<EventStorage::DataWriter>(
180 /* writingPath = */ ".",
181 /* fileNameCore = */ "orderedMerge",
182 /* rPar = */ rp,
183 /* fmdStrings = */ reader.freeMetaDataStrings(),
184 /* startIndex = */ 1,
185 /* compression = */ EventStorage::CompressionType::NONE,
186 /* compLevel = */ 0
187 );
188}
189
190int main(int argc, char** argv) {
191
192 if (argc < 3) {
193 std::cout << "Usage: " << argv[0] << " FILE FILE [FILE]" << std::endl
194 << "At least 2 input files are required" << std::endl;
195 return 1;
196 }
197
198 std::vector<Buffer> buffers;
199 size_t total_events{0}, read_events{0};
200 std::unordered_set<uint64_t> written_event_ids;
201 uint64_t max_event_id{0};
202
203 std::cout << "Reading " << argc-1 << " files:" << std::endl;
204 for (int i=1; i<argc; ++i) {
205 std::cout << " - " << argv[i] << std::endl;
206 buffers.emplace_back(std::unique_ptr<EventStorage::DataReader>(pickDataReader(argv[i])), s_defaultBufferSize);
207 total_events += buffers.back().eventsInFile();
208 }
209
210 std::cout << "Input files contain " << total_events << " events in total" << std::endl;
211
212 std::unique_ptr<EventStorage::DataWriter> writer = createWriter(buffers[0].reader());
213
214 while (read_events < total_events) {
215 std::optional<std::reference_wrapper<Buffer>> next_buffer;
216 uint64_t lowest_event_id{std::numeric_limits<uint64_t>::max()};
217 int buffer_id{-1}, next_buffer_id{-1};
218 for (Buffer& buffer : buffers) {
219 ++buffer_id;
220 std::optional<std::reference_wrapper<const Event>> event = buffer.peek();
221 if (not event.has_value()) {continue;}
222 const uint64_t event_id{event.value().get().frag.global_id()};
223 if (event_id < lowest_event_id) {
224 lowest_event_id = event_id;
225 next_buffer = buffer;
226 if (s_debugLogging) {
227 next_buffer_id = buffer_id;
228 }
229 }
230 }
231 if (not next_buffer.has_value()) {
232 std::cout << "End of inputs reached, read " << read_events << " out of " << total_events << " events" << std::endl;
233 break;
234 }
235
236 const Event event = next_buffer.value().get().next();
237 ++read_events;
238 const uint64_t event_id{event.frag.global_id()};
239 if (not written_event_ids.insert(event_id).second) {
240 std::cout << "Duplicate event with global_id " << event_id << ", skipping" << std::endl;
241 continue;
242 }
243 if (event_id > max_event_id) {
244 max_event_id = event_id;
245 } else if (s_debugLogging) {
246 std::cout << "The current event global_id " << event_id << " is lower than previously written event "
247 << max_event_id << " - event ordering not strictly preserved" << std::endl;
248 }
249
250 if (s_debugLogging) {
251 std::cout << "Writing event " << written_event_ids.size() << " with global_id " << event_id
252 << " read from file " << next_buffer_id << std::endl;
253 } else if (read_events % s_printInterval == 0) {
254 std::cout << "Read " << read_events << " out of " << total_events << " events and wrote "
255 << written_event_ids.size() << " events" << std::endl;
256 }
257
258 const WriteEvent writeEvent{event};
259 if (writeEvent.status() != WriteEvent::Status::OK) {
260 std::cerr << "Error in event serialisation" << std::endl;
261 return 1;
262 }
263
264 const uint32_t sizeInBytes = writeEvent.size() * sizeof(uint32_t);
265 uint32_t writeSizeOnDisk{0};
266 const EventStorage::DWError err_code = writer->putData(sizeInBytes, writeEvent.blob().get(), writeSizeOnDisk);
267 if (err_code != EventStorage::DWError::DWOK or not writer->good()) {
268 std::cerr << "Error writing to file, exiting..." << std::endl;
269 return 1;
270 }
271 if (writeSizeOnDisk != sizeInBytes) {
272 std::cerr << "Error writing to file, wrote " << writeSizeOnDisk << " instead of " << sizeInBytes << " words" << std::endl;
273 return 1;
274 }
275 }
276
277 std::cout << "Merging successful, read " << read_events << " out of " << total_events << " events and wrote "
278 << written_event_ids.size() << " events" << std::endl;
279
280 return 0;
281}
ReadCards * rp
static Double_t a
@ top
unsigned int eventsInFile()
std::optional< std::reference_wrapper< const Event > > peek() const
EventStorage::DRError fillBuffer()
std::unique_ptr< EventStorage::DataReader > m_reader
const EventStorage::DataReader & reader() const
Buffer(std::unique_ptr< EventStorage::DataReader > &&reader, size_t size)
std::vector< Event > m_events
const eformat::write::FullEventFragment & frag() const
Status status() const
uint32_t size() const
std::vector< std::unique_ptr< uint32_t[]> > m_robBlobs
const std::unique_ptr< uint32_t[]> & blob() const
std::vector< eformat::write::ROBFragment > m_robs
eformat::write::FullEventFragment m_frag
WriteEvent(const Event &readEvent)
std::unique_ptr< uint32_t[]> m_blob
int main()
Definition hello.cxx:18
STL namespace.
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.
std::unique_ptr< const uint32_t[]> blob
Event(std::unique_ptr< const uint32_t[]> &&b)
eformat::read::FullEventFragment frag
std::unique_ptr< EventStorage::DataWriter > createWriter(const EventStorage::DataReader &reader)