ATLAS Offline Software
Loading...
Searching...
No Matches
ByteStreamEventStorageInputSvc Class Reference

This class is the ByteStreamInputSvc for reading events written by EventStorage. More...

#include <ByteStreamEventStorageInputSvc.h>

Inheritance diagram for ByteStreamEventStorageInputSvc:
Collaboration diagram for ByteStreamEventStorageInputSvc:

Classes

struct  EventCache

Public Member Functions

 ByteStreamEventStorageInputSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Constructors:
virtual ~ByteStreamEventStorageInputSvc ()
 Destructor.
virtual StatusCode initialize () override
 Required of all Gaudi Services.
virtual StatusCode stop () override
virtual StatusCode finalize () override
virtual const RawEventcurrentEvent () const override
 Implementation of the IByteStreamInputSvc interface methods.
virtual const RawEventnextEvent () override
 ++, new
virtual const RawEventpreviousEvent () override
 –, old
virtual void setEvent (void *data, unsigned int eventStatus) override
virtual unsigned int currentEventStatus () const override
 Return the current event status.
virtual void validateEvent () override
virtual long positionInBlock () override
virtual std::pair< long, std::string > getBlockIterator (const std::string &fileName) override
virtual void closeBlockIterator (bool clearMetadata=true) override
virtual bool setSequentialRead ()
virtual bool ready () override
virtual StatusCode generateDataHeader () override

Private Types

enum  Advance { PREVIOUS = -1 , NEXT = 1 }

Private Member Functions

StatusCode loadMetadata ()
void buildFragment (EventCache *cache, uint32_t eventSize, bool validate) const
bool readerReady ()
bool ROBFragmentCheck (const RawEvent *) const
unsigned validateEvent (const RawEvent *const rawEvent) const
void setEvent (const EventContext &context, void *data, unsigned int eventStatus)
const RawEventgetEvent (Advance step)
std::unique_ptr< DataHeaderElementmakeBSProvenance () const
template<typename T>
StatusCode deleteEntry (const std::string &key)

Private Attributes

std::mutex m_readerMutex
SG::SlotSpecificObj< EventCachem_eventsCache
std::unique_ptr< EventStorage::DataReader > m_reader
 DataReader from EventStorage.
std::vector< long long int > m_evtOffsets
 offset for event i in that file
unsigned int m_evtInFile
long long int m_evtFileOffset
 last read in event offset within a file, can be -1
std::string m_fileGUID
 current file GUID
ServiceHandle< StoreGateSvcm_storeGate {this, "EventStore", "StoreGateSvc"}
 Pointer to StoreGate.
ServiceHandle< StoreGateSvcm_inputMetadata {this, "MetaDataStore", "StoreGateSvc/InputMetaDataStore"}
ServiceHandle< IROBDataProviderSvcm_robProvider
Gaudi::Property< bool > m_sequential {this, "EnableSequential", false, "enable sequential reading"}
Gaudi::Property< bool > m_dump {this, "DumpFlag", false, "Dump fragments"}
Gaudi::Property< float > m_wait {this, "WaitSecs", 0.0f, "Seconds to wait if input is in wait state"}
Gaudi::Property< bool > m_valEvent {this, "ValidateEvent", false, "switch on check_tree when reading events"}
Gaudi::Property< std::string > m_eventInfoKey {this, "EventInfoKey", "EventInfo", "Key of EventInfo in metadata store"}

Detailed Description

This class is the ByteStreamInputSvc for reading events written by EventStorage.

Definition at line 35 of file ByteStreamEventStorageInputSvc.h.

Member Enumeration Documentation

◆ Advance

Constructor & Destructor Documentation

◆ ByteStreamEventStorageInputSvc()

ByteStreamEventStorageInputSvc::ByteStreamEventStorageInputSvc ( const std::string & name,
ISvcLocator * pSvcLocator )

Constructors:

Definition at line 38 of file ByteStreamEventStorageInputSvc.cxx.

