ATLAS Offline Software
Loading...
Searching...
No Matches
DFEF::EFInterfaceEmulator Class Reference

#include <EFInterfaceEmulator.h>

Inheritance diagram for DFEF::EFInterfaceEmulator:
Collaboration diagram for DFEF::EFInterfaceEmulator:

Public Member Functions

 EFInterfaceEmulator (const boost::property_tree::ptree &args)
virtual void open () override
 Opens the Session.
virtual void close () override
 Closes the Session.
virtual std::future< std::unique_ptr< uint32_t[]> > getNext () override
 Returns a pointer to the next Event object to be processed.
virtual std::future< void > accept (uint64_t l0id, std::unique_ptr< uint32_t[]> hltr) override
 Marks the event as accepted by the High-Level Trigger.
virtual std::future< void > reject (uint64_t l0id) override
 Marks the event as rejected by the High-Level Trigger.
virtual ~EFInterfaceEmulator ()

Private Member Functions

void inputThreadCallback ()
 Compression level of built event.
void outputThreadCallback ()
 The method executed by the output handling thread.

Private Attributes

std::string m_name
std::unique_ptr< FileReaderWriterm_file_rw
eformat::Compression m_comp
unsigned int m_compLevel
 Compression type of built event.
std::unique_ptr< HLT::LoopThreadm_inputThread
 Input handling thread (triggers reading new events)
std::unique_ptr< HLT::LoopThreadm_outputThread
 Output handling thread (triggers post-processing of finished events)
std::queue< std::promise< std::unique_ptr< uint32_t[]> > > m_inputQueue
 queue of promises for getNext() calls
std::mutex m_iQMutex
 queue mutex
std::queue< std::pair< uint64_t, std::unique_ptr< uint32_t[]> > > m_outputQueue
 queue of promises for accept() calls
std::mutex m_oQMutex
 queue mutex
std::unordered_map< uint64_t, std::vector< std::unique_ptr< uint32_t[]> > > m_events
 Map of events read from the input (stores duplicates per L1ID)
std::mutex m_mMutex
 Map mutex.
std::mutex m_RWMutex
 ReadWrite mutex for the file reader/writer.

Detailed Description

Definition at line 39 of file EFInterfaceEmulator.h.

Constructor & Destructor Documentation

◆ EFInterfaceEmulator()

EFInterfaceEmulator::EFInterfaceEmulator ( const boost::property_tree::ptree & args)

Definition at line 15 of file EFInterfaceEmulator.cxx.

16{
17 ERS_DEBUG(2, "Initializing EFInterfaceEmulator");
18 try {
19 m_name = cargs.get<std::string>("name");
20 }
21 catch (const boost::property_tree::ptree_error &e) {
22 throw std::runtime_error("Failed to get 'name' from configuration: " + std::string(e.what()));
23 }
24 {
25 std::lock_guard<std::mutex> lock(m_RWMutex);
26 m_file_rw = std::make_unique<FileReaderWriter>(cargs);
27 }
28 // TODO: read ptree to get the compression type and level
29 m_comp = eformat::UNCOMPRESSED;
30 m_compLevel = 0;
31}
unsigned int m_compLevel
Compression type of built event.
std::unique_ptr< FileReaderWriter > m_file_rw
std::mutex m_RWMutex
ReadWrite mutex for the file reader/writer.
eformat::Compression m_comp

◆ ~EFInterfaceEmulator()

EFInterfaceEmulator::~EFInterfaceEmulator ( )
virtual

Definition at line 96 of file EFInterfaceEmulator.cxx.

96{}

Member Function Documentation

◆ accept()

std::future< void > EFInterfaceEmulator::accept ( uint64_t l0id,
std::unique_ptr< uint32_t[]> hltr )
overridevirtual

Marks the event as accepted by the High-Level Trigger.

Definition at line 70 of file EFInterfaceEmulator.cxx.

