ATLAS Offline Software
Loading...
Searching...
No Matches
TrigByteStreamInputSvc Class Reference

A ByteStreamInputSvc implementation for online use, reading events from hltinterface::DataCollector. More...

#include <TrigByteStreamInputSvc.h>

Inheritance diagram for TrigByteStreamInputSvc:
Collaboration diagram for TrigByteStreamInputSvc:

Classes

struct  EventCache

Public Types

enum class  NextEventStatus { OK , NO_EVENT , STOP , ERROR }

Public Member Functions

 TrigByteStreamInputSvc (const std::string &name, ISvcLocator *svcLoc)
 Standard constructor.
virtual ~TrigByteStreamInputSvc ()
 Standard destructor.
virtual StatusCode initialize () override
virtual StatusCode finalize () override
virtual const RawEventnextEvent () override
virtual const RawEventpreviousEvent () override
virtual const RawEventcurrentEvent () const override

Private Attributes

ServiceHandle< IROBDataProviderSvcm_robDataProviderSvc {this, "ROBDataProvider", "ROBDataProviderSvc"}
ServiceHandle< EFInterfaceSvcm_efInterfaceSvc {this, "EFInterfaceSvc", "", "Online service managing EF interface with Dataflow"}
ServiceHandle< StoreGateSvcm_evtStore {this, "EventStore", "StoreGateSvc"}
ToolHandle< GenericMonitoringToolm_monTool {this, "MonTool", "" , "Monitoring tool"}
Gaudi::Property< int > m_checkCTPFragmentModuleID
SG::SlotSpecificObj< EventCachem_eventsCache
 Cache of RawEvent pointer per event slot.
uint16_t m_maxLB {0}
 Maximum lumi block number seen so far, used for monitoring.
bool m_hasEFInterface {false}

Detailed Description

A ByteStreamInputSvc implementation for online use, reading events from hltinterface::DataCollector.

The layout and implementation are based on ByteStreamEventStorageInputSvc

Definition at line 25 of file TrigByteStreamInputSvc.h.

Member Enumeration Documentation

◆ NextEventStatus

Enumerator
OK 
NO_EVENT 
STOP 
ERROR 

Definition at line 33 of file TrigByteStreamInputSvc.h.

33{ OK, NO_EVENT, STOP, ERROR };

Constructor & Destructor Documentation

◆ TrigByteStreamInputSvc()

TrigByteStreamInputSvc::TrigByteStreamInputSvc ( const std::string & name,
ISvcLocator * svcLoc )

Standard constructor.

Definition at line 35 of file TrigByteStreamInputSvc.cxx.

36: base_class(name, svcLoc) {}

◆ ~TrigByteStreamInputSvc()

TrigByteStreamInputSvc::~TrigByteStreamInputSvc ( )
virtual

Standard destructor.

Definition at line 41 of file TrigByteStreamInputSvc.cxx.

41{}

Member Function Documentation

◆ currentEvent()

const RawEvent * TrigByteStreamInputSvc::currentEvent ( ) const
overridevirtual

Definition at line 219 of file TrigByteStreamInputSvc.cxx.

219 {
220 ATH_MSG_FATAL("The method " << __FUNCTION__ << " is not implemented for online running");
221 return nullptr;
222}
#define ATH_MSG_FATAL(x)

◆ finalize()

StatusCode TrigByteStreamInputSvc::finalize ( )
overridevirtual

Definition at line 70 of file TrigByteStreamInputSvc.cxx.

70 {
71 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
72 if (m_robDataProviderSvc.release().isFailure()) {
73 ATH_MSG_WARNING("Cannot release rob data provider");
74 }
76 if (m_efInterfaceSvc.release().isFailure()) {
77 ATH_MSG_WARNING("Failed to release service " << m_efInterfaceSvc.typeAndName());
78 }
79 }
80 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
81 return StatusCode::SUCCESS;
82}
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
ServiceHandle< EFInterfaceSvc > m_efInterfaceSvc
ServiceHandle< IROBDataProviderSvc > m_robDataProviderSvc

