ATLAS Offline Software
Loading...
Searching...
No Matches
DFEF::EFInterfaceEmulator Class Reference

#include <EFInterfaceEmulator.h>

Inheritance diagram for DFEF::EFInterfaceEmulator:
Collaboration diagram for DFEF::EFInterfaceEmulator:

Public Member Functions

 EFInterfaceEmulator (const boost::property_tree::ptree &args)
virtual void open () override
 Opens the Session.
virtual void close () override
 Closes the Session.
virtual std::future< std::unique_ptr< uint32_t[]> > getNext () override
 Returns a pointer to the next Event object to be processed.
virtual std::future< void > accept (uint64_t l0id, std::unique_ptr< uint32_t[]> hltr) override
 Marks the event as accepted by the High-Level Trigger.
virtual std::future< void > reject (uint64_t l0id) override
 Marks the event as rejected by the High-Level Trigger.
virtual ~EFInterfaceEmulator ()

Private Member Functions

void inputThreadCallback ()
 Compression level of built event.
void outputThreadCallback ()
 The method executed by the output handling thread.

Private Attributes

std::string m_name
std::unique_ptr< FileReaderWriterm_file_rw
eformat::Compression m_comp
unsigned int m_compLevel
 Compression type of built event.
std::unique_ptr< HLT::LoopThreadm_inputThread
 Input handling thread (triggers reading new events)
std::unique_ptr< HLT::LoopThreadm_outputThread
 Output handling thread (triggers post-processing of finished events)
std::queue< std::promise< std::unique_ptr< uint32_t[]> > > m_inputQueue
 queue of promises for getNext() calls
std::mutex m_iQMutex
 queue mutex
std::queue< std::pair< uint64_t, std::unique_ptr< uint32_t[]> > > m_outputQueue
 queue of promises for accept() calls
std::mutex m_oQMutex
 queue mutex
std::unordered_map< uint64_t, std::vector< std::unique_ptr< uint32_t[]> > > m_events
 Map of events read from the input (stores duplicates per L1ID)
std::mutex m_mMutex
 Map mutex.
std::mutex m_RWMutex
 ReadWrite mutex for the file reader/writer.

Detailed Description

Definition at line 39 of file EFInterfaceEmulator.h.

Constructor & Destructor Documentation

◆ EFInterfaceEmulator()

EFInterfaceEmulator::EFInterfaceEmulator ( const boost::property_tree::ptree & args)

Definition at line 24 of file EFInterfaceEmulator.cxx.

25{
26 ERS_DEBUG(2, "Initializing EFInterfaceEmulator");
27 try {
28 m_name = cargs.get<std::string>("name");
29 }
30 catch (const boost::property_tree::ptree_error &e) {
31 throw std::runtime_error("Failed to get 'name' from configuration: " + std::string(e.what()));
32 }
33 {
34 std::lock_guard<std::mutex> lock(m_RWMutex);
35 m_file_rw = std::make_unique<FileReaderWriter>(cargs);
36 }
37 m_comp = eformat::ZLIB;
38 m_compLevel = 2;
39}
unsigned int m_compLevel
Compression type of built event.
std::unique_ptr< FileReaderWriter > m_file_rw
std::mutex m_RWMutex
ReadWrite mutex for the file reader/writer.
eformat::Compression m_comp

◆ ~EFInterfaceEmulator()

EFInterfaceEmulator::~EFInterfaceEmulator ( )
virtual

Definition at line 104 of file EFInterfaceEmulator.cxx.

104{}

Member Function Documentation

◆ accept()

std::future< void > EFInterfaceEmulator::accept ( uint64_t l0id,
std::unique_ptr< uint32_t[]> hltr )
overridevirtual

Marks the event as accepted by the High-Level Trigger.

Definition at line 78 of file EFInterfaceEmulator.cxx.

78 {
79 // TODO: here I have to keep track of the events that I read. When event done is received, I have to assemble the full event and write it to the output.
80 std::promise<void> p;
81 // Create a pair, push the pair in the inputThreadQueue, and notify the input thread
82 std::pair<uint64_t, std::unique_ptr<uint32_t[]>> eventPair(l0id, std::move(hltr));
83 {
84 std::lock_guard<std::mutex> lock(m_oQMutex);
85 m_outputQueue.push(std::move(eventPair));
86 }
87 m_outputThread->cond().notify_one();
88 p.set_value();
89 return p.get_future();
90}
std::unique_ptr< HLT::LoopThread > m_outputThread
Output handling thread (triggers post-processing of finished events)
std::mutex m_oQMutex
queue mutex
std::queue< std::pair< uint64_t, std::unique_ptr< uint32_t[]> > > m_outputQueue
queue of promises for accept() calls

◆ close()

void EFInterfaceEmulator::close ( )
overridevirtual

Closes the Session.

Definition at line 51 of file EFInterfaceEmulator.cxx.

51 {
52 ERS_DEBUG(2, "Closing EFInterfaceEmulator");
53 m_inputThread->stop();
54 m_outputThread->stop();
55 ERS_DEBUG(2, "Threads stopped, waiting for them to finish");
56 m_inputThread->wait();
57 m_outputThread->wait();
58 ERS_DEBUG(2, "EFInterfaceEmulator closed and threads stopped");
59 {
60 std::lock_guard<std::mutex> lock(m_RWMutex);
61 m_file_rw.reset();
62 }
63}
std::unique_ptr< HLT::LoopThread > m_inputThread
Input handling thread (triggers reading new events)

◆ getNext()

std::future< std::unique_ptr< uint32_t[]> > EFInterfaceEmulator::getNext ( )
overridevirtual

Returns a pointer to the next Event object to be processed.

Definition at line 65 of file EFInterfaceEmulator.cxx.

65 {
66 ERS_DEBUG(2, "EFInterfaceEmulator getNext called");
67 std::promise<std::unique_ptr<uint32_t[]>> p;
68 auto fut = p.get_future();
69 {
70 std::lock_guard<std::mutex> lock(m_iQMutex);
71 m_inputQueue.push(std::move(p));
72 }
73 ERS_DEBUG(2, "Notifying input thread");
74 m_inputThread->cond().notify_one();
75 return fut;
76}
std::mutex m_iQMutex
queue mutex
std::queue< std::promise< std::unique_ptr< uint32_t[]> > > m_inputQueue
queue of promises for getNext() calls

◆ inputThreadCallback()

void EFInterfaceEmulator::inputThreadCallback ( )
private

Compression level of built event.

The method executed by the input handling thread

Definition at line 106 of file EFInterfaceEmulator.cxx.

106 {
107 ERS_DEBUG(2, "Input thread started");
108 while (true) {
109 std::promise<std::unique_ptr<uint32_t[]>> p;
110 {
111 std::lock_guard<std::mutex> lock(m_iQMutex);
112 ERS_DEBUG(2, "m_inputQueue size: " << m_inputQueue.size());
113 if (m_inputQueue.empty()) {
114 ERS_DEBUG(2, "Input queue is empty, breaking...");
115 break;
116 }
117 p = std::move(m_inputQueue.front());
118 m_inputQueue.pop();
119 }
120 // Get the event from file, create a copy of it to store in the map, and return the original pointer to the promise
121 ERS_DEBUG(2, "Reading next event from file");
122 std::unique_ptr<uint32_t[]> event = nullptr;
123 {
124 std::lock_guard<std::mutex> lock(m_RWMutex);
125 try {
126 //FIXME: Sleep for random duration, only for debugging
127 //auto sleepDuration = (std::rand() % 100) * 10;
128 //ERS_DEBUG(2, "Sleeping for " << sleepDuration << " milliseconds");
129 //std::this_thread::sleep_for(std::chrono::milliseconds(sleepDuration));
130 //coverity[SLEEP]
131 event = m_file_rw->getNextEvent();
132 } catch (const DFEF::NoMoreEventsInFile &ex) {
133 ERS_DEBUG(2, "No more events in file");
134 p.set_exception(std::make_exception_ptr(daq::df_ef_interface::NoMoreEvents("No more events in file")));
135 continue;
136 } catch (DFEF::ProblemReadingFromFile &ex) {
137 ERS_DEBUG(2, "Problem reading from file: " << ex.what());
138 p.set_exception(std::make_exception_ptr(daq::df_ef_interface::CommunicationError("Problem reading from file")));
139 continue;
140 }
141 }
142 ERS_DEBUG(2, "Event read");
143 eformat::read::FullEventFragment fullEvent(event.get());
144 uint32_t nwords = event[1];
145 ERS_DEBUG(2, "Copying full event with L0ID: " << fullEvent.lvl1_id() << ", size: " << nwords * sizeof(uint32_t) << " bytes");
146 std::unique_ptr<uint32_t[]> eventCopy(new uint32_t[nwords]);
147 std::memcpy(eventCopy.get(), event.get(), nwords * sizeof(uint32_t));
148 {
149 std::lock_guard<std::mutex> lock(m_mMutex);
150 auto& bucket = m_events[fullEvent.lvl1_id()];
151 bucket.push_back(std::move(eventCopy));
152 }
153 p.set_value(std::move(event));
154 }
155}
std::unordered_map< uint64_t, std::vector< std::unique_ptr< uint32_t[]> > > m_events
Map of events read from the input (stores duplicates per L1ID)
std::mutex m_mMutex
Map mutex.
setEventNumber uint32_t

