ATLAS Offline Software
Loading...
Searching...
No Matches
ByteStreamEventStorageOutputSvc.cxx
Go to the documentation of this file.
1/* Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration */
3
4#include <stdexcept>
5#include <stdlib.h>
6#include <sstream>
7
8#include <boost/shared_ptr.hpp>
9
11
13
15
16#include "EventStorage/EventStorageRecords.h"
17#include "EventStorage/RawFileName.h"
18#include "EventStorage/SimpleFileName.h"
19
21
22#include "GaudiKernel/ServiceHandle.h"
23#include "GaudiKernel/IIoComponentMgr.h"
24
26
28
29
31 const std::string& name, ISvcLocator* pSvcLocator)
32 : base_class(name, pSvcLocator) {
33}
34
35
36StatusCode
38 ATH_MSG_INFO("Initializing");
39
40 ATH_CHECK(m_eventInfoKey.initialize());
42 ATH_CHECK(m_tagInfoMgr.retrieve());
43 ATH_CHECK(m_metaDataStore.retrieve());
44
45 // register this service for 'I/O' events
46 ATH_CHECK(m_ioMgr.retrieve());
47 ATH_CHECK(m_ioMgr->io_register(this));
48
49 // Register output file's name with the I/O manager
50 if (!m_simpleFileName.empty()) {
51 ATH_CHECK(m_ioMgr->io_register(this, IIoComponentMgr::IoMode::WRITE,
53 ATH_MSG_VERBOSE("io_register[" << this->name() << "]("
54 << m_simpleFileName << ") [ok]");
55 }
56
58
59 return StatusCode::SUCCESS;
60}
61
62
63StatusCode
65 ATH_MSG_INFO("Reinitialization...");
66 return StatusCode::SUCCESS;
67}
68
69
70StatusCode
72 // Check whether anything has been written and whether the user wants metadata
73 // only files
74 if (m_dataWriter == 0 and m_writeEventless) {
75 const ByteStreamMetadata* metaData = getByteStreamMetadata();
76
77 // Try to write metadata to eventless file
78 bool dWok = initDataWriterContents(nullptr, metaData);
79 if (!dWok) ATH_MSG_WARNING("Could not write Metadata for eventless file");
80 }
81
82 return StatusCode::SUCCESS;
83}
84
85
86StatusCode
88 // clean up
89 ATH_MSG_DEBUG("deleting DataWriter");
90 std::lock_guard< std::mutex > lock(m_dataWriterMutex);
91 m_dataWriter.reset();
92 ATH_MSG_INFO("number of events written: " << m_totalEventCounter);
93 return StatusCode::SUCCESS;
94}
95
96
97bool
99 // Called on first event. Reads run parameters first event and/or first event
100 const xAOD::EventInfo* eventInfo = ctx == nullptr
102 : SG::get(m_eventInfoKey, *ctx);
103 if (eventInfo == nullptr) ATH_MSG_WARNING("failed to retrieve EventInfo");
104
105 const ByteStreamMetadata* metaData = ctx == nullptr ? getByteStreamMetadata() : getByteStreamMetadata(*ctx);
106 if (metaData == nullptr)
107 ATH_MSG_WARNING("failed to retrieve ByteStreamMetaData");
108
109 // Now open a file for writing from retrieved parameters
110 return initDataWriterContents(eventInfo, metaData);
111}
112
113
114bool
116 const xAOD::EventInfo* evtInfo,
117 const ByteStreamMetadata* metaData) {
118 // check that we have sufficient information to do what we need
119 if (evtInfo or metaData)
120 ATH_MSG_DEBUG("Looking up data writer parameters");
121 else
122 throw std::runtime_error("Cannot write data without run parameters");
123
124 // The heirarchy of run/lumiblock number, GNARR
125 //
126 // 1) User override
127 // 2) Event data
128 // 3) File metadata
129 // 4) default = unknown = 0
130 //
131 // Go from 4 to 1 and overwrite
132 DataWriterParameters params;
133 if (metaData != nullptr) updateDataWriterParameters(params, *metaData);
134 if (evtInfo != nullptr) updateDataWriterParameters(params, *evtInfo);
136
138
139 bool result = m_dataWriter->good();
140 if (result)
141 ATH_MSG_DEBUG("initialized output stream to file with name "
142 << params.fileNameCore);
143 else
144 ATH_MSG_ERROR("Unable to initialize file");
145
146 return result;
147}
148
149
150bool
152 // Read the next event.
153 return putEvent(re, Gaudi::Hive::currentContext());
154}
155
156
157bool
159 const RawEvent* re, const EventContext& ctx) {
160 // Read the next event.
163
164 EventCache* cache = m_eventCache.get(ctx);
165 cache->releaseEvent();
166
167 // we need the size and the start of the event to give to the data writer
168 cache->size = re->fragment_size_word();
169 ATH_MSG_DEBUG("event size = " << cache->size << ", start = " << re->start());
170
171 cache->buffer = std::make_unique< DataType[] >(cache->size);
172 std::copy(re->start(), re->start() + cache->size, cache->buffer.get());
173
174 {
175 // multiple data writers concurrently sounds like a bad idea
176 std::lock_guard< std::mutex > lock(m_dataWriterMutex);
177
178 // make sure the data writer is ready
179 ATH_MSG_DEBUG("looking up data writer");
180 if (!m_dataWriter) {
181 if (!initDataWriter(&ctx)) {
182 ATH_MSG_ERROR("Failed to initialize DataWriter");
183 return false;
184 }
185 }
186
187 // write event to disk
188 EventStorage::DWError write_result = m_dataWriter->putData(
189 sizeof(DataType) * cache->size,
190 reinterpret_cast< void* >(cache->buffer.get()));
191
192 // Report success or failure
193 if (write_result != EventStorage::DWOK) {
194 ATH_MSG_ERROR("Failed to write event to DataWriter");
195 return false;
196 }
198 }
199
200 return true;
201}
202
203
204StatusCode
206 ATH_MSG_INFO("I/O reinitialization...");
207
208 if (!m_ioMgr->io_hasitem(this)) {
209 ATH_MSG_FATAL("IoComponentMgr does not know about myself !");
210 return StatusCode::FAILURE;
211 }
212
213 if (!m_simpleFileName.empty()) {
214 std::string outputFile = m_simpleFileName;
215 ATH_MSG_INFO("I/O reinitialization, file = " << outputFile);
216 std::string &fname = outputFile;
217 if (!m_ioMgr->io_contains(this, fname)) {
218 ATH_MSG_ERROR("IoComponentMgr does not know about [" << fname << "] !");
219 return StatusCode::FAILURE;
220 }
221 ATH_CHECK(m_ioMgr->io_retrieve(this, fname));
222 // all good... copy over.
223 // modify directory
224 m_inputDir.setValue(outputFile.substr(0, outputFile.find_last_of("/")));
225 // FIXME: modify file name, not done for now because of
226 // IoUtils.update_io_registry vs. merge conflict.
227 //m_simpleFileName.setValue(
228 // outputFile.substr(outputFile.find_last_of("/") + 1));
229 }
230 ATH_MSG_DEBUG("Deleting DataWriter");
231 m_dataWriter.reset();
232
233 ATH_CHECK(reinit());
234
235 return StatusCode::SUCCESS;
236}
237
238
239const ByteStreamMetadata *
241{
243
244 if (!metaDataCont.isValid()) return nullptr;
245
246 if (metaDataCont->size() > 1)
247 ATH_MSG_WARNING("Multiple run parameters in MetaDataStore. "
248 "Bytestream format only supports one. Arbitrarily "
249 "choosing first.");
250
251 return metaDataCont->front();
252}
253
254
255const ByteStreamMetadata *
257 const EventContext& ctx)
258{
260
261 if (!metaDataCont.isValid()) return nullptr;
262
263 if (metaDataCont->size() > 1)
264 ATH_MSG_WARNING("Multiple run parameters in MetaDataStore. "
265 "Bytestream format only supports one. Arbitrarily "
266 "choosing first.");
267
268 return metaDataCont->front();
269}
270
271
272void
274 DataWriterParameters& params) const {
275
276 params.version = 0;
277 params.writingPath = m_inputDir;
278
279 if (m_run != 0) params.rPar.run_number = m_run;
280 ATH_MSG_DEBUG("Run number: " << params.rPar.run_number);
281
282 if (m_lumiBlockNumber != 0) params.lumiBlockNumber = m_lumiBlockNumber;
283 ATH_MSG_DEBUG("LB number: " << params.lumiBlockNumber);
284
285 if (!m_streamType.empty()) params.streamType = m_streamType;
286 if (!m_streamName.empty()) params.streamName = m_streamName;
287
288 if (params.streamType.empty()) params.streamType = "Single";
289 if (params.streamName.empty()) params.streamName = "Stream";
290
291 params.stream = params.streamType + "_" + params.streamName;
292
293 if (!m_projectTag.empty()) params.project = m_projectTag;
294
295 params.applicationName = m_appName;
296
297 if (!m_simpleFileName.empty()) {
298 // set up for simple file name
299 boost::shared_ptr<EventStorage::SimpleFileName> simple_file_name(
300 new EventStorage::SimpleFileName(m_simpleFileName));
301 params.theFNCB = simple_file_name;
302 } else {
303 // set up for production file name
304 daq::RawFileName fileNameObj(
305 params.project,
306 params.rPar.run_number,
307 params.streamType,
308 params.streamName,
309 params.lumiBlockNumber,
310 params.applicationName);
311 params.fileNameCore = fileNameObj.fileNameCore();
312 }
313
314 params.compression = m_compressEvents
315 ? EventStorage::ZLIB
316 : EventStorage::NONE;
317
318 params.maxFileMB = m_maxFileMB;
319 params.maxFileNE = params.rPar.max_events = m_maxFileNE;
320}
321
322
323void
325 DataWriterParameters& params, const xAOD::EventInfo& eventInfo) const {
326 ATH_MSG_DEBUG("Parsing run parameters from EventInfo" << eventInfo);
327
328 params.rPar.run_number = eventInfo.runNumber();
329 params.lumiBlockNumber = eventInfo.lumiBlock();
330
331 for (const xAOD::EventInfo::StreamTag& tag : eventInfo.streamTags())
332 if(!tag.type().empty()) {
333 params.streamType = tag.type();
334 break;
335 }
336
337 for (const xAOD::EventInfo::StreamTag& tag : eventInfo.streamTags())
338 if (!tag.name().empty()) {
339 params.streamName = tag.name();
340 break;
341 }
342
343 for (const auto& tag : eventInfo.detDescrTags())
344 params.fmdStrings.push_back(tag.first + ' ' + tag.second);
345
346 // Get beam metadata from TagInfo
347 std::string beamTypeStr = m_tagInfoMgr->findInputTag("beam_type");
348 if (!beamTypeStr.empty()) {
349 ATH_MSG_DEBUG("Got beam_type from input file metadata: " << beamTypeStr);
350 if (beamTypeStr == "collisions") {
351 params.rPar.beam_type = 1;
352 } else if (beamTypeStr == "cosmics") {
353 params.rPar.beam_type = 0;
354 }
355 }
356
357 std::string beamEnergyStr = m_tagInfoMgr->findInputTag("beam_energy");
358 if (!beamEnergyStr.empty()) {
359 try {
360 params.rPar.beam_energy = std::stoul(beamEnergyStr);
361 ATH_MSG_DEBUG("Got beam_energy from input file metadata: " << params.rPar.beam_energy);
362 } catch (const std::exception& e) {
363 ATH_MSG_WARNING("Could not convert beam_energy '" << beamEnergyStr << "' to number: " << e.what());
364 }
365 }
366
367 // Get IOV metadata strings from MetaDataStore
368 const std::vector<std::string>* iovMetaStrings = m_metaDataStore->tryRetrieve<std::vector<std::string>>("IOVMetaDataStrings");
369 if (iovMetaStrings != nullptr) {
370 ATH_MSG_DEBUG("Retrieved " << iovMetaStrings->size() << " IOV metadata strings from MetaDataStore");
371 for (const std::string& str : *iovMetaStrings) {
372 params.fmdStrings.push_back(str);
373 }
374 } else {
375 ATH_MSG_DEBUG("No IOV metadata strings found in MetaDataStore (this is normal if not configured)");
376 }
377
378 params.rPar.trigger_type = eventInfo.level1TriggerType();
379 params.rPar.detector_mask_LS = eventInfo.detectorMask();
380 params.rPar.detector_mask_MS = eventInfo.detectorMaskExt();
381
382 std::string event_type = "Event type: sim/data - ";
384 event_type += "is sim";
385 else event_type += "is data";
386
387 event_type += " , testbeam/atlas - ";
389 event_type += "is testbeam";
390 else event_type += "is atlas";
391
392 event_type += " , calibration/physics - ";
394 event_type += "is calibration";
395 else event_type += "is physics";
396
397 params.fmdStrings.push_back(std::move(event_type));
398}
399
400
401void
403 DataWriterParameters& params, const ByteStreamMetadata& metaData) const {
404 ATH_MSG_DEBUG("Parsing run parameters from metadata:\n" << metaData);
405
406 params.rPar.run_number = metaData.getRunNumber();
407 params.lumiBlockNumber = metaData.getLumiBlock();
408
409 const std::string stream = metaData.getStream();
410 const std::string::size_type split = stream.find('_');
411
412 if (split != std::string::npos and params.streamType.empty())
413 params.streamType = stream.substr(0,split);
414
415 if (split != std::string::npos and params.streamName.empty())
416 params.streamName = stream.substr(split+1);
417
418 params.project = metaData.getProject();
419 params.maxFileNE = params.rPar.max_events = metaData.getMaxEvents();
420
421 params.rPar.rec_enable = metaData.getRecEnable();
422 params.rPar.trigger_type = metaData.getTriggerType();
423 params.rPar.beam_type = metaData.getBeamType();
424 if (metaData.getBeamEnergy() != 0)
425 params.rPar.beam_energy = metaData.getBeamEnergy();
426
427 params.rPar.detector_mask_LS = metaData.getDetectorMask();
428 params.rPar.detector_mask_MS = metaData.getDetectorMask2();
429
430 for (const std::string& fmd : metaData.getFreeMetaDataStrings())
431 params.fmdStrings.push_back(fmd);
432 // if(fmd.find("Compression=") == std::string::npos)
433}
const boost::regex re(r_e)
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the ByteStreamDataWriter class.
This file contains the class definition for the ByteStreamEventStorageOutputSvc class.
OFFLINE_FRAGMENTS_NAMESPACE::PointerType DataType
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition RawEvent.h:37
Handle class for reading from StoreGate.
static std::unique_ptr< ByteStreamDataWriter > makeWriter(int version, const std::string &writingPath, const std::string &fileNameCore, const EventStorage::run_parameters_record &rPar, const std::vector< std::string > &fmdStrings, unsigned int maxFileNE=0, unsigned int maxFileMB=0, unsigned int startIndex=1, EventStorage::CompressionType compression=EventStorage::NONE, unsigned int compLevel=1)
Factory method returning data writer instance for specified version.
Gaudi::Property< unsigned int > m_maxFileMB
number of events per file
StatusCode reinit()
reinitialize the service when a fork() occurred/was-issued
Gaudi::Property< std::string > m_projectTag
Application Name.
Gaudi::Property< bool > m_writeEventless
Compress events.
Gaudi::Property< std::string > m_streamType
stream name
StatusCode initialize() override
Required of all Gaudi Services.
Gaudi::Property< unsigned int > m_maxFileNE
std::mutex m_dataWriterMutex
mutex to lock data writer during initialization or writing
Gaudi::Property< std::string > m_simpleFileName
use this string for filename, not from the "AgreedFileName"
bool initDataWriter(const EventContext *ctx=nullptr)
initialize EventStorage's DataWriter
Gaudi::Property< int > m_lumiBlockNumber
run number
virtual bool putEvent(const RawEvent *re) override
Implementation of the IByteStreamOutputSvc interface method putEvent.
SG::SlotSpecificObj< EventCache > m_eventCache
Cache of event data for each slot.
SG::ReadHandleKey< ByteStreamMetadataContainer > m_byteStreamMetadataKey
ByteStreamEventStorageOutputSvc(const std::string &name, ISvcLocator *pSvcLocator)
Constructors:
Gaudi::Property< int > m_run
Dump fragments.
Gaudi::Property< std::string > m_appName
File Tag.
void updateDataWriterParameters(DataWriterParameters &) const
Create DataWriter parameters from job properties.
SG::ReadHandleKey< xAOD::EventInfo > m_eventInfoKey
Gaudi::Property< std::string > m_streamName
EventStorage BS version to produce, "v5" for run1, or "current".
Gaudi::Property< bool > m_compressEvents
number of MB per file
std::unique_ptr< ByteStreamDataWriter > m_dataWriter
pointer to DataWriter
Gaudi::Property< std::string > m_inputDir
< directory for the data files
bool initDataWriterContents(const xAOD::EventInfo *, const ByteStreamMetadata *)
This class is the StoreGate data object for bytestream metadata.
uint64_t getDetectorMask() const
unsigned int getBeamType() const
const std::string & getStream() const
unsigned int getMaxEvents() const
unsigned int getRecEnable() const
unsigned int getRunNumber() const
unsigned int getTriggerType() const
unsigned int getBeamEnergy() const
unsigned int getLumiBlock() const
const std::vector< std::string > & getFreeMetaDataStrings() const
const std::string & getProject() const
uint64_t getDetectorMask2() const
virtual bool isValid() override final
Can the handle be successfully dereferenced?
Class describing a stream tag on the event.
uint32_t lumiBlock() const
The current event's luminosity block number.
bool eventType(EventType type) const
Check for one particular bitmask value.
uint16_t level1TriggerType() const
The Level-1 trigger type.
uint64_t detectorMaskExt() const
Bit field indicating which TTC zones are present in the event.
const std::vector< StreamTag > & streamTags() const
Get the streams that the event was put in.
@ IS_CALIBRATION
true: calibration, false: physics
@ IS_SIMULATION
true: simulation, false: data
@ IS_TESTBEAM
true: testbeam, false: full detector
uint32_t runNumber() const
The current event's run number.
uint64_t detectorMask() const
Bit field indicating which TTC zones are present in the event.
const DetDescrTags_t & detDescrTags() const
The detector description tags.
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
const DataType * PointerType
Definition RawEvent.h:25
const T * get(const ReadCondHandleKey< T > &key, const EventContext &ctx)
Convenience function to retrieve an object given a ReadCondHandleKey.
EventInfo_v1 EventInfo
Definition of the latest event info version.
std::unique_ptr< uint32_t[] > buffer
Underlying data structure.
Class containing parameters needed to initiate DataWriter.