ATLAS Offline Software
Loading...
Searching...
No Matches
TrigByteStreamInputSvc Class Reference

A ByteStreamInputSvc implementation for online use, reading events from hltinterface::DataCollector. More...

#include <TrigByteStreamInputSvc.h>

Inheritance diagram for TrigByteStreamInputSvc:
Collaboration diagram for TrigByteStreamInputSvc:

Classes

struct  EventCache

Public Types

enum class  NextEventStatus { OK , NO_EVENT , STOP , ERROR }

Public Member Functions

 TrigByteStreamInputSvc (const std::string &name, ISvcLocator *svcLoc)
 Standard constructor.
virtual ~TrigByteStreamInputSvc ()
 Standard destructor.
virtual StatusCode initialize () override
virtual StatusCode finalize () override
virtual const RawEventnextEvent () override
virtual const RawEventpreviousEvent () override
virtual const RawEventcurrentEvent () const override

Private Attributes

ServiceHandle< IROBDataProviderSvcm_robDataProviderSvc {this, "ROBDataProvider", "ROBDataProviderSvc"}
ServiceHandle< EFInterfaceSvcm_efInterfaceSvc {this, "EFInterfaceSvc", "", "Online service managing EF interface with Dataflow"}
ServiceHandle< StoreGateSvcm_evtStore {this, "EventStore", "StoreGateSvc"}
ToolHandle< GenericMonitoringToolm_monTool {this, "MonTool", "" , "Monitoring tool"}
Gaudi::Property< int > m_checkCTPFragmentModuleID
SG::SlotSpecificObj< EventCachem_eventsCache
 Cache of RawEvent pointer per event slot.
uint16_t m_maxLB {0}
 Maximum lumi block number seen so far, used for monitoring.
bool m_hasEFInterface {false}

Detailed Description

A ByteStreamInputSvc implementation for online use, reading events from hltinterface::DataCollector.

The layout and implementation are based on ByteStreamEventStorageInputSvc

Definition at line 25 of file TrigByteStreamInputSvc.h.

Member Enumeration Documentation

◆ NextEventStatus

Enumerator
OK 
NO_EVENT 
STOP 
ERROR 

Definition at line 33 of file TrigByteStreamInputSvc.h.

33{ OK, NO_EVENT, STOP, ERROR };

Constructor & Destructor Documentation

◆ TrigByteStreamInputSvc()

TrigByteStreamInputSvc::TrigByteStreamInputSvc ( const std::string & name,
ISvcLocator * svcLoc )

Standard constructor.

Definition at line 35 of file TrigByteStreamInputSvc.cxx.

36: base_class(name, svcLoc) {}

◆ ~TrigByteStreamInputSvc()

TrigByteStreamInputSvc::~TrigByteStreamInputSvc ( )
virtual

Standard destructor.

Definition at line 41 of file TrigByteStreamInputSvc.cxx.

41{}

Member Function Documentation

◆ currentEvent()

const RawEvent * TrigByteStreamInputSvc::currentEvent ( ) const
overridevirtual

Definition at line 218 of file TrigByteStreamInputSvc.cxx.

218 {
219 ATH_MSG_FATAL("The method " << __FUNCTION__ << " is not implemented for online running");
220 return nullptr;
221}
#define ATH_MSG_FATAL(x)

◆ finalize()

StatusCode TrigByteStreamInputSvc::finalize ( )
overridevirtual

Definition at line 70 of file TrigByteStreamInputSvc.cxx.

70 {
71 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
72 if (m_robDataProviderSvc.release().isFailure()) {
73 ATH_MSG_WARNING("Cannot release rob data provider");
74 }
76 if (m_efInterfaceSvc.release().isFailure()) {
77 ATH_MSG_WARNING("Failed to release service " << m_efInterfaceSvc.typeAndName());
78 }
79 }
80 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
81 return StatusCode::SUCCESS;
82}
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
ServiceHandle< EFInterfaceSvc > m_efInterfaceSvc
ServiceHandle< IROBDataProviderSvc > m_robDataProviderSvc

◆ initialize()

StatusCode TrigByteStreamInputSvc::initialize ( )
overridevirtual

Definition at line 46 of file TrigByteStreamInputSvc.cxx.

46 {
47 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
48
50 //Check if EFInterfaceSvc is available and set flag for selecting which interface to use
51 if (!m_efInterfaceSvc.empty()) {
52 ATH_MSG_INFO("Using EFInterfaceSvc");
53 ATH_CHECK(m_efInterfaceSvc.retrieve());
54 m_hasEFInterface = true;
55 }
56 else {
57 ATH_MSG_INFO("Using legacy dataflow interface");
58 m_hasEFInterface = false;
59 }
60 ATH_CHECK(m_evtStore.retrieve());
61 if (!m_monTool.empty()) ATH_CHECK(m_monTool.retrieve());
62
63 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
64 return StatusCode::SUCCESS;
65}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_INFO(x)
ToolHandle< GenericMonitoringTool > m_monTool
ServiceHandle< StoreGateSvc > m_evtStore

