ATLAS Offline Software
Classes | Public Member Functions | Static Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
ByteStreamEventStorageInputSvc Class Reference

This class is the ByteStreamInputSvc for reading events written by EventStorage. More...

#include <ByteStreamEventStorageInputSvc.h>

Inheritance diagram for ByteStreamEventStorageInputSvc:
Collaboration diagram for ByteStreamEventStorageInputSvc:

Classes

struct  EventCache
 

Public Member Functions

 ByteStreamEventStorageInputSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Constructors: More...
 
virtual ~ByteStreamEventStorageInputSvc ()
 Destructor. More...
 
virtual StatusCode initialize () override
 Required of all Gaudi Services. More...
 
virtual StatusCode stop () override
 
virtual StatusCode finalize () override
 
virtual StatusCode queryInterface (const InterfaceID &riid, void **ppvInterface) override
 Required of all Gaudi services: see Gaudi documentation for details. More...
 
virtual const RawEventcurrentEvent () const override
 Implementation of the ByteStreamInputSvc interface methods. More...
 
virtual const RawEventnextEvent () override
 ++, new More...
 
virtual const RawEventpreviousEvent () override
 –, old More...
 
virtual void setEvent (void *data, unsigned int eventStatus) override
 
virtual unsigned int currentEventStatus () const override
 Return the current event status. More...
 
virtual void validateEvent () override
 
virtual long positionInBlock () override
 
virtual std::pair< long, std::string > getBlockIterator (const std::string &fileName) override
 
virtual void closeBlockIterator (bool clearMetadata=true) override
 
virtual bool setSequentialRead ()
 
virtual bool ready () override
 
virtual StatusCode generateDataHeader () override
 
MsgStream & msg () const
 
MsgStream & msg (const MSG::Level lvl) const
 
bool msgLvl (const MSG::Level lvl) const
 

Static Public Member Functions

static const InterfaceID & interfaceID ()
 Retrieve interface ID. More...
 

Private Types

enum  Advance { PREVIOUS = -1, NEXT = 1 }
 

Private Member Functions

StatusCode loadMetadata ()
 
void buildFragment (EventCache *cache, uint32_t eventSize, bool validate) const
 
bool readerReady ()
 
bool ROBFragmentCheck (const RawEvent *) const
 
unsigned validateEvent (const RawEvent *const rawEvent) const
 
void setEvent (const EventContext &context, void *data, unsigned int eventStatus)
 
const RawEventgetEvent (Advance step)
 
std::unique_ptr< DataHeaderElementmakeBSProvenance () const
 
template<typename T >
StatusCode deleteEntry (const std::string &key)
 

Private Attributes

std::mutex m_readerMutex
 
SG::SlotSpecificObj< EventCachem_eventsCache
 
std::unique_ptr< EventStorage::DataReader > m_reader
 DataReader from EventStorage. More...
 
std::vector< long long int > m_evtOffsets
 offset for event i in that file More...
 
unsigned int m_evtInFile
 
long long int m_evtFileOffset
 last read in event offset within a file, can be -1 More...
 
std::string m_fileGUID
 current file GUID More...
 
ServiceHandle< StoreGateSvcm_storeGate
 Pointer to StoreGate. More...
 
ServiceHandle< StoreGateSvcm_inputMetadata
 StoreGateSvc. More...
 
ServiceHandle< IROBDataProviderSvcm_robProvider
 
Gaudi::Property< bool > m_sequential
 enable sequential reading. More...
 
Gaudi::Property< bool > m_dump
 
Gaudi::Property< float > m_wait
 
Gaudi::Property< bool > m_valEvent
 
Gaudi::Property< std::string > m_eventInfoKey
 

Detailed Description

This class is the ByteStreamInputSvc for reading events written by EventStorage.

Definition at line 34 of file ByteStreamEventStorageInputSvc.h.

Member Enumeration Documentation

◆ Advance

Enumerator
PREVIOUS 
NEXT 

Definition at line 116 of file ByteStreamEventStorageInputSvc.h.

116 { PREVIOUS = -1, NEXT = 1 };

Constructor & Destructor Documentation

◆ ByteStreamEventStorageInputSvc()

ByteStreamEventStorageInputSvc::ByteStreamEventStorageInputSvc ( const std::string &  name,
ISvcLocator *  pSvcLocator 
)

Constructors:

Definition at line 37 of file ByteStreamEventStorageInputSvc.cxx.

39  : ByteStreamInputSvc(name, pSvcLocator)
40  , m_readerMutex()
41  , m_eventsCache()
42  , m_reader()
43  , m_evtOffsets()
44  , m_evtInFile(0)
45  , m_evtFileOffset(0)
46  , m_fileGUID("")
47  , m_storeGate ("StoreGateSvc", name)
48  , m_inputMetadata("StoreGateSvc/InputMetaDataStore", name)
49  , m_robProvider ("ROBDataProviderSvc", name)
50  , m_sequential (this, "EnableSequential", false, "")
51  , m_dump (this, "DumpFlag", false, "Dump fragments")
52  , m_wait (this, "WaitSecs", 0., "Seconds to wait if input is in wait state")
53  , m_valEvent (this, "ValidateEvent", true, "switch on check_tree when reading events")
54  , m_eventInfoKey (this, "EventInfoKey", "EventInfo", "Key of EventInfo in metadata store")
55 {
56  assert(pSvcLocator != nullptr);
57 
58  declareProperty("EventStore", m_storeGate);
59  declareProperty("MetaDataStore", m_inputMetadata);
60 }