40 : base_class(name, pSvcLocator)
43 , m_reader()
44 , m_evtOffsets()
45 , m_evtInFile(0)
47 , m_fileGUID("")
48 , m_robProvider("ROBDataProviderSvc", name)
49{
50 assert(pSvcLocator != nullptr);
51}
SG::SlotSpecificObj< EventCache > m_eventsCache
std::unique_ptr< EventStorage::DataReader > m_reader
DataReader from EventStorage.
ServiceHandle< IROBDataProviderSvc > m_robProvider
long long int m_evtFileOffset
last read in event offset within a file, can be -1
std::vector< long long int > m_evtOffsets
offset for event i in that file

◆ ~ByteStreamEventStorageInputSvc()

ByteStreamEventStorageInputSvc::~ByteStreamEventStorageInputSvc ( )
virtualdefault

Destructor.

Member Function Documentation

◆ buildFragment()

void ByteStreamEventStorageInputSvc::buildFragment ( EventCache * cache,
uint32_t eventSize,
bool validate ) const
private

Definition at line 304 of file ByteStreamEventStorageInputSvc.cxx.

305 {
308 DataType* fragment = reinterpret_cast<DataType*>(cache->data);
309
310 if (validate) {
311 // check fragment type
312 const DataType headWord = fragment[0];
313 ATH_MSG_DEBUG("First word of the fragment " << MSG::hex << headWord << MSG::dec);
314 // format version
315 const DataType formatVersion = eformat::helper::Version(fragment[3]).major_version();
316 ATH_MSG_DEBUG("Format version" << MSG::hex << formatVersion << MSG::dec);
317 // error message
318 if((formatVersion != eformat::MAJOR_DEFAULT_VERSION) &&
319 (formatVersion != eformat::MAJOR_V24_VERSION) &&
320 (formatVersion != eformat::MAJOR_V30_VERSION) &&
321 (formatVersion != eformat::MAJOR_V40_VERSION) &&
322 (formatVersion != eformat::MAJOR_V31_VERSION) ) {
323 ATH_MSG_ERROR("unsupported Format Version : "
324 << MSG::hex << formatVersion << MSG::dec);
325 }
326
327 if(eformat::FULL_EVENT == headWord || 0xcc1234cc == headWord) { // ROS = 0xcc1234cc
328 try {
329 // convert old fragment
330 if(formatVersion != eformat::MAJOR_DEFAULT_VERSION) {
331 // 100 for increase of data-size due to header conversion
332 const uint32_t newEventSize = eventSize + 1000;
333 DataType* newFragment = new DataType[newEventSize];
334 eformat::old::convert(fragment, newFragment, newEventSize);
335
336 // delete old fragment
337 delete [] fragment; fragment = nullptr;
338
339 // set new pointer
340 fragment = newFragment;
341 cache->data = reinterpret_cast< char* >(fragment);
342 }
343 } catch (const eformat::Issue& ex) {
344 // bad event
345 ATH_MSG_WARNING(ex.what());
346 ATH_MSG_ERROR("Skipping bad event");
347 throw ByteStreamExceptions::badFragment();
348 }
349 } else {
350 // Unknown fragment
351 ATH_MSG_FATAL("Unknown Header work in input fragment " << MSG::hex << headWord);
352 throw ByteStreamExceptions::badFragment();
353 }
354 }
355 if (!fragment){
356 ATH_MSG_ERROR("fragment is nullptr!");
357 throw ByteStreamExceptions::badFragment();
358 }
359 // This is a FullEventFragment
360 // make a new FEFrag in memory from it
361 cache->eventStatus = 0;
362 if(fragment[5] > 0) {
363 cache->eventStatus += eformat::helper::Status(fragment[6]).specific();
364 cache->eventStatus += (eformat::helper::Status(fragment[6]).generic() & 0x000000ff) << 16;
365 }
366
367 // This is a FullEventFragment
368 // make a new RawEvent in memory from it
369 cache->rawEvent = std::make_unique<RawEvent>(fragment);
370 ATH_MSG_DEBUG("Made an FullEventFragment from ES " << fragment);
371}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
OFFLINE_FRAGMENTS_NAMESPACE::PointerType DataType
const DataType * PointerType
Definition RawEvent.h:25
const IIntersectionCache * cache() const
Retrieve the associated cache block, if it exists.
setEventNumber uint32_t

◆ closeBlockIterator()

void ByteStreamEventStorageInputSvc::closeBlockIterator ( bool clearMetadata = true)
overridevirtual