◆ initialize()

StatusCode TrigByteStreamInputSvc::initialize ( )
overridevirtual

Definition at line 46 of file TrigByteStreamInputSvc.cxx.

46 {
47 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
48
50 //Check if EFInterfaceSvc is available and set flag for selecting which interface to use
51 if (!m_efInterfaceSvc.empty()) {
52 ATH_MSG_INFO("Using EFInterfaceSvc");
53 ATH_CHECK(m_efInterfaceSvc.retrieve());
54 m_hasEFInterface = true;
55 }
56 else {
57 ATH_MSG_INFO("Using legacy dataflow interface");
58 m_hasEFInterface = false;
59 }
60 ATH_CHECK(m_evtStore.retrieve());
61 if (!m_monTool.empty()) ATH_CHECK(m_monTool.retrieve());
62
63 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
64 return StatusCode::SUCCESS;
65}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_INFO(x)
ToolHandle< GenericMonitoringTool > m_monTool
ServiceHandle< StoreGateSvc > m_evtStore

◆ nextEvent()

const RawEvent * TrigByteStreamInputSvc::nextEvent ( )
overridevirtual

Definition at line 87 of file TrigByteStreamInputSvc.cxx.

87 {
88 ATH_MSG_VERBOSE("start of " << __FUNCTION__);
89
90 // Get the EventContext via event store because the interface doesn't allow passing it explicitly as an argument
91 // and we don't want to use ThreadLocalContext. Don't use ReadHandle here because it calls ThreadLocalContext if
92 // not given a context (which we want to retrieve).
93 const EventContext* eventContext = nullptr;
94 if (m_evtStore->retrieve(eventContext).isFailure()) {
95 ATH_MSG_ERROR("Failed to retrieve EventContext from the event store, new event cannot be read");
96 return nullptr;
97 }
98
99 ATH_MSG_DEBUG("Reading new event for event context " << *eventContext);
100
101 // Find the cache corresponding to the current slot
102 EventCache* cache = m_eventsCache.get(*eventContext);
103
104 // Free the memory allocated to the previous event processed in the current slot
105 cache->releaseEvent();
106
107 auto monLBN = Monitored::Scalar<uint16_t>("getNext_LBN", m_maxLB);
108 auto monNoEvent = Monitored::Scalar<bool>("getNext_noEvent", false);
110 try {
111 auto t_getNext = Monitored::Timer<std::chrono::duration<float, std::milli>>("TIME_getNext");
112 if (m_hasEFInterface) {
113 status = fetch<EFInterfaceSvc::Status>([&]{ return m_efInterfaceSvc->getNext(cache->rawData); });
114 }
115 else {
116 status = fetch<hltinterface::DataCollector::Status>([&]{ return hltinterface::DataCollector::instance()->getNext(cache->rawData); });
117 }
118 auto mon = Monitored::Group(m_monTool, t_getNext);
119 }
120 catch (const std::exception& ex) {
121 ATH_MSG_ERROR("Failed to read new event, caught an unexpected exception: " << ex.what()
122 << ". Throwing hltonl::Exception::EventSourceCorrupted" );
123 throw hltonl::Exception::EventSourceCorrupted();
124 }
125 catch (...) {
126 ATH_MSG_ERROR("Failed to read new event, caught an unexpected exception. "
127 << "Throwing hltonl::Exception::EventSourceCorrupted" );
128 throw hltonl::Exception::EventSourceCorrupted();
129 }
130 if (status == NextEventStatus::STOP) {
131 ATH_MSG_DEBUG("DataCollector::getNext returned STOP - no more events available");
132 throw hltonl::Exception::NoMoreEvents();
133 }
134 else if (status == NextEventStatus::NO_EVENT) {
135 ATH_MSG_DEBUG("DataCollector::getNext returned NO_EVENT - no events available temporarily");
136 monNoEvent = true;
137 auto mon = Monitored::Group(m_monTool, monLBN, monNoEvent);
138 throw hltonl::Exception::NoEventsTemporarily();
139 }
140 else if (status != NextEventStatus::OK) {
141 ATH_MSG_ERROR("Unhandled return Status " << static_cast<int>(status) << " from DataCollector::getNext");
142 return nullptr;
143 }
144 ATH_MSG_VERBOSE("DataCollector::getNext returned Status::OK");
145
146 // Create a cached FullEventFragment object from the cached raw data
147 cache->fullEventFragment.reset(new RawEvent(cache->rawData.get()));
148
149 // Update LB number for monitoring
150 if (m_maxLB < cache->fullEventFragment->lumi_block()) {
151 m_maxLB = cache->fullEventFragment->lumi_block();
152 monLBN = m_maxLB;
153 }
154
155 // Monitor the input
156 auto numROBs = Monitored::Scalar<int>("Input_NumROBs", 0);
157 auto fragSize = Monitored::Scalar<float>("Input_FullEvFragSize",
158 cache->fullEventFragment->fragment_size_word()*wordsToKiloBytes);
159
160 std::vector<std::string> subdetNameVec;
161 auto iter = cache->fullEventFragment->child_iter();
163 ++numROBs;
165 eformat::helper::SourceIdentifier sid(rob.rob_source_id());
166 subdetNameVec.push_back(sid.human_detector());
167 }
168 auto subdets = Monitored::Collection<std::vector<std::string>>("Input_SubDets", subdetNameVec);
169 auto mon = Monitored::Group(m_monTool, numROBs, fragSize, subdets, monLBN, monNoEvent);
170
171 // Give the FullEventFragment pointer to ROBDataProviderSvc
172 m_robDataProviderSvc->setNextEvent(*eventContext, cache->fullEventFragment.get());
173 ATH_MSG_VERBOSE("end of " << __FUNCTION__);
174
175 // Check the CTP fragment (request from readout if not part of the cache), ATR-25217
176 if (m_checkCTPFragmentModuleID.value() >= 0) {
177 const eformat::helper::SourceIdentifier sid{eformat::SubDetector::TDAQ_CTP,
178 static_cast<uint16_t>(m_checkCTPFragmentModuleID.value())};
179 std::vector<const OFFLINE_FRAGMENTS_NAMESPACE::ROBFragment*> vrobf;
180 m_robDataProviderSvc->getROBData(*eventContext, {sid.code()}, vrobf, name());
181 if (vrobf.empty()) {
182 ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " is missing. "
183 << "Throwing hltonl::Exception::MissingCTPFragment");
184 throw hltonl::Exception::MissingCTPFragment();
185 }
186 uint32_t robStatus = vrobf.at(0)->nstatus()>0 ? *(vrobf.at(0)->status()) : 0;
187 if (robStatus!=0) {
188 std::string hexStatus(8, char{0});
189 std::to_chars(hexStatus.data(), hexStatus.data()+hexStatus.size(), robStatus, 16);
190 ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " has non-zero status word: 0x"
191 << hexStatus << ". Throwing hltonl::Exception::BadCTPFragment");
192 throw hltonl::Exception::BadCTPFragment("Non-zero ROB status 0x"+hexStatus);
193 }
194 try {
195 vrobf.at(0)->check();
196 }
197 catch (const std::exception& ex) {
198 ATH_MSG_INFO("The CTP ROB fragment 0x" << std::hex << sid.code() << std::dec << " is corrupted: "
199 << ex.what() << ". Throwing hltonl::Exception::BadCTPFragment");
200 throw hltonl::Exception::BadCTPFragment(ex.what());
201 }
202 }
203
204 // Return the FullEventFragment pointer (do not transfer ownership)
205 return cache->fullEventFragment.get();
206}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_DEBUG(x)
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition RawEvent.h:37
uint16_t m_maxLB
Maximum lumi block number seen so far, used for monitoring.
SG::SlotSpecificObj< EventCache > m_eventsCache
Cache of RawEvent pointer per event slot.
Gaudi::Property< int > m_checkCTPFragmentModuleID
ValuesCollection< T > Collection(std::string name, const T &collection)
Declare a monitored (double-convertible) collection.
const DataType * PointerType
Definition RawEvent.h:25
eformat::ROBFragment< PointerType > ROBFragment
Definition RawEvent.h:27
const IIntersectionCache * cache() const
Retrieve the associated cache block, if it exists.
status
Definition merge.py:16
setWord1 uint16_t
setEventNumber uint32_t

