ATLAS Offline Software
Classes | Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
ByteStreamEventStorageInputSvc Class Reference

This class is the ByteStreamInputSvc for reading events written by EventStorage. More...

#include <ByteStreamEventStorageInputSvc.h>

Inheritance diagram for ByteStreamEventStorageInputSvc:
Collaboration diagram for ByteStreamEventStorageInputSvc:

Classes

struct  EventCache
 

Public Member Functions

 ByteStreamEventStorageInputSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Constructors: More...
 
virtual ~ByteStreamEventStorageInputSvc ()
 Destructor. More...
 
virtual StatusCode initialize () override
 Required of all Gaudi Services. More...
 
virtual StatusCode stop () override
 
virtual StatusCode finalize () override
 
virtual const RawEventcurrentEvent () const override
 Implementation of the IByteStreamInputSvc interface methods. More...
 
virtual const RawEventnextEvent () override
 ++, new More...
 
virtual const RawEventpreviousEvent () override
 –, old More...
 
virtual void setEvent (void *data, unsigned int eventStatus) override
 
virtual unsigned int currentEventStatus () const override
 Return the current event status. More...
 
virtual void validateEvent () override
 
virtual long positionInBlock () override
 
virtual std::pair< long, std::string > getBlockIterator (const std::string &fileName) override
 
virtual void closeBlockIterator (bool clearMetadata=true) override
 
virtual bool setSequentialRead ()
 
virtual bool ready () override
 
virtual StatusCode generateDataHeader () override
 

Private Types

enum  Advance { PREVIOUS = -1, NEXT = 1 }
 

Private Member Functions

StatusCode loadMetadata ()
 
void buildFragment (EventCache *cache, uint32_t eventSize, bool validate) const
 
bool readerReady ()
 
bool ROBFragmentCheck (const RawEvent *) const
 
unsigned validateEvent (const RawEvent *const rawEvent) const
 
void setEvent (const EventContext &context, void *data, unsigned int eventStatus)
 
const RawEventgetEvent (Advance step)
 
std::unique_ptr< DataHeaderElementmakeBSProvenance () const
 
template<typename T >
StatusCode deleteEntry (const std::string &key)
 

Private Attributes

std::mutex m_readerMutex
 
SG::SlotSpecificObj< EventCachem_eventsCache
 
std::unique_ptr< EventStorage::DataReader > m_reader
 DataReader from EventStorage. More...
 
std::vector< long long int > m_evtOffsets
 offset for event i in that file More...
 
unsigned int m_evtInFile
 
long long int m_evtFileOffset
 last read in event offset within a file, can be -1 More...
 
std::string m_fileGUID
 current file GUID More...
 
ServiceHandle< StoreGateSvcm_storeGate
 Pointer to StoreGate. More...
 
ServiceHandle< StoreGateSvcm_inputMetadata
 StoreGateSvc. More...
 
ServiceHandle< IROBDataProviderSvcm_robProvider
 
Gaudi::Property< bool > m_sequential {this, "EnableSequential", false, "enable sequential reading"}
 
Gaudi::Property< bool > m_dump {this, "DumpFlag", false, "Dump fragments"}
 
Gaudi::Property< float > m_wait {this, "WaitSecs", 0.0f, "Seconds to wait if input is in wait state"}
 
Gaudi::Property< bool > m_valEvent {this, "ValidateEvent", false, "switch on check_tree when reading events"}
 
Gaudi::Property< std::string > m_eventInfoKey {this, "EventInfoKey", "EventInfo", "Key of EventInfo in metadata store"}
 

Detailed Description

This class is the ByteStreamInputSvc for reading events written by EventStorage.

Definition at line 35 of file ByteStreamEventStorageInputSvc.h.

Member Enumeration Documentation

◆ Advance

Enumerator
PREVIOUS 
NEXT 

Definition at line 113 of file ByteStreamEventStorageInputSvc.h.

113 { PREVIOUS = -1, NEXT = 1 };

Constructor & Destructor Documentation

◆ ByteStreamEventStorageInputSvc()

ByteStreamEventStorageInputSvc::ByteStreamEventStorageInputSvc ( const std::string &  name,
ISvcLocator *  pSvcLocator 
)

Constructors:

Definition at line 38 of file ByteStreamEventStorageInputSvc.cxx.

40  : base_class(name, pSvcLocator)
41  , m_readerMutex()
42  , m_eventsCache()
43  , m_reader()
44  , m_evtOffsets()
45  , m_evtInFile(0)
46  , m_evtFileOffset(0)
47  , m_fileGUID("")
48  , m_storeGate ("StoreGateSvc", name)
49  , m_inputMetadata("StoreGateSvc/InputMetaDataStore", name)
50  , m_robProvider ("ROBDataProviderSvc", name)
51 {
52  assert(pSvcLocator != nullptr);
53 
54  declareProperty("EventStore", m_storeGate);
55  declareProperty("MetaDataStore", m_inputMetadata);
56 }

◆ ~ByteStreamEventStorageInputSvc()

ByteStreamEventStorageInputSvc::~ByteStreamEventStorageInputSvc ( )
virtual