Definition at line 469 of file ByteStreamEventStorageInputSvc.cxx.

470{
471 if (clearMetadata) {
472 ATH_MSG_WARNING("Clearing input metadata store");
473 StatusCode status = m_inputMetadata->clearStore();
474 if (!status.isSuccess()) {
475 ATH_MSG_WARNING("Unable to clear Input MetaData Proxies");
476 }
477 }
478
479 if (!readerReady()) {
480 ATH_MSG_INFO("No more events in this run, high water mark for this file = "
481 << m_evtOffsets.size()-1);
482 }
483
484 m_reader.reset();
485}
#define ATH_MSG_INFO(x)
ServiceHandle< StoreGateSvc > m_inputMetadata
::StatusCode StatusCode
StatusCode definition for legacy code.
status
Definition merge.py:16

◆ currentEvent()

const RawEvent * ByteStreamEventStorageInputSvc::currentEvent ( ) const
overridevirtual

Implementation of the IByteStreamInputSvc interface methods.

Definition at line 636 of file ByteStreamEventStorageInputSvc.cxx.

637{
638 const EventContext& context{Gaudi::Hive::currentContext()};
639 return m_eventsCache.get(context)->rawEvent.get();
640}

◆ currentEventStatus()

unsigned int ByteStreamEventStorageInputSvc::currentEventStatus ( ) const
overridevirtual

Return the current event status.

Definition at line 645 of file ByteStreamEventStorageInputSvc.cxx.

646{
647 const EventContext& context{Gaudi::Hive::currentContext()};
648 return m_eventsCache.get(context)->eventStatus;
649}

◆ deleteEntry()

template<typename T>
StatusCode ByteStreamEventStorageInputSvc::deleteEntry ( const std::string & key)
inlineprivate

Definition at line 118 of file ByteStreamEventStorageInputSvc.h.

119 {
120 if (m_storeGate->contains<T>(key)) {
121 const T* tmp = m_storeGate->tryConstRetrieve<T>(key);
122 if (tmp != nullptr) ATH_CHECK(m_storeGate->remove<T>(tmp));
123 }
124 return StatusCode::SUCCESS;
125 }
#define ATH_CHECK
Evaluate an expression and check for errors.
ServiceHandle< StoreGateSvc > m_storeGate
Pointer to StoreGate.
unsigned long long T

◆ finalize()

StatusCode ByteStreamEventStorageInputSvc::finalize ( )
overridevirtual

Definition at line 83 of file ByteStreamEventStorageInputSvc.cxx.

83 {
84 return StatusCode::SUCCESS;
85}

◆ generateDataHeader()

StatusCode ByteStreamEventStorageInputSvc::generateDataHeader ( )
overridevirtual

Definition at line 376 of file ByteStreamEventStorageInputSvc.cxx.

