ATLAS Offline Software
Classes | Public Member Functions | Static Public Member Functions | Private Attributes | List of all members
TrigByteStreamInputSvc Class Reference

A ByteStreamInputSvc implementation for online use, reading events from hltinterface::DataCollector. More...

#include <TrigByteStreamInputSvc.h>

Inheritance diagram for TrigByteStreamInputSvc:
Collaboration diagram for TrigByteStreamInputSvc:

Classes

struct  EventCache
 

Public Member Functions

 TrigByteStreamInputSvc (const std::string &name, ISvcLocator *svcLoc)
 Standard constructor. More...
 
virtual ~TrigByteStreamInputSvc ()
 Standard destructor. More...
 
virtual StatusCode queryInterface (const InterfaceID &riid, void **ppvInterface) override
 
virtual StatusCode initialize () override
 
virtual StatusCode finalize () override
 
virtual const RawEventnextEvent () override
 virtual method for advance to the next event More...
 
virtual const RawEventpreviousEvent () override
 
virtual const RawEventcurrentEvent () const override
 virtual method for accessing the current event More...
 
virtual void setEvent (void *, unsigned int)
 
virtual unsigned int currentEventStatus () const
 virtual method for accessing the current event status More...
 
virtual std::pair< long, std::string > getBlockIterator (const std::string &)
 
virtual void closeBlockIterator (bool)
 
virtual bool ready ()
 
virtual StatusCode generateDataHeader ()
 
virtual long positionInBlock ()
 
virtual void validateEvent ()
 
MsgStream & msg () const
 
MsgStream & msg (const MSG::Level lvl) const
 
bool msgLvl (const MSG::Level lvl) const
 

Static Public Member Functions

static const InterfaceID & interfaceID ()
 Retrieve interface ID. More...
 

Private Attributes

ServiceHandle< IROBDataProviderSvcm_robDataProviderSvc {this, "ROBDataProvider", "ROBDataProviderSvc"}
 
ServiceHandle< StoreGateSvcm_evtStore {this, "EventStore", "StoreGateSvc"}
 
ToolHandle< GenericMonitoringToolm_monTool {this, "MonTool", "" , "Monitoring tool"}
 
Gaudi::Property< int > m_checkCTPFragmentModuleID
 
SG::SlotSpecificObj< EventCachem_eventsCache
 Cache of RawEvent pointer per event slot. More...
 
uint16_t m_maxLB {0}
 Maximum lumi block number seen so far, used for monitoring. More...
 

Detailed Description

A ByteStreamInputSvc implementation for online use, reading events from hltinterface::DataCollector.

The layout and implementation are based on ByteStreamEventStorageInputSvc

Definition at line 23 of file TrigByteStreamInputSvc.h.

Constructor & Destructor Documentation

◆ TrigByteStreamInputSvc()

TrigByteStreamInputSvc::TrigByteStreamInputSvc ( const std::string &  name,
ISvcLocator *  svcLoc 
)

Standard constructor.

Definition at line 26 of file TrigByteStreamInputSvc.cxx.

27 : ByteStreamInputSvc(name, svcLoc) {}

◆ ~TrigByteStreamInputSvc()

TrigByteStreamInputSvc::~TrigByteStreamInputSvc ( )
virtual

Standard destructor.

Definition at line 32 of file TrigByteStreamInputSvc.cxx.

32 {}

Member Function Documentation

◆ closeBlockIterator()

void ByteStreamInputSvc::closeBlockIterator ( bool  )
inlinevirtualinherited

Reimplemented in ByteStreamEventStorageInputSvc.

Definition at line 61 of file ByteStreamInputSvc.h.

61 {}

◆ currentEvent()

const RawEvent * TrigByteStreamInputSvc::currentEvent ( ) const
overridevirtual

virtual method for accessing the current event

Implements ByteStreamInputSvc.

Definition at line 207 of file TrigByteStreamInputSvc.cxx.

207  {
208  ATH_MSG_FATAL("The method " << __FUNCTION__ << " is not implemented for online running");
209  return nullptr;
210 }

◆ currentEventStatus()

