ATLAS Offline Software
TrigByteStreamInputSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 // Trigger includes
8 
9 // Athena includes
11 #include "StoreGate/StoreGateSvc.h"
12 
13 // TDAQ includes
14 #include "hltinterface/DataCollector.h"
15 
16 // System includes
17 #include <charconv>
18 
19 namespace {
20  constexpr float wordsToKiloBytes = 0.001*sizeof(uint32_t);
21 }
22 
23 // =============================================================================
24 // Standard constructor
25 // =============================================================================
26 TrigByteStreamInputSvc::TrigByteStreamInputSvc(const std::string& name, ISvcLocator* svcLoc)
27 : base_class(name, svcLoc) {}
28 
29 // =============================================================================
30 // Standard destructor
31 // =============================================================================
33 
34 // =============================================================================
35 // Implementation of Service::initialize
36 // =============================================================================
38  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
39 
40  ATH_CHECK(m_robDataProviderSvc.retrieve());
41  ATH_CHECK(m_evtStore.retrieve());
42  if (!m_monTool.empty()) ATH_CHECK(m_monTool.retrieve());
43 
44  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
45  return StatusCode::SUCCESS;
46 }
47 
48 // =============================================================================
49 // Implementation of Service::finalize
50 // =============================================================================
52  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
53  if (m_robDataProviderSvc.release().isFailure()) {
54  ATH_MSG_WARNING("Cannot release rob data provider");
55  }
56  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
57  return StatusCode::SUCCESS;
58 }
59 
60 // =============================================================================
61 // Implementation of ByteStreamInputSvc::nextEvent
62 // =============================================================================
64  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
65 
66  // Get the EventContext via event store because the interface doesn't allow passing it explicitly as an argument
67  // and we don't want to use ThreadLocalContext. Don't use ReadHandle here because it calls ThreadLocalContext if
68  // not given a context (which we want to retrieve).
69  const EventContext* eventContext = nullptr;
70  if (m_evtStore->retrieve(eventContext).isFailure()) {
71  ATH_MSG_ERROR("Failed to retrieve EventContext from the event store, new event cannot be read");
72  return nullptr;
73  }
74 
75  ATH_MSG_DEBUG("Reading new event for event context " << *eventContext);
76 
77  // Find the cache corresponding to the current slot
78  EventCache* cache = m_eventsCache.get(*eventContext);
79 
80  // Free the memory allocated to the previous event processed in the current slot
81  cache->releaseEvent();
82 
83  using DCStatus = hltinterface::DataCollector::Status;
84  DCStatus status = DCStatus::NO_EVENT;
85  auto monLBN = Monitored::Scalar<uint16_t>("getNext_LBN", m_maxLB);
86  auto monNoEvent = Monitored::Scalar<bool>("getNext_noEvent", false);
87  try {
88  auto t_getNext = Monitored::Timer<std::chrono::duration<float, std::milli>>("TIME_getNext");
90  auto mon = Monitored::Group(m_monTool, t_getNext);
91  }
92  catch (const std::exception& ex) {
93  ATH_MSG_ERROR("Failed to read new event, caught an unexpected exception: " << ex.what()
94  << ". Throwing hltonl::Exception::EventSourceCorrupted" );
96  }
97  catch (...) {
98  ATH_MSG_ERROR("Failed to read new event, caught an unexpected exception. "
99  << "Throwing hltonl::Exception::EventSourceCorrupted" );
101  }
102 
103  if (status == DCStatus::STOP) {
104  ATH_MSG_DEBUG("DataCollector::getNext returned STOP - no more events available");
106  }
107  else if (status == DCStatus::NO_EVENT) {
108  ATH_MSG_DEBUG("DataCollector::getNext returned NO_EVENT - no events available temporarily");
109  monNoEvent = true;
110  auto mon = Monitored::Group(m_monTool, monLBN, monNoEvent);
112  }
113  else if (status != DCStatus::OK) {
114  ATH_MSG_ERROR("Unhandled return Status " << static_cast<int>(status) << " from DataCollector::getNext");
115  return nullptr;
116  }
117  ATH_MSG_VERBOSE("DataCollector::getNext returned Status::OK");
118 
119  // Create a cached FullEventFragment object from the cached raw data
120  cache->fullEventFragment.reset(new RawEvent(cache->rawData.get()));
121 
122  // Update LB number for monitoring
123  if (m_maxLB < cache->fullEventFragment->lumi_block()) {
124  m_maxLB = cache->fullEventFragment->lumi_block();
125  monLBN = m_maxLB;
126  }
127 
128  // Monitor the input
129  auto numROBs = Monitored::Scalar<int>("L1Result_NumROBs",
130  cache->fullEventFragment->nchildren());
131  auto fragSize = Monitored::Scalar<float>("L1Result_FullEvFragSize",
132  cache->fullEventFragment->fragment_size_word()*wordsToKiloBytes);
133  std::vector<eformat::read::ROBFragment> robVec;
134  cache->fullEventFragment->robs(robVec);
135  std::vector<std::string> subdetNameVec;
136  for (const eformat::read::ROBFragment& rob : robVec) {
137  eformat::helper::SourceIdentifier sid(rob.rob_source_id());
138  subdetNameVec.push_back(sid.human_detector());
139  }
140  auto subdets = Monitored::Collection<std::vector<std::string>>("L1Result_SubDets", subdetNameVec);
141  auto mon = Monitored::Group(m_monTool, numROBs, fragSize, subdets, monLBN, monNoEvent);
142 
143  // Give the FullEventFragment pointer to ROBDataProviderSvc
144  m_robDataProviderSvc->setNextEvent(*eventContext, cache->fullEventFragment.get());
145  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
146 
147  // Check the CTP fragment (request from readout if not part of the cache), ATR-25217
148  if (m_checkCTPFragmentModuleID.value() >= 0) {
149  const eformat::helper::SourceIdentifier sid{eformat::SubDetector::TDAQ_CTP,
150  static_cast<uint16_t>(m_checkCTPFragmentModuleID.value())};
151  std::vector<const OFFLINE_FRAGMENTS_NAMESPACE::ROBFragment*> vrobf;
152  m_robDataProviderSvc->getROBData(*eventContext, {sid.code()}, vrobf, name());
153  if (vrobf.empty()) {
154  ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " is missing. "
155  << "Throwing hltonl::Exception::MissingCTPFragment");
157  }
158  uint32_t robStatus = vrobf.at(0)->nstatus()>0 ? *(vrobf.at(0)->status()) : 0;
159  if (robStatus!=0) {
160  std::string hexStatus(8, char{0});
161  std::to_chars(hexStatus.data(), hexStatus.data()+hexStatus.size(), robStatus, 16);
162  ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " has non-zero status word: 0x"
163  << hexStatus << ". Throwing hltonl::Exception::BadCTPFragment");
164  throw hltonl::Exception::BadCTPFragment("Non-zero ROB status 0x"+hexStatus);
165  }
166  try {
167  vrobf.at(0)->check();
168  }
169  catch (const std::exception& ex) {
170  ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " is corrupted: "
171  << ex.what() << ". Throwing hltonl::Exception::BadCTPFragment");
172  throw hltonl::Exception::BadCTPFragment(ex.what());
173  }
174  }
175 
176  // Return the FullEventFragment pointer (do not transfer ownership)
177  return cache->fullEventFragment.get();
178 }
179 
180 // =============================================================================
181 // Implementation of ByteStreamInputSvc::previousEvent
182 // =============================================================================
184  ATH_MSG_FATAL("The method " << __FUNCTION__ << " is not implemented for online running");
185  return nullptr;
186 }
187 
188 // =============================================================================
189 // Implementation of ByteStreamInputSvc::currentEvent
190 // =============================================================================
192  ATH_MSG_FATAL("The method " << __FUNCTION__ << " is not implemented for online running");
193  return nullptr;
194 }
195 
196 // =============================================================================
198  this->rawData.reset();
199  this->fullEventFragment.reset();
200 }
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
TrigDefs::Group
Group
Properties of a chain group.
Definition: GroupProperties.h:13
hltonl::Exception::MissingCTPFragment
Thrown if the CTP ROBFragment cannot be retrieved for a new event.
Definition: HltExceptions.h:41
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
TrigByteStreamInputSvc::EventCache::rawData
std::unique_ptr< uint32_t[]> rawData
Underlying data structure.
Definition: TrigByteStreamInputSvc.h:56
HltExceptions.h
xAOD::uint32_t
setEventNumber uint32_t
Definition: EventInfo_v1.cxx:127
TrigByteStreamInputSvc::nextEvent
virtual const RawEvent * nextEvent() override
Definition: TrigByteStreamInputSvc.cxx:63
hltonl::Exception::EventSourceCorrupted
Thrown if event source throws an exception when new event is requested.
Definition: HltExceptions.h:33
TrigByteStreamInputSvc::m_monTool
ToolHandle< GenericMonitoringTool > m_monTool
Definition: TrigByteStreamInputSvc.h:44
RawEvent
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition: RawEvent.h:37
TrigByteStreamInputSvc::EventCache::fullEventFragment
std::unique_ptr< RawEvent > fullEventFragment
Current event fragment.
Definition: TrigByteStreamInputSvc.h:55
TrigByteStreamInputSvc::EventCache::releaseEvent
void releaseEvent()
Definition: TrigByteStreamInputSvc.cxx:197
TrigByteStreamInputSvc::initialize
virtual StatusCode initialize() override
Definition: TrigByteStreamInputSvc.cxx:37
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
TrigByteStreamInputSvc.h
TrigByteStreamInputSvc::m_evtStore
ServiceHandle< StoreGateSvc > m_evtStore
Definition: TrigByteStreamInputSvc.h:43
TrigByteStreamInputSvc::m_robDataProviderSvc
ServiceHandle< IROBDataProviderSvc > m_robDataProviderSvc
Definition: TrigByteStreamInputSvc.h:42
EventContextClid.h
Assign a CLID to EventContext.
TrigByteStreamInputSvc::m_checkCTPFragmentModuleID
Gaudi::Property< int > m_checkCTPFragmentModuleID
Definition: TrigByteStreamInputSvc.h:47
TrigByteStreamInputSvc::currentEvent
virtual const RawEvent * currentEvent() const override
Definition: TrigByteStreamInputSvc.cxx:191
instance
std::map< std::string, double > instance
Definition: Run_To_Get_Tags.h:8
hltonl::Exception::BadCTPFragment
Thrown if the CTP ROBFragment for a new event has non-zero status word or other errors.
Definition: HltExceptions.h:49
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
xAOD::uint16_t
setWord1 uint16_t
Definition: eFexEMRoI_v1.cxx:88
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
TrigByteStreamInputSvc::~TrigByteStreamInputSvc
virtual ~TrigByteStreamInputSvc()
Standard destructor.
Definition: TrigByteStreamInputSvc.cxx:32
calibdata.exception
exception
Definition: calibdata.py:496
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
TrigByteStreamInputSvc::m_eventsCache
SG::SlotSpecificObj< EventCache > m_eventsCache
Cache of RawEvent pointer per event slot.
Definition: TrigByteStreamInputSvc.h:59
TrigByteStreamInputSvc::previousEvent
virtual const RawEvent * previousEvent() override
Definition: TrigByteStreamInputSvc.cxx:183
hltonl::Exception::NoEventsTemporarily
Thrown if the event source cannot provide new events temporarily, e.g. when trigger is on hold.
Definition: HltExceptions.h:25
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:221
TrigByteStreamInputSvc::EventCache
Definition: TrigByteStreamInputSvc.h:52
OFFLINE_FRAGMENTS_NAMESPACE::ROBFragment
eformat::ROBFragment< PointerType > ROBFragment
Definition: RawEvent.h:27
TrigByteStreamInputSvc::finalize
virtual StatusCode finalize() override
Definition: TrigByteStreamInputSvc.cxx:51
Athena::Status
Status
Athena specific StatusCode values.
Definition: AthStatusCode.h:22
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
plotBeamSpotMon.mon
mon
Definition: plotBeamSpotMon.py:67
TrigByteStreamInputSvc::TrigByteStreamInputSvc
TrigByteStreamInputSvc(const std::string &name, ISvcLocator *svcLoc)
Standard constructor.
Definition: TrigByteStreamInputSvc.cxx:26
merge.status
status
Definition: merge.py:17
Monitored::Scalar
Declare a monitored scalar variable.
Definition: MonitoredScalar.h:34
StoreGateSvc.h
TrigByteStreamInputSvc::m_maxLB
uint16_t m_maxLB
Maximum lumi block number seen so far, used for monitoring.
Definition: TrigByteStreamInputSvc.h:60
hltonl::Exception::NoMoreEvents
Thrown if all events are already read from the input and another one is requested.
Definition: HltExceptions.h:17
Monitored::Timer
A monitored timer.
Definition: MonitoredTimer.h:32