◆ ~ByteStreamEventStorageInputSvc()

ByteStreamEventStorageInputSvc::~ByteStreamEventStorageInputSvc ( )
virtual

Destructor.

Definition at line 64 of file ByteStreamEventStorageInputSvc.cxx.

65 {}

Member Function Documentation

◆ buildFragment()

void ByteStreamEventStorageInputSvc::buildFragment ( EventCache cache,
uint32_t  eventSize,
bool  validate 
) const
private

Definition at line 322 of file ByteStreamEventStorageInputSvc.cxx.

323  {
326  DataType* fragment = reinterpret_cast<DataType*>(cache->data);
327 
328  if (validate) {
329  // check fragment type
330  const DataType headWord = fragment[0];
331  ATH_MSG_DEBUG("First word of the fragment " << MSG::hex << headWord << MSG::dec);
332  // format version
333  const DataType formatVersion = eformat::helper::Version(fragment[3]).major_version();
334  ATH_MSG_DEBUG("Format version" << MSG::hex << formatVersion << MSG::dec);
335  // error message
336  if((formatVersion != eformat::MAJOR_DEFAULT_VERSION) &&
337  (formatVersion != eformat::MAJOR_V24_VERSION) &&
338  (formatVersion != eformat::MAJOR_V30_VERSION) &&
339  (formatVersion != eformat::MAJOR_V40_VERSION) &&
340  (formatVersion != eformat::MAJOR_V31_VERSION) ) {
341  ATH_MSG_ERROR("unsupported Format Version : "
342  << MSG::hex << formatVersion << MSG::dec);
343  }
344 
345  if(eformat::FULL_EVENT == headWord || 0xcc1234cc == headWord) { // ROS = 0xcc1234cc
346  try {
347  // convert old fragment
348  if(formatVersion != eformat::MAJOR_DEFAULT_VERSION) {
349  // 100 for increase of data-size due to header conversion
350  const uint32_t newEventSize = eventSize + 1000;
351  DataType* newFragment = new DataType[newEventSize];
352  eformat::old::convert(fragment, newFragment, newEventSize);
353 
354  // delete old fragment
355  delete [] fragment; fragment = nullptr;
356 
357  // set new pointer
358  fragment = newFragment;
359  cache->data = reinterpret_cast< char* >(fragment);
360  }
361  } catch (const eformat::Issue& ex) {
362  // bad event
363  ATH_MSG_WARNING(ex.what());
364  ATH_MSG_ERROR("Skipping bad event");
366  }
367  } else {
368  // Unknown fragment
369  ATH_MSG_FATAL("Unknown Header work in input fragment " << MSG::hex << headWord);
371  }
372  }
373 
374  // This is a FullEventFragment
375  // make a new FEFrag in memory from it
376  cache->eventStatus = 0;
377  if(fragment[5] > 0) {
378  cache->eventStatus += eformat::helper::Status(fragment[6]).specific();
379  cache->eventStatus += (eformat::helper::Status(fragment[6]).generic() & 0x000000ff) << 16;
380  }
381 
382  // This is a FullEventFragment
383  // make a new RawEvent in memory from it
384  cache->rawEvent = std::make_unique<RawEvent>(fragment);
385  ATH_MSG_DEBUG("Made an FullEventFragment from ES " << fragment);
386 }

◆ closeBlockIterator()

void ByteStreamEventStorageInputSvc::closeBlockIterator ( bool  clearMetadata = true)
overridevirtual

Reimplemented from ByteStreamInputSvc.

Definition at line 482 of file ByteStreamEventStorageInputSvc.cxx.

483 {
484  if (clearMetadata) {
485  ATH_MSG_WARNING("Clearing input metadata store");
486  StatusCode status = m_inputMetadata->clearStore();
487  if (!status.isSuccess()) {
488  ATH_MSG_WARNING("Unable to clear Input MetaData Proxies");
489  }
490  }
491 
492  if (!readerReady()) {
493  ATH_MSG_INFO("No more events in this run, high water mark for this file = "
494  << m_evtOffsets.size()-1);
495  }
496 
497  m_reader.reset();
498 }

◆ currentEvent()

const RawEvent * ByteStreamEventStorageInputSvc::currentEvent ( ) const
overridevirtual

Implementation of the ByteStreamInputSvc interface methods.

Implements ByteStreamInputSvc.

Definition at line 649 of file ByteStreamEventStorageInputSvc.cxx.

650 {
651  const EventContext& context{Gaudi::Hive::currentContext()};
652  return m_eventsCache.get(context)->rawEvent.get();
653 }

◆ currentEventStatus()

unsigned int ByteStreamEventStorageInputSvc::currentEventStatus ( ) const
overridevirtual

Return the current event status.

Reimplemented from ByteStreamInputSvc.

Definition at line 658 of file ByteStreamEventStorageInputSvc.cxx.

659 {
660  const EventContext& context{Gaudi::Hive::currentContext()};
661  return m_eventsCache.get(context)->eventStatus;
662 }

◆ deleteEntry()

template<typename T >
StatusCode ByteStreamEventStorageInputSvc::deleteEntry ( const std::string &  key)
inlineprivate

Definition at line 121 of file ByteStreamEventStorageInputSvc.h.

122  {
123  if (m_storeGate->contains<T>(key)) {
124  const T* tmp = m_storeGate->tryConstRetrieve<T>(key);
125  if (tmp != nullptr) ATH_CHECK(m_storeGate->remove<T>(tmp));
126  }
127  return StatusCode::SUCCESS;
128  }

◆ finalize()

StatusCode ByteStreamEventStorageInputSvc::finalize ( )
overridevirtual

Definition at line 96 of file ByteStreamEventStorageInputSvc.cxx.

96  {
97 
98  ATH_CHECK(m_storeGate.release());
99  ATH_CHECK(m_robProvider.release());
100  ATH_CHECK(m_inputMetadata.release());
101 
103 }

◆ generateDataHeader()

StatusCode ByteStreamEventStorageInputSvc::generateDataHeader ( )
overridevirtual

Reimplemented from ByteStreamInputSvc.

Definition at line 391 of file ByteStreamEventStorageInputSvc.cxx.

392 {
393  // get file GUID
394  m_fileGUID = m_reader->GUID();
395 
396  // reader returns -1 when end of the file is reached
397  if(m_evtFileOffset != -1) {
398  ATH_MSG_DEBUG("ByteStream File GUID: " << m_fileGUID);
399  ATH_MSG_DEBUG("ByteStream Event Position in File: " << m_evtFileOffset);
400 
401  // To accommodate for skipEvents option in EventSelector
402  // While skipping BS event Selector does not return SUCCESS code,
403  // just advances silently through events. So SG content is not refreshed
404  // Lets do refresh of the event header here
405  std::string key = "ByteStreamDataHeader";
406  ATH_CHECK(deleteEntry<DataHeader>(key));
407 
408  // Created data header element with BS provenance information
409  std::unique_ptr<DataHeaderElement> dataHeaderElement = makeBSProvenance();
410  // Create data header itself
411  std::unique_ptr<DataHeader> dataHeader = std::make_unique<DataHeader>();
412  // Declare header primary
413  dataHeader->setStatus(DataHeader::Input);
414  //add the data header element self reference to the object vector
415  dataHeader->insert(*std::move(dataHeaderElement));
416  // Set processTag
417  dataHeader->setProcessTag(dataHeaderElement->getKey());
418 
419  // Clean up EventInfo from the previous event
420  key = m_eventInfoKey.value();
421  ATH_CHECK(deleteEntry<xAOD::EventInfo>(key));
422  // Now add ref to xAOD::EventInfo
423  std::unique_ptr<IOpaqueAddress> iopx = std::make_unique<ByteStreamAddress>(
425  ATH_CHECK(m_storeGate->recordAddress(key, iopx.release()));
426  const SG::DataProxy* ptmpx = m_storeGate->transientProxy(
428  if (ptmpx != nullptr) {
429  DataHeaderElement element(ptmpx, 0, key);
430  dataHeader->insert(element);
431  }
432 
433  // Clean up auxiliary EventInfo from the previous event
434  key = m_eventInfoKey.value() + "Aux.";
435  ATH_CHECK(deleteEntry<xAOD::EventAuxInfo>(key));
436  // Now add ref to xAOD::EventAuxInfo
437  std::unique_ptr<IOpaqueAddress> iopaux = std::make_unique<ByteStreamAddress>(
439  ATH_CHECK(m_storeGate->recordAddress(key, iopaux.release()));
440  const SG::DataProxy* ptmpaux = m_storeGate->transientProxy(
442  if (ptmpaux !=0) {
443  DataHeaderElement element(ptmpaux, 0, key);
444  dataHeader->insert(element);
445  }
446 
447  // Record new data header.Boolean flags will allow it's deletion in case
448  // of skipped events.
449  ATH_CHECK(m_storeGate->record<DataHeader>(dataHeader.release(),
450  "ByteStreamDataHeader", true, false, true));
451  }
452  return StatusCode::SUCCESS;
453 }

◆ getBlockIterator()

std::pair< long, std::string > ByteStreamEventStorageInputSvc::getBlockIterator ( const std::string &  fileName)
overridevirtual

Reimplemented from ByteStreamInputSvc.

Definition at line 521 of file ByteStreamEventStorageInputSvc.cxx.

522 {
523  // open the file
524  if(m_reader != 0) closeBlockIterator();
525 
526  m_reader = std::unique_ptr<EventStorage::DataReader>(pickDataReader(fileName));
527 
528  if(m_reader == nullptr) {
529  ATH_MSG_ERROR("Failed to open file " << fileName);
531  return std::make_pair(-1,"END");
532  }
533 
534  // Initialize offset vector
535  m_evtOffsets.resize(m_reader->eventsInFile(), -1);
536  m_evtOffsets.clear();
537 
538  // Get ByteStream Metadata into Input MetaData Store
539  // (include begin Input File Incident)
540  if (loadMetadata().isSuccess()) {
541  ATH_MSG_DEBUG("Recorded ByteStreamMetadata in InputMetaDataStore");
542  } else {
543  ATH_MSG_ERROR("Unable to record ByteStreamMetadata in InputMetaDataStore");
544  return std::make_pair(-1, "FAIL");
545  }
546 
547  m_evtInFile = 0;
548 
549  // enable sequentialReading if multiple files
550  if(m_sequential) {
551  bool test = setSequentialRead();
552  if (!test) return std::make_pair(-1,"SEQ");
553  }
554 
555  ATH_MSG_INFO("Picked valid file: " << m_reader->fileName());
556  // initialize offsets and counters
557  m_evtOffsets.push_back(static_cast<long long>(m_reader->getPosition()));
558  return std::make_pair(m_reader->eventsInFile(), m_reader->GUID());
559 }

◆ getEvent()

const RawEvent* ByteStreamEventStorageInputSvc::getEvent ( Advance  step)
private

◆ initialize()

StatusCode ByteStreamEventStorageInputSvc::initialize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 70 of file ByteStreamEventStorageInputSvc.cxx.

71 {
72  ATH_MSG_INFO("Initializing");
73 
75  ATH_CHECK(m_inputMetadata.retrieve());
76  ATH_CHECK(m_storeGate.retrieve());
77  ATH_CHECK(m_robProvider.retrieve());
78 
79  return(StatusCode::SUCCESS);
80 }

◆ interfaceID()

const InterfaceID & ByteStreamInputSvc::interfaceID ( )
inlinestaticinherited

Retrieve interface ID.

Declaration of the interface ID ( interface id, major version, minor version)

Definition at line 49 of file ByteStreamInputSvc.h.

49  {
51  static const InterfaceID IID_ByteStreamInputSvc("ByteStreamInputSvc", 1, 0);
52  return(IID_ByteStreamInputSvc);
53 }

◆ loadMetadata()

StatusCode ByteStreamEventStorageInputSvc::loadMetadata ( )
private

Definition at line 117 of file ByteStreamEventStorageInputSvc.cxx.

118 {
119  // default goes into ByteStreamMetadata
120  auto bsmdc = std::make_unique<ByteStreamMetadataContainer>();
121  bsmdc->push_back(std::make_unique<ByteStreamMetadata>(*m_reader));
122  ATH_MSG_DEBUG("ByteStreamMetadata:\n" << *(bsmdc->front()));
123 
124  return m_inputMetadata->record(std::move(bsmdc), "ByteStreamMetadata");
125 }

◆ makeBSProvenance()

std::unique_ptr< DataHeaderElement > ByteStreamEventStorageInputSvc::makeBSProvenance ( ) const
private

Definition at line 682 of file ByteStreamEventStorageInputSvc.cxx.

683 {
684  std::unique_ptr<Token> token = std::make_unique<Token>();
685  token->setDb(m_fileGUID);
686  token->setTechnology(0x00001000);
687  token->setOid(Token::OID_t(0LL, m_evtFileOffset));
688 
689  // note: passing ownership of token to DataHeaderElement
690  return std::make_unique<DataHeaderElement>(ClassID_traits<DataHeader>::ID(),
691  "StreamRAW", token.release());
692 }

◆ msg() [1/2]

MsgStream& AthCommonMsg< Service >::msg ( ) const
inlineinherited

Definition at line 24 of file AthCommonMsg.h.

24  {
25  return this->msgStream();
26  }

◆ msg() [2/2]

MsgStream& AthCommonMsg< Service >::msg ( const MSG::Level  lvl) const
inlineinherited

Definition at line 27 of file AthCommonMsg.h.

27  {
28  return this->msgStream(lvl);
29  }

◆ msgLvl()

bool AthCommonMsg< Service >::msgLvl ( const MSG::Level  lvl) const
inlineinherited

Definition at line 30 of file AthCommonMsg.h.

30  {
31  return this->msgLevel(lvl);
32  }

◆ nextEvent()

const RawEvent * ByteStreamEventStorageInputSvc::nextEvent ( )
overridevirtual

++, new

Implements ByteStreamInputSvc.

Definition at line 197 of file ByteStreamEventStorageInputSvc.cxx.

197  {
198 
199  std::lock_guard<std::mutex> lock( m_readerMutex );
200  const EventContext& context{ Gaudi::Hive::currentContext() };
201  EventCache* cache = m_eventsCache.get(context);
202 
203  // initialize before building RawEvent
204  cache->releaseEvent();
205 
206  // Load data buffer from file
207  unsigned int eventSize;
208  if (readerReady()) {
209  DRError ecode;
210  // Check if have moved back from high water mark
211  m_evtInFile++; // increment iterator
212  if (m_evtInFile+1 > m_evtOffsets.size()) {
213  //get current event position (cast to long long until native tdaq implementation)
214  ATH_MSG_DEBUG("nextEvent _above_ high water mark");
215  m_evtFileOffset = static_cast<long long>(m_reader->getPosition());
216  m_evtOffsets.push_back(m_evtFileOffset);
217  ecode = m_reader->getData(eventSize, &(cache->data));
218  } else {
219  // Load from previous offset
220  ATH_MSG_DEBUG("nextEvent below high water mark");
222  ecode = m_reader->getData(eventSize, &(cache->data), m_evtFileOffset);
223  }
224 
225  if (DRWAIT == ecode && m_wait > 0) {
226  do {
227  // wait for n seconds
228  ATH_MSG_DEBUG("Waiting for input: " << m_wait << " seconds");
229  int result = usleep(static_cast<int>(m_wait * 1e6));
230  if (result != 0) {
231  ATH_MSG_ERROR("System Error while running sleep");
232  return 0;
233  }
234  } while(m_reader->getData(eventSize, &(cache->data)) == DRWAIT);
235  } else if (DROK != ecode) {
236  ATH_MSG_ERROR("Error reading next event");
238  }
239  ATH_MSG_DEBUG("Event Size " << eventSize);
240 
241  } else {
242  ATH_MSG_ERROR("DataReader not ready. Need to getBlockIterator first");
243  return 0;
244  }
245 
246  // Use buffer to build FullEventFragment
247  try {
248  buildFragment(cache, eventSize, true);
249  }
250  catch (...) {
251  // rethrow any exceptions
252  throw;
253  }
254 
255  if (cache->rawEvent == nullptr) {
256  ATH_MSG_ERROR("Failure to build fragment");
257  return nullptr;
258  }
259 
260  // Set it for the data provider
261  m_robProvider->setNextEvent(context, cache->rawEvent.get());
262  m_robProvider->setEventStatus(context, cache->eventStatus);
263 
264  //++m_totalEventCounter;
265 
266  // dump
267  if (m_dump) {
268  DumpFrags::dump(cache->rawEvent.get());
269  }
270  ATH_MSG_DEBUG("switched to next event in slot " << context);
271  return(cache->rawEvent.get());
272 }

◆ positionInBlock()

long ByteStreamEventStorageInputSvc::positionInBlock ( )
overridevirtual

Reimplemented from ByteStreamInputSvc.

Definition at line 109 of file ByteStreamEventStorageInputSvc.cxx.

110 {
111  return m_evtOffsets.size();
112 }

◆ previousEvent()

const RawEvent * ByteStreamEventStorageInputSvc::previousEvent ( )
overridevirtual

–, old

Implements ByteStreamInputSvc.

Definition at line 130 of file ByteStreamEventStorageInputSvc.cxx.

131 {
132  std::lock_guard<std::mutex> lock(m_readerMutex);
133  const EventContext& context{Gaudi::Hive::currentContext()};
134  EventCache* cache = m_eventsCache.get(context);
135  // initialize before building RawEvent
136  cache->releaseEvent();
137 
138  // Load data buffer from file
139  unsigned int eventSize;
140  if (readerReady()) {
141  //get current event position (cast to long long until native tdaq implementation)
142  m_evtInFile--;
144  DRError ecode = m_reader->getData(eventSize, &(cache->data), m_evtOffsets.at(m_evtInFile - 1));
145 
146  if (DRWAIT == ecode && m_wait > 0) {
147  do {
148  // wait for n seconds
149  ATH_MSG_DEBUG("Waiting for input: " << m_wait << " seconds");
150  int result = usleep(static_cast<int>(m_wait * 1e6));
151  if (result != 0) {
152  ATH_MSG_ERROR("System Error while running sleep");
153  return nullptr;
154  }
155  } while(m_reader->getData(eventSize, &(cache->data), m_evtFileOffset) == DRWAIT);
156  } else if (DROK != ecode) {
157  ATH_MSG_ERROR("Error reading previous event");
159  }
160  ATH_MSG_DEBUG("Event Size " << eventSize);
161  }
162  else {
163  ATH_MSG_ERROR("DataReader not ready. Need to getBlockIterator first");
164  return 0;
165  }
166 
167  // Use buffer to build FullEventFragment
168  try {
169  buildFragment(cache, eventSize, true);
170  }
171  catch (...) {
172  // rethrow any exceptions
173  throw;
174  }
175 
176  if (cache->rawEvent == nullptr) {
177  ATH_MSG_ERROR("Failure to build fragment");
178  return nullptr;
179  }
180 
181  // Set it for the data provider
182  m_robProvider->setNextEvent(context, cache->rawEvent.get());
183  m_robProvider->setEventStatus(context, cache->eventStatus);
184 
185  // dump
186  if (m_dump) {
187  DumpFrags::dump(cache->rawEvent.get());
188  }
189  ATH_MSG_DEBUG( "switched to previous event in slot " << context);
190  return(cache->rawEvent.get());
191 }

◆ queryInterface()

StatusCode ByteStreamEventStorageInputSvc::queryInterface ( const InterfaceID &  riid,
void **  ppvInterface 
)
overridevirtual

Required of all Gaudi services: see Gaudi documentation for details.

Definition at line 667 of file ByteStreamEventStorageInputSvc.cxx.

669 {
670  if(ByteStreamInputSvc::interfaceID().versionMatch(riid))
671  *ppvInterface = dynamic_cast<ByteStreamInputSvc*>(this);
672  else // Interface is not directly available: try out a base class
673  return(::AthService::queryInterface(riid, ppvInterface));
674 
675  addRef();
676  return(StatusCode::SUCCESS);
677 }

◆ readerReady()

bool ByteStreamEventStorageInputSvc::readerReady ( )
private

Definition at line 564 of file ByteStreamEventStorageInputSvc.cxx.

565 {
566  bool eofFlag(false);
567 
568  if (m_reader) eofFlag = m_reader->endOfFile();
569  else {
570  ATH_MSG_INFO("eformat reader object not initialized");
571  return false;
572  }
573 
574  bool moreEvent = m_reader->good();
575 
576  return (!eofFlag) && moreEvent;
577 }

◆ ready()

bool ByteStreamEventStorageInputSvc::ready ( )
overridevirtual

Reimplemented from ByteStreamInputSvc.

Definition at line 513 of file ByteStreamEventStorageInputSvc.cxx.

514 {
515  return readerReady();
516 }

◆ ROBFragmentCheck()

bool ByteStreamEventStorageInputSvc::ROBFragmentCheck ( const RawEvent re) const
private

Definition at line 582 of file ByteStreamEventStorageInputSvc.cxx.

583 {
584  bool allOK = true;
585  uint32_t total = re->nchildren();
586  uint32_t lastId = 0;
587  std::vector<eformat::FragmentProblem> problems;
588 
589  for (size_t i = 0; i<total; ++i) {
591  re->child(fp, i);
592 
594  lastId = f.source_id();
595 
596  problems.clear();
597  f.problems(problems);
598  if(!problems.empty()) {
599  allOK = false;
600  for(const auto& problem : problems) {
601  ATH_MSG_WARNING("Failed to create ROBFragment id = " << lastId << ", "
602  << eformat::helper::SourceIdentifier(lastId).human() << " : "
603  << eformat::helper::FragmentProblemDictionary.string(problem));
604  }
605  }
606  }
607 
608  return allOK;
609 }

◆ setEvent() [1/2]

void ByteStreamEventStorageInputSvc::setEvent ( const EventContext &  context,
void *  data,
unsigned int  eventStatus 
)
private

Definition at line 623 of file ByteStreamEventStorageInputSvc.cxx.

625 {
627  EventCache* cache = m_eventsCache.get(context);
628  cache->releaseEvent();
629 
630  DataType* fragment = reinterpret_cast<DataType*>(data);
631  cache->rawEvent = std::make_unique<RawEvent>(fragment);
632  cache->eventStatus = eventStatus;
633 
634  // Set it for the data provider
635  m_robProvider->setNextEvent(context, cache->rawEvent.get());
636  m_robProvider->setEventStatus(context, cache->eventStatus);
637 
638  // Build a DH for use by other components
639  StatusCode rec_sg = generateDataHeader();
640  if (rec_sg != StatusCode::SUCCESS) {
641  ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! "
642  << rec_sg);
643  }
644 }

◆ setEvent() [2/2]

void ByteStreamEventStorageInputSvc::setEvent ( void *  data,
unsigned int  eventStatus 
)
overridevirtual

Reimplemented from ByteStreamInputSvc.

Definition at line 614 of file ByteStreamEventStorageInputSvc.cxx.

615 {
616  const EventContext& context{Gaudi::Hive::currentContext()};
617  return setEvent(context, data, eventStatus);
618 }

◆ setSequentialRead()

bool ByteStreamEventStorageInputSvc::setSequentialRead ( )
virtual

Definition at line 503 of file ByteStreamEventStorageInputSvc.cxx.

504 {
505  // enable SequenceReading
506  m_reader->enableSequenceReading();
507  return true;
508 }

◆ stop()

StatusCode ByteStreamEventStorageInputSvc::stop ( )
overridevirtual

Definition at line 85 of file ByteStreamEventStorageInputSvc.cxx.

86 {
87  ATH_MSG_DEBUG("Calling ByteStreamInputSvc::stop()");
88  // close moved to EventSelector for explicit coupling with incident
89  //if (m_reader != 0) closeBlockIterator(false);
90  return(StatusCode::SUCCESS);
91 }

◆ validateEvent() [1/2]

void ByteStreamEventStorageInputSvc::validateEvent ( )
overridevirtual

Reimplemented from ByteStreamInputSvc.

Definition at line 277 of file ByteStreamEventStorageInputSvc.cxx.

278 {
279  const EventContext& context{Gaudi::Hive::currentContext()};
280  const RawEvent* const event = m_eventsCache.get(context)->rawEvent.get();
281  m_eventsCache.get(context)->eventStatus = validateEvent(event);
282 }

◆ validateEvent() [2/2]

unsigned ByteStreamEventStorageInputSvc::validateEvent ( const RawEvent *const  rawEvent) const
private

Definition at line 287 of file ByteStreamEventStorageInputSvc.cxx.

288 {
289  unsigned int status = 0;
290  if (m_valEvent) {
291  // check validity
292  std::vector<eformat::FragmentProblem> problems;
293  rawEvent->problems(problems);
294 
295  if(!problems.empty()) {
296  status += 0x01000000;
297 
298  // bad event
299  ATH_MSG_WARNING("Failed to create FullEventFragment");
300  for(const auto& problem : problems)
301  ATH_MSG_WARNING(eformat::helper::FragmentProblemDictionary.string(problem));
302 
304  }
305 
306  if(!ROBFragmentCheck(rawEvent)) {
307  status += 0x02000000;
308 
309  // bad event
310  ATH_MSG_ERROR("Skipping bad event");
312  }
313  } else {
314  ATH_MSG_DEBUG("Processing event without validating.");
315  }
316  return status;
317 }

Member Data Documentation

◆ m_dump

Gaudi::Property<bool> ByteStreamEventStorageInputSvc::m_dump
private

Definition at line 102 of file ByteStreamEventStorageInputSvc.h.

◆ m_eventInfoKey

Gaudi::Property<std::string> ByteStreamEventStorageInputSvc::m_eventInfoKey
private

Definition at line 105 of file ByteStreamEventStorageInputSvc.h.

◆ m_eventsCache

SG::SlotSpecificObj<EventCache> ByteStreamEventStorageInputSvc::m_eventsCache
private

Definition at line 84 of file ByteStreamEventStorageInputSvc.h.

◆ m_evtFileOffset

long long int ByteStreamEventStorageInputSvc::m_evtFileOffset
private

last read in event offset within a file, can be -1

Definition at line 90 of file ByteStreamEventStorageInputSvc.h.

◆ m_evtInFile

unsigned int ByteStreamEventStorageInputSvc::m_evtInFile
private

Definition at line 89 of file ByteStreamEventStorageInputSvc.h.

◆ m_evtOffsets

std::vector<long long int> ByteStreamEventStorageInputSvc::m_evtOffsets
private

offset for event i in that file

Definition at line 88 of file ByteStreamEventStorageInputSvc.h.

◆ m_fileGUID

std::string ByteStreamEventStorageInputSvc::m_fileGUID
private

current file GUID

Definition at line 92 of file ByteStreamEventStorageInputSvc.h.

◆ m_inputMetadata

ServiceHandle<StoreGateSvc> ByteStreamEventStorageInputSvc::m_inputMetadata
private

StoreGateSvc.

Definition at line 99 of file ByteStreamEventStorageInputSvc.h.

◆ m_reader

std::unique_ptr<EventStorage::DataReader> ByteStreamEventStorageInputSvc::m_reader
private

DataReader from EventStorage.

Definition at line 86 of file ByteStreamEventStorageInputSvc.h.

◆ m_readerMutex

std::mutex ByteStreamEventStorageInputSvc::m_readerMutex
private

Definition at line 73 of file ByteStreamEventStorageInputSvc.h.

◆ m_robProvider

ServiceHandle<IROBDataProviderSvc> ByteStreamEventStorageInputSvc::m_robProvider
private

Definition at line 100 of file ByteStreamEventStorageInputSvc.h.

◆ m_sequential

Gaudi::Property<bool> ByteStreamEventStorageInputSvc::m_sequential
private

enable sequential reading.

Definition at line 101 of file ByteStreamEventStorageInputSvc.h.

◆ m_storeGate

ServiceHandle<StoreGateSvc> ByteStreamEventStorageInputSvc::m_storeGate
private

Pointer to StoreGate.

StoreGateSvc

Definition at line 98 of file ByteStreamEventStorageInputSvc.h.

◆ m_valEvent

Gaudi::Property<bool> ByteStreamEventStorageInputSvc::m_valEvent
private

Definition at line 104 of file ByteStreamEventStorageInputSvc.h.

◆ m_wait

Gaudi::Property<float> ByteStreamEventStorageInputSvc::m_wait
private

Definition at line 103 of file ByteStreamEventStorageInputSvc.h.


The documentation for this class was generated from the following files:
ByteStreamEventStorageInputSvc::m_evtFileOffset
long long int m_evtFileOffset
last read in event offset within a file, can be -1
Definition: ByteStreamEventStorageInputSvc.h:90
DataHeader::setProcessTag
void setProcessTag(const std::string &processTag)
Set ProcessTag for DataHeader.
Definition: DataHeader.cxx:243
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
python.CaloRecoConfig.f
f
Definition: CaloRecoConfig.py:127
python.tests.PyTestsLib.finalize
def finalize(self)
_info( "content of StoreGate..." ) self.sg.dump()
Definition: PyTestsLib.py:53
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
validation.validate
def validate(testSampleDir, thisSampleName, testSamplePath, weight_database, outputSamples)
Definition: validation.py:26
get_generator_info.result
result
Definition: get_generator_info.py:21
ByteStreamEventStorageInputSvc::NEXT
@ NEXT
Definition: ByteStreamEventStorageInputSvc.h:116
ByteStreamEventStorageInputSvc::PREVIOUS
@ PREVIOUS
Definition: ByteStreamEventStorageInputSvc.h:116
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
OFFLINE_FRAGMENTS_NAMESPACE::DataType
uint32_t DataType
Definition: RawEvent.h:24
ByteStreamEventStorageInputSvc::m_readerMutex
std::mutex m_readerMutex
Definition: ByteStreamEventStorageInputSvc.h:73
ByteStreamInputSvc
This class provides the base class to services to read bytestream data. The concrete class can provid...
Definition: ByteStreamInputSvc.h:23
xAOD::uint32_t
setEventNumber uint32_t
Definition: EventInfo_v1.cxx:127
DataHeader::Input
@ Input
Definition: DataHeader.h:126
ByteStreamEventStorageInputSvc::m_evtOffsets
std::vector< long long int > m_evtOffsets
offset for event i in that file
Definition: ByteStreamEventStorageInputSvc.h:88
ByteStreamEventStorageInputSvc::m_eventsCache
SG::SlotSpecificObj< EventCache > m_eventsCache
Definition: ByteStreamEventStorageInputSvc.h:84
ByteStreamEventStorageInputSvc::m_storeGate
ServiceHandle< StoreGateSvc > m_storeGate
Pointer to StoreGate.
Definition: ByteStreamEventStorageInputSvc.h:98
Issue
Configuration Issue
Definition: PscIssues.h:31
ByteStreamEventStorageInputSvc::setSequentialRead
virtual bool setSequentialRead()
Definition: ByteStreamEventStorageInputSvc.cxx:503
initialize
void initialize()
Definition: run_EoverP.cxx:894
DataType
OFFLINE_FRAGMENTS_NAMESPACE::PointerType DataType
Definition: RoIBResultByteStreamTool.cxx:25
ByteStreamEventStorageInputSvc::validateEvent
virtual void validateEvent() override
Definition: ByteStreamEventStorageInputSvc.cxx:277
RawEvent
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition: RawEvent.h:37
ByteStreamEventStorageInputSvc::m_eventInfoKey
Gaudi::Property< std::string > m_eventInfoKey
Definition: ByteStreamEventStorageInputSvc.h:105
ByteStreamEventStorageInputSvc::m_robProvider
ServiceHandle< IROBDataProviderSvc > m_robProvider
Definition: ByteStreamEventStorageInputSvc.h:100
ByteStreamInputSvc::ByteStreamInputSvc
ByteStreamInputSvc(const std::string &name, ISvcLocator *svcloc)
constructor
Definition: ByteStreamInputSvc.cxx:8
TrigInDetValidation_Base.test
test
Definition: TrigInDetValidation_Base.py:144
ByteStreamEventStorageInputSvc::generateDataHeader
virtual StatusCode generateDataHeader() override
Definition: ByteStreamEventStorageInputSvc.cxx:391
ByteStreamEventStorageInputSvc::m_evtInFile
unsigned int m_evtInFile
Definition: ByteStreamEventStorageInputSvc.h:89
ByteStreamEventStorageInputSvc::m_dump
Gaudi::Property< bool > m_dump
Definition: ByteStreamEventStorageInputSvc.h:102
ByteStreamEventStorageInputSvc::closeBlockIterator
virtual void closeBlockIterator(bool clearMetadata=true) override
Definition: ByteStreamEventStorageInputSvc.cxx:482
ByteStreamEventStorageInputSvc::m_valEvent
Gaudi::Property< bool > m_valEvent
Definition: ByteStreamEventStorageInputSvc.h:104
ByteStreamEventStorageInputSvc::loadMetadata
StatusCode loadMetadata()
Definition: ByteStreamEventStorageInputSvc.cxx:117
ByteStreamEventStorageInputSvc::m_wait
Gaudi::Property< float > m_wait
Definition: ByteStreamEventStorageInputSvc.h:103
ByteStreamEventStorageInputSvc::ROBFragmentCheck
bool ROBFragmentCheck(const RawEvent *) const
Definition: ByteStreamEventStorageInputSvc.cxx:582
DataHeaderElement::getKey
const std::string & getKey() const
Definition: DataHeader.cxx:127
Token::OID_t
Definition: Token.h:24
OFFLINE_FRAGMENTS_NAMESPACE::PointerType
const DataType * PointerType
Definition: RawEvent.h:25
DataHeaderElement
This class provides a persistent form for the TransientAddress.
Definition: DataHeader.h:36
FortranAlgorithmOptions.fileName
fileName
Definition: FortranAlgorithmOptions.py:13
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
ByteStreamExceptions::readError
Definition: ByteStreamExceptions.h:22
ByteStreamEventStorageInputSvc::m_reader
std::unique_ptr< EventStorage::DataReader > m_reader
DataReader from EventStorage.
Definition: ByteStreamEventStorageInputSvc.h:86
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
eformat::ROBFragment
Definition: L1CaloBsDecoderUtil.h:12
DataHeader
This class provides the layout for summary information stored for data written to POOL.
Definition: DataHeader.h:124
lumiFormat.i
int i
Definition: lumiFormat.py:92
trigmenu_modify_prescale_json.fp
fp
Definition: trigmenu_modify_prescale_json.py:53
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:40
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
Token::setDb
Token & setDb(const Guid &db)
Set database name.
Definition: Token.h:64
DeMoUpdate.tmp
string tmp
Definition: DeMoUpdate.py:1167
Token::setTechnology
Token & setTechnology(int t)
Set technology type.
Definition: Token.h:77
DataHeader::insert
void insert(const SG::TransientAddress *sgAddress, IOpaqueAddress *tokAddress=0, const std::string &pTag="")
Insert a new element into the "DataObject" vector.
Definition: DataHeader.cxx:267
ByteStreamExceptions::badFragmentData
Definition: ByteStreamExceptions.h:34
DataHeader::setStatus
void setStatus(statusFlag status)
Set StatusFlag enum for DataHeader.
Definition: DataHeader.cxx:235
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
ByteStreamEventStorageInputSvc::readerReady
bool readerReady()
Definition: ByteStreamEventStorageInputSvc.cxx:564
ByteStreamEventStorageInputSvc::buildFragment
void buildFragment(EventCache *cache, uint32_t eventSize, bool validate) const
Definition: ByteStreamEventStorageInputSvc.cxx:322
Token::setOid
Token & setOid(const OID_t &oid)
Set object identifier.
Definition: Token.h:83
TMVAToMVAUtils::convert
std::unique_ptr< MVAUtils::BDT > convert(TMVA::MethodBDT *bdt, bool isRegression=true, bool useYesNoLeaf=false)
Definition: TMVAToMVAUtils.h:114
ByteStreamExceptions::badFragment
Definition: ByteStreamExceptions.h:28
Athena::Status
Status
Athena specific StatusCode values.
Definition: AthStatusCode.h:22
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
ByteStreamEventStorageInputSvc::m_sequential
Gaudi::Property< bool > m_sequential
enable sequential reading.
Definition: ByteStreamEventStorageInputSvc.h:101
re
const boost::regex re(r_e)
ByteStreamEventStorageInputSvc::m_inputMetadata
ServiceHandle< StoreGateSvc > m_inputMetadata
StoreGateSvc.
Definition: ByteStreamEventStorageInputSvc.h:99
declareProperty
#define declareProperty(n, p, h)
Definition: BaseFakeBkgTool.cxx:15
merge.status
status
Definition: merge.py:17
ByteStreamInputSvc::interfaceID
static const InterfaceID & interfaceID()
Retrieve interface ID.
Definition: ByteStreamInputSvc.h:49
DumpFrags::dump
static void dump(const RawEvent *re)
dump fragments from FullEventFragment
Definition: DumpFrags.h:23
SG::DataProxy
Definition: DataProxy.h:44
ByteStreamEventStorageInputSvc::makeBSProvenance
std::unique_ptr< DataHeaderElement > makeBSProvenance() const
Definition: ByteStreamEventStorageInputSvc.cxx:682
TSU::T
unsigned long long T
Definition: L1TopoDataTypes.h:35
ByteStreamEventStorageInputSvc::m_fileGUID
std::string m_fileGUID
current file GUID
Definition: ByteStreamEventStorageInputSvc.h:92
ByteStreamEventStorageInputSvc::setEvent
virtual void setEvent(void *data, unsigned int eventStatus) override
Definition: ByteStreamEventStorageInputSvc.cxx:614
mapkey::key
key
Definition: TElectronEfficiencyCorrectionTool.cxx:37