unsigned int ByteStreamInputSvc::currentEventStatus ( ) const
inlinevirtualinherited

virtual method for accessing the current event status

Reimplemented in ByteStreamEventStorageInputSvc.

Definition at line 55 of file ByteStreamInputSvc.h.

55  {
56  return(0);
57 }

◆ finalize()

StatusCode TrigByteStreamInputSvc::finalize ( )
overridevirtual

Definition at line 67 of file TrigByteStreamInputSvc.cxx.

67  {
68  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
69  if (m_robDataProviderSvc.release().isFailure()) {
70  ATH_MSG_WARNING("Cannot release rob data provider");
71  }
72  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
73  return StatusCode::SUCCESS;
74 }

◆ generateDataHeader()

StatusCode ByteStreamInputSvc::generateDataHeader ( )
inlinevirtualinherited

Reimplemented in ByteStreamEventStorageInputSvc.

Definition at line 63 of file ByteStreamInputSvc.h.

63 {return StatusCode::SUCCESS;}

◆ getBlockIterator()

std::pair< long, std::string > ByteStreamInputSvc::getBlockIterator ( const std::string &  )
inlinevirtualinherited

Reimplemented in ByteStreamEventStorageInputSvc.

Definition at line 60 of file ByteStreamInputSvc.h.

60 {return std::make_pair(-1,"GUID");}

◆ initialize()

StatusCode TrigByteStreamInputSvc::initialize ( )
overridevirtual

Definition at line 53 of file TrigByteStreamInputSvc.cxx.

53  {
54  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
55 
56  ATH_CHECK(m_robDataProviderSvc.retrieve());
57  ATH_CHECK(m_evtStore.retrieve());
58  if (!m_monTool.empty()) ATH_CHECK(m_monTool.retrieve());
59 
60  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
61  return StatusCode::SUCCESS;
62 }

◆ interfaceID()

const InterfaceID & ByteStreamInputSvc::interfaceID ( )
inlinestaticinherited

Retrieve interface ID.

Declaration of the interface ID ( interface id, major version, minor version)

Definition at line 49 of file ByteStreamInputSvc.h.

49  {
51  static const InterfaceID IID_ByteStreamInputSvc("ByteStreamInputSvc", 1, 0);
52  return(IID_ByteStreamInputSvc);
53 }

◆ msg() [1/2]

MsgStream& AthCommonMsg< Service >::msg ( ) const
inlineinherited

Definition at line 24 of file AthCommonMsg.h.

24  {
25  return this->msgStream();
26  }

◆ msg() [2/2]

MsgStream& AthCommonMsg< Service >::msg ( const MSG::Level  lvl) const
inlineinherited

Definition at line 27 of file AthCommonMsg.h.

27  {
28  return this->msgStream(lvl);
29  }

◆ msgLvl()

bool AthCommonMsg< Service >::msgLvl ( const MSG::Level  lvl) const
inlineinherited

Definition at line 30 of file AthCommonMsg.h.

30  {
31  return this->msgLevel(lvl);
32  }

◆ nextEvent()

const RawEvent * TrigByteStreamInputSvc::nextEvent ( )
overridevirtual

virtual method for advance to the next event

Implements ByteStreamInputSvc.

Definition at line 79 of file TrigByteStreamInputSvc.cxx.