70 {
71 // TODO: here I have to keep track of the events that I read. When event done is received, I have to assemble the full event and write it to the output.
72 std::promise<void> p;
73 // Create a pair, push the pair in the inputThreadQueue, and notify the input thread
74 std::pair<uint64_t, std::unique_ptr<uint32_t[]>> eventPair(l0id, std::move(hltr));
75 {
76 std::lock_guard<std::mutex> lock(m_oQMutex);
77 m_outputQueue.push(std::move(eventPair));
78 }
79 m_outputThread->cond().notify_one();
80 p.set_value();
81 return p.get_future();
82}
std::unique_ptr< HLT::LoopThread > m_outputThread
Output handling thread (triggers post-processing of finished events)
std::mutex m_oQMutex
queue mutex
std::queue< std::pair< uint64_t, std::unique_ptr< uint32_t[]> > > m_outputQueue
queue of promises for accept() calls

◆ close()

void EFInterfaceEmulator::close ( )
overridevirtual

Closes the Session.

Definition at line 43 of file EFInterfaceEmulator.cxx.

43 {
44 ERS_DEBUG(2, "Closing EFInterfaceEmulator");
45 m_inputThread->stop();
46 m_outputThread->stop();
47 ERS_DEBUG(2, "Threads stopped, waiting for them to finish");
48 m_inputThread->wait();
49 m_outputThread->wait();
50 ERS_DEBUG(2, "EFInterfaceEmulator closed and threads stopped");
51 {
52 std::lock_guard<std::mutex> lock(m_RWMutex);
53 m_file_rw.reset();
54 }
55}
std::unique_ptr< HLT::LoopThread > m_inputThread
Input handling thread (triggers reading new events)

◆ getNext()

std::future< std::unique_ptr< uint32_t[]> > EFInterfaceEmulator::getNext ( )
overridevirtual

Returns a pointer to the next Event object to be processed.

Definition at line 57 of file EFInterfaceEmulator.cxx.

57 {
58 ERS_DEBUG(2, "EFInterfaceEmulator getNext called");
59 std::promise<std::unique_ptr<uint32_t[]>> p;
60 auto fut = p.get_future();
61 {
62 std::lock_guard<std::mutex> lock(m_iQMutex);
63 m_inputQueue.push(std::move(p));
64 }
65 ERS_DEBUG(2, "Notifying input thread");
66 m_inputThread->cond().notify_one();
67 return fut;
68}
std::mutex m_iQMutex
queue mutex
std::queue< std::promise< std::unique_ptr< uint32_t[]> > > m_inputQueue
queue of promises for getNext() calls

◆ inputThreadCallback()

void EFInterfaceEmulator::inputThreadCallback ( )
private

Compression level of built event.

The method executed by the input handling thread

Definition at line 98 of file EFInterfaceEmulator.cxx.

98 {
99 ERS_DEBUG(2, "Input thread started");
100 while (true) {
101 std::promise<std::unique_ptr<uint32_t[]>> p;
102 {
103 std::lock_guard<std::mutex> lock(m_iQMutex);
104 ERS_DEBUG(2, "m_inputQueue size: " << m_inputQueue.size());
105 if (m_inputQueue.empty()) {
106 ERS_DEBUG(2, "Input queue is empty, breaking...");
107 break;
108 }
109 p = std::move(m_inputQueue.front());
110 m_inputQueue.pop();
111 }
112 // Get the event from file, create a copy of it to store in the map, and return the original pointer to the promise
113 ERS_DEBUG(2, "Reading next event from file");
114 std::unique_ptr<uint32_t[]> event = nullptr;
115 {
116 std::lock_guard<std::mutex> lock(m_RWMutex);
117 try {
118 //FIXME: Sleep for random duration, only for debugging
119 //auto sleepDuration = (std::rand() % 100) * 10;
120 //ERS_DEBUG(2, "Sleeping for " << sleepDuration << " milliseconds");
121 //std::this_thread::sleep_for(std::chrono::milliseconds(sleepDuration));
122 //coverity[SLEEP]
123 event = m_file_rw->getNextEvent();
124 } catch (const DFEF::NoMoreEventsInFile &ex) {
125 ERS_DEBUG(2, "No more events in file");
126 p.set_exception(std::make_exception_ptr(daq::df_ef_interface::NoMoreEvents("No more events in file")));
127 continue;
128 } catch (DFEF::ProblemReadingFromFile &ex) {
129 ERS_DEBUG(2, "Problem reading from file: " << ex.what());
130 p.set_exception(std::make_exception_ptr(daq::df_ef_interface::CommunicationError("Problem reading from file")));
131 continue;
132 }
133 }
134 ERS_DEBUG(2, "Event read");
135 eformat::read::FullEventFragment fullEvent(event.get());
136 uint32_t nwords = event[1];
137 ERS_DEBUG(2, "Copying full event with L0ID: " << fullEvent.lvl1_id() << ", size: " << nwords * sizeof(uint32_t) << " bytes");
138 std::unique_ptr<uint32_t[]> eventCopy(new uint32_t[nwords]);
139 std::memcpy(eventCopy.get(), event.get(), nwords * sizeof(uint32_t));
140 {
141 std::lock_guard<std::mutex> lock(m_mMutex);
142 auto& bucket = m_events[fullEvent.lvl1_id()];
143 bucket.push_back(std::move(eventCopy));
144 }
145 p.set_value(std::move(event));
146 }
147}
std::unordered_map< uint64_t, std::vector< std::unique_ptr< uint32_t[]> > > m_events
Map of events read from the input (stores duplicates per L1ID)
std::mutex m_mMutex
Map mutex.
setEventNumber uint32_t