377{
378 std::lock_guard<std::mutex> lock(m_readerMutex);
379
380 // get file GUID
381 m_fileGUID = m_reader->GUID();
382
383 // reader returns -1 when end of the file is reached
384 if(m_evtFileOffset != -1) {
385 ATH_MSG_DEBUG("ByteStream File GUID: " << m_fileGUID);
386 ATH_MSG_DEBUG("ByteStream Event Position in File: " << m_evtFileOffset);
387
388 // To accommodate for skipEvents option in EventSelector
389 // While skipping BS event Selector does not return SUCCESS code,
390 // just advances silently through events. So SG content is not refreshed
391 // Lets do refresh of the event header here
392 std::string key = "ByteStreamDataHeader";
394
395 // Created data header element with BS provenance information
396 std::unique_ptr<DataHeaderElement> dataHeaderElement = makeBSProvenance();
397 // Create data header itself
398 std::unique_ptr<DataHeader> dataHeader = std::make_unique<DataHeader>();
399 // Declare header primary
400 dataHeader->setStatus(DataHeader::Input);
401 // Set processTag
402 dataHeader->setProcessTag(dataHeaderElement->getKey());
403 //add the data header element self reference to the object vector
404 dataHeader->insert(*std::move(dataHeaderElement));
405
406 // Clean up EventInfo from the previous event
407 key = m_eventInfoKey.value();
409 // Now add ref to xAOD::EventInfo
410 std::unique_ptr<IOpaqueAddress> iopx = std::make_unique<ByteStreamAddress>(
412 ATH_CHECK(m_storeGate->recordAddress(key, std::move(iopx)));
413 const SG::DataProxy* ptmpx = m_storeGate->transientProxy(
415 if (ptmpx != nullptr) {
416 DataHeaderElement element(ptmpx, 0, key);
417 dataHeader->insert(element);
418 }
419
420 // Clean up auxiliary EventInfo from the previous event
421 key = m_eventInfoKey.value() + "Aux.";
423 // Now add ref to xAOD::EventAuxInfo
424 std::unique_ptr<IOpaqueAddress> iopaux = std::make_unique<ByteStreamAddress>(
426 ATH_CHECK(m_storeGate->recordAddress(key, std::move(iopaux)));
427 const SG::DataProxy* ptmpaux = m_storeGate->transientProxy(
429 if (ptmpaux !=0) {
430 DataHeaderElement element(ptmpaux, 0, key);
431 dataHeader->insert(element);
432 }
433
434 // Record new data header.Boolean flags will allow it's deletion in case
435 // of skipped events.
436 ATH_CHECK(m_storeGate->record<DataHeader>(dataHeader.release(),
437 "ByteStreamDataHeader", true, false, true));
438 }
439 return StatusCode::SUCCESS;
440}
virtual void lock()=0
Interface to allow an object to lock itself when made const in SG.
std::unique_ptr< DataHeaderElement > makeBSProvenance() const
StatusCode deleteEntry(const std::string &key)
Gaudi::Property< std::string > m_eventInfoKey

◆ getBlockIterator()

std::pair< long, std::string > ByteStreamEventStorageInputSvc::getBlockIterator ( const std::string & fileName)
overridevirtual

Definition at line 508 of file ByteStreamEventStorageInputSvc.cxx.

509{
510 // open the file
511 if(m_reader != 0) closeBlockIterator();
512
513 m_reader = std::unique_ptr<EventStorage::DataReader>(pickDataReader(fileName));
514
515 if(m_reader == nullptr) {
516 ATH_MSG_ERROR("Failed to open file " << fileName);
518 return std::make_pair(-1,"END");
519 }
520
521 // Initialize offset vector
522 m_evtOffsets.resize(m_reader->eventsInFile(), -1);
523 m_evtOffsets.clear();
524
525 // Get ByteStream Metadata into Input MetaData Store
526 // (include begin Input File Incident)
527 if (loadMetadata().isSuccess()) {
528 ATH_MSG_DEBUG("Recorded ByteStreamMetadata in InputMetaDataStore");
529 } else {
530 ATH_MSG_ERROR("Unable to record ByteStreamMetadata in InputMetaDataStore");
531 return std::make_pair(-1, "FAIL");
532 }
533
534 m_evtInFile = 0;
535
536 // enable sequentialReading if multiple files
537 if(m_sequential) {
538 bool test = setSequentialRead();
539 if (!test) return std::make_pair(-1,"SEQ");
540 }
541
542 ATH_MSG_INFO("Picked valid file: " << m_reader->fileName());
543 // initialize offsets and counters
544 m_evtOffsets.push_back(static_cast<long long>(m_reader->getPosition()));
545 return std::make_pair(m_reader->eventsInFile(), m_reader->GUID());
546}
virtual void closeBlockIterator(bool clearMetadata=true) override

◆ getEvent()

const RawEvent * ByteStreamEventStorageInputSvc::getEvent ( Advance step)
private

◆ initialize()

StatusCode ByteStreamEventStorageInputSvc::initialize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 60 of file ByteStreamEventStorageInputSvc.cxx.

61{
62 ATH_MSG_INFO("Initializing");
63
64 ATH_CHECK(m_inputMetadata.retrieve());
65 ATH_CHECK(m_storeGate.retrieve());
66 ATH_CHECK(m_robProvider.retrieve());
67
68 return StatusCode::SUCCESS;
69}

◆ loadMetadata()

StatusCode ByteStreamEventStorageInputSvc::loadMetadata ( )
private

Definition at line 99 of file ByteStreamEventStorageInputSvc.cxx.

100{
101 // default goes into ByteStreamMetadata
102 auto bsmdc = std::make_unique<ByteStreamMetadataContainer>();
103 bsmdc->push_back(std::make_unique<ByteStreamMetadata>(*m_reader));
104 ATH_MSG_DEBUG("ByteStreamMetadata:\n" << *(bsmdc->front()));
105
106 return m_inputMetadata->record(std::move(bsmdc), "ByteStreamMetadata");
107}

◆ makeBSProvenance()

std::unique_ptr< DataHeaderElement > ByteStreamEventStorageInputSvc::makeBSProvenance ( ) const
private

Definition at line 654 of file ByteStreamEventStorageInputSvc.cxx.

655{
656 Token token;
657 token.setDb(m_fileGUID);
658 token.setTechnology(0x00001000);
659 token.setOid(Token::OID_t(0LL, m_evtFileOffset));
660
661 return std::make_unique<DataHeaderElement>(ClassID_traits<DataHeader>::ID(),
662 "StreamRAW", std::move(token));
663}
Token & setDb(const Guid &db)
Set database name.
Definition Token.h:66
Token & setOid(const OID_t &oid)
Set object identifier.
Definition Token.h:85
Token & setTechnology(int t)
Set technology type.
Definition Token.h:79

◆ nextEvent()

const RawEvent * ByteStreamEventStorageInputSvc::nextEvent ( )
overridevirtual

++, new

Definition at line 179 of file ByteStreamEventStorageInputSvc.cxx.

179 {
180
181 std::lock_guard<std::mutex> lock( m_readerMutex );
182 const EventContext& context{ Gaudi::Hive::currentContext() };
183 EventCache* cache = m_eventsCache.get(context);
184
185 // initialize before building RawEvent
186 cache->releaseEvent();
187
188 // Load data buffer from file
189 unsigned int eventSize;
190 if (readerReady()) {
191 DRError ecode;
192 // Check if have moved back from high water mark
193 m_evtInFile++; // increment iterator
194 if (m_evtInFile+1 > m_evtOffsets.size()) {
195 //get current event position (cast to long long until native tdaq implementation)
196 ATH_MSG_DEBUG("nextEvent _above_ high water mark");
197 m_evtFileOffset = static_cast<long long>(m_reader->getPosition());
198 m_evtOffsets.push_back(m_evtFileOffset);
199 ecode = m_reader->getData(eventSize, &(cache->data));
200 } else {
201 // Load from previous offset
202 ATH_MSG_DEBUG("nextEvent below high water mark");
204 ecode = m_reader->getData(eventSize, &(cache->data), m_evtFileOffset);
205 }
206
207 if (DRWAIT == ecode && m_wait > 0) {
208 do {
209 // wait for n seconds
210 ATH_MSG_DEBUG("Waiting for input: " << m_wait << " seconds");
211 int result = usleep(static_cast<int>(m_wait * 1e6));
212 if (result != 0) {
213 ATH_MSG_ERROR("System Error while running sleep");
214 return nullptr;
215 }
216 } while(m_reader->getData(eventSize, &(cache->data)) == DRWAIT);
217 } else if (DROK != ecode) {
218 ATH_MSG_ERROR("Error reading next event");
219 throw ByteStreamExceptions::readError();
220 }
221 ATH_MSG_DEBUG("Event Size " << eventSize);
222
223 } else {
224 ATH_MSG_ERROR("DataReader not ready. Need to getBlockIterator first");
225 return nullptr;
226 }
227
228 // Use buffer to build FullEventFragment
229 try {
230 buildFragment(cache, eventSize, true);
231 }
232 catch (...) {
233 // rethrow any exceptions
234 throw;
235 }
236
237 if (cache->rawEvent == nullptr) {
238 ATH_MSG_ERROR("Failure to build fragment");
239 return nullptr;
240 }
241
242 // Set it for the data provider
243 m_robProvider->setNextEvent(context, cache->rawEvent.get());
244 m_robProvider->setEventStatus(context, cache->eventStatus);
245
246 //++m_totalEventCounter;
247
248 // dump
249 if (m_dump) {
250 DumpFrags::dump(cache->rawEvent.get());
251 }
252 ATH_MSG_DEBUG("switched to next event in slot " << context);
253 return cache->rawEvent.get();
254}
void buildFragment(EventCache *cache, uint32_t eventSize, bool validate) const
static void dump(const RawEvent *re)
dump fragments from FullEventFragment
Definition DumpFrags.h:21

◆ positionInBlock()

long ByteStreamEventStorageInputSvc::positionInBlock ( )
overridevirtual

Definition at line 91 of file ByteStreamEventStorageInputSvc.cxx.

92{
93 return m_evtOffsets.size();
94}

◆ previousEvent()

const RawEvent * ByteStreamEventStorageInputSvc::previousEvent ( )
overridevirtual

–, old

Definition at line 112 of file ByteStreamEventStorageInputSvc.cxx.

113{
114 std::lock_guard<std::mutex> lock(m_readerMutex);
115 const EventContext& context{Gaudi::Hive::currentContext()};
116 EventCache* cache = m_eventsCache.get(context);
117 // initialize before building RawEvent
118 cache->releaseEvent();
119
120 // Load data buffer from file
121 unsigned int eventSize;
122 if (readerReady()) {
123 //get current event position (cast to long long until native tdaq implementation)
124 m_evtInFile--;
126 DRError ecode = m_reader->getData(eventSize, &(cache->data), m_evtOffsets.at(m_evtInFile - 1));
127
128 if (DRWAIT == ecode && m_wait > 0) {
129 do {
130 // wait for n seconds
131 ATH_MSG_DEBUG("Waiting for input: " << m_wait << " seconds");
132 int result = usleep(static_cast<int>(m_wait * 1e6));
133 if (result != 0) {
134 ATH_MSG_ERROR("System Error while running sleep");
135 return nullptr;
136 }
137 } while(m_reader->getData(eventSize, &(cache->data), m_evtFileOffset) == DRWAIT);
138 } else if (DROK != ecode) {
139 ATH_MSG_ERROR("Error reading previous event");
140 throw ByteStreamExceptions::readError();
141 }
142 ATH_MSG_DEBUG("Event Size " << eventSize);
143 }
144 else {
145 ATH_MSG_ERROR("DataReader not ready. Need to getBlockIterator first");
146 return nullptr;
147 }
148
149 // Use buffer to build FullEventFragment
150 try {
151 buildFragment(cache, eventSize, true);
152 }
153 catch (...) {
154 // rethrow any exceptions
155 throw;
156 }
157
158 if (cache->rawEvent == nullptr) {
159 ATH_MSG_ERROR("Failure to build fragment");
160 return nullptr;
161 }
162
163 // Set it for the data provider
164 m_robProvider->setNextEvent(context, cache->rawEvent.get());
165 m_robProvider->setEventStatus(context, cache->eventStatus);
166
167 // dump
168 if (m_dump) {
169 DumpFrags::dump(cache->rawEvent.get());
170 }
171 ATH_MSG_DEBUG( "switched to previous event in slot " << context);
172 return cache->rawEvent.get();
173}

◆ readerReady()

bool ByteStreamEventStorageInputSvc::readerReady ( )
private

Definition at line 551 of file ByteStreamEventStorageInputSvc.cxx.

552{
553 bool eofFlag(false);
554
555 if (m_reader) eofFlag = m_reader->endOfFile();
556 else {
557 ATH_MSG_INFO("eformat reader object not initialized");
558 return false;
559 }
560
561 bool moreEvent = m_reader->good();
562
563 return (!eofFlag) && moreEvent;
564}

◆ ready()

bool ByteStreamEventStorageInputSvc::ready ( )
overridevirtual

Definition at line 500 of file ByteStreamEventStorageInputSvc.cxx.

501{
502 return readerReady();
503}

◆ ROBFragmentCheck()

bool ByteStreamEventStorageInputSvc::ROBFragmentCheck ( const RawEvent * re) const
private

Definition at line 569 of file ByteStreamEventStorageInputSvc.cxx.

570{
571 bool allOK = true;
572 uint32_t total = re->nchildren();
573 uint32_t lastId = 0;
574 std::vector<eformat::FragmentProblem> problems;
575
576 for (size_t i = 0; i<total; ++i) {
578 re->child(fp, i);
579
581 lastId = f.source_id();
582
583 problems.clear();
584 f.problems(problems);
585 if(!problems.empty()) {
586 allOK = false;
587 for(const auto& problem : problems) {
588 ATH_MSG_WARNING("Failed to create ROBFragment id = " << lastId << ", "
589 << eformat::helper::SourceIdentifier(lastId).human() << " : "
590 << eformat::helper::FragmentProblemDictionary.string(problem));
591 }
592 }
593 }
594
595 return allOK;
596}
const boost::regex re(r_e)
eformat::ROBFragment< PointerType > ROBFragment
Definition RawEvent.h:27

◆ setEvent() [1/2]

void ByteStreamEventStorageInputSvc::setEvent ( const EventContext & context,
void * data,
unsigned int eventStatus )
private

Definition at line 610 of file ByteStreamEventStorageInputSvc.cxx.

612{
614 EventCache* cache = m_eventsCache.get(context);
615 cache->releaseEvent();
616
617 DataType* fragment = reinterpret_cast<DataType*>(data);
618 cache->rawEvent = std::make_unique<RawEvent>(fragment);
619 cache->eventStatus = eventStatus;
620
621 // Set it for the data provider
622 m_robProvider->setNextEvent(context, cache->rawEvent.get());
623 m_robProvider->setEventStatus(context, cache->eventStatus);
624
625 // Build a DH for use by other components
627 if (rec_sg != StatusCode::SUCCESS) {
628 ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! "
629 << rec_sg);
630 }
631}
char data[hepevt_bytes_allocation_ATLAS]
Definition HepEvt.cxx:11
virtual StatusCode generateDataHeader() override

◆ setEvent() [2/2]

void ByteStreamEventStorageInputSvc::setEvent ( void * data,
unsigned int eventStatus )
overridevirtual

Definition at line 601 of file ByteStreamEventStorageInputSvc.cxx.

602{
603 const EventContext& context{Gaudi::Hive::currentContext()};
604 return setEvent(context, data, eventStatus);
605}
virtual void setEvent(void *data, unsigned int eventStatus) override

◆ setSequentialRead()

bool ByteStreamEventStorageInputSvc::setSequentialRead ( )
virtual

Definition at line 490 of file ByteStreamEventStorageInputSvc.cxx.

491{
492 // enable SequenceReading
493 m_reader->enableSequenceReading();
494 return true;
495}

◆ stop()

StatusCode ByteStreamEventStorageInputSvc::stop ( )
overridevirtual

Definition at line 74 of file ByteStreamEventStorageInputSvc.cxx.

75{
76 // close moved to EventSelector for explicit coupling with incident
77 return StatusCode::SUCCESS;
78}

◆ validateEvent() [1/2]

void ByteStreamEventStorageInputSvc::validateEvent ( )
overridevirtual

Definition at line 259 of file ByteStreamEventStorageInputSvc.cxx.

260{
261 const EventContext& context{Gaudi::Hive::currentContext()};
262 const RawEvent* const event = m_eventsCache.get(context)->rawEvent.get();
263 m_eventsCache.get(context)->eventStatus = validateEvent(event);
264}
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition RawEvent.h:37

◆ validateEvent() [2/2]

unsigned ByteStreamEventStorageInputSvc::validateEvent ( const RawEvent *const rawEvent) const
private

Definition at line 269 of file ByteStreamEventStorageInputSvc.cxx.

270{
271 unsigned int status = 0;
272 if (m_valEvent) {
273 // check validity
274 std::vector<eformat::FragmentProblem> problems;
275 rawEvent->problems(problems);
276
277 if(!problems.empty()) {
278 status += 0x01000000;
279
280 // bad event
281 ATH_MSG_WARNING("Failed to create FullEventFragment");
282 for(const auto& problem : problems)
283 ATH_MSG_WARNING(eformat::helper::FragmentProblemDictionary.string(problem));
284
285 throw ByteStreamExceptions::badFragmentData();
286 }
287
288 if(!ROBFragmentCheck(rawEvent)) {
289 status += 0x02000000;
290
291 // bad event
292 ATH_MSG_ERROR("Skipping bad event");
293 throw ByteStreamExceptions::badFragmentData();
294 }
295 } else {
296 ATH_MSG_DEBUG("Processing event without validating.");
297 }
298 return status;
299}

Member Data Documentation

◆ m_dump

Gaudi::Property<bool> ByteStreamEventStorageInputSvc::m_dump {this, "DumpFlag", false, "Dump fragments"}
private

Definition at line 99 of file ByteStreamEventStorageInputSvc.h.

99{this, "DumpFlag", false, "Dump fragments"};

◆ m_eventInfoKey

Gaudi::Property<std::string> ByteStreamEventStorageInputSvc::m_eventInfoKey {this, "EventInfoKey", "EventInfo", "Key of EventInfo in metadata store"}
private

Definition at line 102 of file ByteStreamEventStorageInputSvc.h.

102{this, "EventInfoKey", "EventInfo", "Key of EventInfo in metadata store"};

◆ m_eventsCache

SG::SlotSpecificObj<EventCache> ByteStreamEventStorageInputSvc::m_eventsCache
private

Definition at line 81 of file ByteStreamEventStorageInputSvc.h.

◆ m_evtFileOffset

long long int ByteStreamEventStorageInputSvc::m_evtFileOffset
private

last read in event offset within a file, can be -1

Definition at line 87 of file ByteStreamEventStorageInputSvc.h.

◆ m_evtInFile

unsigned int ByteStreamEventStorageInputSvc::m_evtInFile
private

Definition at line 86 of file ByteStreamEventStorageInputSvc.h.

◆ m_evtOffsets

std::vector<long long int> ByteStreamEventStorageInputSvc::m_evtOffsets
private

offset for event i in that file

Definition at line 85 of file ByteStreamEventStorageInputSvc.h.

◆ m_fileGUID

std::string ByteStreamEventStorageInputSvc::m_fileGUID
private

current file GUID

Definition at line 89 of file ByteStreamEventStorageInputSvc.h.

◆ m_inputMetadata

ServiceHandle<StoreGateSvc> ByteStreamEventStorageInputSvc::m_inputMetadata {this, "MetaDataStore", "StoreGateSvc/InputMetaDataStore"}
private

Definition at line 96 of file ByteStreamEventStorageInputSvc.h.

96{this, "MetaDataStore", "StoreGateSvc/InputMetaDataStore"};

◆ m_reader

std::unique_ptr<EventStorage::DataReader> ByteStreamEventStorageInputSvc::m_reader
private

DataReader from EventStorage.

Definition at line 83 of file ByteStreamEventStorageInputSvc.h.

◆ m_readerMutex

std::mutex ByteStreamEventStorageInputSvc::m_readerMutex
private

Definition at line 70 of file ByteStreamEventStorageInputSvc.h.

◆ m_robProvider

ServiceHandle<IROBDataProviderSvc> ByteStreamEventStorageInputSvc::m_robProvider
private

Definition at line 97 of file ByteStreamEventStorageInputSvc.h.

◆ m_sequential

Gaudi::Property<bool> ByteStreamEventStorageInputSvc::m_sequential {this, "EnableSequential", false, "enable sequential reading"}
private

Definition at line 98 of file ByteStreamEventStorageInputSvc.h.

98{this, "EnableSequential", false, "enable sequential reading"};

◆ m_storeGate

ServiceHandle<StoreGateSvc> ByteStreamEventStorageInputSvc::m_storeGate {this, "EventStore", "StoreGateSvc"}
private

Pointer to StoreGate.

Definition at line 95 of file ByteStreamEventStorageInputSvc.h.

95{this, "EventStore", "StoreGateSvc"};

◆ m_valEvent

Gaudi::Property<bool> ByteStreamEventStorageInputSvc::m_valEvent {this, "ValidateEvent", false, "switch on check_tree when reading events"}
private

Definition at line 101 of file ByteStreamEventStorageInputSvc.h.

101{this, "ValidateEvent", false, "switch on check_tree when reading events"};

◆ m_wait

Gaudi::Property<float> ByteStreamEventStorageInputSvc::m_wait {this, "WaitSecs", 0.0f, "Seconds to wait if input is in wait state"}
private

Definition at line 100 of file ByteStreamEventStorageInputSvc.h.

100{this, "WaitSecs", 0.0f, "Seconds to wait if input is in wait state"};

The documentation for this class was generated from the following files: