ATLAS Offline Software
Classes | Public Member Functions | Private Attributes | List of all members
TrigByteStreamInputSvc Class Reference

A ByteStreamInputSvc implementation for online use, reading events from hltinterface::DataCollector. More...

#include <TrigByteStreamInputSvc.h>

Inheritance diagram for TrigByteStreamInputSvc:
Collaboration diagram for TrigByteStreamInputSvc:

Classes

struct  EventCache
 

Public Member Functions

 TrigByteStreamInputSvc (const std::string &name, ISvcLocator *svcLoc)
 Standard constructor. More...
 
virtual ~TrigByteStreamInputSvc ()
 Standard destructor. More...
 
virtual StatusCode initialize () override
 
virtual StatusCode finalize () override
 
virtual const RawEventnextEvent () override
 
virtual const RawEventpreviousEvent () override
 
virtual const RawEventcurrentEvent () const override
 

Private Attributes

ServiceHandle< IROBDataProviderSvcm_robDataProviderSvc {this, "ROBDataProvider", "ROBDataProviderSvc"}
 
ServiceHandle< StoreGateSvcm_evtStore {this, "EventStore", "StoreGateSvc"}
 
ToolHandle< GenericMonitoringToolm_monTool {this, "MonTool", "" , "Monitoring tool"}
 
Gaudi::Property< int > m_checkCTPFragmentModuleID
 
SG::SlotSpecificObj< EventCachem_eventsCache
 Cache of RawEvent pointer per event slot. More...
 
uint16_t m_maxLB {0}
 Maximum lumi block number seen so far, used for monitoring. More...
 

Detailed Description

A ByteStreamInputSvc implementation for online use, reading events from hltinterface::DataCollector.

The layout and implementation are based on ByteStreamEventStorageInputSvc

Definition at line 24 of file TrigByteStreamInputSvc.h.

Constructor & Destructor Documentation

◆ TrigByteStreamInputSvc()

TrigByteStreamInputSvc::TrigByteStreamInputSvc ( const std::string &  name,
ISvcLocator *  svcLoc 
)

Standard constructor.

Definition at line 26 of file TrigByteStreamInputSvc.cxx.

27 : base_class(name, svcLoc) {}

◆ ~TrigByteStreamInputSvc()

TrigByteStreamInputSvc::~TrigByteStreamInputSvc ( )
virtual

Standard destructor.

Definition at line 32 of file TrigByteStreamInputSvc.cxx.

32 {}

Member Function Documentation

◆ currentEvent()

const RawEvent * TrigByteStreamInputSvc::currentEvent ( ) const
overridevirtual

Definition at line 191 of file TrigByteStreamInputSvc.cxx.

191  {
192  ATH_MSG_FATAL("The method " << __FUNCTION__ << " is not implemented for online running");
193  return nullptr;
194 }

◆ finalize()

StatusCode TrigByteStreamInputSvc::finalize ( )
overridevirtual

Definition at line 51 of file TrigByteStreamInputSvc.cxx.

51  {
52  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
53  if (m_robDataProviderSvc.release().isFailure()) {
54  ATH_MSG_WARNING("Cannot release rob data provider");
55  }
56  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
57  return StatusCode::SUCCESS;
58 }

◆ initialize()

StatusCode TrigByteStreamInputSvc::initialize ( )
overridevirtual

Definition at line 37 of file TrigByteStreamInputSvc.cxx.

37  {
38  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
39 
40  ATH_CHECK(m_robDataProviderSvc.retrieve());
41  ATH_CHECK(m_evtStore.retrieve());
42  if (!m_monTool.empty()) ATH_CHECK(m_monTool.retrieve());
43 
44  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
45  return StatusCode::SUCCESS;
46 }

◆ nextEvent()

const RawEvent * TrigByteStreamInputSvc::nextEvent ( )
overridevirtual

Definition at line 63 of file TrigByteStreamInputSvc.cxx.

63  {
64  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
65 
66  // Get the EventContext via event store because the interface doesn't allow passing it explicitly as an argument
67  // and we don't want to use ThreadLocalContext. Don't use ReadHandle here because it calls ThreadLocalContext if
68  // not given a context (which we want to retrieve).
69  const EventContext* eventContext = nullptr;
70  if (m_evtStore->retrieve(eventContext).isFailure()) {
71  ATH_MSG_ERROR("Failed to retrieve EventContext from the event store, new event cannot be read");
72  return nullptr;
73  }
74 
75  ATH_MSG_DEBUG("Reading new event for event context " << *eventContext);
76 
77  // Find the cache corresponding to the current slot
78  EventCache* cache = m_eventsCache.get(*eventContext);
79 
80  // Free the memory allocated to the previous event processed in the current slot
81  cache->releaseEvent();
82 
83  using DCStatus = hltinterface::DataCollector::Status;
84  DCStatus status = DCStatus::NO_EVENT;
85  auto monLBN = Monitored::Scalar<uint16_t>("getNext_LBN", m_maxLB);
86  auto monNoEvent = Monitored::Scalar<bool>("getNext_noEvent", false);
87  try {
88  auto t_getNext = Monitored::Timer<std::chrono::duration<float, std::milli>>("TIME_getNext");
89  status = hltinterface::DataCollector::instance()->getNext(cache->rawData);
90  auto mon = Monitored::Group(m_monTool, t_getNext);
91  }
92  catch (const std::exception& ex) {
93  ATH_MSG_ERROR("Failed to read new event, caught an unexpected exception: " << ex.what()
94  << ". Throwing hltonl::Exception::EventSourceCorrupted" );
96  }
97  catch (...) {
98  ATH_MSG_ERROR("Failed to read new event, caught an unexpected exception. "
99  << "Throwing hltonl::Exception::EventSourceCorrupted" );
101  }
102 
103  if (status == DCStatus::STOP) {
104  ATH_MSG_DEBUG("DataCollector::getNext returned STOP - no more events available");
106  }
107  else if (status == DCStatus::NO_EVENT) {
108  ATH_MSG_DEBUG("DataCollector::getNext returned NO_EVENT - no events available temporarily");
109  monNoEvent = true;
110  auto mon = Monitored::Group(m_monTool, monLBN, monNoEvent);
112  }
113  else if (status != DCStatus::OK) {
114  ATH_MSG_ERROR("Unhandled return Status " << static_cast<int>(status) << " from DataCollector::getNext");
115  return nullptr;
116  }
117  ATH_MSG_VERBOSE("DataCollector::getNext returned Status::OK");
118 
119  // Create a cached FullEventFragment object from the cached raw data
120  cache->fullEventFragment.reset(new RawEvent(cache->rawData.get()));
121 
122  // Update LB number for monitoring
123  if (m_maxLB < cache->fullEventFragment->lumi_block()) {
124  m_maxLB = cache->fullEventFragment->lumi_block();
125  monLBN = m_maxLB;
126  }
127 
128  // Monitor the input
129  auto numROBs = Monitored::Scalar<int>("L1Result_NumROBs",
130  cache->fullEventFragment->nchildren());
131  auto fragSize = Monitored::Scalar<float>("L1Result_FullEvFragSize",
132  cache->fullEventFragment->fragment_size_word()*wordsToKiloBytes);
133  std::vector<eformat::read::ROBFragment> robVec;
134  cache->fullEventFragment->robs(robVec);
135  std::vector<std::string> subdetNameVec;
136  for (const eformat::read::ROBFragment& rob : robVec) {
137  eformat::helper::SourceIdentifier sid(rob.rob_source_id());
138  subdetNameVec.push_back(sid.human_detector());
139  }
140  auto subdets = Monitored::Collection<std::vector<std::string>>("L1Result_SubDets", subdetNameVec);
141  auto mon = Monitored::Group(m_monTool, numROBs, fragSize, subdets, monLBN, monNoEvent);
142 
143  // Give the FullEventFragment pointer to ROBDataProviderSvc
144  m_robDataProviderSvc->setNextEvent(*eventContext, cache->fullEventFragment.get());
145  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
146 
147  // Check the CTP fragment (request from readout if not part of the cache), ATR-25217
148  if (m_checkCTPFragmentModuleID.value() >= 0) {
149  const eformat::helper::SourceIdentifier sid{eformat::SubDetector::TDAQ_CTP,
150  static_cast<uint16_t>(m_checkCTPFragmentModuleID.value())};
151  std::vector<const OFFLINE_FRAGMENTS_NAMESPACE::ROBFragment*> vrobf;
152  m_robDataProviderSvc->getROBData(*eventContext, {sid.code()}, vrobf, name());
153  if (vrobf.empty()) {
154  ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " is missing. "
155  << "Throwing hltonl::Exception::MissingCTPFragment");
157  }
158  uint32_t robStatus = vrobf.at(0)->nstatus()>0 ? *(vrobf.at(0)->status()) : 0;
159  if (robStatus!=0) {
160  std::string hexStatus(8, char{0});
161  std::to_chars(hexStatus.data(), hexStatus.data()+hexStatus.size(), robStatus, 16);
162  ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " has non-zero status word: 0x"
163  << hexStatus << ". Throwing hltonl::Exception::BadCTPFragment");
164  throw hltonl::Exception::BadCTPFragment("Non-zero ROB status 0x"+hexStatus);
165  }
166  try {
167  vrobf.at(0)->check();
168  }
169  catch (const std::exception& ex) {
170  ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " is corrupted: "
171  << ex.what() << ". Throwing hltonl::Exception::BadCTPFragment");
172  throw hltonl::Exception::BadCTPFragment(ex.what());
173  }
174  }
175 
176  // Return the FullEventFragment pointer (do not transfer ownership)
177  return cache->fullEventFragment.get();
178 }

◆ previousEvent()

const RawEvent * TrigByteStreamInputSvc::previousEvent ( )
overridevirtual

Definition at line 183 of file TrigByteStreamInputSvc.cxx.