79  {
80  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
81 
82  // Get the EventContext via event store because the interface doesn't allow passing it explicitly as an argument
83  // and we don't want to use ThreadLocalContext. Don't use ReadHandle here because it calls ThreadLocalContext if
84  // not given a context (which we want to retrieve).
85  const EventContext* eventContext = nullptr;
86  if (m_evtStore->retrieve(eventContext).isFailure()) {
87  ATH_MSG_ERROR("Failed to retrieve EventContext from the event store, new event cannot be read");
88  return nullptr;
89  }
90 
91  ATH_MSG_DEBUG("Reading new event for event context " << *eventContext);
92 
93  // Find the cache corresponding to the current slot
94  EventCache* cache = m_eventsCache.get(*eventContext);
95 
96  // Free the memory allocated to the previous event processed in the current slot
97  cache->releaseEvent();
98 
99  using DCStatus = hltinterface::DataCollector::Status;
100  DCStatus status = DCStatus::NO_EVENT;
101  auto monLBN = Monitored::Scalar<uint16_t>("getNext_LBN", m_maxLB);
102  auto monNoEvent = Monitored::Scalar<bool>("getNext_noEvent", false);
103  try {
104  auto t_getNext = Monitored::Timer<std::chrono::duration<float, std::milli>>("TIME_getNext");
105  status = hltinterface::DataCollector::instance()->getNext(cache->rawData);
106  auto mon = Monitored::Group(m_monTool, t_getNext);
107  }
108  catch (const std::exception& ex) {
109  ATH_MSG_ERROR("Failed to read new event, caught an unexpected exception: " << ex.what()
110  << ". Throwing hltonl::Exception::EventSourceCorrupted" );
112  }
113  catch (...) {
114  ATH_MSG_ERROR("Failed to read new event, caught an unexpected exception. "
115  << "Throwing hltonl::Exception::EventSourceCorrupted" );
117  }
118 
119  if (status == DCStatus::STOP) {
120  ATH_MSG_DEBUG("DataCollector::getNext returned STOP - no more events available");
122  }
123  else if (status == DCStatus::NO_EVENT) {
124  ATH_MSG_DEBUG("DataCollector::getNext returned NO_EVENT - no events available temporarily");
125  monNoEvent = true;
126  auto mon = Monitored::Group(m_monTool, monLBN, monNoEvent);
128  }
129  else if (status != DCStatus::OK) {
130  ATH_MSG_ERROR("Unhandled return Status " << static_cast<int>(status) << " from DataCollector::getNext");
131  return nullptr;
132  }
133  ATH_MSG_VERBOSE("DataCollector::getNext returned Status::OK");
134 
135  // Create a cached FullEventFragment object from the cached raw data
136  cache->fullEventFragment.reset(new RawEvent(cache->rawData.get()));
137 
138  // Update LB number for monitoring
139  if (m_maxLB < cache->fullEventFragment->lumi_block()) {
140  m_maxLB = cache->fullEventFragment->lumi_block();
141  monLBN = m_maxLB;
142  }
143 
144  // Monitor the input
145  auto numROBs = Monitored::Scalar<int>("L1Result_NumROBs",
146  cache->fullEventFragment->nchildren());
147  auto fragSize = Monitored::Scalar<float>("L1Result_FullEvFragSize",
148  cache->fullEventFragment->fragment_size_word()*wordsToKiloBytes);
149  std::vector<eformat::read::ROBFragment> robVec;
150  cache->fullEventFragment->robs(robVec);
151  std::vector<std::string> subdetNameVec;
152  for (const eformat::read::ROBFragment& rob : robVec) {
153  eformat::helper::SourceIdentifier sid(rob.rob_source_id());
154  subdetNameVec.push_back(sid.human_detector());
155  }
156  auto subdets = Monitored::Collection<std::vector<std::string>>("L1Result_SubDets", subdetNameVec);
157  auto mon = Monitored::Group(m_monTool, numROBs, fragSize, subdets, monLBN, monNoEvent);
158 
159  // Give the FullEventFragment pointer to ROBDataProviderSvc
160  m_robDataProviderSvc->setNextEvent(*eventContext, cache->fullEventFragment.get());
161  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
162 
163  // Check the CTP fragment (request from readout if not part of the cache), ATR-25217
164  if (m_checkCTPFragmentModuleID.value() >= 0) {
165  const eformat::helper::SourceIdentifier sid{eformat::SubDetector::TDAQ_CTP,
166  static_cast<uint16_t>(m_checkCTPFragmentModuleID.value())};
167  std::vector<const OFFLINE_FRAGMENTS_NAMESPACE::ROBFragment*> vrobf;
168  m_robDataProviderSvc->getROBData(*eventContext, {sid.code()}, vrobf, name());
169  if (vrobf.empty()) {
170  ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " is missing. "
171  << "Throwing hltonl::Exception::MissingCTPFragment");
173  }
174  uint32_t robStatus = vrobf.at(0)->nstatus()>0 ? *(vrobf.at(0)->status()) : 0;
175  if (robStatus!=0) {
176  std::string hexStatus(8, char{0});
177  std::to_chars(hexStatus.data(), hexStatus.data()+hexStatus.size(), robStatus, 16);
178  ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " has non-zero status word: 0x"
179  << hexStatus << ". Throwing hltonl::Exception::BadCTPFragment");
180  throw hltonl::Exception::BadCTPFragment("Non-zero ROB status 0x"+hexStatus);
181  }
182  try {
183  vrobf.at(0)->check();
184  }
185  catch (const std::exception& ex) {
186  ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " is corrupted: "
187  << ex.what() << ". Throwing hltonl::Exception::BadCTPFragment");
188  throw hltonl::Exception::BadCTPFragment(ex.what());
189  }
190  }
191 
192  // Return the FullEventFragment pointer (do not transfer ownership)
193  return cache->fullEventFragment.get();
194 }