Destructor.

Definition at line 60 of file ByteStreamEventStorageInputSvc.cxx.

61 {}

Member Function Documentation

◆ buildFragment()

void ByteStreamEventStorageInputSvc::buildFragment ( EventCache cache,
uint32_t  eventSize,
bool  validate 
) const
private

Definition at line 316 of file ByteStreamEventStorageInputSvc.cxx.

317  {
320  DataType* fragment = reinterpret_cast<DataType*>(cache->data);
321 
322  if (validate) {
323  // check fragment type
324  const DataType headWord = fragment[0];
325  ATH_MSG_DEBUG("First word of the fragment " << MSG::hex << headWord << MSG::dec);
326  // format version
327  const DataType formatVersion = eformat::helper::Version(fragment[3]).major_version();
328  ATH_MSG_DEBUG("Format version" << MSG::hex << formatVersion << MSG::dec);
329  // error message
330  if((formatVersion != eformat::MAJOR_DEFAULT_VERSION) &&
331  (formatVersion != eformat::MAJOR_V24_VERSION) &&
332  (formatVersion != eformat::MAJOR_V30_VERSION) &&
333  (formatVersion != eformat::MAJOR_V40_VERSION) &&
334  (formatVersion != eformat::MAJOR_V31_VERSION) ) {
335  ATH_MSG_ERROR("unsupported Format Version : "
336  << MSG::hex << formatVersion << MSG::dec);
337  }
338 
339  if(eformat::FULL_EVENT == headWord || 0xcc1234cc == headWord) { // ROS = 0xcc1234cc
340  try {
341  // convert old fragment
342  if(formatVersion != eformat::MAJOR_DEFAULT_VERSION) {
343  // 100 for increase of data-size due to header conversion
344  const uint32_t newEventSize = eventSize + 1000;
345  DataType* newFragment = new DataType[newEventSize];
346  eformat::old::convert(fragment, newFragment, newEventSize);
347 
348  // delete old fragment
349  delete [] fragment; fragment = nullptr;
350 
351  // set new pointer
352  fragment = newFragment;
353  cache->data = reinterpret_cast< char* >(fragment);
354  }
355  } catch (const eformat::Issue& ex) {
356  // bad event
357  ATH_MSG_WARNING(ex.what());
358  ATH_MSG_ERROR("Skipping bad event");
360  }
361  } else {
362  // Unknown fragment
363  ATH_MSG_FATAL("Unknown Header work in input fragment " << MSG::hex << headWord);
365  }
366  }
367 
368  // This is a FullEventFragment
369  // make a new FEFrag in memory from it
370  cache->eventStatus = 0;
371  if(fragment[5] > 0) {
372  cache->eventStatus += eformat::helper::Status(fragment[6]).specific();
373  cache->eventStatus += (eformat::helper::Status(fragment[6]).generic() & 0x000000ff) << 16;
374  }
375 
376  // This is a FullEventFragment
377  // make a new RawEvent in memory from it
378  cache->rawEvent = std::make_unique<RawEvent>(fragment);
379  ATH_MSG_DEBUG("Made an FullEventFragment from ES " << fragment);
380 }

◆ closeBlockIterator()

void ByteStreamEventStorageInputSvc::closeBlockIterator ( bool  clearMetadata = true)
overridevirtual

Definition at line 478 of file ByteStreamEventStorageInputSvc.cxx.

479 {
480  if (clearMetadata) {
481  ATH_MSG_WARNING("Clearing input metadata store");
482  StatusCode status = m_inputMetadata->clearStore();
483  if (!status.isSuccess()) {
484  ATH_MSG_WARNING("Unable to clear Input MetaData Proxies");
485  }
486  }
487 
488  if (!readerReady()) {
489  ATH_MSG_INFO("No more events in this run, high water mark for this file = "
490  << m_evtOffsets.size()-1);
491  }
492 
493  m_reader.reset();
494 }

◆ currentEvent()

const RawEvent * ByteStreamEventStorageInputSvc::currentEvent ( ) const
overridevirtual

Implementation of the IByteStreamInputSvc interface methods.

Definition at line 645 of file ByteStreamEventStorageInputSvc.cxx.

646 {
647  const EventContext& context{Gaudi::Hive::currentContext()};
648  return m_eventsCache.get(context)->rawEvent.get();
649 }

◆ currentEventStatus()

unsigned int ByteStreamEventStorageInputSvc::currentEventStatus ( ) const
overridevirtual

Return the current event status.

Definition at line 654 of file ByteStreamEventStorageInputSvc.cxx.

655 {
656  const EventContext& context{Gaudi::Hive::currentContext()};
657  return m_eventsCache.get(context)->eventStatus;
658 }

◆ deleteEntry()

template<typename T >
StatusCode ByteStreamEventStorageInputSvc::deleteEntry ( const std::string &  key)
inlineprivate

Definition at line 118 of file ByteStreamEventStorageInputSvc.h.

119  {
120  if (m_storeGate->contains<T>(key)) {
121  const T* tmp = m_storeGate->tryConstRetrieve<T>(key);
122  if (tmp != nullptr) ATH_CHECK(m_storeGate->remove<T>(tmp));
123  }
124  return StatusCode::SUCCESS;
125  }

