ATLAS Offline Software
trigbs_orderedMerge.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 */
15 #include "eformat/FullEventFragmentNoTemplates.h"
16 #include "eformat/ROBFragmentNoTemplates.h"
17 #include "eformat/write/FullEventFragment.h"
18 #include "eformat/write/ROBFragment.h"
19 #include "eformat/compression.h"
20 #include "EventStorage/DataReader.h"
21 #include "EventStorage/pickDataReader.h"
22 #include "EventStorage/DataWriter.h"
23 #include "EventStorage/DRError.h"
24 #include "EventStorage/DWError.h"
25 #include "EventStorage/EventStorageRecords.h"
26 #include "EventStorage/ESCompression.h"
27 
28 #include <iostream>
29 #include <string>
30 #include <vector>
31 #include <memory>
32 #include <optional>
33 #include <functional>
34 #include <unordered_set>
35 
36 namespace {
37  constexpr bool s_debugLogging{false};
38  constexpr size_t s_printInterval{500};
39  constexpr size_t s_defaultBufferSize{100};
40 }
41 
42 struct Event {
43  std::unique_ptr<const uint32_t[]> blob;
45  explicit Event(std::unique_ptr<const uint32_t[]>&& b) : blob(std::move(b)) {
47  }
48 };
49 
50 class WriteEvent {
51 public:
52  enum class Status {OK=0, NOT_OK=1};
53 
54 private:
56  std::unique_ptr<uint32_t[]> m_blob;
58  std::vector<eformat::write::ROBFragment> m_robs;
59  std::vector<std::unique_ptr<uint32_t[]>> m_robBlobs;
61 
62 public:
63  explicit WriteEvent(const Event& readEvent) {
64  m_frag.status(readEvent.frag.nstatus(), readEvent.frag.status());
65  m_frag.run_type(readEvent.frag.run_type());
66  m_frag.run_no(readEvent.frag.run_no());
67  m_frag.global_id(readEvent.frag.global_id());
68  m_frag.lumi_block(readEvent.frag.lumi_block());
69  m_frag.bc_id(readEvent.frag.bc_id());
70  m_frag.bc_time_seconds(readEvent.frag.bc_time_seconds());
71  m_frag.bc_time_nanoseconds(readEvent.frag.bc_time_nanoseconds());
72  m_frag.lvl1_id(readEvent.frag.lvl1_id());
73  m_frag.lvl1_trigger_type(readEvent.frag.lvl1_trigger_type());
74  m_frag.lvl1_trigger_info(readEvent.frag.nlvl1_trigger_info(), readEvent.frag.lvl1_trigger_info());
75  m_frag.hlt_info(readEvent.frag.nhlt_info(), readEvent.frag.hlt_info());
76  m_frag.stream_tag(readEvent.frag.nstream_tag(), readEvent.frag.stream_tag());
77  m_frag.compression_type(eformat::Compression::UNCOMPRESSED);
78  m_frag.compression_level(0);
79  std::vector<eformat::read::ROBFragment> readRobs;
80  readEvent.frag.robs(readRobs);
81  m_robs.reserve(readRobs.size());
82  for (const eformat::read::ROBFragment& rob : readRobs) {
83  m_robs.emplace_back(rob.start());
84  const eformat::write::node_t* top = m_robs.back().bind();
85  const uint32_t writeSize = m_robs.back().size_word();
86  m_robBlobs.push_back(std::make_unique<uint32_t[]>(writeSize));
87  const uint32_t writtenSize = eformat::write::copy(*top, m_robBlobs.back().get(), writeSize);
88  if (writtenSize != writeSize) {
89  std::cerr << "Error in ROB serialisation, copied " << writtenSize << " instead of " << writeSize
90  << " words, skipping ROB" << std::endl;
92  continue;
93  }
94  m_frag.append_unchecked(m_robBlobs.back().get());
95  }
96 
97  const eformat::write::node_t* top = m_frag.bind();
98  m_size = m_frag.size_word();
99  m_blob = std::make_unique<uint32_t[]>(m_size);
100  const uint32_t writtenSize = eformat::write::copy(*top, m_blob.get(), m_size);
101  if (writtenSize != m_size) {
102  std::cerr << "Error in event serialisation, copied " << writtenSize << " instead of " << m_size << " words" << std::endl;
104  }
105 
107  }
108  Status status() const {return m_status;}
109  const std::unique_ptr<uint32_t[]>& blob() const {return m_blob;}
111  uint32_t size() const {return m_size;}
112 };
113 
114 class Buffer {
115 private:
116  std::unique_ptr<EventStorage::DataReader> m_reader;
117  size_t m_size{0};
118  std::vector<Event> m_events;
119  void sort() {
120  std::sort(m_events.begin(), m_events.end(), [](const Event& a, const Event& b){
121  return a.frag.global_id() < b.frag.global_id();
122  });
123  }
124  EventStorage::DRError fillBuffer() {
125  while (m_reader->good() && m_events.size() < m_size) {
126  char* blobChars{nullptr};
127  unsigned int blobCharsSize{0};
128  if (const EventStorage::DRError err_code = m_reader->getData(blobCharsSize, &blobChars); err_code != EventStorage::DRError::DROK) {
129  std::cerr << "Error code " << err_code << " from EventStorage::DataReader::getData" << std::endl;
130  return err_code;
131  }
132  const uint32_t* blobWords = reinterpret_cast<const uint32_t*>(blobChars);
133  m_events.emplace_back(std::unique_ptr<const uint32_t[]>(blobWords));
134  }
135  sort();
136  return EventStorage::DRError::DROK;
137  }
138 
139 public:
140  Buffer(std::unique_ptr<EventStorage::DataReader>&& reader, size_t size)
141  : m_reader(std::move(reader)), m_size(size) {
142  fillBuffer();
143  if (s_debugLogging) {
144  std::cout << "Constructed a Buffer with " << m_events.size() << " events loaded, global_id range: ["
145  << m_events.front().frag.global_id() << ", " << m_events.back().frag.global_id() << "]" << std::endl;
146  }
147  }
148  std::optional<std::reference_wrapper<const Event>> peek() const {
149  if (m_events.empty()) {return std::nullopt;}
150  return m_events.at(0);
151  }
153  Event event{std::move(m_events[0].blob)};
154  m_events[0].blob.reset();
155  m_events.erase(m_events.begin());
156  fillBuffer();
157  return event;
158  }
159  unsigned int eventsInFile() {
160  return m_reader->eventsInFile();
161  }
162  const EventStorage::DataReader& reader() const {
163  return *m_reader;
164  }
165 };
166 
167 std::unique_ptr<EventStorage::DataWriter> createWriter(const EventStorage::DataReader& reader) {
168  EventStorage::run_parameters_record rp{};
169  rp.run_number = reader.runNumber();
170  rp.max_events = 0;
171  rp.rec_enable = 0;
172  rp.trigger_type = reader.triggerType();
173  constexpr static std::bitset<128> bitMask64{0xffffffffffffffff};
174  rp.detector_mask_LS = (reader.detectorMask() & bitMask64).to_ulong();
175  rp.detector_mask_MS = ((reader.detectorMask() >> 64) & bitMask64).to_ulong();
176  rp.beam_type = reader.beamType();
177  rp.beam_energy = reader.beamEnergy();
178 
179  return std::make_unique<EventStorage::DataWriter>(
180  /* writingPath = */ ".",
181  /* fileNameCore = */ "orderedMerge",
182  /* rPar = */ rp,
183  /* fmdStrings = */ reader.freeMetaDataStrings(),
184  /* startIndex = */ 1,
185  /* compression = */ EventStorage::CompressionType::NONE,
186  /* compLevel = */ 0
187  );
188 }
189 
190 int main(int argc, char** argv) {
191 
192  if (argc < 3) {
193  std::cout << "Usage: " << argv[0] << " FILE FILE [FILE]" << std::endl
194  << "At least 2 input files are required" << std::endl;
195  return 1;
196  }
197 
198  std::vector<Buffer> buffers;
199  size_t total_events{0}, read_events{0};
200  std::unordered_set<uint64_t> written_event_ids;
201  uint64_t max_event_id{0};
202 
203  std::cout << "Reading " << argc-1 << " files:" << std::endl;
204  for (int i=1; i<argc; ++i) {
205  std::cout << " - " << argv[i] << std::endl;
206  buffers.emplace_back(std::unique_ptr<EventStorage::DataReader>(pickDataReader(argv[i])), s_defaultBufferSize);
207  total_events += buffers.back().eventsInFile();
208  }
209 
210  std::cout << "Input files contain " << total_events << " events in total" << std::endl;
211 
212  std::unique_ptr<EventStorage::DataWriter> writer = createWriter(buffers[0].reader());
213 
214  while (read_events < total_events) {
215  std::optional<std::reference_wrapper<Buffer>> next_buffer;
216  uint64_t lowest_event_id{std::numeric_limits<uint64_t>::max()};
217  int buffer_id{-1}, next_buffer_id{-1};
218  for (Buffer& buffer : buffers) {
219  ++buffer_id;
220  std::optional<std::reference_wrapper<const Event>> event = buffer.peek();
221  if (not event.has_value()) {continue;}
222  const uint64_t event_id{event.value().get().frag.global_id()};
223  if (event_id < lowest_event_id) {
224  lowest_event_id = event_id;
225  next_buffer = buffer;
226  if (s_debugLogging) {
227  next_buffer_id = buffer_id;
228  }
229  }
230  }
231  if (not next_buffer.has_value()) {
232  std::cout << "End of inputs reached, read " << read_events << " out of " << total_events << " events" << std::endl;
233  break;
234  }
235 
236  const Event event = next_buffer.value().get().next();
237  ++read_events;
238  const uint64_t event_id{event.frag.global_id()};
239  if (not written_event_ids.insert(event_id).second) {
240  std::cout << "Duplicate event with global_id " << event_id << ", skipping" << std::endl;
241  continue;
242  }
243  if (event_id > max_event_id) {
244  max_event_id = event_id;
245  } else if (s_debugLogging) {
246  std::cout << "The current event global_id " << event_id << " is lower than previously written event "
247  << max_event_id << " - event ordering not strictly preserved" << std::endl;
248  }
249 
250  if (s_debugLogging) {
251  std::cout << "Writing event " << written_event_ids.size() << " with global_id " << event_id
252  << " read from file " << next_buffer_id << std::endl;
253  } else if (read_events % s_printInterval == 0) {
254  std::cout << "Read " << read_events << " out of " << total_events << " events and wrote "
255  << written_event_ids.size() << " events" << std::endl;
256  }
257 
258  const WriteEvent writeEvent{event};
259  if (writeEvent.status() != WriteEvent::Status::OK) {
260  std::cerr << "Error in event serialisation" << std::endl;
261  return 1;
262  }
263 
264  const uint32_t sizeInBytes = writeEvent.size() * sizeof(uint32_t);
265  uint32_t writeSizeOnDisk{0};
266  const EventStorage::DWError err_code = writer->putData(sizeInBytes, writeEvent.blob().get(), writeSizeOnDisk);
267  if (err_code != EventStorage::DWError::DWOK or not writer->good()) {
268  std::cerr << "Error writing to file, exiting..." << std::endl;
269  return 1;
270  }
271  if (writeSizeOnDisk != sizeInBytes) {
272  std::cerr << "Error writing to file, wrote " << writeSizeOnDisk << " instead of " << sizeInBytes << " words" << std::endl;
273  return 1;
274  }
275  }
276 
277  std::cout << "Merging successful, read " << read_events << " out of " << total_events << " events and wrote "
278  << written_event_ids.size() << " events" << std::endl;
279 
280  return 0;
281 }
NONE
@ NONE
Definition: sTGCenumeration.h:13
WriteEvent::status
Status status() const
Definition: trigbs_orderedMerge.cxx:108
max
#define max(a, b)
Definition: cfImp.cxx:41
xAOD::uint32_t
setEventNumber uint32_t
Definition: EventInfo_v1.cxx:127
make_unique
std::unique_ptr< T > make_unique(Args &&... args)
Definition: SkimmingToolEXOT5.cxx:23
Event
Definition: trigbs_orderedMerge.cxx:42
WriteEvent::Status::OK
@ OK
Event::frag
eformat::read::FullEventFragment frag
Definition: trigbs_orderedMerge.cxx:44
WriteEvent::Status::NOT_OK
@ NOT_OK
trigbs_truncateEvents.readEvent
def readEvent(file)
Definition: trigbs_truncateEvents.py:10
WriteEvent::m_size
uint32_t m_size
Definition: trigbs_orderedMerge.cxx:60
Buffer::sort
void sort()
Definition: trigbs_orderedMerge.cxx:119
WriteEvent::blob
const std::unique_ptr< uint32_t[]> & blob() const
Definition: trigbs_orderedMerge.cxx:109
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment
eformat::FullEventFragment< PointerType > FullEventFragment
Definition: RawEvent.h:26
WriteEvent
Definition: trigbs_orderedMerge.cxx:50
WriteEvent::m_robs
std::vector< eformat::write::ROBFragment > m_robs
Definition: trigbs_orderedMerge.cxx:58
Buffer::eventsInFile
unsigned int eventsInFile()
Definition: trigbs_orderedMerge.cxx:159
WriteEvent::WriteEvent
WriteEvent(const Event &readEvent)
Definition: trigbs_orderedMerge.cxx:63
Buffer::m_reader
std::unique_ptr< EventStorage::DataReader > m_reader
Definition: trigbs_orderedMerge.cxx:116
python.setupRTTAlg.size
int size
Definition: setupRTTAlg.py:39
WriteEvent::Status
Status
Definition: trigbs_orderedMerge.cxx:52
createCoolChannelIdFile.buffer
buffer
Definition: createCoolChannelIdFile.py:12
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
lumiFormat.i
int i
Definition: lumiFormat.py:85
Buffer::peek
std::optional< std::reference_wrapper< const Event > > peek() const
Definition: trigbs_orderedMerge.cxx:148
LArCellNtuple.argv
argv
Definition: LArCellNtuple.py:152
Buffer::next
Event next()
Definition: trigbs_orderedMerge.cxx:152
Buffer::m_size
size_t m_size
Definition: trigbs_orderedMerge.cxx:117
Buffer::reader
const EventStorage::DataReader & reader() const
Definition: trigbs_orderedMerge.cxx:162
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
WriteEvent::m_frag
eformat::write::FullEventFragment m_frag
Definition: trigbs_orderedMerge.cxx:57
DQHistogramMergeRegExp.argc
argc
Definition: DQHistogramMergeRegExp.py:20
WriteEvent::frag
const eformat::write::FullEventFragment & frag() const
Definition: trigbs_orderedMerge.cxx:110
plotBeamSpotMon.b
b
Definition: plotBeamSpotMon.py:77
Buffer::Buffer
Buffer(std::unique_ptr< EventStorage::DataReader > &&reader, size_t size)
Definition: trigbs_orderedMerge.cxx:140
Buffer::fillBuffer
EventStorage::DRError fillBuffer()
Definition: trigbs_orderedMerge.cxx:124
Buffer
Definition: trigbs_orderedMerge.cxx:114
OFFLINE_FRAGMENTS_NAMESPACE::ROBFragment
eformat::ROBFragment< PointerType > ROBFragment
Definition: RawEvent.h:27
WriteEvent::m_blob
std::unique_ptr< uint32_t[]> m_blob
Definition: trigbs_orderedMerge.cxx:56
main
int main(int argc, char **argv)
Definition: trigbs_orderedMerge.cxx:190
Event::Event
Event(std::unique_ptr< const uint32_t[]> &&b)
Definition: trigbs_orderedMerge.cxx:45
WriteEvent::m_status
Status m_status
Definition: trigbs_orderedMerge.cxx:55
a
TList * a
Definition: liststreamerinfos.cxx:10
Buffer::m_events
std::vector< Event > m_events
Definition: trigbs_orderedMerge.cxx:118
WriteEvent::size
uint32_t size() const
Definition: trigbs_orderedMerge.cxx:111
WriteEvent::m_robBlobs
std::vector< std::unique_ptr< uint32_t[]> > m_robBlobs
Definition: trigbs_orderedMerge.cxx:59
top
@ top
Definition: TruthClasses.h:64
calibdata.copy
bool copy
Definition: calibdata.py:27
collisions.reader
reader
read the goodrunslist xml file(s)
Definition: collisions.py:22
rp
ReadCards * rp
Definition: IReadCards.cxx:26
example.writer
writer
show summary of content
Definition: example.py:36
createWriter
std::unique_ptr< EventStorage::DataWriter > createWriter(const EventStorage::DataReader &reader)
Definition: trigbs_orderedMerge.cxx:167
Event::blob
std::unique_ptr< const uint32_t[]> blob
Definition: trigbs_orderedMerge.cxx:43
CaloCondBlobAlgs_fillNoiseFromASCII.blob
blob
Definition: CaloCondBlobAlgs_fillNoiseFromASCII.py:96