◆ positionInBlock()

long ByteStreamInputSvc::positionInBlock ( )
inlinevirtualinherited

Reimplemented in ByteStreamEventStorageInputSvc.

Definition at line 64 of file ByteStreamInputSvc.h.

64 {return -1;}

◆ previousEvent()

const RawEvent * TrigByteStreamInputSvc::previousEvent ( )
overridevirtual

Implements ByteStreamInputSvc.

Definition at line 199 of file TrigByteStreamInputSvc.cxx.

199  {
200  ATH_MSG_FATAL("The method " << __FUNCTION__ << " is not implemented for online running");
201  return nullptr;
202 }

◆ queryInterface()

StatusCode TrigByteStreamInputSvc::queryInterface ( const InterfaceID &  riid,
void **  ppvInterface 
)
overridevirtual

Definition at line 37 of file TrigByteStreamInputSvc.cxx.

37  {
38  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
39 
40  if(ByteStreamInputSvc::interfaceID().versionMatch(riid))
41  *ppvInterface = static_cast<ByteStreamInputSvc*>(this);
42  else
43  return AthService::queryInterface(riid, ppvInterface);
44 
45  addRef();
46  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
47  return StatusCode::SUCCESS;
48 }

◆ ready()

bool ByteStreamInputSvc::ready ( )
inlinevirtualinherited

Reimplemented in ByteStreamEventStorageInputSvc.

Definition at line 62 of file ByteStreamInputSvc.h.

62 {return false;}

◆ setEvent()

virtual void ByteStreamInputSvc::setEvent ( void *  ,
unsigned int   
)
inlinevirtualinherited

Reimplemented in ByteStreamEventStorageInputSvc.

Definition at line 36 of file ByteStreamInputSvc.h.

36 {}

◆ validateEvent()

void ByteStreamInputSvc::validateEvent ( )
inlinevirtualinherited

Reimplemented in ByteStreamEventStorageInputSvc.

Definition at line 65 of file ByteStreamInputSvc.h.

65 {}

Member Data Documentation

◆ m_checkCTPFragmentModuleID

Gaudi::Property<int> TrigByteStreamInputSvc::m_checkCTPFragmentModuleID
private
Initial value:
{this, "CheckCTPFragmentModuleID", -1,
"After reading a new event, assert we can retrieve the CTP fragment with Module ID given by this property, "
"and that has no errors. A value <0 disables the check."}

Definition at line 49 of file TrigByteStreamInputSvc.h.

◆ m_eventsCache

SG::SlotSpecificObj<EventCache> TrigByteStreamInputSvc::m_eventsCache
private

Cache of RawEvent pointer per event slot.

Definition at line 61 of file TrigByteStreamInputSvc.h.

◆ m_evtStore

ServiceHandle<StoreGateSvc> TrigByteStreamInputSvc::m_evtStore {this, "EventStore", "StoreGateSvc"}
private

Definition at line 45 of file TrigByteStreamInputSvc.h.

◆ m_maxLB

uint16_t TrigByteStreamInputSvc::m_maxLB {0}
private

Maximum lumi block number seen so far, used for monitoring.

Definition at line 62 of file TrigByteStreamInputSvc.h.

◆ m_monTool