◆ previousEvent()

const RawEvent * TrigByteStreamInputSvc::previousEvent ( )
overridevirtual

Definition at line 211 of file TrigByteStreamInputSvc.cxx.

211 {
212 ATH_MSG_FATAL("The method " << __FUNCTION__ << " is not implemented for online running");
213 return nullptr;
214}

Member Data Documentation

◆ m_checkCTPFragmentModuleID

Gaudi::Property<int> TrigByteStreamInputSvc::m_checkCTPFragmentModuleID
private
Initial value:
{this, "CheckCTPFragmentModuleID", -1,
"After reading a new event, assert we can retrieve the CTP fragment with Module ID given by this property, "
"and that has no errors. A value <0 disables the check."}

Definition at line 52 of file TrigByteStreamInputSvc.h.

52 {this, "CheckCTPFragmentModuleID", -1,
53 "After reading a new event, assert we can retrieve the CTP fragment with Module ID given by this property, "
54 "and that has no errors. A value <0 disables the check."};

◆ m_efInterfaceSvc

ServiceHandle<EFInterfaceSvc> TrigByteStreamInputSvc::m_efInterfaceSvc {this, "EFInterfaceSvc", "", "Online service managing EF interface with Dataflow"}
private

Definition at line 47 of file TrigByteStreamInputSvc.h.

