ATLAS Offline Software
TrigByteStreamInputSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 // Trigger includes
8 
9 // Athena includes
11 #include "StoreGate/StoreGateSvc.h"
12 
13 // TDAQ includes
14 #include "hltinterface/DataCollector.h"
15 
16 // System includes
17 #include <charconv>
18 
19 namespace {
20  constexpr float wordsToKiloBytes = 0.001*sizeof(uint32_t);
21 }
22 
23 // =============================================================================
24 // Standard constructor
25 // =============================================================================
26 TrigByteStreamInputSvc::TrigByteStreamInputSvc(const std::string& name, ISvcLocator* svcLoc)
27 : ByteStreamInputSvc(name, svcLoc) {}
28 
29 // =============================================================================
30 // Standard destructor
31 // =============================================================================
33 
34 // =============================================================================
35 // Implementation of IInterface::queryInterface
36 // =============================================================================
37 StatusCode TrigByteStreamInputSvc::queryInterface(const InterfaceID& riid, void** ppvInterface) {
38  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
39 
40  if(ByteStreamInputSvc::interfaceID().versionMatch(riid))
41  *ppvInterface = static_cast<ByteStreamInputSvc*>(this);
42  else
43  return AthService::queryInterface(riid, ppvInterface);
44 
45  addRef();
46  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
47  return StatusCode::SUCCESS;
48 }
49 
50 // =============================================================================
51 // Implementation of Service::initialize
52 // =============================================================================
54  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
55 
56  ATH_CHECK(m_robDataProviderSvc.retrieve());
57  ATH_CHECK(m_evtStore.retrieve());
58  if (!m_monTool.empty()) ATH_CHECK(m_monTool.retrieve());
59 
60  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
61  return StatusCode::SUCCESS;
62 }
63 
64 // =============================================================================
65 // Implementation of Service::finalize
66 // =============================================================================
68  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
69  if (m_robDataProviderSvc.release().isFailure()) {
70  ATH_MSG_WARNING("Cannot release rob data provider");
71  }
72  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
73  return StatusCode::SUCCESS;
74 }
75 
76 // =============================================================================
77 // Implementation of ByteStreamInputSvc::nextEvent
78 // =============================================================================
80  ATH_MSG_VERBOSE("start of " << __FUNCTION__);
81 
82  // Get the EventContext via event store because the interface doesn't allow passing it explicitly as an argument
83  // and we don't want to use ThreadLocalContext. Don't use ReadHandle here because it calls ThreadLocalContext if
84  // not given a context (which we want to retrieve).
85  const EventContext* eventContext = nullptr;
86  if (m_evtStore->retrieve(eventContext).isFailure()) {
87  ATH_MSG_ERROR("Failed to retrieve EventContext from the event store, new event cannot be read");
88  return nullptr;
89  }
90 
91  ATH_MSG_DEBUG("Reading new event for event context " << *eventContext);
92 
93  // Find the cache corresponding to the current slot
94  EventCache* cache = m_eventsCache.get(*eventContext);
95 
96  // Free the memory allocated to the previous event processed in the current slot
97  cache->releaseEvent();
98 
99  using DCStatus = hltinterface::DataCollector::Status;
100  DCStatus status = DCStatus::NO_EVENT;
101  auto monLBN = Monitored::Scalar<uint16_t>("getNext_LBN", m_maxLB);
102  auto monNoEvent = Monitored::Scalar<bool>("getNext_noEvent", false);
103  try {
104  auto t_getNext = Monitored::Timer<std::chrono::duration<float, std::milli>>("TIME_getNext");
106  auto mon = Monitored::Group(m_monTool, t_getNext);
107  }
108  catch (const std::exception& ex) {
109  ATH_MSG_ERROR("Failed to read new event, caught an unexpected exception: " << ex.what()
110  << ". Throwing hltonl::Exception::EventSourceCorrupted" );
112  }
113  catch (...) {
114  ATH_MSG_ERROR("Failed to read new event, caught an unexpected exception. "
115  << "Throwing hltonl::Exception::EventSourceCorrupted" );
117  }
118 
119  if (status == DCStatus::STOP) {
120  ATH_MSG_DEBUG("DataCollector::getNext returned STOP - no more events available");
122  }
123  else if (status == DCStatus::NO_EVENT) {
124  ATH_MSG_DEBUG("DataCollector::getNext returned NO_EVENT - no events available temporarily");
125  monNoEvent = true;
126  auto mon = Monitored::Group(m_monTool, monLBN, monNoEvent);
128  }
129  else if (status != DCStatus::OK) {
130  ATH_MSG_ERROR("Unhandled return Status " << static_cast<int>(status) << " from DataCollector::getNext");
131  return nullptr;
132  }
133  ATH_MSG_VERBOSE("DataCollector::getNext returned Status::OK");
134 
135  // Create a cached FullEventFragment object from the cached raw data
136  cache->fullEventFragment.reset(new RawEvent(cache->rawData.get()));
137 
138  // Update LB number for monitoring
139  if (m_maxLB < cache->fullEventFragment->lumi_block()) {
140  m_maxLB = cache->fullEventFragment->lumi_block();
141  monLBN = m_maxLB;
142  }
143 
144  // Monitor the input
145  auto numROBs = Monitored::Scalar<int>("L1Result_NumROBs",
146  cache->fullEventFragment->nchildren());
147  auto fragSize = Monitored::Scalar<float>("L1Result_FullEvFragSize",
148  cache->fullEventFragment->fragment_size_word()*wordsToKiloBytes);
149  std::vector<eformat::read::ROBFragment> robVec;
150  cache->fullEventFragment->robs(robVec);
151  std::vector<std::string> subdetNameVec;
152  for (const eformat::read::ROBFragment& rob : robVec) {
153  eformat::helper::SourceIdentifier sid(rob.rob_source_id());
154  subdetNameVec.push_back(sid.human_detector());
155  }
156  auto subdets = Monitored::Collection<std::vector<std::string>>("L1Result_SubDets", subdetNameVec);
157  auto mon = Monitored::Group(m_monTool, numROBs, fragSize, subdets, monLBN, monNoEvent);
158 
159  // Give the FullEventFragment pointer to ROBDataProviderSvc
160  m_robDataProviderSvc->setNextEvent(*eventContext, cache->fullEventFragment.get());
161  ATH_MSG_VERBOSE("end of " << __FUNCTION__);
162 
163  // Check the CTP fragment (request from readout if not part of the cache), ATR-25217
164  if (m_checkCTPFragmentModuleID.value() >= 0) {
165  const eformat::helper::SourceIdentifier sid{eformat::SubDetector::TDAQ_CTP,
166  static_cast<uint16_t>(m_checkCTPFragmentModuleID.value())};
167  std::vector<const OFFLINE_FRAGMENTS_NAMESPACE::ROBFragment*> vrobf;
168  m_robDataProviderSvc->getROBData(*eventContext, {sid.code()}, vrobf, name());
169  if (vrobf.empty()) {
170  ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " is missing. "
171  << "Throwing hltonl::Exception::MissingCTPFragment");
173  }
174  uint32_t robStatus = vrobf.at(0)->nstatus()>0 ? *(vrobf.at(0)->status()) : 0;
175  if (robStatus!=0) {
176  std::string hexStatus(8, char{0});
177  std::to_chars(hexStatus.data(), hexStatus.data()+hexStatus.size(), robStatus, 16);
178  ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " has non-zero status word: 0x"
179  << hexStatus << ". Throwing hltonl::Exception::BadCTPFragment");
180  throw hltonl::Exception::BadCTPFragment("Non-zero ROB status 0x"+hexStatus);
181  }
182  try {
183  vrobf.at(0)->check();
184  }
185  catch (const std::exception& ex) {
186  ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " is corrupted: "
187  << ex.what() << ". Throwing hltonl::Exception::BadCTPFragment");
188  throw hltonl::Exception::BadCTPFragment(ex.what());
189  }
190  }
191 
192  // Return the FullEventFragment pointer (do not transfer ownership)
193  return cache->fullEventFragment.get();
194 }
195 
196 // =============================================================================
197 // Implementation of ByteStreamInputSvc::previousEvent
198 // =============================================================================
200  ATH_MSG_FATAL("The method " << __FUNCTION__ << " is not implemented for online running");
201  return nullptr;
202 }
203 
204 // =============================================================================
205 // Implementation of ByteStreamInputSvc::currentEvent
206 // =============================================================================
208  ATH_MSG_FATAL("The method " << __FUNCTION__ << " is not implemented for online running");
209  return nullptr;
210 }
211 
212 // =============================================================================
214  this->rawData.reset();
215  this->fullEventFragment.reset();
216 }
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
TrigDefs::Group
Group
Properties of a chain group.
Definition: GroupProperties.h:13
hltonl::Exception::MissingCTPFragment
Thrown if the CTP ROBFragment cannot be retrieved for a new event.
Definition: HltExceptions.h:41
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
TrigByteStreamInputSvc::EventCache::rawData
std::unique_ptr< uint32_t[]> rawData
Underlying data structure.
Definition: TrigByteStreamInputSvc.h:58
HltExceptions.h
ByteStreamInputSvc
This class provides the base class to services to read bytestream data. The concrete class can provid...
Definition: ByteStreamInputSvc.h:23
xAOD::uint32_t
setEventNumber uint32_t
Definition: EventInfo_v1.cxx:127
TrigByteStreamInputSvc::nextEvent
virtual const RawEvent * nextEvent() override
virtual method for advance to the next event
Definition: TrigByteStreamInputSvc.cxx:79
hltonl::Exception::EventSourceCorrupted
Thrown if event source throws an exception when new event is requested.
Definition: HltExceptions.h:33
TrigByteStreamInputSvc::m_monTool
ToolHandle< GenericMonitoringTool > m_monTool
Definition: TrigByteStreamInputSvc.h:46
RawEvent
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition: RawEvent.h:37
TrigByteStreamInputSvc::EventCache::fullEventFragment
std::unique_ptr< RawEvent > fullEventFragment
Current event fragment.
Definition: TrigByteStreamInputSvc.h:57
TrigByteStreamInputSvc::EventCache::releaseEvent
void releaseEvent()
Definition: TrigByteStreamInputSvc.cxx:213
TrigByteStreamInputSvc::initialize
virtual StatusCode initialize() override
Definition: TrigByteStreamInputSvc.cxx:53
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
TrigByteStreamInputSvc.h
TrigByteStreamInputSvc::m_evtStore
ServiceHandle< StoreGateSvc > m_evtStore
Definition: TrigByteStreamInputSvc.h:45
TrigByteStreamInputSvc::m_robDataProviderSvc
ServiceHandle< IROBDataProviderSvc > m_robDataProviderSvc
Definition: TrigByteStreamInputSvc.h:44
EventContextClid.h
Assign a CLID to EventContext.
TrigByteStreamInputSvc::m_checkCTPFragmentModuleID
Gaudi::Property< int > m_checkCTPFragmentModuleID
Definition: TrigByteStreamInputSvc.h:49
TrigByteStreamInputSvc::currentEvent
virtual const RawEvent * currentEvent() const override
virtual method for accessing the current event
Definition: TrigByteStreamInputSvc.cxx:207
instance
std::map< std::string, double > instance
Definition: Run_To_Get_Tags.h:8
hltonl::Exception::BadCTPFragment
Thrown if the CTP ROBFragment for a new event has non-zero status word or other errors.
Definition: HltExceptions.h:49
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
xAOD::uint16_t
setWord1 uint16_t
Definition: eFexEMRoI_v1.cxx:88
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
TrigByteStreamInputSvc::~TrigByteStreamInputSvc
virtual ~TrigByteStreamInputSvc()
Standard destructor.
Definition: TrigByteStreamInputSvc.cxx:32
calibdata.exception
exception
Definition: calibdata.py:496
TrigByteStreamInputSvc::queryInterface
virtual StatusCode queryInterface(const InterfaceID &riid, void **ppvInterface) override
Definition: TrigByteStreamInputSvc.cxx:37
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
TrigByteStreamInputSvc::m_eventsCache
SG::SlotSpecificObj< EventCache > m_eventsCache
Cache of RawEvent pointer per event slot.
Definition: TrigByteStreamInputSvc.h:61
TrigByteStreamInputSvc::previousEvent
virtual const RawEvent * previousEvent() override
Definition: TrigByteStreamInputSvc.cxx:199
hltonl::Exception::NoEventsTemporarily
Thrown if the event source cannot provide new events temporarily, e.g. when trigger is on hold.
Definition: HltExceptions.h:25
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
TrigByteStreamInputSvc::EventCache
Definition: TrigByteStreamInputSvc.h:54
OFFLINE_FRAGMENTS_NAMESPACE::ROBFragment
eformat::ROBFragment< PointerType > ROBFragment
Definition: RawEvent.h:27
TrigByteStreamInputSvc::finalize
virtual StatusCode finalize() override
Definition: TrigByteStreamInputSvc.cxx:67
Athena::Status
Status
Athena specific StatusCode values.
Definition: AthStatusCode.h:22
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
plotBeamSpotMon.mon
mon
Definition: plotBeamSpotMon.py:67
TrigByteStreamInputSvc::TrigByteStreamInputSvc
TrigByteStreamInputSvc(const std::string &name, ISvcLocator *svcLoc)
Standard constructor.
Definition: TrigByteStreamInputSvc.cxx:26
merge.status
status
Definition: merge.py:17
ByteStreamInputSvc::interfaceID
static const InterfaceID & interfaceID()
Retrieve interface ID.
Definition: ByteStreamInputSvc.h:49
Monitored::Scalar
Declare a monitored scalar variable.
Definition: MonitoredScalar.h:34
StoreGateSvc.h
TrigByteStreamInputSvc::m_maxLB
uint16_t m_maxLB
Maximum lumi block number seen so far, used for monitoring.
Definition: TrigByteStreamInputSvc.h:62
hltonl::Exception::NoMoreEvents
Thrown if all events are already read from the input and another one is requested.
Definition: HltExceptions.h:17
Monitored::Timer
A monitored timer.
Definition: MonitoredTimer.h:32