ToolHandle<GenericMonitoringTool> TrigByteStreamInputSvc::m_monTool {this, "MonTool", "" , "Monitoring tool"}
private

Definition at line 46 of file TrigByteStreamInputSvc.h.

◆ m_robDataProviderSvc

ServiceHandle<IROBDataProviderSvc> TrigByteStreamInputSvc::m_robDataProviderSvc {this, "ROBDataProvider", "ROBDataProviderSvc"}
private

Definition at line 44 of file TrigByteStreamInputSvc.h.


The documentation for this class was generated from the following files:
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
TrigDefs::Group
Group
Properties of a chain group.
Definition: GroupProperties.h:13
hltonl::Exception::MissingCTPFragment
Thrown if the CTP ROBFragment cannot be retrieved for a new event.
Definition: HltExceptions.h:41
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
ByteStreamInputSvc
This class provides the base class to services to read bytestream data. The concrete class can provid...
Definition: ByteStreamInputSvc.h:23
xAOD::uint32_t
setEventNumber uint32_t
Definition: EventInfo_v1.cxx:127
hltonl::Exception::EventSourceCorrupted
Thrown if event source throws an exception when new event is requested.
Definition: HltExceptions.h:33
TrigByteStreamInputSvc::m_monTool
ToolHandle< GenericMonitoringTool > m_monTool
Definition: TrigByteStreamInputSvc.h:46
RawEvent
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition: RawEvent.h:37
ByteStreamInputSvc::ByteStreamInputSvc
ByteStreamInputSvc(const std::string &name, ISvcLocator *svcloc)
constructor
Definition: ByteStreamInputSvc.cxx:8
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
TrigByteStreamInputSvc::m_evtStore
ServiceHandle< StoreGateSvc > m_evtStore
Definition: TrigByteStreamInputSvc.h:45
TrigByteStreamInputSvc::m_robDataProviderSvc
ServiceHandle< IROBDataProviderSvc > m_robDataProviderSvc
Definition: TrigByteStreamInputSvc.h:44
TrigByteStreamInputSvc::m_checkCTPFragmentModuleID
Gaudi::Property< int > m_checkCTPFragmentModuleID
Definition: TrigByteStreamInputSvc.h:49
instance
std::map< std::string, double > instance
Definition: Run_To_Get_Tags.h:8
hltonl::Exception::BadCTPFragment
Thrown if the CTP ROBFragment for a new event has non-zero status word or other errors.
Definition: HltExceptions.h:49
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
xAOD::uint16_t
setWord1 uint16_t
Definition: eFexEMRoI_v1.cxx:88
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
calibdata.exception
exception
Definition: calibdata.py:496
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
TrigByteStreamInputSvc::m_eventsCache
SG::SlotSpecificObj< EventCache > m_eventsCache
Cache of RawEvent pointer per event slot.
Definition: TrigByteStreamInputSvc.h:61
hltonl::Exception::NoEventsTemporarily
Thrown if the event source cannot provide new events temporarily, e.g. when trigger is on hold.
Definition: HltExceptions.h:25
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
OFFLINE_FRAGMENTS_NAMESPACE::ROBFragment
eformat::ROBFragment< PointerType > ROBFragment
Definition: RawEvent.h:27
Athena::Status
Status
Athena specific StatusCode values.
Definition: AthStatusCode.h:22
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
plotBeamSpotMon.mon
mon
Definition: plotBeamSpotMon.py:67
merge.status
status
Definition: merge.py:17
ByteStreamInputSvc::interfaceID
static const InterfaceID & interfaceID()
Retrieve interface ID.
Definition: ByteStreamInputSvc.h:49
Monitored::Scalar
Declare a monitored scalar variable.
Definition: MonitoredScalar.h:34
TrigByteStreamInputSvc::m_maxLB
uint16_t m_maxLB
Maximum lumi block number seen so far, used for monitoring.
Definition: TrigByteStreamInputSvc.h:62
hltonl::Exception::NoMoreEvents
Thrown if all events are already read from the input and another one is requested.
Definition: HltExceptions.h:17
Monitored::Timer
A monitored timer.
Definition: MonitoredTimer.h:32