◆ nextEvent()

const RawEvent * TrigByteStreamInputSvc::nextEvent ( )
overridevirtual

Definition at line 87 of file TrigByteStreamInputSvc.cxx.

87 {
88 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
89
90 // Get the EventContext via event store because the interface doesn't allow passing it explicitly as an argument
91 // and we don't want to use ThreadLocalContext. Don't use ReadHandle here because it calls ThreadLocalContext if
92 // not given a context (which we want to retrieve).
93 const EventContext* eventContext = nullptr;
94 if (m_evtStore->retrieve(eventContext).isFailure()) {
95 ATH_MSG_ERROR("Failed to retrieve EventContext from the event store, new event cannot be read");
96 return nullptr;
97 }
98
99 ATH_MSG_DEBUG("Reading new event for event context " << *eventContext);
100
101 // Find the cache corresponding to the current slot
102 EventCache* cache = m_eventsCache.get(*eventContext);
103
104 // Free the memory allocated to the previous event processed in the current slot
105 cache->releaseEvent();
106
107 auto monLBN = Monitored::Scalar<uint16_t>("getNext_LBN", m_maxLB);
108 auto monNoEvent = Monitored::Scalar<bool>("getNext_noEvent", false);
110 try {
111 auto t_getNext = Monitored::Timer<std::chrono::duration<float, std::milli>>("TIME_getNext");
112 if (m_hasEFInterface) {
113 status = fetch<EFInterfaceSvc::Status>([&]{ return m_efInterfaceSvc->getNext(cache->rawData); });
114 }
115 else {
116 status = fetch<hltinterface::DataCollector::Status>([&]{ return hltinterface::DataCollector::instance()->getNext(cache->rawData); });
117 }
118 auto mon = Monitored::Group(m_monTool, t_getNext);
119 }
120 catch (const std::exception& ex) {
121 ATH_MSG_ERROR("Failed to read new event, caught an unexpected exception: " << ex.what()
122 << ". Throwing hltonl::Exception::EventSourceCorrupted" );
123 throw hltonl::Exception::EventSourceCorrupted();
124 }
125 catch (...) {
126 ATH_MSG_ERROR("Failed to read new event, caught an unexpected exception. "
127 << "Throwing hltonl::Exception::EventSourceCorrupted" );
128 throw hltonl::Exception::EventSourceCorrupted();
129 }
130 if (status == NextEventStatus::STOP) {
131 ATH_MSG_DEBUG("DataCollector::getNext returned STOP - no more events available");
132 throw hltonl::Exception::NoMoreEvents();
133 }
134 else if (status == NextEventStatus::NO_EVENT) {
135 ATH_MSG_DEBUG("DataCollector::getNext returned NO_EVENT - no events available temporarily");
136 monNoEvent = true;
137 auto mon = Monitored::Group(m_monTool, monLBN, monNoEvent);
138 throw hltonl::Exception::NoEventsTemporarily();
139 }
140 else if (status != NextEventStatus::OK) {
141 ATH_MSG_ERROR("Unhandled return Status " << static_cast<int>(status) << " from DataCollector::getNext");
142 return nullptr;
143 }
144 ATH_MSG_VERBOSE("DataCollector::getNext returned Status::OK");
145
146 // Create a cached FullEventFragment object from the cached raw data
147 cache->fullEventFragment.reset(new RawEvent(cache->rawData.get()));
148
149 // Update LB number for monitoring
150 if (m_maxLB < cache->fullEventFragment->lumi_block()) {
151 m_maxLB = cache->fullEventFragment->lumi_block();
152 monLBN = m_maxLB;
153 }
154
155 // Monitor the input
156 auto numROBs = Monitored::Scalar<int>("L1Result_NumROBs",
157 cache->fullEventFragment->nchildren());
158 auto fragSize = Monitored::Scalar<float>("L1Result_FullEvFragSize",
159 cache->fullEventFragment->fragment_size_word()*wordsToKiloBytes);
160 std::vector<eformat::read::ROBFragment> robVec;
161 cache->fullEventFragment->robs(robVec);
162 std::vector<std::string> subdetNameVec;
163 for (const eformat::read::ROBFragment& rob : robVec) {
164 eformat::helper::SourceIdentifier sid(rob.rob_source_id());
165 subdetNameVec.push_back(sid.human_detector());
166 }
167 auto subdets = Monitored::Collection<std::vector<std::string>>("L1Result_SubDets", subdetNameVec);
168 auto mon = Monitored::Group(m_monTool, numROBs, fragSize, subdets, monLBN, monNoEvent);
169
170 // Give the FullEventFragment pointer to ROBDataProviderSvc
171 m_robDataProviderSvc->setNextEvent(*eventContext, cache->fullEventFragment.get());
172 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
173
174 // Check the CTP fragment (request from readout if not part of the cache), ATR-25217
175 if (m_checkCTPFragmentModuleID.value() >= 0) {
176 const eformat::helper::SourceIdentifier sid{eformat::SubDetector::TDAQ_CTP,
177 static_cast<uint16_t>(m_checkCTPFragmentModuleID.value())};
178 std::vector<const OFFLINE_FRAGMENTS_NAMESPACE::ROBFragment*> vrobf;
179 m_robDataProviderSvc->getROBData(*eventContext, {sid.code()}, vrobf, name());
180 if (vrobf.empty()) {
181 ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " is missing. "
182 << "Throwing hltonl::Exception::MissingCTPFragment");
183 throw hltonl::Exception::MissingCTPFragment();
184 }
185 uint32_t robStatus = vrobf.at(0)->nstatus()>0 ? *(vrobf.at(0)->status()) : 0;
186 if (robStatus!=0) {
187 std::string hexStatus(8, char{0});
188 std::to_chars(hexStatus.data(), hexStatus.data()+hexStatus.size(), robStatus, 16);
189 ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " has non-zero status word: 0x"
190 << hexStatus << ". Throwing hltonl::Exception::BadCTPFragment");
191 throw hltonl::Exception::BadCTPFragment("Non-zero ROB status 0x"+hexStatus);
192 }
193 try {
194 vrobf.at(0)->check();
195 }
196 catch (const std::exception& ex) {
197 ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " is corrupted: "
198 << ex.what() << ". Throwing hltonl::Exception::BadCTPFragment");
199 throw hltonl::Exception::BadCTPFragment(ex.what());
200 }
201 }
202
203 // Return the FullEventFragment pointer (do not transfer ownership)
204 return cache->fullEventFragment.get();
205}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_DEBUG(x)
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition RawEvent.h:37
uint16_t m_maxLB
Maximum lumi block number seen so far, used for monitoring.
SG::SlotSpecificObj< EventCache > m_eventsCache
Cache of RawEvent pointer per event slot.
Gaudi::Property< int > m_checkCTPFragmentModuleID
ValuesCollection< T > Collection(std::string name, const T &collection)
Declare a monitored (double-convertible) collection.
status
Definition merge.py:16
setWord1 uint16_t
setEventNumber uint32_t

