ATLAS Offline Software
Classes | Functions
trigbs_orderedMerge.cxx File Reference
#include "eformat/FullEventFragmentNoTemplates.h"
#include "eformat/ROBFragmentNoTemplates.h"
#include "eformat/write/FullEventFragment.h"
#include "eformat/write/ROBFragment.h"
#include "eformat/compression.h"
#include "EventStorage/DataReader.h"
#include "EventStorage/pickDataReader.h"
#include "EventStorage/DataWriter.h"
#include "EventStorage/DRError.h"
#include "EventStorage/DWError.h"
#include "EventStorage/EventStorageRecords.h"
#include "EventStorage/ESCompression.h"
#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <optional>
#include <functional>
#include <unordered_set>

Go to the source code of this file.

Classes

struct  Event
 
class  WriteEvent
 
class  Buffer
 

Functions

std::unique_ptr< EventStorage::DataWriter > createWriter (const EventStorage::DataReader &reader)
 
int main (int argc, char **argv)
 

Detailed Description

Standalone executable to order and decompress events from multiple files and write them into a single file.

This is intended mainly for merging files from same run and LB but different streams. The reordering ensures uniform distribution of different kinds of events from different streams and removes duplicates. Using this script with files from different LBs is pointless since all events in LB N+1 are always after all events from LB N. The reordering is not strict and the level of out-of-order events is tied to the buffer size. The smaller the buffer, the larger the chance of finding lower-ID events still to be read from input files.

Definition in file trigbs_orderedMerge.cxx.

Function Documentation

◆ createWriter()

std::unique_ptr<EventStorage::DataWriter> createWriter ( const EventStorage::DataReader &  reader)

Definition at line 167 of file trigbs_orderedMerge.cxx.

167  {
168  EventStorage::run_parameters_record rp{};
169  rp.run_number = reader.runNumber();
170  rp.max_events = 0;
171  rp.rec_enable = 0;
172  rp.trigger_type = reader.triggerType();
173  constexpr static std::bitset<128> bitMask64{0xffffffffffffffff};
174  rp.detector_mask_LS = (reader.detectorMask() & bitMask64).to_ulong();
175  rp.detector_mask_MS = ((reader.detectorMask() >> 64) & bitMask64).to_ulong();
176  rp.beam_type = reader.beamType();
177  rp.beam_energy = reader.beamEnergy();
178 
179  return std::make_unique<EventStorage::DataWriter>(
180  /* writingPath = */ ".",
181  /* fileNameCore = */ "orderedMerge",
182  /* rPar = */ rp,
183  /* fmdStrings = */ reader.freeMetaDataStrings(),
184  /* startIndex = */ 1,
185  /* compression = */ EventStorage::CompressionType::NONE,
186  /* compLevel = */ 0
187  );
188 }

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 190 of file trigbs_orderedMerge.cxx.

190  {
191 
192  if (argc < 3) {
193  std::cout << "Usage: " << argv[0] << " FILE FILE [FILE]" << std::endl
194  << "At least 2 input files are required" << std::endl;
195  return 1;
196  }
197 
198  std::vector<Buffer> buffers;
199  size_t total_events{0}, read_events{0};
200  std::unordered_set<uint64_t> written_event_ids;
201  uint64_t max_event_id{0};
202 
203  std::cout << "Reading " << argc-1 << " files:" << std::endl;
204  for (int i=1; i<argc; ++i) {
205  std::cout << " - " << argv[i] << std::endl;
206  buffers.emplace_back(std::unique_ptr<EventStorage::DataReader>(pickDataReader(argv[i])), s_defaultBufferSize);
207  total_events += buffers.back().eventsInFile();
208  }
209 
210  std::cout << "Input files contain " << total_events << " events in total" << std::endl;
211 
212  std::unique_ptr<EventStorage::DataWriter> writer = createWriter(buffers[0].reader());
213 
214  while (read_events < total_events) {
215  std::optional<std::reference_wrapper<Buffer>> next_buffer;
216  uint64_t lowest_event_id{std::numeric_limits<uint64_t>::max()};
217  int buffer_id{-1}, next_buffer_id{-1};
218  for (Buffer& buffer : buffers) {
219  ++buffer_id;
220  std::optional<std::reference_wrapper<const Event>> event = buffer.peek();
221  if (not event.has_value()) {continue;}
222  const uint64_t event_id{event.value().get().frag.global_id()};
223  if (event_id < lowest_event_id) {
224  lowest_event_id = event_id;
225  next_buffer = buffer;
226  if (s_debugLogging) {
227  next_buffer_id = buffer_id;
228  }
229  }
230  }
231  if (not next_buffer.has_value()) {
232  std::cout << "End of inputs reached, read " << read_events << " out of " << total_events << " events" << std::endl;
233  break;
234  }
235 
236  const Event event = next_buffer.value().get().next();
237  ++read_events;
238  const uint64_t event_id{event.frag.global_id()};
239  if (not written_event_ids.insert(event_id).second) {
240  std::cout << "Duplicate event with global_id " << event_id << ", skipping" << std::endl;
241  continue;
242  }
243  if (event_id > max_event_id) {
244  max_event_id = event_id;
245  } else if (s_debugLogging) {
246  std::cout << "The current event global_id " << event_id << " is lower than previously written event "
247  << max_event_id << " - event ordering not strictly preserved" << std::endl;
248  }
249 
250  if (s_debugLogging) {
251  std::cout << "Writing event " << written_event_ids.size() << " with global_id " << event_id
252  << " read from file " << next_buffer_id << std::endl;
253  } else if (read_events % s_printInterval == 0) {
254  std::cout << "Read " << read_events << " out of " << total_events << " events and wrote "
255  << written_event_ids.size() << " events" << std::endl;
256  }
257 
258  const WriteEvent writeEvent{event};
259  if (writeEvent.status() != WriteEvent::Status::OK) {
260  std::cerr << "Error in event serialisation" << std::endl;
261  return 1;
262  }
263 
264  const uint32_t sizeInBytes = writeEvent.size() * sizeof(uint32_t);
265  uint32_t writeSizeOnDisk{0};
266  const EventStorage::DWError err_code = writer->putData(sizeInBytes, writeEvent.blob().get(), writeSizeOnDisk);
267  if (err_code != EventStorage::DWError::DWOK or not writer->good()) {
268  std::cerr << "Error writing to file, exiting..." << std::endl;
269  return 1;
270  }
271  if (writeSizeOnDisk != sizeInBytes) {
272  std::cerr << "Error writing to file, wrote " << writeSizeOnDisk << " instead of " << sizeInBytes << " words" << std::endl;
273  return 1;
274  }
275  }
276 
277  std::cout << "Merging successful, read " << read_events << " out of " << total_events << " events and wrote "
278  << written_event_ids.size() << " events" << std::endl;
279 
280  return 0;
281 }
NONE
@ NONE
Definition: sTGCenumeration.h:13
max
#define max(a, b)
Definition: cfImp.cxx:41
xAOD::uint32_t
setEventNumber uint32_t
Definition: EventInfo_v1.cxx:127
Event
Definition: trigbs_orderedMerge.cxx:42
WriteEvent::Status::OK
@ OK
WriteEvent
Definition: trigbs_orderedMerge.cxx:50
createCoolChannelIdFile.buffer
buffer
Definition: createCoolChannelIdFile.py:12
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
lumiFormat.i
int i
Definition: lumiFormat.py:85
LArCellNtuple.argv
argv
Definition: LArCellNtuple.py:152
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
DQHistogramMergeRegExp.argc
argc
Definition: DQHistogramMergeRegExp.py:20
Buffer
Definition: trigbs_orderedMerge.cxx:114
collisions.reader
reader
read the goodrunslist xml file(s)
Definition: collisions.py:22
rp
ReadCards * rp
Definition: IReadCards.cxx:26
example.writer
writer
show summary of content
Definition: example.py:36
createWriter
std::unique_ptr< EventStorage::DataWriter > createWriter(const EventStorage::DataReader &reader)
Definition: trigbs_orderedMerge.cxx:167