5#include <boost/property_tree/ptree.hpp>
10extern "C" std::unique_ptr<daq::df_ef_interface::EventHandler>
createEventHandler(
const boost::property_tree::ptree &conf){
17 ERS_DEBUG(2,
"Initializing EFInterfaceEmulator");
19 m_name = cargs.get<std::string>(
"name");
21 catch (
const boost::property_tree::ptree_error &e) {
22 throw std::runtime_error(
"Failed to get 'name' from configuration: " + std::string(e.what()));
25 std::lock_guard<std::mutex> lock(
m_RWMutex);
26 m_file_rw = std::make_unique<FileReaderWriter>(cargs);
29 m_comp = eformat::UNCOMPRESSED;
35 ERS_DEBUG(2,
"Starting input and output threads");
40 ERS_DEBUG(2,
"Input and output threads started");
44 ERS_DEBUG(2,
"Closing EFInterfaceEmulator");
47 ERS_DEBUG(2,
"Threads stopped, waiting for them to finish");
50 ERS_DEBUG(2,
"EFInterfaceEmulator closed and threads stopped");
52 std::lock_guard<std::mutex> lock(
m_RWMutex);
58 ERS_DEBUG(2,
"EFInterfaceEmulator getNext called");
59 std::promise<std::unique_ptr<uint32_t[]>> p;
60 auto fut = p.get_future();
62 std::lock_guard<std::mutex> lock(
m_iQMutex);
65 ERS_DEBUG(2,
"Notifying input thread");
74 std::pair<uint64_t, std::unique_ptr<uint32_t[]>> eventPair(l0id, std::move(hltr));
76 std::lock_guard<std::mutex> lock(
m_oQMutex);
81 return p.get_future();
85 ERS_DEBUG(2,
"Rejecting event with L1ID: " << l0id);
86 ERS_DEBUG(2,
"m_events size before erase: " <<
m_events.size());
88 std::lock_guard<std::mutex> lock(
m_mMutex);
93 return p.get_future();
99 ERS_DEBUG(2,
"Input thread started");
101 std::promise<std::unique_ptr<uint32_t[]>> p;
103 std::lock_guard<std::mutex> lock(
m_iQMutex);
104 ERS_DEBUG(2,
"m_inputQueue size: " <<
m_inputQueue.size());
106 ERS_DEBUG(2,
"Input queue is empty, breaking...");
113 ERS_DEBUG(2,
"Reading next event from file");
114 std::unique_ptr<uint32_t[]>
event =
nullptr;
116 std::lock_guard<std::mutex> lock(
m_RWMutex);
125 ERS_DEBUG(2,
"No more events in file");
126 p.set_exception(std::make_exception_ptr(daq::df_ef_interface::NoMoreEvents(
"No more events in file")));
129 ERS_DEBUG(2,
"Problem reading from file: " << ex.what());
130 p.set_exception(std::make_exception_ptr(daq::df_ef_interface::CommunicationError(
"Problem reading from file")));
134 ERS_DEBUG(2,
"Event read");
135 eformat::read::FullEventFragment fullEvent(event.get());
136 uint32_t nwords =
event[1];
137 ERS_DEBUG(2,
"Copying full event with L0ID: " << fullEvent.lvl1_id() <<
", size: " << nwords *
sizeof(uint32_t) <<
" bytes");
138 std::unique_ptr<uint32_t[]> eventCopy(
new uint32_t[nwords]);
139 std::memcpy(eventCopy.get(), event.get(), nwords *
sizeof(uint32_t));
141 std::lock_guard<std::mutex> lock(
m_mMutex);
142 auto& bucket =
m_events[fullEvent.lvl1_id()];
143 bucket.push_back(std::move(eventCopy));
145 p.set_value(std::move(event));
150 ERS_DEBUG(2,
"Output thread started");
152 std::pair<uint64_t, std::unique_ptr<uint32_t[]>> eventPair;
154 std::lock_guard<std::mutex> lock(
m_oQMutex);
155 ERS_DEBUG(2,
"m_outputQueue size: " <<
m_outputQueue.size());
158 ERS_DEBUG(2,
"Output queue is empty, breaking...");
164 std::unique_ptr<uint32_t[]> fullEvent =
nullptr;
166 std::lock_guard<std::mutex> lock(
m_mMutex);
167 auto it =
m_events.find(eventPair.first);
168 if (it !=
m_events.end() && !it->second.empty()) {
169 fullEvent = std::move(it->second.back());
170 it->second.pop_back();
171 if (it->second.empty()) {
175 throw std::runtime_error(
"Missing the FullEvent copy for event accepted with L0ID: " + std::to_string(eventPair.first));
179 std::unique_ptr<uint32_t[]> hltResult = std::move(eventPair.second);
180 uint32_t nwords = hltResult[1];
181 eformat::read::FullEventFragment fullEventFragment(fullEvent.get());
182 ERS_DEBUG(2,
"Processing HLTResult with L0ID: " << fullEventFragment.lvl1_id() <<
", size: " << nwords * 4 <<
" bytes");
185 auto finalSize = finalEvent.size();
186 auto sizeInBytes = finalSize*4;
188 ERS_DEBUG(2,
"Writing fullEventFragment with L0ID: " << eventPair.first <<
" of size: " << sizeInBytes <<
" bytes");
189 std::lock_guard<std::mutex> lock(
m_RWMutex);
190 m_file_rw->writeEvent(sizeInBytes, finalEvent.data());
192 ERS_DEBUG(2,
"Event with L0ID: " << eventPair.first <<
" written to file");
193 ERS_DEBUG(2,
"m_events size after erase: " <<
m_events.size());
195 ERS_DEBUG(2,
"Output thread finished processing.");
std::unique_ptr< daq::df_ef_interface::EventHandler > createEventHandler(const boost::property_tree::ptree &conf)
std::unordered_map< uint64_t, std::vector< std::unique_ptr< uint32_t[]> > > m_events
Map of events read from the input (stores duplicates per L1ID)
EFInterfaceEmulator(const boost::property_tree::ptree &args)
std::unique_ptr< HLT::LoopThread > m_outputThread
Output handling thread (triggers post-processing of finished events)
std::mutex m_iQMutex
queue mutex
unsigned int m_compLevel
Compression type of built event.
virtual std::future< void > accept(uint64_t l0id, std::unique_ptr< uint32_t[]> hltr) override
Marks the event as accepted by the High-Level Trigger.
void inputThreadCallback()
Compression level of built event.
std::queue< std::promise< std::unique_ptr< uint32_t[]> > > m_inputQueue
queue of promises for getNext() calls
void outputThreadCallback()
The method executed by the output handling thread.
std::unique_ptr< HLT::LoopThread > m_inputThread
Input handling thread (triggers reading new events)
virtual void open() override
Opens the Session.
std::mutex m_oQMutex
queue mutex
std::mutex m_mMutex
Map mutex.
std::unique_ptr< FileReaderWriter > m_file_rw
std::mutex m_RWMutex
ReadWrite mutex for the file reader/writer.
eformat::Compression m_comp
std::queue< std::pair< uint64_t, std::unique_ptr< uint32_t[]> > > m_outputQueue
queue of promises for accept() calls
virtual void close() override
Closes the Session.
virtual ~EFInterfaceEmulator()
virtual std::future< void > reject(uint64_t l0id) override
Marks the event as rejected by the High-Level Trigger.
virtual std::future< std::unique_ptr< uint32_t[]> > getNext() override
Returns a pointer to the next Event object to be processed.
std::vector< uint32_t > merge_hltresult_with_input_event(const uint32_t *hltresult, const uint32_t *input, eformat::Compression comp, unsigned int comp_level)
Merge two events: Take header and robs from hltresult event, add all res of the ROBS from input event...