ATLAS Offline Software
Loading...
Searching...
No Matches
ByteStreamEventStorageInputSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7#include "DumpFrags.h"
11#include "EventStorage/pickDataReader.h"
12#include "EventStorage/DataReader.h"
13
14#include "Gaudi/Property.h"
15
21
22#include "eformat/HeaderMarker.h"
23#include "eformat/SourceIdentifier.h"
24#include "eformat/Issue.h"
25#include "eformat/Problem.h"
26#include "eformat/Version.h"
27#include "eformat/Status.h"
28#include "eformat/old/util.h"
29
30#include <cstdio>
31#include <string>
32#include <vector>
33#include <unistd.h>
34
35
36/******************************************************************************/
37// Constructor.
39 const std::string& name, ISvcLocator* pSvcLocator)
40 : base_class(name, pSvcLocator)
43 , m_reader()
44 , m_evtOffsets()
45 , m_evtInFile(0)
47 , m_fileGUID("")
48 , m_robProvider("ROBDataProviderSvc", name)
49{
50 assert(pSvcLocator != nullptr);
51}
52
53
54/******************************************************************************/
56
57
58/******************************************************************************/
59StatusCode
61{
62 ATH_MSG_INFO("Initializing");
63
64 ATH_CHECK(m_inputMetadata.retrieve());
65 ATH_CHECK(m_storeGate.retrieve());
66 ATH_CHECK(m_robProvider.retrieve());
67
68 return StatusCode::SUCCESS;
69}
70
71
72/******************************************************************************/
73StatusCode
75{
76 // close moved to EventSelector for explicit coupling with incident
77 return StatusCode::SUCCESS;
78}
79
80
81/******************************************************************************/
82StatusCode
84 return StatusCode::SUCCESS;
85}
86
87
88/******************************************************************************/
89// Can't inline this because it is called via pointer to the base class
90long
95
96
97/******************************************************************************/
98StatusCode
100{
101 // default goes into ByteStreamMetadata
102 auto bsmdc = std::make_unique<ByteStreamMetadataContainer>();
103 bsmdc->push_back(std::make_unique<ByteStreamMetadata>(*m_reader));
104 ATH_MSG_DEBUG("ByteStreamMetadata:\n" << *(bsmdc->front()));
105
106 return m_inputMetadata->record(std::move(bsmdc), "ByteStreamMetadata");
107}
108
109
110/******************************************************************************/
111const RawEvent*
113{
114 std::lock_guard<std::mutex> lock(m_readerMutex);
115 const EventContext& context{Gaudi::Hive::currentContext()};
116 EventCache* cache = m_eventsCache.get(context);
117 // initialize before building RawEvent
118 cache->releaseEvent();
119
120 // Load data buffer from file
121 unsigned int eventSize;
122 if (readerReady()) {
123 //get current event position (cast to long long until native tdaq implementation)
124 m_evtInFile--;
126 DRError ecode = m_reader->getData(eventSize, &(cache->data), m_evtOffsets.at(m_evtInFile - 1));
127
128 if (DRWAIT == ecode && m_wait > 0) {
129 do {
130 // wait for n seconds
131 ATH_MSG_DEBUG("Waiting for input: " << m_wait << " seconds");
132 int result = usleep(static_cast<int>(m_wait * 1e6));
133 if (result != 0) {
134 ATH_MSG_ERROR("System Error while running sleep");
135 return nullptr;
136 }
137 } while(m_reader->getData(eventSize, &(cache->data), m_evtFileOffset) == DRWAIT);
138 } else if (DROK != ecode) {
139 ATH_MSG_ERROR("Error reading previous event");
141 }
142 ATH_MSG_DEBUG("Event Size " << eventSize);
143 }
144 else {
145 ATH_MSG_ERROR("DataReader not ready. Need to getBlockIterator first");
146 return nullptr;
147 }
148
149 // Use buffer to build FullEventFragment
150 try {
151 buildFragment(cache, eventSize, true);
152 }
153 catch (...) {
154 // rethrow any exceptions
155 throw;
156 }
157
158 if (cache->rawEvent == nullptr) {
159 ATH_MSG_ERROR("Failure to build fragment");
160 return nullptr;
161 }
162
163 // Set it for the data provider
164 m_robProvider->setNextEvent(context, cache->rawEvent.get());
165 m_robProvider->setEventStatus(context, cache->eventStatus);
166
167 // dump
168 if (m_dump) {
169 DumpFrags::dump(cache->rawEvent.get());
170 }
171 ATH_MSG_DEBUG( "switched to previous event in slot " << context);
172 return cache->rawEvent.get();
173}
174
175
176/******************************************************************************/
177// Read the next event.
178const RawEvent*
180
181 std::lock_guard<std::mutex> lock( m_readerMutex );
182 const EventContext& context{ Gaudi::Hive::currentContext() };
183 EventCache* cache = m_eventsCache.get(context);
184
185 // initialize before building RawEvent
186 cache->releaseEvent();
187
188 // Load data buffer from file
189 unsigned int eventSize;
190 if (readerReady()) {
191 DRError ecode;
192 // Check if have moved back from high water mark
193 m_evtInFile++; // increment iterator
194 if (m_evtInFile+1 > m_evtOffsets.size()) {
195 //get current event position (cast to long long until native tdaq implementation)
196 ATH_MSG_DEBUG("nextEvent _above_ high water mark");
197 m_evtFileOffset = static_cast<long long>(m_reader->getPosition());
198 m_evtOffsets.push_back(m_evtFileOffset);
199 ecode = m_reader->getData(eventSize, &(cache->data));
200 } else {
201 // Load from previous offset
202 ATH_MSG_DEBUG("nextEvent below high water mark");
204 ecode = m_reader->getData(eventSize, &(cache->data), m_evtFileOffset);
205 }
206
207 if (DRWAIT == ecode && m_wait > 0) {
208 do {
209 // wait for n seconds
210 ATH_MSG_DEBUG("Waiting for input: " << m_wait << " seconds");
211 int result = usleep(static_cast<int>(m_wait * 1e6));
212 if (result != 0) {
213 ATH_MSG_ERROR("System Error while running sleep");
214 return nullptr;
215 }
216 } while(m_reader->getData(eventSize, &(cache->data)) == DRWAIT);
217 } else if (DROK != ecode) {
218 ATH_MSG_ERROR("Error reading next event");
220 }
221 ATH_MSG_DEBUG("Event Size " << eventSize);
222
223 } else {
224 ATH_MSG_ERROR("DataReader not ready. Need to getBlockIterator first");
225 return nullptr;
226 }
227
228 // Use buffer to build FullEventFragment
229 try {
230 buildFragment(cache, eventSize, true);
231 }
232 catch (...) {
233 // rethrow any exceptions
234 throw;
235 }
236
237 if (cache->rawEvent == nullptr) {
238 ATH_MSG_ERROR("Failure to build fragment");
239 return nullptr;
240 }
241
242 // Set it for the data provider
243 m_robProvider->setNextEvent(context, cache->rawEvent.get());
244 m_robProvider->setEventStatus(context, cache->eventStatus);
245
246 //++m_totalEventCounter;
247
248 // dump
249 if (m_dump) {
250 DumpFrags::dump(cache->rawEvent.get());
251 }
252 ATH_MSG_DEBUG("switched to next event in slot " << context);
253 return cache->rawEvent.get();
254}
255
256
257/******************************************************************************/
258void
260{
261 const EventContext& context{Gaudi::Hive::currentContext()};
262 const RawEvent* const event = m_eventsCache.get(context)->rawEvent.get();
263 m_eventsCache.get(context)->eventStatus = validateEvent(event);
264}
265
266
267/******************************************************************************/
268unsigned
270{
271 unsigned int status = 0;
272 if (m_valEvent) {
273 // check validity
274 std::vector<eformat::FragmentProblem> problems;
275 rawEvent->problems(problems);
276
277 if(!problems.empty()) {
278 status += 0x01000000;
279
280 // bad event
281 ATH_MSG_WARNING("Failed to create FullEventFragment");
282 for(const auto& problem : problems)
283 ATH_MSG_WARNING(eformat::helper::FragmentProblemDictionary.string(problem));
284
286 }
287
288 if(!ROBFragmentCheck(rawEvent)) {
289 status += 0x02000000;
290
291 // bad event
292 ATH_MSG_ERROR("Skipping bad event");
294 }
295 } else {
296 ATH_MSG_DEBUG("Processing event without validating.");
297 }
298 return status;
299}
300
301
302/******************************************************************************/
303void
305 EventCache* cache, uint32_t eventSize, bool validate) const {
308 DataType* fragment = reinterpret_cast<DataType*>(cache->data);
309
310 if (validate) {
311 // check fragment type
312 const DataType headWord = fragment[0];
313 ATH_MSG_DEBUG("First word of the fragment " << MSG::hex << headWord << MSG::dec);
314 // format version
315 const DataType formatVersion = eformat::helper::Version(fragment[3]).major_version();
316 ATH_MSG_DEBUG("Format version" << MSG::hex << formatVersion << MSG::dec);
317 // error message
318 if((formatVersion != eformat::MAJOR_DEFAULT_VERSION) &&
319 (formatVersion != eformat::MAJOR_V24_VERSION) &&
320 (formatVersion != eformat::MAJOR_V30_VERSION) &&
321 (formatVersion != eformat::MAJOR_V40_VERSION) &&
322 (formatVersion != eformat::MAJOR_V31_VERSION) ) {
323 ATH_MSG_ERROR("unsupported Format Version : "
324 << MSG::hex << formatVersion << MSG::dec);
325 }
326
327 if(eformat::FULL_EVENT == headWord || 0xcc1234cc == headWord) { // ROS = 0xcc1234cc
328 try {
329 // convert old fragment
330 if(formatVersion != eformat::MAJOR_DEFAULT_VERSION) {
331 // 100 for increase of data-size due to header conversion
332 const uint32_t newEventSize = eventSize + 1000;
333 DataType* newFragment = new DataType[newEventSize];
334 eformat::old::convert(fragment, newFragment, newEventSize);
335
336 // delete old fragment
337 delete [] fragment; fragment = nullptr;
338
339 // set new pointer
340 fragment = newFragment;
341 cache->data = reinterpret_cast< char* >(fragment);
342 }
343 } catch (const eformat::Issue& ex) {
344 // bad event
345 ATH_MSG_WARNING(ex.what());
346 ATH_MSG_ERROR("Skipping bad event");
348 }
349 } else {
350 // Unknown fragment
351 ATH_MSG_FATAL("Unknown Header work in input fragment " << MSG::hex << headWord);
353 }
354 }
355
356 // This is a FullEventFragment
357 // make a new FEFrag in memory from it
358 cache->eventStatus = 0;
359 if(fragment[5] > 0) {
360 cache->eventStatus += eformat::helper::Status(fragment[6]).specific();
361 cache->eventStatus += (eformat::helper::Status(fragment[6]).generic() & 0x000000ff) << 16;
362 }
363
364 // This is a FullEventFragment
365 // make a new RawEvent in memory from it
366 cache->rawEvent = std::make_unique<RawEvent>(fragment);
367 ATH_MSG_DEBUG("Made an FullEventFragment from ES " << fragment);
368}
369
370
371/******************************************************************************/
372StatusCode
374{
375 std::lock_guard<std::mutex> lock(m_readerMutex);
376
377 // get file GUID
378 m_fileGUID = m_reader->GUID();
379
380 // reader returns -1 when end of the file is reached
381 if(m_evtFileOffset != -1) {
382 ATH_MSG_DEBUG("ByteStream File GUID: " << m_fileGUID);
383 ATH_MSG_DEBUG("ByteStream Event Position in File: " << m_evtFileOffset);
384
385 // To accommodate for skipEvents option in EventSelector
386 // While skipping BS event Selector does not return SUCCESS code,
387 // just advances silently through events. So SG content is not refreshed
388 // Lets do refresh of the event header here
389 std::string key = "ByteStreamDataHeader";
391
392 // Created data header element with BS provenance information
393 std::unique_ptr<DataHeaderElement> dataHeaderElement = makeBSProvenance();
394 // Create data header itself
395 std::unique_ptr<DataHeader> dataHeader = std::make_unique<DataHeader>();
396 // Declare header primary
397 dataHeader->setStatus(DataHeader::Input);
398 // Set processTag
399 dataHeader->setProcessTag(dataHeaderElement->getKey());
400 //add the data header element self reference to the object vector
401 dataHeader->insert(*std::move(dataHeaderElement));
402
403 // Clean up EventInfo from the previous event
404 key = m_eventInfoKey.value();
406 // Now add ref to xAOD::EventInfo
407 std::unique_ptr<IOpaqueAddress> iopx = std::make_unique<ByteStreamAddress>(
409 ATH_CHECK(m_storeGate->recordAddress(key, std::move(iopx)));
410 const SG::DataProxy* ptmpx = m_storeGate->transientProxy(
412 if (ptmpx != nullptr) {
413 DataHeaderElement element(ptmpx, 0, key);
414 dataHeader->insert(element);
415 }
416
417 // Clean up auxiliary EventInfo from the previous event
418 key = m_eventInfoKey.value() + "Aux.";
420 // Now add ref to xAOD::EventAuxInfo
421 std::unique_ptr<IOpaqueAddress> iopaux = std::make_unique<ByteStreamAddress>(
423 ATH_CHECK(m_storeGate->recordAddress(key, std::move(iopaux)));
424 const SG::DataProxy* ptmpaux = m_storeGate->transientProxy(
426 if (ptmpaux !=0) {
427 DataHeaderElement element(ptmpaux, 0, key);
428 dataHeader->insert(element);
429 }
430
431 // Record new data header.Boolean flags will allow it's deletion in case
432 // of skipped events.
433 ATH_CHECK(m_storeGate->record<DataHeader>(dataHeader.release(),
434 "ByteStreamDataHeader", true, false, true));
435 }
436 return StatusCode::SUCCESS;
437}
438
439
440/******************************************************************************/
441void
443{
444 // cleanup parts of previous event and re-init them
445 if (rawEvent) {
446 rawEvent.reset(nullptr);
447 eventStatus = 0;
448 }
449
450 if (data) {
451 delete [] data;
452 data = nullptr;
453 }
454}
455
456
457/******************************************************************************/
462
463
464/******************************************************************************/
465void
467{
468 if (clearMetadata) {
469 ATH_MSG_WARNING("Clearing input metadata store");
470 StatusCode status = m_inputMetadata->clearStore();
471 if (!status.isSuccess()) {
472 ATH_MSG_WARNING("Unable to clear Input MetaData Proxies");
473 }
474 }
475
476 if (!readerReady()) {
477 ATH_MSG_INFO("No more events in this run, high water mark for this file = "
478 << m_evtOffsets.size()-1);
479 }
480
481 m_reader.reset();
482}
483
484
485/******************************************************************************/
486bool
488{
489 // enable SequenceReading
490 m_reader->enableSequenceReading();
491 return true;
492}
493
494
495/******************************************************************************/
496bool
501
502
503/******************************************************************************/
504std::pair<long,std::string>
506{
507 // open the file
508 if(m_reader != 0) closeBlockIterator();
509
510 m_reader = std::unique_ptr<EventStorage::DataReader>(pickDataReader(fileName));
511
512 if(m_reader == nullptr) {
513 ATH_MSG_ERROR("Failed to open file " << fileName);
515 return std::make_pair(-1,"END");
516 }
517
518 // Initialize offset vector
519 m_evtOffsets.resize(m_reader->eventsInFile(), -1);
520 m_evtOffsets.clear();
521
522 // Get ByteStream Metadata into Input MetaData Store
523 // (include begin Input File Incident)
524 if (loadMetadata().isSuccess()) {
525 ATH_MSG_DEBUG("Recorded ByteStreamMetadata in InputMetaDataStore");
526 } else {
527 ATH_MSG_ERROR("Unable to record ByteStreamMetadata in InputMetaDataStore");
528 return std::make_pair(-1, "FAIL");
529 }
530
531 m_evtInFile = 0;
532
533 // enable sequentialReading if multiple files
534 if(m_sequential) {
535 bool test = setSequentialRead();
536 if (!test) return std::make_pair(-1,"SEQ");
537 }
538
539 ATH_MSG_INFO("Picked valid file: " << m_reader->fileName());
540 // initialize offsets and counters
541 m_evtOffsets.push_back(static_cast<long long>(m_reader->getPosition()));
542 return std::make_pair(m_reader->eventsInFile(), m_reader->GUID());
543}
544
545
546/******************************************************************************/
547bool
549{
550 bool eofFlag(false);
551
552 if (m_reader) eofFlag = m_reader->endOfFile();
553 else {
554 ATH_MSG_INFO("eformat reader object not initialized");
555 return false;
556 }
557
558 bool moreEvent = m_reader->good();
559
560 return (!eofFlag) && moreEvent;
561}
562
563
564/******************************************************************************/
565bool
567{
568 bool allOK = true;
569 uint32_t total = re->nchildren();
570 uint32_t lastId = 0;
571 std::vector<eformat::FragmentProblem> problems;
572
573 for (size_t i = 0; i<total; ++i) {
575 re->child(fp, i);
576
578 lastId = f.source_id();
579
580 problems.clear();
581 f.problems(problems);
582 if(!problems.empty()) {
583 allOK = false;
584 for(const auto& problem : problems) {
585 ATH_MSG_WARNING("Failed to create ROBFragment id = " << lastId << ", "
586 << eformat::helper::SourceIdentifier(lastId).human() << " : "
587 << eformat::helper::FragmentProblemDictionary.string(problem));
588 }
589 }
590 }
591
592 return allOK;
593}
594
595
596/******************************************************************************/
597void
598ByteStreamEventStorageInputSvc::setEvent(void* data, unsigned int eventStatus)
599{
600 const EventContext& context{Gaudi::Hive::currentContext()};
601 return setEvent(context, data, eventStatus);
602}
603
604
605/******************************************************************************/
606void
608 void* data, unsigned int eventStatus)
609{
611 EventCache* cache = m_eventsCache.get(context);
612 cache->releaseEvent();
613
614 DataType* fragment = reinterpret_cast<DataType*>(data);
615 cache->rawEvent = std::make_unique<RawEvent>(fragment);
616 cache->eventStatus = eventStatus;
617
618 // Set it for the data provider
619 m_robProvider->setNextEvent(context, cache->rawEvent.get());
620 m_robProvider->setEventStatus(context, cache->eventStatus);
621
622 // Build a DH for use by other components
623 StatusCode rec_sg = generateDataHeader();
624 if (rec_sg != StatusCode::SUCCESS) {
625 ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! "
626 << rec_sg);
627 }
628}
629
630
631/******************************************************************************/
632const
634{
635 const EventContext& context{Gaudi::Hive::currentContext()};
636 return m_eventsCache.get(context)->rawEvent.get();
637}
638
639
640/******************************************************************************/
641unsigned int
643{
644 const EventContext& context{Gaudi::Hive::currentContext()};
645 return m_eventsCache.get(context)->eventStatus;
646}
647
648
649/******************************************************************************/
650std::unique_ptr<DataHeaderElement>
652{
653 Token token;
654 token.setDb(m_fileGUID);
655 token.setTechnology(0x00001000);
657
658 return std::make_unique<DataHeaderElement>(ClassID_traits<DataHeader>::ID(),
659 "StreamRAW", std::move(token));
660}
const boost::regex re(r_e)
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the ByteStreamEventStorageInputSvc class.
This file contains the class definition for the ByteStreamMetadataContainer class.
OFFLINE_FRAGMENTS_NAMESPACE::PointerType DataType
This file contains the class definition for the DataHeader and DataHeaderElement classes.
char data[hepevt_bytes_allocation_ATLAS]
Definition HepEvt.cxx:11
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition RawEvent.h:37
This file contains the class definition for the Token class (migrated from POOL).
SG::SlotSpecificObj< EventCache > m_eventsCache
virtual unsigned int currentEventStatus() const override
Return the current event status.
virtual void setEvent(void *data, unsigned int eventStatus) override
void buildFragment(EventCache *cache, uint32_t eventSize, bool validate) const
virtual const RawEvent * currentEvent() const override
Implementation of the IByteStreamInputSvc interface methods.
std::unique_ptr< DataHeaderElement > makeBSProvenance() const
ServiceHandle< StoreGateSvc > m_storeGate
Pointer to StoreGate.
ByteStreamEventStorageInputSvc(const std::string &name, ISvcLocator *pSvcLocator)
Constructors:
StatusCode deleteEntry(const std::string &key)
virtual StatusCode initialize() override
Required of all Gaudi Services.
Gaudi::Property< std::string > m_eventInfoKey
virtual const RawEvent * nextEvent() override
++, new
virtual ~ByteStreamEventStorageInputSvc()
Destructor.
virtual const RawEvent * previousEvent() override
–, old
ServiceHandle< StoreGateSvc > m_inputMetadata
std::unique_ptr< EventStorage::DataReader > m_reader
DataReader from EventStorage.
ServiceHandle< IROBDataProviderSvc > m_robProvider
virtual StatusCode generateDataHeader() override
virtual std::pair< long, std::string > getBlockIterator(const std::string &fileName) override
long long int m_evtFileOffset
last read in event offset within a file, can be -1
std::vector< long long int > m_evtOffsets
offset for event i in that file
virtual void closeBlockIterator(bool clearMetadata=true) override
This class provides a persistent form for the TransientAddress.
Definition DataHeader.h:37
This class provides the layout for summary information stored for data written to POOL.
Definition DataHeader.h:123
static void dump(const RawEvent *re)
dump fragments from FullEventFragment
Definition DumpFrags.h:21
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition Token.h:21
Token & setDb(const Guid &db)
Set database name.
Definition Token.h:66
Token & setOid(const OID_t &oid)
Set object identifier.
Definition Token.h:85
Token & setTechnology(int t)
Set technology type.
Definition Token.h:79
const DataType * PointerType
Definition RawEvent.h:25
eformat::ROBFragment< PointerType > ROBFragment
Definition RawEvent.h:27
unsigned int eventStatus
check_tree() status of the current event
std::unique_ptr< RawEvent > rawEvent
current event
char * data
take ownership of RawEvent content
Declarations of methods for working with old eformat versions.