◆ finalize()

StatusCode ByteStreamEventStorageInputSvc::finalize ( )
overridevirtual

Definition at line 90 of file ByteStreamEventStorageInputSvc.cxx.

90  {
91 
92  ATH_CHECK(m_storeGate.release());
93  ATH_CHECK(m_robProvider.release());
94  ATH_CHECK(m_inputMetadata.release());
95 
96  return(StatusCode::SUCCESS);
97 }

◆ generateDataHeader()

StatusCode ByteStreamEventStorageInputSvc::generateDataHeader ( )
overridevirtual

Definition at line 385 of file ByteStreamEventStorageInputSvc.cxx.

386 {
387  std::lock_guard<std::mutex> lock(m_readerMutex);
388 
389  // get file GUID
390  m_fileGUID = m_reader->GUID();
391 
392  // reader returns -1 when end of the file is reached
393  if(m_evtFileOffset != -1) {
394  ATH_MSG_DEBUG("ByteStream File GUID: " << m_fileGUID);
395  ATH_MSG_DEBUG("ByteStream Event Position in File: " << m_evtFileOffset);
396 
397  // To accommodate for skipEvents option in EventSelector
398  // While skipping BS event Selector does not return SUCCESS code,
399  // just advances silently through events. So SG content is not refreshed
400  // Lets do refresh of the event header here
401  std::string key = "ByteStreamDataHeader";
402  ATH_CHECK(deleteEntry<DataHeader>(key));
403 
404  // Created data header element with BS provenance information
405  std::unique_ptr<DataHeaderElement> dataHeaderElement = makeBSProvenance();
406  // Create data header itself
407  std::unique_ptr<DataHeader> dataHeader = std::make_unique<DataHeader>();
408  // Declare header primary
409  dataHeader->setStatus(DataHeader::Input);
410  // Set processTag
411  dataHeader->setProcessTag(dataHeaderElement->getKey());
412  //add the data header element self reference to the object vector
413  dataHeader->insert(*std::move(dataHeaderElement));
414 
415  // Clean up EventInfo from the previous event
416  key = m_eventInfoKey.value();
417  ATH_CHECK(deleteEntry<xAOD::EventInfo>(key));
418  // Now add ref to xAOD::EventInfo
419  std::unique_ptr<IOpaqueAddress> iopx = std::make_unique<ByteStreamAddress>(
421  ATH_CHECK(m_storeGate->recordAddress(key, iopx.release()));
422  const SG::DataProxy* ptmpx = m_storeGate->transientProxy(
424  if (ptmpx != nullptr) {
425  DataHeaderElement element(ptmpx, 0, key);
426  dataHeader->insert(element);
427  }
428 
429  // Clean up auxiliary EventInfo from the previous event
430  key = m_eventInfoKey.value() + "Aux.";
431  ATH_CHECK(deleteEntry<xAOD::EventAuxInfo>(key));
432  // Now add ref to xAOD::EventAuxInfo
433  std::unique_ptr<IOpaqueAddress> iopaux = std::make_unique<ByteStreamAddress>(
435  ATH_CHECK(m_storeGate->recordAddress(key, iopaux.release()));
436  const SG::DataProxy* ptmpaux = m_storeGate->transientProxy(
438  if (ptmpaux !=0) {
439  DataHeaderElement element(ptmpaux, 0, key);
440  dataHeader->insert(element);
441  }
442 
443  // Record new data header.Boolean flags will allow it's deletion in case
444  // of skipped events.
445  ATH_CHECK(m_storeGate->record<DataHeader>(dataHeader.release(),
446  "ByteStreamDataHeader", true, false, true));
447  }
448  return StatusCode::SUCCESS;
449 }

◆ getBlockIterator()

std::pair< long, std::string > ByteStreamEventStorageInputSvc::getBlockIterator ( const std::string &  fileName)
overridevirtual

Definition at line 517 of file ByteStreamEventStorageInputSvc.cxx.

518 {
519  // open the file
520  if(m_reader != 0) closeBlockIterator();
521 
522  m_reader = std::unique_ptr<EventStorage::DataReader>(pickDataReader(fileName));
523 
524  if(m_reader == nullptr) {
525  ATH_MSG_ERROR("Failed to open file " << fileName);
527  return std::make_pair(-1,"END");
528  }
529 
530  // Initialize offset vector
531  m_evtOffsets.resize(m_reader->eventsInFile(), -1);
532  m_evtOffsets.clear();
533 
534  // Get ByteStream Metadata into Input MetaData Store
535  // (include begin Input File Incident)
536  if (loadMetadata().isSuccess()) {
537  ATH_MSG_DEBUG("Recorded ByteStreamMetadata in InputMetaDataStore");
538  } else {
539  ATH_MSG_ERROR("Unable to record ByteStreamMetadata in InputMetaDataStore");
540  return std::make_pair(-1, "FAIL");
541  }
542 
543  m_evtInFile = 0;
544 
545  // enable sequentialReading if multiple files
546  if(m_sequential) {
547  bool test = setSequentialRead();
548  if (!test) return std::make_pair(-1,"SEQ");
549  }
550 
551  ATH_MSG_INFO("Picked valid file: " << m_reader->fileName());
552  // initialize offsets and counters
553  m_evtOffsets.push_back(static_cast<long long>(m_reader->getPosition()));
554  return std::make_pair(m_reader->eventsInFile(), m_reader->GUID());
555 }

◆ getEvent()

const RawEvent* ByteStreamEventStorageInputSvc::getEvent ( Advance  step)
private

◆ initialize()

StatusCode ByteStreamEventStorageInputSvc::initialize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 66 of file ByteStreamEventStorageInputSvc.cxx.

67 {
68  ATH_MSG_INFO("Initializing");
69 
70  ATH_CHECK(m_inputMetadata.retrieve());
71  ATH_CHECK(m_storeGate.retrieve());
72  ATH_CHECK(m_robProvider.retrieve());
73 
74  return(StatusCode::SUCCESS);
75 }

◆ loadMetadata()

StatusCode ByteStreamEventStorageInputSvc::loadMetadata ( )
private

Definition at line 111 of file ByteStreamEventStorageInputSvc.cxx.

112 {
113  // default goes into ByteStreamMetadata
114  auto bsmdc = std::make_unique<ByteStreamMetadataContainer>();
115  bsmdc->push_back(std::make_unique<ByteStreamMetadata>(*m_reader));
116  ATH_MSG_DEBUG("ByteStreamMetadata:\n" << *(bsmdc->front()));
117 
118  return m_inputMetadata->record(std::move(bsmdc), "ByteStreamMetadata");
119 }

◆ makeBSProvenance()

std::unique_ptr< DataHeaderElement > ByteStreamEventStorageInputSvc::makeBSProvenance ( ) const
private

Definition at line 663 of file ByteStreamEventStorageInputSvc.cxx.

664 {
665  Token token;
666  token.setDb(m_fileGUID);
667  token.setTechnology(0x00001000);
668  token.setOid(Token::OID_t(0LL, m_evtFileOffset));
669 
670  return std::make_unique<DataHeaderElement>(ClassID_traits<DataHeader>::ID(),
671  "StreamRAW", std::move(token));
672 }

◆ nextEvent()

const RawEvent * ByteStreamEventStorageInputSvc::nextEvent ( )
overridevirtual

++, new

Definition at line 191 of file ByteStreamEventStorageInputSvc.cxx.

191  {
192 
193  std::lock_guard<std::mutex> lock( m_readerMutex );
194  const EventContext& context{ Gaudi::Hive::currentContext() };
195  EventCache* cache = m_eventsCache.get(context);
196 
197  // initialize before building RawEvent
198  cache->releaseEvent();
199 
200  // Load data buffer from file
201  unsigned int eventSize;
202  if (readerReady()) {
203  DRError ecode;
204  // Check if have moved back from high water mark
205  m_evtInFile++; // increment iterator
206  if (m_evtInFile+1 > m_evtOffsets.size()) {
207  //get current event position (cast to long long until native tdaq implementation)
208  ATH_MSG_DEBUG("nextEvent _above_ high water mark");
209  m_evtFileOffset = static_cast<long long>(m_reader->getPosition());
210  m_evtOffsets.push_back(m_evtFileOffset);
211  ecode = m_reader->getData(eventSize, &(cache->data));
212  } else {
213  // Load from previous offset
214  ATH_MSG_DEBUG("nextEvent below high water mark");
216  ecode = m_reader->getData(eventSize, &(cache->data), m_evtFileOffset);
217  }
218 
219  if (DRWAIT == ecode && m_wait > 0) {
220  do {
221  // wait for n seconds
222  ATH_MSG_DEBUG("Waiting for input: " << m_wait << " seconds");
223  int result = usleep(static_cast<int>(m_wait * 1e6));
224  if (result != 0) {
225  ATH_MSG_ERROR("System Error while running sleep");
226  return 0;
227  }
228  } while(m_reader->getData(eventSize, &(cache->data)) == DRWAIT);
229  } else if (DROK != ecode) {
230  ATH_MSG_ERROR("Error reading next event");
232  }
233  ATH_MSG_DEBUG("Event Size " << eventSize);
234 
235  } else {
236  ATH_MSG_ERROR("DataReader not ready. Need to getBlockIterator first");
237  return 0;
238  }
239 
240  // Use buffer to build FullEventFragment
241  try {
242  buildFragment(cache, eventSize, true);
243  }
244  catch (...) {
245  // rethrow any exceptions
246  throw;
247  }
248 
249  if (cache->rawEvent == nullptr) {
250  ATH_MSG_ERROR("Failure to build fragment");
251  return nullptr;
252  }
253 
254  // Set it for the data provider
255  m_robProvider->setNextEvent(context, cache->rawEvent.get());
256  m_robProvider->setEventStatus(context, cache->eventStatus);
257 
258  //++m_totalEventCounter;
259 
260  // dump
261  if (m_dump) {
262  DumpFrags::dump(cache->rawEvent.get());
263  }
264  ATH_MSG_DEBUG("switched to next event in slot " << context);
265  return(cache->rawEvent.get());
266 }

◆ positionInBlock()

long ByteStreamEventStorageInputSvc::positionInBlock ( )
overridevirtual

Definition at line 103 of file ByteStreamEventStorageInputSvc.cxx.

104 {
105  return m_evtOffsets.size();
106 }

◆ previousEvent()

const RawEvent * ByteStreamEventStorageInputSvc::previousEvent ( )
overridevirtual

–, old

Definition at line 124 of file ByteStreamEventStorageInputSvc.cxx.

125 {
126  std::lock_guard<std::mutex> lock(m_readerMutex);
127  const EventContext& context{Gaudi::Hive::currentContext()};
128  EventCache* cache = m_eventsCache.get(context);
129  // initialize before building RawEvent
130  cache->releaseEvent();
131 
132  // Load data buffer from file
133  unsigned int eventSize;
134  if (readerReady()) {
135  //get current event position (cast to long long until native tdaq implementation)
136  m_evtInFile--;
138  DRError ecode = m_reader->getData(eventSize, &(cache->data), m_evtOffsets.at(m_evtInFile - 1));
139 
140  if (DRWAIT == ecode && m_wait > 0) {
141  do {
142  // wait for n seconds
143  ATH_MSG_DEBUG("Waiting for input: " << m_wait << " seconds");
144  int result = usleep(static_cast<int>(m_wait * 1e6));
145  if (result != 0) {
146  ATH_MSG_ERROR("System Error while running sleep");
147  return nullptr;
148  }
149  } while(m_reader->getData(eventSize, &(cache->data), m_evtFileOffset) == DRWAIT);
150  } else if (DROK != ecode) {
151  ATH_MSG_ERROR("Error reading previous event");
153  }
154  ATH_MSG_DEBUG("Event Size " << eventSize);
155  }
156  else {
157  ATH_MSG_ERROR("DataReader not ready. Need to getBlockIterator first");
158  return 0;
159  }
160 
161  // Use buffer to build FullEventFragment
162  try {
163  buildFragment(cache, eventSize, true);
164  }
165  catch (...) {
166  // rethrow any exceptions
167  throw;
168  }
169 
170  if (cache->rawEvent == nullptr) {
171  ATH_MSG_ERROR("Failure to build fragment");
172  return nullptr;
173  }
174 
175  // Set it for the data provider
176  m_robProvider->setNextEvent(context, cache->rawEvent.get());
177  m_robProvider->setEventStatus(context, cache->eventStatus);
178 
179  // dump
180  if (m_dump) {
181  DumpFrags::dump(cache->rawEvent.get());
182  }
183  ATH_MSG_DEBUG( "switched to previous event in slot " << context);
184  return(cache->rawEvent.get());
185 }

◆ readerReady()

bool ByteStreamEventStorageInputSvc::readerReady ( )
private

Definition at line 560 of file ByteStreamEventStorageInputSvc.cxx.

561 {
562  bool eofFlag(false);
563 
564  if (m_reader) eofFlag = m_reader->endOfFile();
565  else {
566  ATH_MSG_INFO("eformat reader object not initialized");
567  return false;
568  }
569 
570  bool moreEvent = m_reader->good();
571 
572  return (!eofFlag) && moreEvent;
573 }

◆ ready()

bool ByteStreamEventStorageInputSvc::ready ( )
overridevirtual

Definition at line 509 of file ByteStreamEventStorageInputSvc.cxx.

510 {
511  return readerReady();
512 }

◆ ROBFragmentCheck()

bool ByteStreamEventStorageInputSvc::ROBFragmentCheck ( const RawEvent re) const
private

Definition at line 578 of file ByteStreamEventStorageInputSvc.cxx.

579 {
580  bool allOK = true;
581  uint32_t total = re->nchildren();
582  uint32_t lastId = 0;
583  std::vector<eformat::FragmentProblem> problems;
584 
585  for (size_t i = 0; i<total; ++i) {
587  re->child(fp, i);
588 
590  lastId = f.source_id();
591 
592  problems.clear();
593  f.problems(problems);
594  if(!problems.empty()) {
595  allOK = false;
596  for(const auto& problem : problems) {
597  ATH_MSG_WARNING("Failed to create ROBFragment id = " << lastId << ", "
598  << eformat::helper::SourceIdentifier(lastId).human() << " : "
599  << eformat::helper::FragmentProblemDictionary.string(problem));
600  }
601  }
602  }
603 
604  return allOK;
605 }

◆ setEvent() [1/2]

void ByteStreamEventStorageInputSvc::setEvent ( const EventContext &  context,
void *  data,
unsigned int  eventStatus 
)
private

Definition at line 619 of file ByteStreamEventStorageInputSvc.cxx.

621 {
623  EventCache* cache = m_eventsCache.get(context);
624  cache->releaseEvent();
625 
626  DataType* fragment = reinterpret_cast<DataType*>(data);
627  cache->rawEvent = std::make_unique<RawEvent>(fragment);
628  cache->eventStatus = eventStatus;
629 
630  // Set it for the data provider
631  m_robProvider->setNextEvent(context, cache->rawEvent.get());
632  m_robProvider->setEventStatus(context, cache->eventStatus);
633 
634  // Build a DH for use by other components
635  StatusCode rec_sg = generateDataHeader();
636  if (rec_sg != StatusCode::SUCCESS) {
637  ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! "
638  << rec_sg);
639  }
640 }

◆ setEvent() [2/2]

void ByteStreamEventStorageInputSvc::setEvent ( void *  data,
unsigned int  eventStatus 
)
overridevirtual

Definition at line 610 of file ByteStreamEventStorageInputSvc.cxx.

611 {
612  const EventContext& context{Gaudi::Hive::currentContext()};
613  return setEvent(context, data, eventStatus);
614 }

◆ setSequentialRead()

bool ByteStreamEventStorageInputSvc::setSequentialRead ( )
virtual

Definition at line 499 of file ByteStreamEventStorageInputSvc.cxx.

500 {
501  // enable SequenceReading
502  m_reader->enableSequenceReading();
503  return true;
504 }

◆ stop()

StatusCode ByteStreamEventStorageInputSvc::stop ( )
overridevirtual

Definition at line 80 of file ByteStreamEventStorageInputSvc.cxx.

81 {
82  // close moved to EventSelector for explicit coupling with incident
83  //if (m_reader != 0) closeBlockIterator(false);
84  return(StatusCode::SUCCESS);
85 }

◆ validateEvent() [1/2]

void ByteStreamEventStorageInputSvc::validateEvent ( )
overridevirtual

Definition at line 271 of file ByteStreamEventStorageInputSvc.cxx.

272 {
273  const EventContext& context{Gaudi::Hive::currentContext()};
274  const RawEvent* const event = m_eventsCache.get(context)->rawEvent.get();
275  m_eventsCache.get(context)->eventStatus = validateEvent(event);
276 }

◆ validateEvent() [2/2]

unsigned ByteStreamEventStorageInputSvc::validateEvent ( const RawEvent *const  rawEvent) const
private

Definition at line 281 of file ByteStreamEventStorageInputSvc.cxx.

282 {
283  unsigned int status = 0;
284  if (m_valEvent) {
285  // check validity
286  std::vector<eformat::FragmentProblem> problems;
287  rawEvent->problems(problems);
288 
289  if(!problems.empty()) {
290  status += 0x01000000;
291 
292  // bad event
293  ATH_MSG_WARNING("Failed to create FullEventFragment");
294  for(const auto& problem : problems)
295  ATH_MSG_WARNING(eformat::helper::FragmentProblemDictionary.string(problem));
296 
298  }
299 
300  if(!ROBFragmentCheck(rawEvent)) {
301  status += 0x02000000;
302 
303  // bad event
304  ATH_MSG_ERROR("Skipping bad event");
306  }
307  } else {
308  ATH_MSG_DEBUG("Processing event without validating.");
309  }
310  return status;
311 }

Member Data Documentation

◆ m_dump

Gaudi::Property<bool> ByteStreamEventStorageInputSvc::m_dump {this, "DumpFlag", false, "Dump fragments"}
private

Definition at line 99 of file ByteStreamEventStorageInputSvc.h.

◆ m_eventInfoKey

Gaudi::Property<std::string> ByteStreamEventStorageInputSvc::m_eventInfoKey {this, "EventInfoKey", "EventInfo", "Key of EventInfo in metadata store"}
private

Definition at line 102 of file ByteStreamEventStorageInputSvc.h.

◆ m_eventsCache

SG::SlotSpecificObj<EventCache> ByteStreamEventStorageInputSvc::m_eventsCache
private

Definition at line 81 of file ByteStreamEventStorageInputSvc.h.

◆ m_evtFileOffset

long long int ByteStreamEventStorageInputSvc::m_evtFileOffset
private

last read in event offset within a file, can be -1

Definition at line 87 of file ByteStreamEventStorageInputSvc.h.

◆ m_evtInFile

unsigned int ByteStreamEventStorageInputSvc::m_evtInFile
private

Definition at line 86 of file ByteStreamEventStorageInputSvc.h.

◆ m_evtOffsets

std::vector<long long int> ByteStreamEventStorageInputSvc::m_evtOffsets
private

offset for event i in that file

Definition at line 85 of file ByteStreamEventStorageInputSvc.h.

◆ m_fileGUID

std::string ByteStreamEventStorageInputSvc::m_fileGUID
private

current file GUID

Definition at line 89 of file ByteStreamEventStorageInputSvc.h.

◆ m_inputMetadata

ServiceHandle<StoreGateSvc> ByteStreamEventStorageInputSvc::m_inputMetadata
private

StoreGateSvc.

Definition at line 96 of file ByteStreamEventStorageInputSvc.h.

◆ m_reader

std::unique_ptr<EventStorage::DataReader> ByteStreamEventStorageInputSvc::m_reader
private

DataReader from EventStorage.

Definition at line 83 of file ByteStreamEventStorageInputSvc.h.

◆ m_readerMutex

std::mutex ByteStreamEventStorageInputSvc::m_readerMutex
private

Definition at line 70 of file ByteStreamEventStorageInputSvc.h.

◆ m_robProvider

ServiceHandle<IROBDataProviderSvc> ByteStreamEventStorageInputSvc::m_robProvider
private

Definition at line 97 of file ByteStreamEventStorageInputSvc.h.

◆ m_sequential

Gaudi::Property<bool> ByteStreamEventStorageInputSvc::m_sequential {this, "EnableSequential", false, "enable sequential reading"}
private

Definition at line 98 of file ByteStreamEventStorageInputSvc.h.

◆ m_storeGate

ServiceHandle<StoreGateSvc> ByteStreamEventStorageInputSvc::m_storeGate
private

Pointer to StoreGate.

StoreGateSvc

Definition at line 95 of file ByteStreamEventStorageInputSvc.h.

◆ m_valEvent

Gaudi::Property<bool> ByteStreamEventStorageInputSvc::m_valEvent {this, "ValidateEvent", false, "switch on check_tree when reading events"}
private

Definition at line 101 of file ByteStreamEventStorageInputSvc.h.

◆ m_wait

Gaudi::Property<float> ByteStreamEventStorageInputSvc::m_wait {this, "WaitSecs", 0.0f, "Seconds to wait if input is in wait state"}
private

Definition at line 100 of file ByteStreamEventStorageInputSvc.h.


The documentation for this class was generated from the following files:
ByteStreamEventStorageInputSvc::m_evtFileOffset
long long int m_evtFileOffset
last read in event offset within a file, can be -1
Definition: ByteStreamEventStorageInputSvc.h:87
DataHeader::setProcessTag
void setProcessTag(const std::string &processTag)
Set ProcessTag for DataHeader.
Definition: DataHeader.cxx:242
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
validation.validate
def validate(testSampleDir, thisSampleName, testSamplePath, weight_database, outputSamples)
Definition: validation.py:26
get_generator_info.result
result
Definition: get_generator_info.py:21
ByteStreamEventStorageInputSvc::NEXT
@ NEXT
Definition: ByteStreamEventStorageInputSvc.h:113
ByteStreamEventStorageInputSvc::PREVIOUS
@ PREVIOUS
Definition: ByteStreamEventStorageInputSvc.h:113
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
OFFLINE_FRAGMENTS_NAMESPACE::DataType
uint32_t DataType
Definition: RawEvent.h:24
ByteStreamEventStorageInputSvc::m_readerMutex
std::mutex m_readerMutex
Definition: ByteStreamEventStorageInputSvc.h:70
xAOD::uint32_t
setEventNumber uint32_t
Definition: EventInfo_v1.cxx:127
DataHeader::Input
@ Input
Definition: DataHeader.h:128
ByteStreamEventStorageInputSvc::m_evtOffsets
std::vector< long long int > m_evtOffsets
offset for event i in that file
Definition: ByteStreamEventStorageInputSvc.h:85
ByteStreamEventStorageInputSvc::m_eventsCache
SG::SlotSpecificObj< EventCache > m_eventsCache
Definition: ByteStreamEventStorageInputSvc.h:81
ByteStreamEventStorageInputSvc::m_storeGate
ServiceHandle< StoreGateSvc > m_storeGate
Pointer to StoreGate.
Definition: ByteStreamEventStorageInputSvc.h:95
Issue
Configuration Issue
Definition: PscIssues.h:31
ByteStreamEventStorageInputSvc::setSequentialRead
virtual bool setSequentialRead()
Definition: ByteStreamEventStorageInputSvc.cxx:499
DataType
OFFLINE_FRAGMENTS_NAMESPACE::PointerType DataType
Definition: RoIBResultByteStreamTool.cxx:25
ByteStreamEventStorageInputSvc::validateEvent
virtual void validateEvent() override
Definition: ByteStreamEventStorageInputSvc.cxx:271
RawEvent
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition: RawEvent.h:37
ByteStreamEventStorageInputSvc::m_eventInfoKey
Gaudi::Property< std::string > m_eventInfoKey
Definition: ByteStreamEventStorageInputSvc.h:102
ByteStreamEventStorageInputSvc::m_robProvider
ServiceHandle< IROBDataProviderSvc > m_robProvider
Definition: ByteStreamEventStorageInputSvc.h:97
python.RatesEmulationExample.lock
lock
Definition: RatesEmulationExample.py:148
TrigInDetValidation_Base.test
test
Definition: TrigInDetValidation_Base.py:142
ByteStreamEventStorageInputSvc::generateDataHeader
virtual StatusCode generateDataHeader() override
Definition: ByteStreamEventStorageInputSvc.cxx:385
ByteStreamEventStorageInputSvc::m_evtInFile
unsigned int m_evtInFile
Definition: ByteStreamEventStorageInputSvc.h:86
ByteStreamEventStorageInputSvc::m_dump
Gaudi::Property< bool > m_dump
Definition: ByteStreamEventStorageInputSvc.h:99
ByteStreamEventStorageInputSvc::closeBlockIterator
virtual void closeBlockIterator(bool clearMetadata=true) override
Definition: ByteStreamEventStorageInputSvc.cxx:478
ByteStreamEventStorageInputSvc::m_valEvent
Gaudi::Property< bool > m_valEvent
Definition: ByteStreamEventStorageInputSvc.h:101
ByteStreamEventStorageInputSvc::loadMetadata
StatusCode loadMetadata()
Definition: ByteStreamEventStorageInputSvc.cxx:111
Token
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition: Token.h:21
ByteStreamEventStorageInputSvc::m_wait
Gaudi::Property< float > m_wait
Definition: ByteStreamEventStorageInputSvc.h:100
ByteStreamEventStorageInputSvc::ROBFragmentCheck
bool ROBFragmentCheck(const RawEvent *) const
Definition: ByteStreamEventStorageInputSvc.cxx:578
DataHeaderElement::getKey
const std::string & getKey() const
Definition: DataHeader.cxx:117
Token::OID_t
Definition: Token.h:24
OFFLINE_FRAGMENTS_NAMESPACE::PointerType
const DataType * PointerType
Definition: RawEvent.h:25
DataHeaderElement
This class provides a persistent form for the TransientAddress.
Definition: DataHeader.h:37
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
ByteStreamExceptions::readError
Definition: ByteStreamExceptions.h:21
ByteStreamEventStorageInputSvc::m_reader
std::unique_ptr< EventStorage::DataReader > m_reader
DataReader from EventStorage.
Definition: ByteStreamEventStorageInputSvc.h:83
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
eformat::ROBFragment
Definition: L1CaloBsDecoderUtil.h:12
DataHeader
This class provides the layout for summary information stored for data written to POOL.
Definition: DataHeader.h:126
lumiFormat.i
int i
Definition: lumiFormat.py:85
trigmenu_modify_prescale_json.fp
fp
Definition: trigmenu_modify_prescale_json.py:53
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:37
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
Token::setDb
Token & setDb(const Guid &db)
Set database name.
Definition: Token.h:66
hist_file_dump.f
f
Definition: hist_file_dump.py:140
DeMoUpdate.tmp
string tmp
Definition: DeMoUpdate.py:1167
Token::setTechnology
Token & setTechnology(int t)
Set technology type.
Definition: Token.h:79
DataHeader::insert
void insert(const SG::TransientAddress *sgAddress, IOpaqueAddress *tokAddress=0, const std::string &pTag="")
Insert a new element into the "DataObject" vector.
Definition: DataHeader.cxx:266
ByteStreamExceptions::badFragmentData
Definition: ByteStreamExceptions.h:33
DataHeader::setStatus
void setStatus(statusFlag status)
Set StatusFlag enum for DataHeader.
Definition: DataHeader.cxx:234
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
ByteStreamEventStorageInputSvc::readerReady
bool readerReady()
Definition: ByteStreamEventStorageInputSvc.cxx:560
ByteStreamEventStorageInputSvc::buildFragment
void buildFragment(EventCache *cache, uint32_t eventSize, bool validate) const
Definition: ByteStreamEventStorageInputSvc.cxx:316
Token::setOid
Token & setOid(const OID_t &oid)
Set object identifier.
Definition: Token.h:85
TMVAToMVAUtils::convert
std::unique_ptr< MVAUtils::BDT > convert(TMVA::MethodBDT *bdt, bool isRegression=true, bool useYesNoLeaf=false)
Definition: TMVAToMVAUtils.h:114
ByteStreamExceptions::badFragment
Definition: ByteStreamExceptions.h:27
Athena::Status
Status
Athena specific StatusCode values.
Definition: AthStatusCode.h:22
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
ByteStreamEventStorageInputSvc::m_sequential
Gaudi::Property< bool > m_sequential
Definition: ByteStreamEventStorageInputSvc.h:98
re
const boost::regex re(r_e)
ByteStreamEventStorageInputSvc::m_inputMetadata
ServiceHandle< StoreGateSvc > m_inputMetadata
StoreGateSvc.
Definition: ByteStreamEventStorageInputSvc.h:96
merge.status
status
Definition: merge.py:16
jobOptions.fileName
fileName
Definition: jobOptions.SuperChic_ALP2.py:39
DumpFrags::dump
static void dump(const RawEvent *re)
dump fragments from FullEventFragment
Definition: DumpFrags.h:23
SG::DataProxy
Definition: DataProxy.h:45
ByteStreamEventStorageInputSvc::makeBSProvenance
std::unique_ptr< DataHeaderElement > makeBSProvenance() const
Definition: ByteStreamEventStorageInputSvc.cxx:663
TSU::T
unsigned long long T
Definition: L1TopoDataTypes.h:35
ByteStreamEventStorageInputSvc::m_fileGUID
std::string m_fileGUID
current file GUID
Definition: ByteStreamEventStorageInputSvc.h:89
ByteStreamEventStorageInputSvc::setEvent
virtual void setEvent(void *data, unsigned int eventStatus) override
Definition: ByteStreamEventStorageInputSvc.cxx:610
mapkey::key
key
Definition: TElectronEfficiencyCorrectionTool.cxx:37