5#include <boost/property_tree/ptree.hpp>
11#pragma clang diagnostic push
12#pragma clang diagnostic ignored "-Wreturn-type-c-linkage"
15extern "C" std::unique_ptr<daq::df_ef_interface::EventHandler>
createEventHandler(
const boost::property_tree::ptree &conf){
21#pragma clang diagnostic pop
26 ERS_DEBUG(2,
"Initializing EFInterfaceEmulator");
28 m_name = cargs.get<std::string>(
"name");
30 catch (
const boost::property_tree::ptree_error &e) {
31 throw std::runtime_error(
"Failed to get 'name' from configuration: " + std::string(e.what()));
35 m_file_rw = std::make_unique<FileReaderWriter>(cargs);
43 ERS_DEBUG(2,
"Starting input and output threads");
48 ERS_DEBUG(2,
"Input and output threads started");
52 ERS_DEBUG(2,
"Closing EFInterfaceEmulator");
55 ERS_DEBUG(2,
"Threads stopped, waiting for them to finish");
58 ERS_DEBUG(2,
"EFInterfaceEmulator closed and threads stopped");
66 ERS_DEBUG(2,
"EFInterfaceEmulator getNext called");
67 std::promise<std::unique_ptr<uint32_t[]>> p;
68 auto fut = p.get_future();
73 ERS_DEBUG(2,
"Notifying input thread");
82 std::pair<uint64_t, std::unique_ptr<uint32_t[]>> eventPair(l0id, std::move(hltr));
89 return p.get_future();
93 ERS_DEBUG(2,
"Rejecting event with L1ID: " << l0id);
94 ERS_DEBUG(2,
"m_events size before erase: " <<
m_events.size());
101 return p.get_future();
107 ERS_DEBUG(2,
"Input thread started");
109 std::promise<std::unique_ptr<uint32_t[]>> p;
112 ERS_DEBUG(2,
"m_inputQueue size: " <<
m_inputQueue.size());
114 ERS_DEBUG(2,
"Input queue is empty, breaking...");
121 ERS_DEBUG(2,
"Reading next event from file");
122 std::unique_ptr<uint32_t[]>
event =
nullptr;
133 ERS_DEBUG(2,
"No more events in file");
134 p.set_exception(std::make_exception_ptr(daq::df_ef_interface::NoMoreEvents(
"No more events in file")));
137 ERS_DEBUG(2,
"Problem reading from file: " << ex.what());
138 p.set_exception(std::make_exception_ptr(daq::df_ef_interface::CommunicationError(
"Problem reading from file")));
142 ERS_DEBUG(2,
"Event read");
143 eformat::read::FullEventFragment fullEvent(event.get());
144 uint32_t nwords =
event[1];
145 ERS_DEBUG(2,
"Copying full event with L0ID: " << fullEvent.lvl1_id() <<
", size: " << nwords *
sizeof(uint32_t) <<
" bytes");
146 std::unique_ptr<uint32_t[]> eventCopy(
new uint32_t[nwords]);
147 std::memcpy(eventCopy.get(), event.get(), nwords *
sizeof(uint32_t));
150 auto& bucket =
m_events[fullEvent.lvl1_id()];
151 bucket.push_back(std::move(eventCopy));
153 p.set_value(std::move(event));
158 ERS_DEBUG(2,
"Output thread started");
160 std::pair<uint64_t, std::unique_ptr<uint32_t[]>> eventPair;
163 ERS_DEBUG(2,
"m_outputQueue size: " <<
m_outputQueue.size());
166 ERS_DEBUG(2,
"Output queue is empty, breaking...");
172 std::unique_ptr<uint32_t[]> fullEvent =
nullptr;
175 auto it =
m_events.find(eventPair.first);
176 if (it !=
m_events.end() && !it->second.empty()) {
177 fullEvent = std::move(it->second.back());
178 it->second.pop_back();
179 if (it->second.empty()) {
183 throw std::runtime_error(
"Missing the FullEvent copy for event accepted with L0ID: " + std::to_string(eventPair.first));
185 ERS_DEBUG(2,
"m_events size after erase: " <<
m_events.size());
188 std::unique_ptr<uint32_t[]> hltResult = std::move(eventPair.second);
189 uint32_t nwords = hltResult[1];
190 eformat::read::FullEventFragment fullEventFragment(fullEvent.get());
191 ERS_DEBUG(2,
"Processing HLTResult with L0ID: " << fullEventFragment.lvl1_id() <<
", size: " << nwords * 4 <<
" bytes");
194 auto finalSize = finalEvent.size();
195 auto sizeInBytes = finalSize*4;
197 ERS_DEBUG(2,
"Writing fullEventFragment with L0ID: " << eventPair.first <<
" of size: " << sizeInBytes <<
" bytes");
199 m_file_rw->writeEvent(sizeInBytes, finalEvent.data());
201 ERS_DEBUG(2,
"Event with L0ID: " << eventPair.first <<
" written to file");
203 ERS_DEBUG(2,
"Output thread finished processing.");
std::unique_ptr< daq::df_ef_interface::EventHandler > createEventHandler(const boost::property_tree::ptree &conf)
virtual void lock()=0
Interface to allow an object to lock itself when made const in SG.
std::unordered_map< uint64_t, std::vector< std::unique_ptr< uint32_t[]> > > m_events
Map of events read from the input (stores duplicates per L1ID).
EFInterfaceEmulator(const boost::property_tree::ptree &args)
std::unique_ptr< HLT::LoopThread > m_outputThread
Output handling thread (triggers post-processing of finished events).
std::mutex m_iQMutex
queue mutex
unsigned int m_compLevel
Compression type of built event.
virtual std::future< void > accept(uint64_t l0id, std::unique_ptr< uint32_t[]> hltr) override
Marks the event as accepted by the High-Level Trigger.
void inputThreadCallback()
Compression level of built event.
std::queue< std::promise< std::unique_ptr< uint32_t[]> > > m_inputQueue
queue of promises for getNext() calls
void outputThreadCallback()
The method executed by the output handling thread.
std::unique_ptr< HLT::LoopThread > m_inputThread
Input handling thread (triggers reading new events).
virtual void open() override
Opens the Session.
std::mutex m_oQMutex
queue mutex
std::mutex m_mMutex
Map mutex.
std::unique_ptr< FileReaderWriter > m_file_rw
std::mutex m_RWMutex
ReadWrite mutex for the file reader/writer.
eformat::Compression m_comp
std::queue< std::pair< uint64_t, std::unique_ptr< uint32_t[]> > > m_outputQueue
queue of promises for accept() calls
virtual void close() override
Closes the Session.
virtual ~EFInterfaceEmulator()
virtual std::future< void > reject(uint64_t l0id) override
Marks the event as rejected by the High-Level Trigger.
virtual std::future< std::unique_ptr< uint32_t[]> > getNext() override
Returns a pointer to the next Event object to be processed.
std::vector< uint32_t > merge_hltresult_with_input_event(const uint32_t *hltresult, const uint32_t *input, eformat::Compression comp, unsigned int comp_level)
Merge two events: Take header and robs from hltresult event, add all res of the ROBS from input event...