ATLAS Offline Software
Loading...
Searching...
No Matches
EventSelectorAthenaPoolSharedIO.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
9
12
15
16// Framework
17#include "GaudiKernel/FileIncident.h"
18#include "GaudiKernel/IIncidentSvc.h"
19#include "GaudiKernel/StatusCode.h"
21
22// Pool
25
26namespace {
28 StatusCode putEvent_ST(const IAthenaIPCTool& tool,
29 long eventNumber, const void* source,
30 size_t nbytes, unsigned int status) {
31 StatusCode sc ATLAS_THREAD_SAFE = tool.putEvent(eventNumber, source, nbytes, status);
32 return sc;
33 }
34}
35
36//________________________________________________________________________________
37EventSelectorAthenaPoolSharedIO::EventSelectorAthenaPoolSharedIO(const std::string& name, ISvcLocator* pSvcLocator) :
38 base_class(name, pSvcLocator) {
39}
40//________________________________________________________________________________
42 if (!EventSelectorAthenaPool::initialize().isSuccess()) {
43 return StatusCode::FAILURE;
44 }
45 // Get SharedMemoryTool (if configured)
46 if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.retrieve().isSuccess()) {
47 ATH_MSG_FATAL("Cannot get " << m_eventStreamingTool.typeAndName() << "");
48 return StatusCode::FAILURE;
49 } else if (m_makeStreamingToolClient.value() == -1) {
50 std::string dummyStr;
51 ATH_CHECK(m_eventStreamingTool->makeClient(m_makeStreamingToolClient.value(), dummyStr));
52 }
53 // Don't listen to the Event Processing incidents
54 if (!m_eventStreamingTool.empty()) {
55 m_incidentSvc->removeListener(this, IncidentType::BeginProcessing);
56 m_incidentSvc->removeListener(this, IncidentType::EndProcessing);
57 }
58 return StatusCode::SUCCESS;
59}
60//________________________________________________________________________________
62 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
63 return StatusCode::SUCCESS;
64 }
66}
67//________________________________________________________________________________
69 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
70 return StatusCode::SUCCESS;
71 }
73}
74
75//________________________________________________________________________________
77 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
78 delete m_endIter; m_endIter = nullptr;
79 return ::AthService::finalize();
80 }
82}
83
84//__________________________________________________________________________
86 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
87 m_guid = Guid::null();
88 m_evtCount = 0;
89 }
91}
92
93//________________________________________________________________________________
94StatusCode EventSelectorAthenaPoolSharedIO::next(IEvtSelector::Context& ctxt) const {
95 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
96 if (m_makeStreamingToolClient.value() == -1) {
97 StatusCode sc = m_eventStreamingTool->lockEvent(m_evtCount);
98 while (sc.isRecoverable()) {
99 usleep(1000);
100 sc = m_eventStreamingTool->lockEvent(m_evtCount);
101 }
102 }
103 // Increase event count
104 ++m_evtCount;
105 void* tokenStrPtr = nullptr;
106 unsigned int status = 0;
107 StatusCode sc = m_eventStreamingTool->getLockedEvent(&tokenStrPtr, status);
108 std::unique_ptr<const char[]> tokenStr{static_cast<const char*>(tokenStrPtr)};
109 if (sc.isRecoverable()) {
110 // Return end iterator
111 ctxt = *m_endIter;
112 // This is not a real failure but a Gaudi way of handling "end of job"
113 return StatusCode::FAILURE;
114 }
115 if (sc.isFailure()) {
116 ATH_MSG_FATAL("Cannot get NextEvent from AthenaSharedMemoryTool");
117 return StatusCode::FAILURE;
118 }
119 if (!eventStore()->clearStore().isSuccess()) {
120 ATH_MSG_WARNING("Cannot clear Store");
121 }
122 std::unique_ptr<AthenaAttributeList> athAttrList = std::make_unique<AthenaAttributeList>();
123 athAttrList->extend("eventRef", "string");
124 (*athAttrList)["eventRef"].data<std::string>() = tokenStr.get();
125 SG::WriteHandle<AthenaAttributeList> wh(m_attrListKey, eventStore()->name());
126 if (!wh.record(std::move(athAttrList)).isSuccess()) {
127 ATH_MSG_ERROR("Cannot record AttributeList to StoreGate " << StoreID::storeName(eventStore()->storeID()));
128 return StatusCode::FAILURE;
129 }
130 Token token;
131 token.fromString(tokenStr.get());
132 Guid guid = token.dbID();
133 if (guid != m_guid && m_processMetadata.value()) {
134 InputFileIncidentGuard::transition(m_inputFileGuard, *m_incidentSvc, name(),
135 "FID:" + guid.toString(), guid.toString(),
136 /*endFileName=*/{});
137 m_guid = guid;
138 }
139 return StatusCode::SUCCESS;
140 }
142}
143//________________________________________________________________________________
144StatusCode EventSelectorAthenaPoolSharedIO::next(IEvtSelector::Context& ctxt, int jump) const {
145 if (jump > 0) {
146 for (int i = 0; i < jump; i++) {
147 ATH_CHECK(next(ctxt));
148 }
149 return StatusCode::SUCCESS;
150 }
151 return StatusCode::FAILURE;
152}
153//________________________________________________________________________________
155 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
156 if (ds == nullptr) {
157 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
158 return StatusCode::FAILURE;
159 }
160 if (num < 0) {
161 if (ds->makeServer(num - 1).isFailure()) {
162 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to output DataStreaming server");
163 }
164 return StatusCode::SUCCESS;
165 }
166 if (ds->makeServer(num + 1).isFailure()) {
167 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to input DataStreaming server");
168 return StatusCode::FAILURE;
169 }
170 if (m_eventStreamingTool.empty()) {
171 return StatusCode::SUCCESS;
172 }
173 m_processMetadata = false;
174 ATH_MSG_DEBUG("makeServer: " << m_eventStreamingTool << " = " << num);
175 return(m_eventStreamingTool->makeServer(1, ""));
176}
177
178//________________________________________________________________________________
180 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
181 if (ds == nullptr) {
182 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
183 return StatusCode::FAILURE;
184 }
185 if (ds->makeClient(num + 1).isFailure()) {
186 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to DataStreaming client");
187 return StatusCode::FAILURE;
188 }
189 if (m_eventStreamingTool.empty()) {
190 return StatusCode::SUCCESS;
191 }
192 ATH_MSG_DEBUG("makeClient: " << m_eventStreamingTool << " = " << num);
193 std::string dummyStr;
194 return(m_eventStreamingTool->makeClient(0, dummyStr));
195}
196
197//________________________________________________________________________________
199 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
200 if (ds == nullptr) {
201 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
202 return StatusCode::FAILURE;
203 }
204 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
205 StatusCode sc = m_eventStreamingTool->lockEvent(evtnum);
206 while (sc.isRecoverable()) {
207 usleep(1000);
208 sc = m_eventStreamingTool->lockEvent(evtnum);
209 }
210// Send stop client and wait for restart
211 if (sc.isFailure()) {
212 if (ds->makeClient(0).isFailure()) {
213 return StatusCode::FAILURE;
214 }
215 sc = m_eventStreamingTool->lockEvent(evtnum);
216 while (sc.isRecoverable() || sc.isFailure()) {
217 usleep(1000);
218 sc = m_eventStreamingTool->lockEvent(evtnum);
219 }
220//FIXME
221 if (ds->makeClient(1).isFailure()) {
222 return StatusCode::FAILURE;
223 }
224 }
225 return(sc);
226 }
227 return StatusCode::FAILURE;
228}
229
230//________________________________________________________________________________
232 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
233 if (ds == nullptr) {
234 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
235 return StatusCode::FAILURE;
236 }
237 if (m_eventStreamingTool.empty()) {
238 ATH_MSG_ERROR("No AthenaSharedMemoryTool configured for readEvent()");
239 return StatusCode::FAILURE;
240 }
241 ATH_MSG_VERBOSE("Called read Event " << maxevt);
242 std::unique_ptr<IEvtSelector::Context> ctxt = std::make_unique<EventContextAthenaPool>(this);
243 for (int i = 0; i < maxevt || maxevt == -1; ++i) {
244 if (!next(*ctxt).isSuccess()) {
245 if (m_evtCount == -1) {
246 ATH_MSG_VERBOSE("Called read Event and read last event from input: " << i);
247 break;
248 }
249 ATH_MSG_ERROR("Cannot read Event " << m_evtCount - 1 << " into AthenaSharedMemoryTool");
250 return StatusCode::FAILURE;
251 } else {
252 ATH_MSG_VERBOSE("Called next, read Event " << m_evtCount - 1);
253 }
254 }
255 ctxt.reset();
256 // End of file, wait for last event to be taken
257 StatusCode sc;
258 while ( (sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
259 while (ds->readData().isSuccess()) {
260 ATH_MSG_VERBOSE("Called last readData, while marking last event in readEvent()");
261 }
262 usleep(1000);
263 }
264 if (!sc.isSuccess()) {
265 ATH_MSG_ERROR("Cannot put last Event marker to AthenaSharedMemoryTool");
266 return StatusCode::FAILURE;
267 } else {
268 sc = ds->readData();
269 while (sc.isSuccess() || sc.isRecoverable()) {
270 sc = ds->readData();
271 }
272 ATH_MSG_DEBUG("Failed last readData -> Clients are stopped, after marking last event in readEvent()");
273 }
274 return StatusCode::SUCCESS;
275}
276//__________________________________________________________________________
278 if (!m_eventStreamingTool.empty()) {
279 if (m_eventStreamingTool->isServer()) {
280 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
281 if (ds == nullptr) {
282 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
283 return StatusCode::FAILURE;
284 }
285 std::string token = m_headerIterator->eventRef().toString();
286 StatusCode sc;
287 while ( (sc = putEvent_ST(*m_eventStreamingTool,
288 m_evtCount - 1, token.c_str(),
289 token.length() + 1, 0)).isRecoverable() ) {
290 while (ds->readData().isSuccess()) {
291 ATH_MSG_VERBOSE("Called last readData, while putting next event in next()");
292 }
293 // Nothing to do right now, trigger alternative (e.g. caching) here? Currently just fast loop.
294 }
295 if (!sc.isSuccess()) {
296 ATH_MSG_ERROR("Cannot put Event " << m_evtCount - 1 << " to AthenaSharedMemoryTool");
297 return StatusCode::FAILURE;
298 }
299 } else {
300 return StatusCode::SUCCESS;
301 }
302 } else {
304 }
305 return StatusCode::SUCCESS;
306}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the EventContextAthenaPool class.
This file contains the class definition for the EventSelectorAthenaPoolSharedIO class.
static Double_t sc
This file contains the class definition for the Token class (migrated from POOL).
#define ATLAS_THREAD_SAFE
virtual StatusCode next(IEvtSelector::Context &ctxt) const override
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first iteration automatically.
virtual StatusCode makeServer(int num) override
Make this a server.
virtual StatusCode makeClient(int num) override
Make this a client.
EventSelectorAthenaPoolSharedIO(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
virtual StatusCode initialize() override
Required of all Gaudi Services.
virtual StatusCode share(int evtnum) override
Request to share a given event number.
virtual StatusCode readEvent(int maxevt) override
Read the next maxevt events.
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
ToolHandle< IAthenaIPCTool > m_eventStreamingTool
virtual StatusCode start() override
virtual StatusCode initialize() override
Required of all Gaudi Services.
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
virtual StatusCode stop() override
virtual StatusCode next(IEvtSelector::Context &ctxt) const override
virtual StatusCode finalize() override
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
This class provides a encapsulation of a GUID/UUID/CLSID/IID data structure (128 bit number).
Definition Guid.h:25
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
Abstract interface for sharing data.
Definition IDataShare.h:24
static void transition(std::optional< InputFileIncidentGuard > &guard, IIncidentSvc &incSvc, std::string_view source, std::string_view beginFileName, std::string_view guid, std::string_view endFileName={}, std::string_view beginType=IncidentType::BeginInputFile, std::string_view endType=IncidentType::EndInputFile)
Replace the guard in an optional, with strict End-before-Begin ordering.
static const std::string & storeName(const StoreID::type &s)
Definition StoreID.cxx:77
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition Token.h:21
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition Token.cxx:147
const Guid & dbID() const
Access database identifier.
Definition Token.h:64
::StatusCode StatusCode
StatusCode definition for legacy code.