ATLAS Offline Software
Loading...
Searching...
No Matches
EFInterfaceEmulator.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include <boost/property_tree/ptree.hpp>
7#include "eformat_utils.h"
8#include <cstdlib>
9
10extern "C" std::unique_ptr<daq::df_ef_interface::EventHandler> createEventHandler(const boost::property_tree::ptree &conf){
11 std::unique_ptr<daq::df_ef_interface::EventHandler> s(new DFEF::EFInterfaceEmulator(conf));
12 return s;
13}
14
15DFEF::EFInterfaceEmulator::EFInterfaceEmulator(const boost::property_tree::ptree &cargs)
16{
17 ERS_DEBUG(2, "Initializing EFInterfaceEmulator");
18 try {
19 m_name = cargs.get<std::string>("name");
20 }
21 catch (const boost::property_tree::ptree_error &e) {
22 throw std::runtime_error("Failed to get 'name' from configuration: " + std::string(e.what()));
23 }
24 {
25 std::lock_guard<std::mutex> lock(m_RWMutex);
26 m_file_rw = std::make_unique<FileReaderWriter>(cargs);
27 }
28 // TODO: read ptree to get the compression type and level
29 m_comp = eformat::UNCOMPRESSED;
30 m_compLevel = 0;
31}
32
34 //Starting input and output threads
35 ERS_DEBUG(2, "Starting input and output threads");
36 m_inputThread = std::make_unique<HLT::LoopThread>([this]{return inputThreadCallback();},-1);
37 m_outputThread = std::make_unique<HLT::LoopThread>([this]{return outputThreadCallback();}, -1);
38 m_inputThread->start();
39 m_outputThread->start();
40 ERS_DEBUG(2, "Input and output threads started");
41}
42
44 ERS_DEBUG(2, "Closing EFInterfaceEmulator");
45 m_inputThread->stop();
46 m_outputThread->stop();
47 ERS_DEBUG(2, "Threads stopped, waiting for them to finish");
48 m_inputThread->wait();
49 m_outputThread->wait();
50 ERS_DEBUG(2, "EFInterfaceEmulator closed and threads stopped");
51 {
52 std::lock_guard<std::mutex> lock(m_RWMutex);
53 m_file_rw.reset();
54 }
55}
56
57std::future<std::unique_ptr<uint32_t[]>> DFEF::EFInterfaceEmulator::getNext() {
58 ERS_DEBUG(2, "EFInterfaceEmulator getNext called");
59 std::promise<std::unique_ptr<uint32_t[]>> p;
60 auto fut = p.get_future();
61 {
62 std::lock_guard<std::mutex> lock(m_iQMutex);
63 m_inputQueue.push(std::move(p));
64 }
65 ERS_DEBUG(2, "Notifying input thread");
66 m_inputThread->cond().notify_one();
67 return fut;
68}
69
70std::future<void> DFEF::EFInterfaceEmulator::accept(uint64_t l0id, std::unique_ptr<uint32_t[]> hltr) {
71 // TODO: here I have to keep track of the events that I read. When event done is received, I have to assemble the full event and write it to the output.
72 std::promise<void> p;
73 // Create a pair, push the pair in the inputThreadQueue, and notify the input thread
74 std::pair<uint64_t, std::unique_ptr<uint32_t[]>> eventPair(l0id, std::move(hltr));
75 {
76 std::lock_guard<std::mutex> lock(m_oQMutex);
77 m_outputQueue.push(std::move(eventPair));
78 }
79 m_outputThread->cond().notify_one();
80 p.set_value();
81 return p.get_future();
82}
83
84std::future<void> DFEF::EFInterfaceEmulator::reject(uint64_t l0id){
85 ERS_DEBUG(2, "Rejecting event with L1ID: " << l0id);
86 ERS_DEBUG(2, "m_events size before erase: " << m_events.size());
87 {
88 std::lock_guard<std::mutex> lock(m_mMutex);
89 m_events.erase(l0id);
90 }
91 std::promise<void> p;
92 p.set_value();
93 return p.get_future();
94}
95
97
99 ERS_DEBUG(2, "Input thread started");
100 while (true) {
101 std::promise<std::unique_ptr<uint32_t[]>> p;
102 {
103 std::lock_guard<std::mutex> lock(m_iQMutex);
104 ERS_DEBUG(2, "m_inputQueue size: " << m_inputQueue.size());
105 if (m_inputQueue.empty()) {
106 ERS_DEBUG(2, "Input queue is empty, breaking...");
107 break;
108 }
109 p = std::move(m_inputQueue.front());
110 m_inputQueue.pop();
111 }
112 // Get the event from file, create a copy of it to store in the map, and return the original pointer to the promise
113 ERS_DEBUG(2, "Reading next event from file");
114 std::unique_ptr<uint32_t[]> event = nullptr;
115 {
116 std::lock_guard<std::mutex> lock(m_RWMutex);
117 try {
118 //FIXME: Sleep for random duration, only for debugging
119 //auto sleepDuration = (std::rand() % 100) * 10;
120 //ERS_DEBUG(2, "Sleeping for " << sleepDuration << " milliseconds");
121 //std::this_thread::sleep_for(std::chrono::milliseconds(sleepDuration));
122 //coverity[SLEEP]
123 event = m_file_rw->getNextEvent();
124 } catch (const DFEF::NoMoreEventsInFile &ex) {
125 ERS_DEBUG(2, "No more events in file");
126 p.set_exception(std::make_exception_ptr(daq::df_ef_interface::NoMoreEvents("No more events in file")));
127 continue;
128 } catch (DFEF::ProblemReadingFromFile &ex) {
129 ERS_DEBUG(2, "Problem reading from file: " << ex.what());
130 p.set_exception(std::make_exception_ptr(daq::df_ef_interface::CommunicationError("Problem reading from file")));
131 continue;
132 }
133 }
134 ERS_DEBUG(2, "Event read");
135 eformat::read::FullEventFragment fullEvent(event.get());
136 uint32_t nwords = event[1];
137 ERS_DEBUG(2, "Copying full event with L0ID: " << fullEvent.lvl1_id() << ", size: " << nwords * sizeof(uint32_t) << " bytes");
138 std::unique_ptr<uint32_t[]> eventCopy(new uint32_t[nwords]);
139 std::memcpy(eventCopy.get(), event.get(), nwords * sizeof(uint32_t));
140 {
141 std::lock_guard<std::mutex> lock(m_mMutex);
142 auto& bucket = m_events[fullEvent.lvl1_id()];
143 bucket.push_back(std::move(eventCopy));
144 }
145 p.set_value(std::move(event));
146 }
147}
148
150 ERS_DEBUG(2, "Output thread started");
151 while (true) {
152 std::pair<uint64_t, std::unique_ptr<uint32_t[]>> eventPair;
153 {
154 std::lock_guard<std::mutex> lock(m_oQMutex);
155 ERS_DEBUG(2, "m_outputQueue size: " << m_outputQueue.size());
156 if (m_outputQueue.empty())
157 {
158 ERS_DEBUG(2, "Output queue is empty, breaking...");
159 break;
160 }
161 eventPair = std::move(m_outputQueue.front());
162 m_outputQueue.pop();
163 }
164 std::unique_ptr<uint32_t[]> fullEvent = nullptr;
165 {
166 std::lock_guard<std::mutex> lock(m_mMutex);
167 auto it = m_events.find(eventPair.first);
168 if (it != m_events.end() && !it->second.empty()) {
169 fullEvent = std::move(it->second.back());
170 it->second.pop_back();
171 if (it->second.empty()) {
172 m_events.erase(it);
173 }
174 } else {
175 throw std::runtime_error("Missing the FullEvent copy for event accepted with L0ID: " + std::to_string(eventPair.first));
176 }
177 }
178 //FIXME Remove this, just for debugging
179 std::unique_ptr<uint32_t[]> hltResult = std::move(eventPair.second);
180 uint32_t nwords = hltResult[1];
181 eformat::read::FullEventFragment fullEventFragment(fullEvent.get());
182 ERS_DEBUG(2, "Processing HLTResult with L0ID: " << fullEventFragment.lvl1_id() << ", size: " << nwords * 4 << " bytes");
183 // Merge HLTResult
184 auto finalEvent = DFEF::merge_hltresult_with_input_event(hltResult.get(), fullEvent.get(), m_comp, m_compLevel);
185 auto finalSize = finalEvent.size();
186 auto sizeInBytes = finalSize*4;
187 {
188 ERS_DEBUG(2, "Writing fullEventFragment with L0ID: " << eventPair.first << " of size: " << sizeInBytes << " bytes");
189 std::lock_guard<std::mutex> lock(m_RWMutex);
190 m_file_rw->writeEvent(sizeInBytes, finalEvent.data());
191 }
192 ERS_DEBUG(2, "Event with L0ID: " << eventPair.first << " written to file");
193 ERS_DEBUG(2, "m_events size after erase: " << m_events.size());
194 }
195 ERS_DEBUG(2, "Output thread finished processing.");
196}
std::unique_ptr< daq::df_ef_interface::EventHandler > createEventHandler(const boost::property_tree::ptree &conf)
std::unordered_map< uint64_t, std::vector< std::unique_ptr< uint32_t[]> > > m_events
Map of events read from the input (stores duplicates per L1ID)
EFInterfaceEmulator(const boost::property_tree::ptree &args)
std::unique_ptr< HLT::LoopThread > m_outputThread
Output handling thread (triggers post-processing of finished events)
std::mutex m_iQMutex
queue mutex
unsigned int m_compLevel
Compression type of built event.
virtual std::future< void > accept(uint64_t l0id, std::unique_ptr< uint32_t[]> hltr) override
Marks the event as accepted by the High-Level Trigger.
void inputThreadCallback()
Compression level of built event.
std::queue< std::promise< std::unique_ptr< uint32_t[]> > > m_inputQueue
queue of promises for getNext() calls
void outputThreadCallback()
The method executed by the output handling thread.
std::unique_ptr< HLT::LoopThread > m_inputThread
Input handling thread (triggers reading new events)
virtual void open() override
Opens the Session.
std::mutex m_oQMutex
queue mutex
std::mutex m_mMutex
Map mutex.
std::unique_ptr< FileReaderWriter > m_file_rw
std::mutex m_RWMutex
ReadWrite mutex for the file reader/writer.
eformat::Compression m_comp
std::queue< std::pair< uint64_t, std::unique_ptr< uint32_t[]> > > m_outputQueue
queue of promises for accept() calls
virtual void close() override
Closes the Session.
virtual std::future< void > reject(uint64_t l0id) override
Marks the event as rejected by the High-Level Trigger.
virtual std::future< std::unique_ptr< uint32_t[]> > getNext() override
Returns a pointer to the next Event object to be processed.
std::vector< uint32_t > merge_hltresult_with_input_event(const uint32_t *hltresult, const uint32_t *input, eformat::Compression comp, unsigned int comp_level)
Merge two events: Take header and robs from hltresult event, add all res of the ROBS from input event...