183  {
184  ATH_MSG_FATAL("The method " << __FUNCTION__ << " is not implemented for online running");
185  return nullptr;
186 }

Member Data Documentation

◆ m_checkCTPFragmentModuleID

Gaudi::Property<int> TrigByteStreamInputSvc::m_checkCTPFragmentModuleID
private
Initial value:
{this, "CheckCTPFragmentModuleID", -1,
"After reading a new event, assert we can retrieve the CTP fragment with Module ID given by this property, "
"and that has no errors. A value <0 disables the check."}

Definition at line 47 of file TrigByteStreamInputSvc.h.

◆ m_eventsCache

SG::SlotSpecificObj<EventCache> TrigByteStreamInputSvc::m_eventsCache
private

Cache of RawEvent pointer per event slot.

Definition at line 59 of file TrigByteStreamInputSvc.h.

◆ m_evtStore

ServiceHandle<StoreGateSvc> TrigByteStreamInputSvc::m_evtStore {this, "EventStore", "StoreGateSvc"}
private

Definition at line 43 of file TrigByteStreamInputSvc.h.

◆ m_maxLB

uint16_t TrigByteStreamInputSvc::m_maxLB {0}
private

Maximum lumi block number seen so far, used for monitoring.

Definition at line 60 of file TrigByteStreamInputSvc.h.

◆ m_monTool

ToolHandle<GenericMonitoringTool> TrigByteStreamInputSvc::m_monTool {this, "MonTool", "" , "Monitoring tool"}
private

Definition at line 44 of file TrigByteStreamInputSvc.h.

◆ m_robDataProviderSvc

ServiceHandle<IROBDataProviderSvc> TrigByteStreamInputSvc::m_robDataProviderSvc {this, "ROBDataProvider", "ROBDataProviderSvc"}
private

Definition at line 42 of file TrigByteStreamInputSvc.h.


The documentation for this class was generated from the following files:
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
TrigDefs::Group
Group
Properties of a chain group.
Definition: GroupProperties.h:13
hltonl::Exception::MissingCTPFragment
Thrown if the CTP ROBFragment cannot be retrieved for a new event.
Definition: HltExceptions.h:41
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
xAOD::uint32_t
setEventNumber uint32_t
Definition: EventInfo_v1.cxx:127
hltonl::Exception::EventSourceCorrupted
Thrown if event source throws an exception when new event is requested.
Definition: HltExceptions.h:33
TrigByteStreamInputSvc::m_monTool
ToolHandle< GenericMonitoringTool > m_monTool
Definition: TrigByteStreamInputSvc.h:44
RawEvent
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition: RawEvent.h:37
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
TrigByteStreamInputSvc::m_evtStore
ServiceHandle< StoreGateSvc > m_evtStore
Definition: TrigByteStreamInputSvc.h:43
TrigByteStreamInputSvc::m_robDataProviderSvc
ServiceHandle< IROBDataProviderSvc > m_robDataProviderSvc
Definition: TrigByteStreamInputSvc.h:42
TrigByteStreamInputSvc::m_checkCTPFragmentModuleID
Gaudi::Property< int > m_checkCTPFragmentModuleID
Definition: TrigByteStreamInputSvc.h:47
instance
std::map< std::string, double > instance
Definition: Run_To_Get_Tags.h:8
hltonl::Exception::BadCTPFragment
Thrown if the CTP ROBFragment for a new event has non-zero status word or other errors.
Definition: HltExceptions.h:49
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
xAOD::uint16_t
setWord1 uint16_t
Definition: eFexEMRoI_v1.cxx:88
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
calibdata.exception
exception
Definition: calibdata.py:496
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
TrigByteStreamInputSvc::m_eventsCache
SG::SlotSpecificObj< EventCache > m_eventsCache
Cache of RawEvent pointer per event slot.
Definition: TrigByteStreamInputSvc.h:59
hltonl::Exception::NoEventsTemporarily
Thrown if the event source cannot provide new events temporarily, e.g. when trigger is on hold.
Definition: HltExceptions.h:25
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:221
OFFLINE_FRAGMENTS_NAMESPACE::ROBFragment
eformat::ROBFragment< PointerType > ROBFragment
Definition: RawEvent.h:27
Athena::Status
Status
Athena specific StatusCode values.
Definition: AthStatusCode.h:22
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
plotBeamSpotMon.mon
mon
Definition: plotBeamSpotMon.py:67
merge.status
status
Definition: merge.py:17
Monitored::Scalar
Declare a monitored scalar variable.
Definition: MonitoredScalar.h:34
TrigByteStreamInputSvc::m_maxLB
uint16_t m_maxLB
Maximum lumi block number seen so far, used for monitoring.
Definition: TrigByteStreamInputSvc.h:60
hltonl::Exception::NoMoreEvents
Thrown if all events are already read from the input and another one is requested.
Definition: HltExceptions.h:17
Monitored::Timer
A monitored timer.
Definition: MonitoredTimer.h:32