47{this, "EFInterfaceSvc", "", "Online service managing EF interface with Dataflow"};

◆ m_eventsCache

SG::SlotSpecificObj<EventCache> TrigByteStreamInputSvc::m_eventsCache
private

Cache of RawEvent pointer per event slot.

Definition at line 64 of file TrigByteStreamInputSvc.h.

◆ m_evtStore

ServiceHandle<StoreGateSvc> TrigByteStreamInputSvc::m_evtStore {this, "EventStore", "StoreGateSvc"}
private

Definition at line 48 of file TrigByteStreamInputSvc.h.

48{this, "EventStore", "StoreGateSvc"};

◆ m_hasEFInterface

bool TrigByteStreamInputSvc::m_hasEFInterface {false}
private

Definition at line 66 of file TrigByteStreamInputSvc.h.

66{false};

◆ m_maxLB

uint16_t TrigByteStreamInputSvc::m_maxLB {0}
private

Maximum lumi block number seen so far, used for monitoring.

Definition at line 65 of file TrigByteStreamInputSvc.h.

65{0};

◆ m_monTool

ToolHandle<GenericMonitoringTool> TrigByteStreamInputSvc::m_monTool {this, "MonTool", "" , "Monitoring tool"}
private

Definition at line 49 of file TrigByteStreamInputSvc.h.

49{this, "MonTool", "" , "Monitoring tool"};

◆ m_robDataProviderSvc

ServiceHandle<IROBDataProviderSvc> TrigByteStreamInputSvc::m_robDataProviderSvc {this, "ROBDataProvider", "ROBDataProviderSvc"}
private

Definition at line 46 of file TrigByteStreamInputSvc.h.

46{this, "ROBDataProvider", "ROBDataProviderSvc"};

The documentation for this class was generated from the following files: