ATLAS Offline Software
Loading...
Searching...
No Matches
EFInterfaceSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include "EFInterfaceSvc.h"
7#include "GaudiKernel/IIncidentSvc.h"
9#include "eformat/eformat.h"
10#include <functional>
11#include <boost/property_tree/json_parser.hpp>
12
13EFInterfaceSvc::EFInterfaceSvc(const std::string& name, ISvcLocator* svc)
14 : base_class(name, svc)
15{
16}
17
18StatusCode EFInterfaceSvc::initialize ATLAS_NOT_THREAD_SAFE ()
19{
20 ATH_MSG_DEBUG("EFInterfaceSvc initialized");
21 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", name());
22 ATH_CHECK( incSvc.retrieve() );
23 incSvc->addListener(this, AthenaInterprocess::UpdateAfterFork::type());
24 return StatusCode::SUCCESS;
25}
26
27/**************************************************************************************/
28
29void EFInterfaceSvc::handle(const Incident& incident)
30{
31 if (incident.type() == AthenaInterprocess::UpdateAfterFork::type())
32 {
33 ATH_MSG_DEBUG("Going to initialize the EFInterface");
34 std::string full_libname = "lib" + m_interface_library_name + ".so";
35 m_efdfinterface_library = boost::dll::shared_library(full_libname, boost::dll::load_mode::type::search_system_folders | boost::dll::load_mode::type::rtld_global);
36 using cEventHandler = std::unique_ptr<daq::df_ef_interface::EventHandler> (const boost::property_tree::ptree&);
37 std::function<cEventHandler> cs;
38 try {
39 cs = m_efdfinterface_library.get<cEventHandler>("createEventHandler");
40 } catch (std::exception & ex) {
41 ATH_MSG_ERROR("Cant load function createEventHandler. Are you trying to use incorrect library? - " << ex.what());
42 throw;
43 }
44 //Add Tree
45 boost::property_tree::ptree configTree = prepInterfacePTree();
46 std::ostringstream oss;
47 boost::property_tree::write_json(oss, configTree, true);
48 ATH_MSG_INFO("EFInterface configuration:\n" << oss.str());
49 try {
50 m_eventHandler = cs(configTree);
51 } catch (std::exception & ex) {
52 ATH_MSG_ERROR("Cant create EventHandler from DataSource library. Are you trying to use incorrect configuration? - " << ex.what());
53 throw;
54 }
55 // Going to open the EventHandler connection
56 try {
57 ATH_MSG_DEBUG("Opening EventHandler");
58 m_eventHandler->open();
59 }
60 catch (daq::df_ef_interface::CommunicationError & ex) {
61 ATH_MSG_ERROR("CommunicationError while opening EventHandler: " << ex.what());
62 throw;
63 }
64 catch (std::exception & ex) {
65 ATH_MSG_ERROR("Exception while opening EventHandler: " << ex.what());
66 throw;
67 }
68 catch (...) {
69 ATH_MSG_ERROR("Unknown exception while opening EventHandler");
70 throw;
71 }
72 }
73}
74
76{
77 ATH_MSG_DEBUG("EFInterfaceSvc stopped");
78 // Close the EventHandler connection
79 if (!m_eventHandler) {
80 ATH_MSG_DEBUG("EventHandler is not initialized, probably I'm in the mother process.");
81 }
82 else
83 {
84 try {
85 m_eventHandler->close();
86 }
87 catch (daq::df_ef_interface::CommunicationError & ex) {
88 ATH_MSG_ERROR("CommunicationError while closing EventHandler: " << ex.what());
89 throw;
90 }
91 catch (std::exception & ex) {
92 ATH_MSG_ERROR("Exception while closing EventHandler: " << ex.what());
93 throw;
94 }
95 catch (...) {
96 ATH_MSG_ERROR("Unknown exception while closing EventHandler");
97 throw;
98 }
99 }
100 ATH_MSG_INFO("EFInterfaceSvc Stopping. Event processing summary:");
101 ATH_MSG_INFO("Processed Events: " << m_processedEvents);
102 ATH_MSG_INFO("Accepted Events: " << m_acceptedEvents);
103 ATH_MSG_INFO("Rejected Events: " << m_rejectedEvents);
104 return StatusCode::SUCCESS;
105}
106
108{
109 ATH_MSG_DEBUG("EFInterfaceSvc finalized");
110 m_eventHandler.reset();
112 return StatusCode::SUCCESS;
113}
114
115void EFInterfaceSvc::eventDone(std::unique_ptr<uint32_t[]> rawEventPtr)
116{
117 eformat::read::FullEventFragment ev(rawEventPtr.get());
119 bool accepted = ((ev.nstream_tag() > 0)?true:false);
120 ATH_MSG_DEBUG("Number of stream tags: " << ev.nstream_tag());
121 if(accepted){
122 ATH_MSG_DEBUG("Event " << ev.lvl1_id() << " accepted");
123 auto result = m_eventHandler->accept(ev.lvl1_id(), std::move(rawEventPtr));
125 //TODO: decide if we have to check for the future result
126 }else{
127 ATH_MSG_DEBUG("Event " << ev.lvl1_id() << " rejected");
128 auto result = m_eventHandler->reject(ev.lvl1_id());
130 }
131}
132
133EFInterfaceSvc::Status EFInterfaceSvc::getNext(std::unique_ptr<uint32_t[]>& rawEventPtr)
134{
135 try{
136 //check if we have a getNext call already pending
137 std::future<std::unique_ptr<uint32_t []>> future;
138 {
139 std::lock_guard<std::mutex> lock(m_queueMutex);
140 if (m_getNextFuture.empty()){
141 ATH_MSG_DEBUG("No pending getNext call, creating a new one");
142 future = m_eventHandler->getNext();
143 } else {
144 ATH_MSG_DEBUG("Pending getNext call found, using it");
145 future = std::move(m_getNextFuture.front());
146 m_getNextFuture.pop();
147 }
148 }
149 // Wait for the future to be ready
150 auto status = future.wait_for(std::chrono::milliseconds(m_getNextTimeout));
151 if (status == std::future_status::ready) {
152 auto result = future.get();
153 rawEventPtr = std::move(result);
154 return Status::OK;
155 } else {
156 // Future is not ready, return NO_EVENT
157 ATH_MSG_DEBUG("getNext timed out, returning NO_EVENT");
158 rawEventPtr = nullptr;
159 {
160 //Add the future to the queue for later access
161 std::lock_guard<std::mutex> lock(m_queueMutex);
162 m_getNextFuture.push(std::move(future));
163 }
164 return Status::NO_EVENT;
165 }
166 }
167 catch (daq::df_ef_interface::NoMoreEvents &ex){
168 ATH_MSG_DEBUG("NoMoreEvents, returning");
169 return Status::STOP;
170 }
171 catch (daq::df_ef_interface::CommunicationError &ex){
172 ATH_MSG_DEBUG("CommunicationError received from EFInterface, returning NO_EVENT");
173 return Status::NO_EVENT;
174 }
175 catch (std::exception &ex){
176 ATH_MSG_ERROR("EFInterface: caught exception: \""<<ex.what()<<"\" throwing!");
177 throw;
178 }
179 catch(...) {
180 ATH_MSG_ERROR("EFInterface: caught very unknown exception");
181 throw;
182 }
183}
184
185boost::property_tree::ptree EFInterfaceSvc::prepInterfacePTree()
186{
187 boost::property_tree::ptree configTree;
188 // Top-level values
189 configTree.put("name", "DFEFInterfaceSvc");
190 configTree.put("stride", m_stride.value()); // NB: if this is set to 0 it will cause a seg fault!
191 configTree.put("fileOffset", m_fileOffset.value());
192 configTree.put("numEvents", m_numEvents.value());
193 configTree.put("skipEvents", m_skipEvents.value());
194 configTree.put("loopOverFiles", m_loopOverFiles.value() ? "true" : "false"); // as string
195 configTree.put("outputFileName", m_outputFileName.value());
196 // File list
197 boost::property_tree::ptree fileList;
198 for (const std::string& fname : m_files.value()) {
199 boost::property_tree::ptree fileNode;
200 fileNode.put("", fname);
201 fileList.push_back(std::make_pair("file", fileNode));
202 }
203 configTree.add_child("fileList", fileList);
204 // Run parameters
205 boost::property_tree::ptree runParams;
206 runParams.put("run_number", m_runNumber.value());
207 runParams.put("trigger_type", m_triggerType.value());
208 runParams.put("beam_type", m_beamType.value());
209 runParams.put("beam_energy", m_beamEnergy.value());
210 runParams.put("det_mask", m_detMask.value());
211 runParams.put("T0_project_tag", m_T0_project_tag.value());
212 runParams.put("stream", m_stream.value());
213 runParams.put("lumiblock", m_lumiblock.value());
214 configTree.add_child("RunParams", runParams);
215
216 return configTree;
217}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_DEBUG(x)
StatusCode EFInterfaceSvc::initialize ATLAS_NOT_THREAD_SAFE()
Install fatal handler with default options.
static const std::string & type()
Incident type.
Definition Incidents.h:49
Gaudi::Property< std::string > m_outputFileName
@ NO_EVENT
no event available
@ STOP
stop transition (no more events)
Gaudi::Property< int > m_getNextTimeout
std::queue< std::future< std::unique_ptr< uint32_t[]> > > m_getNextFuture
EFInterfaceSvc(const std::string &name, ISvcLocator *svc)
virtual StatusCode stop() override
virtual StatusCode finalize() override
Gaudi::Property< int > m_beamType
uint32_t m_acceptedEvents
Gaudi::Property< std::string > m_stream
Gaudi::Property< std::string > m_detMask
Gaudi::Property< int > m_triggerType
Gaudi::Property< int > m_numEvents
void eventDone(std::unique_ptr< uint32_t[]> rawEventPtr)
virtual Status getNext(std::unique_ptr< uint32_t[]> &rawEventPtr)
Gaudi::Property< std::string > m_interface_library_name
Gaudi::Property< bool > m_loopOverFiles
Gaudi::Property< std::vector< std::string > > m_files
Gaudi::Property< int > m_fileOffset
std::unique_ptr< daq::df_ef_interface::EventHandler > m_eventHandler
Gaudi::Property< int > m_runNumber
Gaudi::Property< std::string > m_T0_project_tag
Gaudi::Property< int > m_beamEnergy
boost::dll::shared_library m_efdfinterface_library
Library with the df_ef_interface implementation.
Gaudi::Property< int > m_lumiblock
virtual void handle(const Incident &incident) override
uint32_t m_processedEvents
Gaudi::Property< int > m_stride
std::mutex m_queueMutex
Mutex for future queue.
uint32_t m_rejectedEvents
Gaudi::Property< int > m_skipEvents
boost::property_tree::ptree prepInterfacePTree()
int ev
Definition globals.cxx:25