◆ open()

void EFInterfaceEmulator::open ( )
overridevirtual

Opens the Session.

Definition at line 33 of file EFInterfaceEmulator.cxx.

33 {
34 //Starting input and output threads
35 ERS_DEBUG(2, "Starting input and output threads");
36 m_inputThread = std::make_unique<HLT::LoopThread>([this]{return inputThreadCallback();},-1);
37 m_outputThread = std::make_unique<HLT::LoopThread>([this]{return outputThreadCallback();}, -1);
38 m_inputThread->start();
39 m_outputThread->start();
40 ERS_DEBUG(2, "Input and output threads started");
41}
void inputThreadCallback()
Compression level of built event.
void outputThreadCallback()
The method executed by the output handling thread.

◆ outputThreadCallback()

void EFInterfaceEmulator::outputThreadCallback ( )
private

The method executed by the output handling thread.

Definition at line 149 of file EFInterfaceEmulator.cxx.

149 {
150 ERS_DEBUG(2, "Output thread started");
151 while (true) {
152 std::pair<uint64_t, std::unique_ptr<uint32_t[]>> eventPair;
153 {
154 std::lock_guard<std::mutex> lock(m_oQMutex);
155 ERS_DEBUG(2, "m_outputQueue size: " << m_outputQueue.size());
156 if (m_outputQueue.empty())
157 {
158 ERS_DEBUG(2, "Output queue is empty, breaking...");
159 break;
160 }
161 eventPair = std::move(m_outputQueue.front());
162 m_outputQueue.pop();
163 }
164 std::unique_ptr<uint32_t[]> fullEvent = nullptr;
165 {
166 std::lock_guard<std::mutex> lock(m_mMutex);
167 auto it = m_events.find(eventPair.first);
168 if (it != m_events.end() && !it->second.empty()) {
169 fullEvent = std::move(it->second.back());
170 it->second.pop_back();
171 if (it->second.empty()) {
172 m_events.erase(it);
173 }
174 } else {
175 throw std::runtime_error("Missing the FullEvent copy for event accepted with L0ID: " + std::to_string(eventPair.first));
176 }
177 }
178 //FIXME Remove this, just for debugging
179 std::unique_ptr<uint32_t[]> hltResult = std::move(eventPair.second);
180 uint32_t nwords = hltResult[1];
181 eformat::read::FullEventFragment fullEventFragment(fullEvent.get());
182 ERS_DEBUG(2, "Processing HLTResult with L0ID: " << fullEventFragment.lvl1_id() << ", size: " << nwords * 4 << " bytes");
183 // Merge HLTResult
184 auto finalEvent = DFEF::merge_hltresult_with_input_event(hltResult.get(), fullEvent.get(), m_comp, m_compLevel);
185 auto finalSize = finalEvent.size();
186 auto sizeInBytes = finalSize*4;
187 {
188 ERS_DEBUG(2, "Writing fullEventFragment with L0ID: " << eventPair.first << " of size: " << sizeInBytes << " bytes");
189 std::lock_guard<std::mutex> lock(m_RWMutex);
190 m_file_rw->writeEvent(sizeInBytes, finalEvent.data());
191 }
192 ERS_DEBUG(2, "Event with L0ID: " << eventPair.first << " written to file");
193 ERS_DEBUG(2, "m_events size after erase: " << m_events.size());
194 }
195 ERS_DEBUG(2, "Output thread finished processing.");
196}
std::vector< uint32_t > merge_hltresult_with_input_event(const uint32_t *hltresult, const uint32_t *input, eformat::Compression comp, unsigned int comp_level)
Merge two events: Take header and robs from hltresult event, add all res of the ROBS from input event...

◆ reject()

std::future< void > EFInterfaceEmulator::reject ( uint64_t l0id)
overridevirtual

Marks the event as rejected by the High-Level Trigger.

Definition at line 84 of file EFInterfaceEmulator.cxx.

84 {
85 ERS_DEBUG(2, "Rejecting event with L1ID: " << l0id);
86 ERS_DEBUG(2, "m_events size before erase: " << m_events.size());
87 {
88 std::lock_guard<std::mutex> lock(m_mMutex);
89 m_events.erase(l0id);
90 }
91 std::promise<void> p;
92 p.set_value();
93 return p.get_future();
94}

Member Data Documentation

◆ m_comp

eformat::Compression DFEF::EFInterfaceEmulator::m_comp
private

Definition at line 72 of file EFInterfaceEmulator.h.

◆ m_compLevel

unsigned int DFEF::EFInterfaceEmulator::m_compLevel
private

Compression type of built event.

Definition at line 73 of file EFInterfaceEmulator.h.

◆ m_events

std::unordered_map<uint64_t, std::vector<std::unique_ptr<uint32_t[]> > > DFEF::EFInterfaceEmulator::m_events
private

Map of events read from the input (stores duplicates per L1ID)

Definition at line 96 of file EFInterfaceEmulator.h.

◆ m_file_rw

std::unique_ptr<FileReaderWriter> DFEF::EFInterfaceEmulator::m_file_rw
private

Definition at line 71 of file EFInterfaceEmulator.h.

◆ m_inputQueue

std::queue<std::promise<std::unique_ptr<uint32_t[]> > > DFEF::EFInterfaceEmulator::m_inputQueue
private

queue of promises for getNext() calls

Definition at line 86 of file EFInterfaceEmulator.h.

◆ m_inputThread

std::unique_ptr<HLT::LoopThread> DFEF::EFInterfaceEmulator::m_inputThread
private

Input handling thread (triggers reading new events)

Definition at line 81 of file EFInterfaceEmulator.h.

◆ m_iQMutex

std::mutex DFEF::EFInterfaceEmulator::m_iQMutex
private

queue mutex

Definition at line 88 of file EFInterfaceEmulator.h.

◆ m_mMutex

std::mutex DFEF::EFInterfaceEmulator::m_mMutex
private

Map mutex.

Definition at line 98 of file EFInterfaceEmulator.h.

◆ m_name

std::string DFEF::EFInterfaceEmulator::m_name
private

Definition at line 70 of file EFInterfaceEmulator.h.

◆ m_oQMutex

std::mutex DFEF::EFInterfaceEmulator::m_oQMutex
private

queue mutex

Definition at line 93 of file EFInterfaceEmulator.h.

◆ m_outputQueue

std::queue<std::pair<uint64_t, std::unique_ptr<uint32_t[]> > > DFEF::EFInterfaceEmulator::m_outputQueue
private

queue of promises for accept() calls

Definition at line 91 of file EFInterfaceEmulator.h.

◆ m_outputThread

std::unique_ptr<HLT::LoopThread> DFEF::EFInterfaceEmulator::m_outputThread
private

Output handling thread (triggers post-processing of finished events)

Definition at line 83 of file EFInterfaceEmulator.h.

◆ m_RWMutex

std::mutex DFEF::EFInterfaceEmulator::m_RWMutex
private

ReadWrite mutex for the file reader/writer.

Definition at line 101 of file EFInterfaceEmulator.h.


The documentation for this class was generated from the following files: