ATLAS Offline Software
Loading...
Searching...
No Matches
ByteStreamEventStorageOutputSvc.cxx
Go to the documentation of this file.
1/* Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration */
3
4#include <stdexcept>
5#include <stdlib.h>
6#include <sstream>
7
8#include <boost/shared_ptr.hpp>
9
11
13
16
17#include "EventStorage/EventStorageRecords.h"
18#include "EventStorage/RawFileName.h"
19#include "EventStorage/SimpleFileName.h"
20
22
23#include "GaudiKernel/ServiceHandle.h"
24#include "GaudiKernel/IIoComponentMgr.h"
25
27
29
30
32 const std::string& name, ISvcLocator* pSvcLocator)
33 : base_class(name, pSvcLocator) {
34}
35
36
37StatusCode
39 ATH_MSG_INFO("Initializing");
40
41 ATH_CHECK(m_eventInfoKey.initialize());
43 ATH_CHECK(m_tagInfoMgr.retrieve());
44 ATH_CHECK(m_metaDataStore.retrieve());
45
46 // register this service for 'I/O' events
47 ATH_CHECK(m_ioMgr.retrieve());
48 ATH_CHECK(m_ioMgr->io_register(this));
49
50 // Register output file's name with the I/O manager
51 if (!m_simpleFileName.empty()) {
52 ATH_CHECK(m_ioMgr->io_register(this, IIoComponentMgr::IoMode::WRITE,
54 ATH_MSG_VERBOSE("io_register[" << this->name() << "]("
55 << m_simpleFileName << ") [ok]");
56 }
57
58 // validate m_eformatVersion
59 const std::vector< std::string > choices_ef{"current", "v40", "run1"};
60 if (std::find(choices_ef.begin(), choices_ef.end(), m_eformatVersion)
61 == choices_ef.end()) {
62 ATH_MSG_FATAL("Unexpected value for EformatVersion property: "
64 return StatusCode::FAILURE;
65 }
66 ATH_MSG_INFO("eformat version to use: \"" << m_eformatVersion << "\"");
67
68 // validate m_eventStorageVersion
69 const std::vector< std::string > choices_es{"current", "v5", "run1"};
70 if (std::find(choices_es.begin(), choices_es.end(), m_eventStorageVersion)
71 == choices_es.end()) {
72 ATH_MSG_FATAL("Unexpected value for EventStorageVersion property: "
74 return StatusCode::FAILURE;
75 }
76 ATH_MSG_INFO("event storage (BS) version to use: \""
77 << m_eventStorageVersion << "\"");
78
79 m_isRun1 = (m_eformatVersion == "v40" or m_eformatVersion == "run1");
80
82
83 return StatusCode::SUCCESS;
84}
85
86
87StatusCode
89 ATH_MSG_INFO("Reinitialization...");
90 return StatusCode::SUCCESS;
91}
92
93
94StatusCode
96 // Check whether anything has been written and whether the user wants metadata
97 // only files
98 if (m_dataWriter == 0 and m_writeEventless) {
99 const ByteStreamMetadata* metaData = getByteStreamMetadata();
100
101 // Try to write metadata to eventless file
102 bool dWok = initDataWriterContents(nullptr, metaData);
103 if (!dWok) ATH_MSG_WARNING("Could not write Metadata for eventless file");
104 }
105
106 return StatusCode::SUCCESS;
107}
108
109
110StatusCode
112 // clean up
113 ATH_MSG_DEBUG("deleting DataWriter");
114 m_dataWriter.reset();
115 std::lock_guard< std::mutex > lock(m_dataWriterMutex);
116 ATH_MSG_INFO("number of events written: " << m_totalEventCounter);
117 return StatusCode::SUCCESS;
118}
119
120
121bool
123 // Called on first event. Reads run parameters first event and/or first event
124 const xAOD::EventInfo* eventInfo = ctx == nullptr
126 : SG::get(m_eventInfoKey, *ctx);
127 if (eventInfo == nullptr) ATH_MSG_WARNING("failed to retrieve EventInfo");
128
129 const ByteStreamMetadata* metaData = ctx == nullptr ? getByteStreamMetadata() : getByteStreamMetadata(*ctx);
130 if (metaData == nullptr)
131 ATH_MSG_WARNING("failed to retrieve ByteStreamMetaData");
132
133 // Now open a file for writing from retrieved parameters
134 return initDataWriterContents(eventInfo, metaData);
135}
136
137
138bool
140 const xAOD::EventInfo* evtInfo,
141 const ByteStreamMetadata* metaData) {
142 // check that we have sufficient information to do what we need
143 if (evtInfo or metaData)
144 ATH_MSG_DEBUG("Looking up data writer parameters");
145 else
146 throw std::runtime_error("Cannot write data without run parameters");
147
148 // The heirarchy of run/lumiblock number, GNARR
149 //
150 // 1) User override
151 // 2) Event data
152 // 3) File metadata
153 // 4) default = unknown = 0
154 //
155 // Go from 4 to 1 and overwrite
156 DataWriterParameters params;
157 if (metaData != nullptr) updateDataWriterParameters(params, *metaData);
158 if (evtInfo != nullptr) updateDataWriterParameters(params, *evtInfo);
160
162
163 bool result = m_dataWriter->good();
164 if (result)
165 ATH_MSG_DEBUG("initialized output stream to file with name "
166 << params.fileNameCore);
167 else
168 ATH_MSG_ERROR("Unable to initialize file");
169
170 return result;
171}
172
173
174bool
176 // Read the next event.
177 return putEvent(re, Gaudi::Hive::currentContext());
178}
179
180
181bool
183 const RawEvent* re, const EventContext& ctx) {
184 // Read the next event.
187
188 EventCache* cache = m_eventCache.get(ctx);
189 cache->releaseEvent();
190
191 // we need the size and the start of the event to give to the data writer
192 cache->size = re->fragment_size_word();
193 ATH_MSG_DEBUG("event size = " << cache->size << ", start = " << re->start());
194
195 if (m_isRun1) {
196 // convert to current eformat
197 // allocate some extra space just in case
198 ATH_MSG_DEBUG("converting Run 1 format ");
199
200 cache->size += 128;
201 cache->buffer = std::make_unique< DataType[] >(cache->size);
202 ATH_MSG_DEBUG("created buffer 0x"
203 << std::hex << cache->buffer.get() << std::dec);
204
205 // This builds no-checksum headers, should use the same
206 // checksum type as original event
208 re->start(), cache->buffer.get(), cache->size);
209 ATH_MSG_DEBUG("filled buffer");
210
211 if (cache->size == 0) {
212 // not enough space in buffer
213 ATH_MSG_ERROR("Failed to convert event, buffer is too small");
214 return false;
215 }
216
217 ATH_MSG_DEBUG("event size after conversion = " << cache->size
218 << " version = " << cache->buffer.get()[3]);
219
220 } else {
221 cache->buffer = std::make_unique< DataType[] >(cache->size);
222 std::copy(re->start(), re->start() + cache->size, cache->buffer.get());
223 }
224
225 {
226 // multiple data writers concurrently sounds like a bad idea
227 std::lock_guard< std::mutex > lock(m_dataWriterMutex);
228
229 // make sure the data writer is ready
230 ATH_MSG_DEBUG("looking up data writer");
231 if (!m_dataWriter) {
232 if (!initDataWriter(&ctx)) {
233 ATH_MSG_ERROR("Failed to initialize DataWriter");
234 return false;
235 }
236 }
237
238 // write event to disk
239 EventStorage::DWError write_result = m_dataWriter->putData(
240 sizeof(DataType) * cache->size,
241 reinterpret_cast< void* >(cache->buffer.get()));
242
243 // Report success or failure
244 if (write_result != EventStorage::DWOK) {
245 ATH_MSG_ERROR("Failed to write event to DataWriter");
246 return false;
247 }
249 }
250
251 return true;
252}
253
254
255StatusCode
257 ATH_MSG_INFO("I/O reinitialization...");
258
259 if (!m_ioMgr->io_hasitem(this)) {
260 ATH_MSG_FATAL("IoComponentMgr does not know about myself !");
261 return StatusCode::FAILURE;
262 }
263
264 if (!m_simpleFileName.empty()) {
265 std::string outputFile = m_simpleFileName;
266 ATH_MSG_INFO("I/O reinitialization, file = " << outputFile);
267 std::string &fname = outputFile;
268 if (!m_ioMgr->io_contains(this, fname)) {
269 ATH_MSG_ERROR("IoComponentMgr does not know about [" << fname << "] !");
270 return StatusCode::FAILURE;
271 }
272 ATH_CHECK(m_ioMgr->io_retrieve(this, fname));
273 // all good... copy over.
274 // modify directory
275 m_inputDir.setValue(outputFile.substr(0, outputFile.find_last_of("/")));
276 // FIXME: modify file name, not done for now because of
277 // IoUtils.update_io_registry vs. merge conflict.
278 //m_simpleFileName.setValue(
279 // outputFile.substr(outputFile.find_last_of("/") + 1));
280 }
281 ATH_MSG_DEBUG("Deleting DataWriter");
282 m_dataWriter.reset();
283
284 ATH_CHECK(reinit());
285
286 return StatusCode::SUCCESS;
287}
288
289
290const ByteStreamMetadata *
292{
294
295 if (!metaDataCont.isValid()) return nullptr;
296
297 if (metaDataCont->size() > 1)
298 ATH_MSG_WARNING("Multiple run parameters in MetaDataStore. "
299 "Bytestream format only supports one. Arbitrarily "
300 "choosing first.");
301
302 return metaDataCont->front();
303}
304
305
306const ByteStreamMetadata *
308 const EventContext& ctx)
309{
311
312 if (!metaDataCont.isValid()) return nullptr;
313
314 if (metaDataCont->size() > 1)
315 ATH_MSG_WARNING("Multiple run parameters in MetaDataStore. "
316 "Bytestream format only supports one. Arbitrarily "
317 "choosing first.");
318
319 return metaDataCont->front();
320}
321
322
323void
325 DataWriterParameters& params) const {
326
327 if (m_eventStorageVersion == "v5" or m_eventStorageVersion == "run1")
328 params.version = 5;
329 else params.version = 0;
330
331 params.writingPath = m_inputDir;
332
333 if (m_run != 0) params.rPar.run_number = m_run;
334 ATH_MSG_DEBUG("Run number: " << params.rPar.run_number);
335
336 if (m_lumiBlockNumber != 0) params.lumiBlockNumber = m_lumiBlockNumber;
337 ATH_MSG_DEBUG("LB number: " << params.lumiBlockNumber);
338
339 if (!m_streamType.empty()) params.streamType = m_streamType;
340 if (!m_streamName.empty()) params.streamName = m_streamName;
341
342 if (params.streamType.empty()) params.streamType = "Single";
343 if (params.streamName.empty()) params.streamName = "Stream";
344
345 params.stream = params.streamType + "_" + params.streamName;
346
347 if (!m_projectTag.empty()) params.project = m_projectTag;
348
349 params.applicationName = m_appName;
350
351 if (!m_simpleFileName.empty()) {
352 // set up for simple file name
353 boost::shared_ptr<EventStorage::SimpleFileName> simple_file_name(
354 new EventStorage::SimpleFileName(m_simpleFileName));
355 params.theFNCB = simple_file_name;
356 } else {
357 // set up for production file name
358 daq::RawFileName fileNameObj(
359 params.project,
360 params.rPar.run_number,
361 params.streamType,
362 params.streamName,
363 params.lumiBlockNumber,
364 params.applicationName);
365 params.fileNameCore = fileNameObj.fileNameCore();
366 }
367
368 params.compression = m_compressEvents
369 ? EventStorage::ZLIB
370 : EventStorage::NONE;
371
372 params.maxFileMB = m_maxFileMB;
373 params.maxFileNE = params.rPar.max_events = m_maxFileNE;
374}
375
376
377void
379 DataWriterParameters& params, const xAOD::EventInfo& eventInfo) const {
380 ATH_MSG_DEBUG("Parsing run parameters from EventInfo" << eventInfo);
381
382 params.rPar.run_number = eventInfo.runNumber();
383 params.lumiBlockNumber = eventInfo.lumiBlock();
384
385 for (const xAOD::EventInfo::StreamTag& tag : eventInfo.streamTags())
386 if(!tag.type().empty()) {
387 params.streamType = tag.type();
388 break;
389 }
390
391 for (const xAOD::EventInfo::StreamTag& tag : eventInfo.streamTags())
392 if (!tag.name().empty()) {
393 params.streamName = tag.name();
394 break;
395 }
396
397 for (const auto& tag : eventInfo.detDescrTags())
398 params.fmdStrings.push_back(tag.first + ' ' + tag.second);
399
400 // Get beam metadata from TagInfo
401 std::string beamTypeStr = m_tagInfoMgr->findInputTag("beam_type");
402 if (!beamTypeStr.empty()) {
403 ATH_MSG_DEBUG("Got beam_type from input file metadata: " << beamTypeStr);
404 if (beamTypeStr == "collisions") {
405 params.rPar.beam_type = 1;
406 } else if (beamTypeStr == "cosmics") {
407 params.rPar.beam_type = 0;
408 }
409 }
410
411 std::string beamEnergyStr = m_tagInfoMgr->findInputTag("beam_energy");
412 if (!beamEnergyStr.empty()) {
413 try {
414 params.rPar.beam_energy = std::stoul(beamEnergyStr);
415 ATH_MSG_DEBUG("Got beam_energy from input file metadata: " << params.rPar.beam_energy);
416 } catch (const std::exception& e) {
417 ATH_MSG_WARNING("Could not convert beam_energy '" << beamEnergyStr << "' to number: " << e.what());
418 }
419 }
420
421 // Get IOV metadata strings from MetaDataStore
422 const std::vector<std::string>* iovMetaStrings = m_metaDataStore->tryRetrieve<std::vector<std::string>>("IOVMetaDataStrings");
423 if (iovMetaStrings != nullptr) {
424 ATH_MSG_DEBUG("Retrieved " << iovMetaStrings->size() << " IOV metadata strings from MetaDataStore");
425 for (const std::string& str : *iovMetaStrings) {
426 params.fmdStrings.push_back(str);
427 }
428 } else {
429 ATH_MSG_DEBUG("No IOV metadata strings found in MetaDataStore (this is normal if not configured)");
430 }
431
432 params.rPar.trigger_type = eventInfo.level1TriggerType();
433 params.rPar.detector_mask_LS = eventInfo.detectorMask();
434 params.rPar.detector_mask_MS = eventInfo.detectorMaskExt();
435
436 std::string event_type = "Event type: sim/data - ";
438 event_type += "is sim";
439 else event_type += "is data";
440
441 event_type += " , testbeam/atlas - ";
443 event_type += "is testbeam";
444 else event_type += "is atlas";
445
446 event_type += " , calibration/physics - ";
448 event_type += "is calibration";
449 else event_type += "is physics";
450
451 params.fmdStrings.push_back(std::move(event_type));
452}
453
454
455void
457 DataWriterParameters& params, const ByteStreamMetadata& metaData) const {
458 ATH_MSG_DEBUG("Parsing run parameters from metadata:\n" << metaData);
459
460 params.rPar.run_number = metaData.getRunNumber();
461 params.lumiBlockNumber = metaData.getLumiBlock();
462
463 const std::string stream = metaData.getStream();
464 const std::string::size_type split = stream.find('_');
465
466 if (split != std::string::npos and params.streamType.empty())
467 params.streamType = stream.substr(0,split);
468
469 if (split != std::string::npos and params.streamName.empty())
470 params.streamName = stream.substr(split+1);
471
472 params.project = metaData.getProject();
473 params.maxFileNE = params.rPar.max_events = metaData.getMaxEvents();
474
475 params.rPar.rec_enable = metaData.getRecEnable();
476 params.rPar.trigger_type = metaData.getTriggerType();
477 params.rPar.beam_type = metaData.getBeamType();
478 if (metaData.getBeamEnergy() != 0)
479 params.rPar.beam_energy = metaData.getBeamEnergy();
480
481 params.rPar.detector_mask_LS = metaData.getDetectorMask();
482 params.rPar.detector_mask_MS = metaData.getDetectorMask2();
483
484 for (const std::string& fmd : metaData.getFreeMetaDataStrings())
485 params.fmdStrings.push_back(fmd);
486 // if(fmd.find("Compression=") == std::string::npos)
487}
const boost::regex re(r_e)
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the ByteStreamDataWriter class.
This file contains the class definition for the ByteStreamEventStorageOutputSvc class.
OFFLINE_FRAGMENTS_NAMESPACE::PointerType DataType
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition RawEvent.h:37
Handle class for reading from StoreGate.
static std::unique_ptr< ByteStreamDataWriter > makeWriter(int version, const std::string &writingPath, const std::string &fileNameCore, const EventStorage::run_parameters_record &rPar, const std::vector< std::string > &fmdStrings, unsigned int maxFileNE=0, unsigned int maxFileMB=0, unsigned int startIndex=1, EventStorage::CompressionType compression=EventStorage::NONE, unsigned int compLevel=1)
Factory method returning data writer instance for specified version.
Gaudi::Property< unsigned int > m_maxFileMB
number of events per file
StatusCode reinit()
reinitialize the service when a fork() occurred/was-issued
Gaudi::Property< std::string > m_projectTag
Application Name.
Gaudi::Property< bool > m_writeEventless
Compress events.
Gaudi::Property< std::string > m_streamType
stream name
StatusCode initialize() override
Required of all Gaudi Services.
Gaudi::Property< unsigned int > m_maxFileNE
Gaudi::Property< std::string > m_eventStorageVersion
stream name for multiple output
std::mutex m_dataWriterMutex
mutex to lock data writer during initialization or writing
Gaudi::Property< std::string > m_simpleFileName
use this string for filename, not from the "AgreedFileName"
bool initDataWriter(const EventContext *ctx=nullptr)
initialize EventStorage's DataWriter
Gaudi::Property< int > m_lumiBlockNumber
run number
virtual bool putEvent(const RawEvent *re) override
Implementation of the IByteStreamOutputSvc interface method putEvent.
SG::SlotSpecificObj< EventCache > m_eventCache
Cache of event data for each slot.
SG::ReadHandleKey< ByteStreamMetadataContainer > m_byteStreamMetadataKey
Gaudi::Property< std::string > m_eformatVersion
EventStorage BS version to produce, "v5" for run1, or "current".
ByteStreamEventStorageOutputSvc(const std::string &name, ISvcLocator *pSvcLocator)
Constructors:
Gaudi::Property< int > m_run
Dump fragments.
Gaudi::Property< std::string > m_appName
File Tag.
void updateDataWriterParameters(DataWriterParameters &) const
Create DataWriter parameters from job properties.
SG::ReadHandleKey< xAOD::EventInfo > m_eventInfoKey
Gaudi::Property< std::string > m_streamName
eformat event version to produce, "v40" for run1, or "current"
Gaudi::Property< bool > m_compressEvents
number of MB per file
std::unique_ptr< ByteStreamDataWriter > m_dataWriter
pointer to DataWriter
Gaudi::Property< std::string > m_inputDir
< directory for the data files
bool initDataWriterContents(const xAOD::EventInfo *, const ByteStreamMetadata *)
This class is the StoreGate data object for bytestream metadata.
uint64_t getDetectorMask() const
unsigned int getBeamType() const
const std::string & getStream() const
unsigned int getMaxEvents() const
unsigned int getRecEnable() const
unsigned int getRunNumber() const
unsigned int getTriggerType() const
unsigned int getBeamEnergy() const
unsigned int getLumiBlock() const
const std::vector< std::string > & getFreeMetaDataStrings() const
const std::string & getProject() const
uint64_t getDetectorMask2() const
virtual bool isValid() override final
Can the handle be successfully dereferenced?
Class describing a stream tag on the event.
uint32_t lumiBlock() const
The current event's luminosity block number.
bool eventType(EventType type) const
Check for one particular bitmask value.
uint16_t level1TriggerType() const
The Level-1 trigger type.
uint64_t detectorMaskExt() const
Bit field indicating which TTC zones are present in the event.
const std::vector< StreamTag > & streamTags() const
Get the streams that the event was put in.
@ IS_CALIBRATION
true: calibration, false: physics
@ IS_SIMULATION
true: simulation, false: data
@ IS_TESTBEAM
true: testbeam, false: full detector
uint32_t runNumber() const
The current event's run number.
uint64_t detectorMask() const
Bit field indicating which TTC zones are present in the event.
const DetDescrTags_t & detDescrTags() const
The detector description tags.
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
const DataType * PointerType
Definition RawEvent.h:25
const T * get(const ReadCondHandleKey< T > &key, const EventContext &ctx)
Convenience function to retrieve an object given a ReadCondHandleKey.
uint32_t convert_to_40(const uint32_t *src, uint32_t *dest, uint32_t max, eformat::CheckSum event_checksum=eformat::NO_CHECKSUM, eformat::CheckSum rob_checksum=eformat::NO_CHECKSUM)
Converts a full event fragment or a ROS fragment, from some format to v4.0 format,...
Definition util.cxx:21
EventInfo_v1 EventInfo
Definition of the latest event info version.
std::unique_ptr< uint32_t[] > buffer
Underlying data structure.
Class containing parameters needed to initiate DataWriter.
Declarations of methods for working with old eformat versions.