◆ open()

void EFInterfaceEmulator::open ( )
overridevirtual

Opens the Session.

Definition at line 41 of file EFInterfaceEmulator.cxx.

41 {
42 //Starting input and output threads
43 ERS_DEBUG(2, "Starting input and output threads");
44 m_inputThread = std::make_unique<HLT::LoopThread>([this]{return inputThreadCallback();},-1);
45 m_outputThread = std::make_unique<HLT::LoopThread>([this]{return outputThreadCallback();}, -1);
46 m_inputThread->start();
47 m_outputThread->start();
48 ERS_DEBUG(2, "Input and output threads started");
49}
void inputThreadCallback()
Compression level of built event.
void outputThreadCallback()
The method executed by the output handling thread.

◆ outputThreadCallback()

void EFInterfaceEmulator::outputThreadCallback ( )
private

The method executed by the output handling thread.

Definition at line 157 of file EFInterfaceEmulator.cxx.

157 {
158 ERS_DEBUG(2, "Output thread started");
159 while (true) {
160 std::pair<uint64_t, std::unique_ptr<uint32_t[]>> eventPair;
161 {
162 std::lock_guard<std::mutex> lock(m_oQMutex);
163 ERS_DEBUG(2, "m_outputQueue size: " << m_outputQueue.size());
164 if (m_outputQueue.empty())
165 {
166 ERS_DEBUG(2, "Output queue is empty, breaking...");
167 break;
168 }
169 eventPair = std::move(m_outputQueue.front());
170 m_outputQueue.pop();
171 }
172 std::unique_ptr<uint32_t[]> fullEvent = nullptr;
173 {
174 std::lock_guard<std::mutex> lock(m_mMutex);
175 auto it = m_events.find(eventPair.first);
176 if (it != m_events.end() && !it->second.empty()) {
177 fullEvent = std::move(it->second.back());
178 it->second.pop_back();
179 if (it->second.empty()) {
180 m_events.erase(it);
181 }
182 } else {
183 throw std::runtime_error("Missing the FullEvent copy for event accepted with L0ID: " + std::to_string(eventPair.first));
184 }
185 ERS_DEBUG(2, "m_events size after erase: " << m_events.size());
186 }
187 //FIXME Remove this, just for debugging
188 std::unique_ptr<uint32_t[]> hltResult = std::move(eventPair.second);
189 uint32_t nwords = hltResult[1];
190 eformat::read::FullEventFragment fullEventFragment(fullEvent.get());
191 ERS_DEBUG(2, "Processing HLTResult with L0ID: " << fullEventFragment.lvl1_id() << ", size: " << nwords * 4 << " bytes");
192 // Merge HLTResult
193 auto finalEvent = DFEF::merge_hltresult_with_input_event(hltResult.get(), fullEvent.get(), m_comp, m_compLevel);
194 auto finalSize = finalEvent.size();
195 auto sizeInBytes = finalSize*4;
196 {
197 ERS_DEBUG(2, "Writing fullEventFragment with L0ID: " << eventPair.first << " of size: " << sizeInBytes << " bytes");
198 std::lock_guard<std::mutex> lock(m_RWMutex);
199 m_file_rw->writeEvent(sizeInBytes, finalEvent.data());
200 }
201 ERS_DEBUG(2, "Event with L0ID: " << eventPair.first << " written to file");
202 }
203 ERS_DEBUG(2, "Output thread finished processing.");
204}
std::vector< uint32_t > merge_hltresult_with_input_event(const uint32_t *hltresult, const uint32_t *input, eformat::Compression comp, unsigned int comp_level)
Merge two events: Take header and robs from hltresult event, add all res of the ROBS from input event...