◆ previousEvent()

const RawEvent * TrigByteStreamInputSvc::previousEvent ( )
overridevirtual

Definition at line 210 of file TrigByteStreamInputSvc.cxx.

210 {
211 ATH_MSG_FATAL("The method " << __FUNCTION__ << " is not implemented for online running");
212 return nullptr;
213}

Member Data Documentation

◆ m_checkCTPFragmentModuleID

Gaudi::Property<int> TrigByteStreamInputSvc::m_checkCTPFragmentModuleID
private
Initial value:
{this, "CheckCTPFragmentModuleID", -1,
"After reading a new event, assert we can retrieve the CTP fragment with Module ID given by this property, "
"and that has no errors. A value <0 disables the check."}

Definition at line 52 of file TrigByteStreamInputSvc.h.

52 {this, "CheckCTPFragmentModuleID", -1,
53 "After reading a new event, assert we can retrieve the CTP fragment with Module ID given by this property, "
54 "and that has no errors. A value <0 disables the check."};

◆ m_efInterfaceSvc

ServiceHandle<EFInterfaceSvc> TrigByteStreamInputSvc::m_efInterfaceSvc {this, "EFInterfaceSvc", "", "Online service managing EF interface with Dataflow"}
private

Definition at line 47 of file TrigByteStreamInputSvc.h.

47{this, "EFInterfaceSvc", "", "Online service managing EF interface with Dataflow"};

◆ m_eventsCache

SG::SlotSpecificObj<EventCache> TrigByteStreamInputSvc::m_eventsCache
private

Cache of RawEvent pointer per event slot.

Definition at line 64 of file TrigByteStreamInputSvc.h.

◆ m_evtStore

ServiceHandle<StoreGateSvc> TrigByteStreamInputSvc::m_evtStore {this, "EventStore", "StoreGateSvc"}
private

Definition at line 48 of file TrigByteStreamInputSvc.h.

48{this, "EventStore", "StoreGateSvc"};

◆ m_hasEFInterface

bool TrigByteStreamInputSvc::m_hasEFInterface {false}
private

Definition at line 66 of file TrigByteStreamInputSvc.h.

66{false};

◆ m_maxLB

uint16_t TrigByteStreamInputSvc::m_maxLB {0}
private

Maximum lumi block number seen so far, used for monitoring.

Definition at line 65 of file TrigByteStreamInputSvc.h.

65{0};

◆ m_monTool

ToolHandle<GenericMonitoringTool> TrigByteStreamInputSvc::m_monTool {this, "MonTool", "" , "Monitoring tool"}
private

Definition at line 49 of file TrigByteStreamInputSvc.h.

49{this, "MonTool", "" , "Monitoring tool"};

◆ m_robDataProviderSvc

ServiceHandle<IROBDataProviderSvc> TrigByteStreamInputSvc::m_robDataProviderSvc {this, "ROBDataProvider", "ROBDataProviderSvc"}
private

Definition at line 46 of file TrigByteStreamInputSvc.h.

46{this, "ROBDataProvider", "ROBDataProviderSvc"};

The documentation for this class was generated from the following files: