ATLAS Offline Software
Loading...
Searching...
No Matches
EFInterfaceEmulator.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include <boost/property_tree/ptree.hpp>
7#include "eformat_utils.h"
8#include <cstdlib>
9
10#ifdef __clang__
11#pragma clang diagnostic push
12#pragma clang diagnostic ignored "-Wreturn-type-c-linkage"
13#endif
14
15extern "C" std::unique_ptr<daq::df_ef_interface::EventHandler> createEventHandler(const boost::property_tree::ptree &conf){
16 std::unique_ptr<daq::df_ef_interface::EventHandler> s(new DFEF::EFInterfaceEmulator(conf));
17 return s;
18}
19
20#ifdef __clang__
21#pragma clang diagnostic pop
22#endif
23
24DFEF::EFInterfaceEmulator::EFInterfaceEmulator(const boost::property_tree::ptree &cargs)
25{
26 ERS_DEBUG(2, "Initializing EFInterfaceEmulator");
27 try {
28 m_name = cargs.get<std::string>("name");
29 }
30 catch (const boost::property_tree::ptree_error &e) {
31 throw std::runtime_error("Failed to get 'name' from configuration: " + std::string(e.what()));
32 }
33 {
34 std::lock_guard<std::mutex> lock(m_RWMutex);
35 m_file_rw = std::make_unique<FileReaderWriter>(cargs);
36 }
37 m_comp = eformat::ZLIB;
38 m_compLevel = 2;
39}
40
42 //Starting input and output threads
43 ERS_DEBUG(2, "Starting input and output threads");
44 m_inputThread = std::make_unique<HLT::LoopThread>([this]{return inputThreadCallback();},-1);
45 m_outputThread = std::make_unique<HLT::LoopThread>([this]{return outputThreadCallback();}, -1);
46 m_inputThread->start();
47 m_outputThread->start();
48 ERS_DEBUG(2, "Input and output threads started");
49}
50
52 ERS_DEBUG(2, "Closing EFInterfaceEmulator");
53 m_inputThread->stop();
54 m_outputThread->stop();
55 ERS_DEBUG(2, "Threads stopped, waiting for them to finish");
56 m_inputThread->wait();
57 m_outputThread->wait();
58 ERS_DEBUG(2, "EFInterfaceEmulator closed and threads stopped");
59 {
60 std::lock_guard<std::mutex> lock(m_RWMutex);
61 m_file_rw.reset();
62 }
63}
64
65std::future<std::unique_ptr<uint32_t[]>> DFEF::EFInterfaceEmulator::getNext() {
66 ERS_DEBUG(2, "EFInterfaceEmulator getNext called");
67 std::promise<std::unique_ptr<uint32_t[]>> p;
68 auto fut = p.get_future();
69 {
70 std::lock_guard<std::mutex> lock(m_iQMutex);
71 m_inputQueue.push(std::move(p));
72 }
73 ERS_DEBUG(2, "Notifying input thread");
74 m_inputThread->cond().notify_one();
75 return fut;
76}
77
78std::future<void> DFEF::EFInterfaceEmulator::accept(uint64_t l0id, std::unique_ptr<uint32_t[]> hltr) {
79 // TODO: here I have to keep track of the events that I read. When event done is received, I have to assemble the full event and write it to the output.
80 std::promise<void> p;
81 // Create a pair, push the pair in the inputThreadQueue, and notify the input thread
82 std::pair<uint64_t, std::unique_ptr<uint32_t[]>> eventPair(l0id, std::move(hltr));
83 {
84 std::lock_guard<std::mutex> lock(m_oQMutex);
85 m_outputQueue.push(std::move(eventPair));
86 }
87 m_outputThread->cond().notify_one();
88 p.set_value();
89 return p.get_future();
90}
91
92std::future<void> DFEF::EFInterfaceEmulator::reject(uint64_t l0id){
93 ERS_DEBUG(2, "Rejecting event with L1ID: " << l0id);
94 ERS_DEBUG(2, "m_events size before erase: " << m_events.size());
95 {
96 std::lock_guard<std::mutex> lock(m_mMutex);
97 m_events.erase(l0id);
98 }
99 std::promise<void> p;
100 p.set_value();
101 return p.get_future();
102}
103
105
107 ERS_DEBUG(2, "Input thread started");
108 while (true) {
109 std::promise<std::unique_ptr<uint32_t[]>> p;
110 {
111 std::lock_guard<std::mutex> lock(m_iQMutex);
112 ERS_DEBUG(2, "m_inputQueue size: " << m_inputQueue.size());
113 if (m_inputQueue.empty()) {
114 ERS_DEBUG(2, "Input queue is empty, breaking...");
115 break;
116 }
117 p = std::move(m_inputQueue.front());
118 m_inputQueue.pop();
119 }
120 // Get the event from file, create a copy of it to store in the map, and return the original pointer to the promise
121 ERS_DEBUG(2, "Reading next event from file");
122 std::unique_ptr<uint32_t[]> event = nullptr;
123 {
124 std::lock_guard<std::mutex> lock(m_RWMutex);
125 try {
126 //FIXME: Sleep for random duration, only for debugging
127 //auto sleepDuration = (std::rand() % 100) * 10;
128 //ERS_DEBUG(2, "Sleeping for " << sleepDuration << " milliseconds");
129 //std::this_thread::sleep_for(std::chrono::milliseconds(sleepDuration));
130 //coverity[SLEEP]
131 event = m_file_rw->getNextEvent();
132 } catch (const DFEF::NoMoreEventsInFile &ex) {
133 ERS_DEBUG(2, "No more events in file");
134 p.set_exception(std::make_exception_ptr(daq::df_ef_interface::NoMoreEvents("No more events in file")));
135 continue;
136 } catch (DFEF::ProblemReadingFromFile &ex) {
137 ERS_DEBUG(2, "Problem reading from file: " << ex.what());
138 p.set_exception(std::make_exception_ptr(daq::df_ef_interface::CommunicationError("Problem reading from file")));
139 continue;
140 }
141 }
142 ERS_DEBUG(2, "Event read");
143 eformat::read::FullEventFragment fullEvent(event.get());
144 uint32_t nwords = event[1];
145 ERS_DEBUG(2, "Copying full event with L0ID: " << fullEvent.lvl1_id() << ", size: " << nwords * sizeof(uint32_t) << " bytes");
146 std::unique_ptr<uint32_t[]> eventCopy(new uint32_t[nwords]);
147 std::memcpy(eventCopy.get(), event.get(), nwords * sizeof(uint32_t));
148 {
149 std::lock_guard<std::mutex> lock(m_mMutex);
150 auto& bucket = m_events[fullEvent.lvl1_id()];
151 bucket.push_back(std::move(eventCopy));
152 }
153 p.set_value(std::move(event));
154 }
155}
156
158 ERS_DEBUG(2, "Output thread started");
159 while (true) {
160 std::pair<uint64_t, std::unique_ptr<uint32_t[]>> eventPair;
161 {
162 std::lock_guard<std::mutex> lock(m_oQMutex);
163 ERS_DEBUG(2, "m_outputQueue size: " << m_outputQueue.size());
164 if (m_outputQueue.empty())
165 {
166 ERS_DEBUG(2, "Output queue is empty, breaking...");
167 break;
168 }
169 eventPair = std::move(m_outputQueue.front());
170 m_outputQueue.pop();
171 }
172 std::unique_ptr<uint32_t[]> fullEvent = nullptr;
173 {
174 std::lock_guard<std::mutex> lock(m_mMutex);
175 auto it = m_events.find(eventPair.first);
176 if (it != m_events.end() && !it->second.empty()) {
177 fullEvent = std::move(it->second.back());
178 it->second.pop_back();
179 if (it->second.empty()) {
180 m_events.erase(it);
181 }
182 } else {
183 throw std::runtime_error("Missing the FullEvent copy for event accepted with L0ID: " + std::to_string(eventPair.first));
184 }
185 ERS_DEBUG(2, "m_events size after erase: " << m_events.size());
186 }
187 //FIXME Remove this, just for debugging
188 std::unique_ptr<uint32_t[]> hltResult = std::move(eventPair.second);
189 uint32_t nwords = hltResult[1];
190 eformat::read::FullEventFragment fullEventFragment(fullEvent.get());
191 ERS_DEBUG(2, "Processing HLTResult with L0ID: " << fullEventFragment.lvl1_id() << ", size: " << nwords * 4 << " bytes");
192 // Merge HLTResult
193 auto finalEvent = DFEF::merge_hltresult_with_input_event(hltResult.get(), fullEvent.get(), m_comp, m_compLevel);
194 auto finalSize = finalEvent.size();
195 auto sizeInBytes = finalSize*4;
196 {
197 ERS_DEBUG(2, "Writing fullEventFragment with L0ID: " << eventPair.first << " of size: " << sizeInBytes << " bytes");
198 std::lock_guard<std::mutex> lock(m_RWMutex);
199 m_file_rw->writeEvent(sizeInBytes, finalEvent.data());
200 }
201 ERS_DEBUG(2, "Event with L0ID: " << eventPair.first << " written to file");
202 }
203 ERS_DEBUG(2, "Output thread finished processing.");
204}
std::unique_ptr< daq::df_ef_interface::EventHandler > createEventHandler(const boost::property_tree::ptree &conf)
virtual void lock()=0
Interface to allow an object to lock itself when made const in SG.
std::unordered_map< uint64_t, std::vector< std::unique_ptr< uint32_t[]> > > m_events
Map of events read from the input (stores duplicates per L1ID).
EFInterfaceEmulator(const boost::property_tree::ptree &args)
std::unique_ptr< HLT::LoopThread > m_outputThread
Output handling thread (triggers post-processing of finished events).
std::mutex m_iQMutex
queue mutex
unsigned int m_compLevel
Compression type of built event.
virtual std::future< void > accept(uint64_t l0id, std::unique_ptr< uint32_t[]> hltr) override
Marks the event as accepted by the High-Level Trigger.
void inputThreadCallback()
Compression level of built event.
std::queue< std::promise< std::unique_ptr< uint32_t[]> > > m_inputQueue
queue of promises for getNext() calls
void outputThreadCallback()
The method executed by the output handling thread.
std::unique_ptr< HLT::LoopThread > m_inputThread
Input handling thread (triggers reading new events).
virtual void open() override
Opens the Session.
std::mutex m_oQMutex
queue mutex
std::mutex m_mMutex
Map mutex.
std::unique_ptr< FileReaderWriter > m_file_rw
std::mutex m_RWMutex
ReadWrite mutex for the file reader/writer.
eformat::Compression m_comp
std::queue< std::pair< uint64_t, std::unique_ptr< uint32_t[]> > > m_outputQueue
queue of promises for accept() calls
virtual void close() override
Closes the Session.
virtual std::future< void > reject(uint64_t l0id) override
Marks the event as rejected by the High-Level Trigger.
virtual std::future< std::unique_ptr< uint32_t[]> > getNext() override
Returns a pointer to the next Event object to be processed.
std::vector< uint32_t > merge_hltresult_with_input_event(const uint32_t *hltresult, const uint32_t *input, eformat::Compression comp, unsigned int comp_level)
Merge two events: Take header and robs from hltresult event, add all res of the ROBS from input event...