◆ reject()

std::future< void > EFInterfaceEmulator::reject ( uint64_t l0id)
overridevirtual

Marks the event as rejected by the High-Level Trigger.

Definition at line 92 of file EFInterfaceEmulator.cxx.

92 {
93 ERS_DEBUG(2, "Rejecting event with L1ID: " << l0id);
94 ERS_DEBUG(2, "m_events size before erase: " << m_events.size());
95 {
96 std::lock_guard<std::mutex> lock(m_mMutex);
97 m_events.erase(l0id);
98 }
99 std::promise<void> p;
100 p.set_value();
101 return p.get_future();
102}

Member Data Documentation

◆ m_comp

eformat::Compression DFEF::EFInterfaceEmulator::m_comp
private

Definition at line 72 of file EFInterfaceEmulator.h.

◆ m_compLevel

unsigned int DFEF::EFInterfaceEmulator::m_compLevel
private

Compression type of built event.

Definition at line 73 of file EFInterfaceEmulator.h.

◆ m_events

std::unordered_map<uint64_t, std::vector<std::unique_ptr<uint32_t[]> > > DFEF::EFInterfaceEmulator::m_events
private

Map of events read from the input (stores duplicates per L1ID)

Definition at line 96 of file EFInterfaceEmulator.h.

◆ m_file_rw

std::unique_ptr<FileReaderWriter> DFEF::EFInterfaceEmulator::m_file_rw
private

Definition at line 71 of file EFInterfaceEmulator.h.

◆ m_inputQueue

std::queue<std::promise<std::unique_ptr<uint32_t[]> > > DFEF::EFInterfaceEmulator::m_inputQueue
private

queue of promises for getNext() calls

Definition at line 86 of file EFInterfaceEmulator.h.

◆ m_inputThread

std::unique_ptr<HLT::LoopThread> DFEF::EFInterfaceEmulator::m_inputThread
private

Input handling thread (triggers reading new events)

Definition at line 81 of file EFInterfaceEmulator.h.

◆ m_iQMutex

std::mutex DFEF::EFInterfaceEmulator::m_iQMutex
private

queue mutex

Definition at line 88 of file EFInterfaceEmulator.h.

◆ m_mMutex

std::mutex DFEF::EFInterfaceEmulator::m_mMutex
private

Map mutex.

Definition at line 98 of file EFInterfaceEmulator.h.

◆ m_name

std::string DFEF::EFInterfaceEmulator::m_name
private

Definition at line 70 of file EFInterfaceEmulator.h.

◆ m_oQMutex

std::mutex DFEF::EFInterfaceEmulator::m_oQMutex
private

queue mutex

Definition at line 93 of file EFInterfaceEmulator.h.

◆ m_outputQueue

std::queue<std::pair<uint64_t, std::unique_ptr<uint32_t[]> > > DFEF::EFInterfaceEmulator::m_outputQueue
private

queue of promises for accept() calls

Definition at line 91 of file EFInterfaceEmulator.h.

◆ m_outputThread

std::unique_ptr<HLT::LoopThread> DFEF::EFInterfaceEmulator::m_outputThread
private

Output handling thread (triggers post-processing of finished events)

Definition at line 83 of file EFInterfaceEmulator.h.

◆ m_RWMutex

std::mutex DFEF::EFInterfaceEmulator::m_RWMutex
private

ReadWrite mutex for the file reader/writer.

Definition at line 101 of file EFInterfaceEmulator.h.


The documentation for this class was generated from the following files: