ATLAS Offline Software
Loading...
Searching...
No Matches
EventSelectorAthenaPoolSharedIO Class Reference

This class is the EventSelector for event data. More...

#include <EventSelectorAthenaPoolSharedIO.h>

Inheritance diagram for EventSelectorAthenaPoolSharedIO:
Collaboration diagram for EventSelectorAthenaPoolSharedIO:

Public Member Functions

 EventSelectorAthenaPoolSharedIO (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor.
virtual ~EventSelectorAthenaPoolSharedIO ()=default
 Destructor.
virtual StatusCode initialize () override
 Required of all Gaudi Services.
virtual StatusCode start () override
virtual StatusCode stop () override
virtual StatusCode finalize () override
virtual StatusCode next (IEvtSelector::Context &ctxt) const override
virtual StatusCode next (IEvtSelector::Context &ctxt, int jump) const override
virtual StatusCode makeServer (int num) override
 Make this a server.
virtual StatusCode makeClient (int num) override
 Make this a client.
virtual StatusCode share (int evtnum) override
 Request to share a given event number.
virtual StatusCode readEvent (int maxevt) override
 Read the next maxevt events.
virtual StatusCode io_reinit () override
 Callback method to reinitialize the internal state of the component for I/O purposes (e.g. upon fork(2)).

Protected Member Functions

virtual StatusCode recordAttributeList () const override
 Record AttributeList in StoreGate.

Private Attributes

ToolHandle< IAthenaIPCToolm_eventStreamingTool {this, "SharedMemoryTool", "", ""}
Gaudi::Property< int > m_makeStreamingToolClient {this, "MakeStreamingToolClient", 0}
 Make this instance a Streaming Client during first iteration automatically.

Detailed Description

This class is the EventSelector for event data.

Definition at line 21 of file EventSelectorAthenaPoolSharedIO.h.

Constructor & Destructor Documentation

◆ EventSelectorAthenaPoolSharedIO()

EventSelectorAthenaPoolSharedIO::EventSelectorAthenaPoolSharedIO ( const std::string & name,
ISvcLocator * pSvcLocator )

Standard Service Constructor.

Definition at line 37 of file EventSelectorAthenaPoolSharedIO.cxx.

37 :
38 base_class(name, pSvcLocator) {
39}

◆ ~EventSelectorAthenaPoolSharedIO()

virtual EventSelectorAthenaPoolSharedIO::~EventSelectorAthenaPoolSharedIO ( )
virtualdefault

Destructor.

Member Function Documentation

◆ finalize()

StatusCode EventSelectorAthenaPoolSharedIO::finalize ( )
overridevirtual

Definition at line 76 of file EventSelectorAthenaPoolSharedIO.cxx.

76 {
77 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
78 delete m_endIter; m_endIter = nullptr;
79 return ::AthService::finalize();
80 }
82}
ToolHandle< IAthenaIPCTool > m_eventStreamingTool
virtual StatusCode finalize() override

◆ initialize()

StatusCode EventSelectorAthenaPoolSharedIO::initialize ( )
overridevirtual

Required of all Gaudi Services.

Definition at line 41 of file EventSelectorAthenaPoolSharedIO.cxx.

41 {
42 if (!EventSelectorAthenaPool::initialize().isSuccess()) {
43 return StatusCode::FAILURE;
44 }
45 // Get SharedMemoryTool (if configured)
46 if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.retrieve().isSuccess()) {
47 ATH_MSG_FATAL("Cannot get " << m_eventStreamingTool.typeAndName() << "");
48 return StatusCode::FAILURE;
49 } else if (m_makeStreamingToolClient.value() == -1) {
50 std::string dummyStr;
51 ATH_CHECK(m_eventStreamingTool->makeClient(m_makeStreamingToolClient.value(), dummyStr));
52 }
53 // Don't listen to the Event Processing incidents
54 if (!m_eventStreamingTool.empty()) {
55 m_incidentSvc->removeListener(this, IncidentType::BeginProcessing);
56 m_incidentSvc->removeListener(this, IncidentType::EndProcessing);
57 }
58 return StatusCode::SUCCESS;
59}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_FATAL(x)
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first iteration automatically.
virtual StatusCode initialize() override
Required of all Gaudi Services.

◆ io_reinit()

StatusCode EventSelectorAthenaPoolSharedIO::io_reinit ( )
overridevirtual

Callback method to reinitialize the internal state of the component for I/O purposes (e.g. upon fork(2)).

Definition at line 85 of file EventSelectorAthenaPoolSharedIO.cxx.

85 {
86 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
87 m_guid = Guid::null();
88 m_evtCount = 0;
89 }
91}
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14

◆ makeClient()

StatusCode EventSelectorAthenaPoolSharedIO::makeClient ( int num)
overridevirtual

Make this a client.

Definition at line 179 of file EventSelectorAthenaPoolSharedIO.cxx.

179 {
180 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
181 if (ds == nullptr) {
182 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
183 return StatusCode::FAILURE;
184 }
185 if (ds->makeClient(num + 1).isFailure()) {
186 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to DataStreaming client");
187 return StatusCode::FAILURE;
188 }
189 if (m_eventStreamingTool.empty()) {
190 return StatusCode::SUCCESS;
191 }
192 ATH_MSG_DEBUG("makeClient: " << m_eventStreamingTool << " = " << num);
193 std::string dummyStr;
194 return(m_eventStreamingTool->makeClient(0, dummyStr));
195}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_DEBUG(x)

◆ makeServer()

StatusCode EventSelectorAthenaPoolSharedIO::makeServer ( int num)
overridevirtual

Make this a server.

Definition at line 154 of file EventSelectorAthenaPoolSharedIO.cxx.

154 {
155 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
156 if (ds == nullptr) {
157 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
158 return StatusCode::FAILURE;
159 }
160 if (num < 0) {
161 if (ds->makeServer(num - 1).isFailure()) {
162 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to output DataStreaming server");
163 }
164 return StatusCode::SUCCESS;
165 }
166 if (ds->makeServer(num + 1).isFailure()) {
167 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to input DataStreaming server");
168 return StatusCode::FAILURE;
169 }
170 if (m_eventStreamingTool.empty()) {
171 return StatusCode::SUCCESS;
172 }
173 m_processMetadata = false;
174 ATH_MSG_DEBUG("makeServer: " << m_eventStreamingTool << " = " << num);
175 return(m_eventStreamingTool->makeServer(1, ""));
176}

◆ next() [1/2]

StatusCode EventSelectorAthenaPoolSharedIO::next ( IEvtSelector::Context & ctxt) const
overridevirtual
Parameters
ctxt[IN/OUT] current event context is interated to next event.

Definition at line 94 of file EventSelectorAthenaPoolSharedIO.cxx.

94 {
95 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
96 if (m_makeStreamingToolClient.value() == -1) {
97 StatusCode sc = m_eventStreamingTool->lockEvent(m_evtCount);
98 while (sc.isRecoverable()) {
99 usleep(1000);
100 sc = m_eventStreamingTool->lockEvent(m_evtCount);
101 }
102 }
103 // Increase event count
104 ++m_evtCount;
105 void* tokenStrPtr = nullptr;
106 unsigned int status = 0;
107 StatusCode sc = m_eventStreamingTool->getLockedEvent(&tokenStrPtr, status);
108 std::unique_ptr<const char[]> tokenStr{static_cast<const char*>(tokenStrPtr)};
109 if (sc.isRecoverable()) {
110 // Return end iterator
111 ctxt = *m_endIter;
112 // This is not a real failure but a Gaudi way of handling "end of job"
113 return StatusCode::FAILURE;
114 }
115 if (sc.isFailure()) {
116 ATH_MSG_FATAL("Cannot get NextEvent from AthenaSharedMemoryTool");
117 return StatusCode::FAILURE;
118 }
119 if (!eventStore()->clearStore().isSuccess()) {
120 ATH_MSG_WARNING("Cannot clear Store");
121 }
122 std::unique_ptr<AthenaAttributeList> athAttrList = std::make_unique<AthenaAttributeList>();
123 athAttrList->extend("eventRef", "string");
124 (*athAttrList)["eventRef"].data<std::string>() = tokenStr.get();
125 SG::WriteHandle<AthenaAttributeList> wh(m_attrListKey, eventStore()->name());
126 if (!wh.record(std::move(athAttrList)).isSuccess()) {
127 ATH_MSG_ERROR("Cannot record AttributeList to StoreGate " << StoreID::storeName(eventStore()->storeID()));
128 return StatusCode::FAILURE;
129 }
130 Token token;
131 token.fromString(tokenStr.get());
132 Guid guid = token.dbID();
133 if (guid != m_guid && m_processMetadata.value()) {
134 InputFileIncidentGuard::transition(m_inputFileGuard, *m_incidentSvc, name(),
135 "FID:" + guid.toString(), guid.toString(),
136 /*endFileName=*/{});
137 m_guid = guid;
138 }
139 return StatusCode::SUCCESS;
140 }
142}
#define ATH_MSG_WARNING(x)
static Double_t sc
virtual StatusCode next(IEvtSelector::Context &ctxt) const override
static void transition(std::optional< InputFileIncidentGuard > &guard, IIncidentSvc &incSvc, std::string_view source, std::string_view beginFileName, std::string_view guid, std::string_view endFileName={}, std::string_view beginType=IncidentType::BeginInputFile, std::string_view endType=IncidentType::EndInputFile)
Replace the guard in an optional, with strict End-before-Begin ordering.
static const std::string & storeName(const StoreID::type &s)
Definition StoreID.cxx:77
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition Token.cxx:147
const Guid & dbID() const
Access database identifier.
Definition Token.h:64
::StatusCode StatusCode
StatusCode definition for legacy code.
status
Definition merge.py:16
str wh
Definition parseDir.py:45

◆ next() [2/2]

StatusCode EventSelectorAthenaPoolSharedIO::next ( IEvtSelector::Context & ctxt,
int jump ) const
overridevirtual
Parameters
ctxt[IN/OUT] current event context is interated to next event.
jump[IN] number of events to jump (currently not supported).

Definition at line 144 of file EventSelectorAthenaPoolSharedIO.cxx.

144 {
145 if (jump > 0) {
146 for (int i = 0; i < jump; i++) {
147 ATH_CHECK(next(ctxt));
148 }
149 return StatusCode::SUCCESS;
150 }
151 return StatusCode::FAILURE;
152}
virtual StatusCode next(IEvtSelector::Context &ctxt) const override

◆ readEvent()

StatusCode EventSelectorAthenaPoolSharedIO::readEvent ( int maxevt)
overridevirtual

Read the next maxevt events.

Parameters
evtnum[IN] The number of events to read.

Definition at line 231 of file EventSelectorAthenaPoolSharedIO.cxx.

231 {
232 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
233 if (ds == nullptr) {
234 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
235 return StatusCode::FAILURE;
236 }
237 if (m_eventStreamingTool.empty()) {
238 ATH_MSG_ERROR("No AthenaSharedMemoryTool configured for readEvent()");
239 return StatusCode::FAILURE;
240 }
241 ATH_MSG_VERBOSE("Called read Event " << maxevt);
242 std::unique_ptr<IEvtSelector::Context> ctxt = std::make_unique<EventContextAthenaPool>(this);
243 for (int i = 0; i < maxevt || maxevt == -1; ++i) {
244 if (!next(*ctxt).isSuccess()) {
245 if (m_evtCount == -1) {
246 ATH_MSG_VERBOSE("Called read Event and read last event from input: " << i);
247 break;
248 }
249 ATH_MSG_ERROR("Cannot read Event " << m_evtCount - 1 << " into AthenaSharedMemoryTool");
250 return StatusCode::FAILURE;
251 } else {
252 ATH_MSG_VERBOSE("Called next, read Event " << m_evtCount - 1);
253 }
254 }
255 ctxt.reset();
256 // End of file, wait for last event to be taken
258 while ( (sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
259 while (ds->readData().isSuccess()) {
260 ATH_MSG_VERBOSE("Called last readData, while marking last event in readEvent()");
261 }
262 usleep(1000);
263 }
264 if (!sc.isSuccess()) {
265 ATH_MSG_ERROR("Cannot put last Event marker to AthenaSharedMemoryTool");
266 return StatusCode::FAILURE;
267 } else {
268 sc = ds->readData();
269 while (sc.isSuccess() || sc.isRecoverable()) {
270 sc = ds->readData();
271 }
272 ATH_MSG_DEBUG("Failed last readData -> Clients are stopped, after marking last event in readEvent()");
273 }
274 return StatusCode::SUCCESS;
275}
#define ATH_MSG_VERBOSE(x)

◆ recordAttributeList()

StatusCode EventSelectorAthenaPoolSharedIO::recordAttributeList ( ) const
overrideprotectedvirtual

Record AttributeList in StoreGate.

Definition at line 277 of file EventSelectorAthenaPoolSharedIO.cxx.

277 {
278 if (!m_eventStreamingTool.empty()) {
279 if (m_eventStreamingTool->isServer()) {
280 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
281 if (ds == nullptr) {
282 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
283 return StatusCode::FAILURE;
284 }
285 std::string token = m_headerIterator->eventRef().toString();
287 while ( (sc = putEvent_ST(*m_eventStreamingTool,
288 m_evtCount - 1, token.c_str(),
289 token.length() + 1, 0)).isRecoverable() ) {
290 while (ds->readData().isSuccess()) {
291 ATH_MSG_VERBOSE("Called last readData, while putting next event in next()");
292 }
293 // Nothing to do right now, trigger alternative (e.g. caching) here? Currently just fast loop.
294 }
295 if (!sc.isSuccess()) {
296 ATH_MSG_ERROR("Cannot put Event " << m_evtCount - 1 << " to AthenaSharedMemoryTool");
297 return StatusCode::FAILURE;
298 }
299 } else {
300 return StatusCode::SUCCESS;
301 }
302 } else {
304 }
305 return StatusCode::SUCCESS;
306}
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.

◆ share()

StatusCode EventSelectorAthenaPoolSharedIO::share ( int evtnum)
overridevirtual

Request to share a given event number.

Parameters
evtnum[IN] The event number to share.

Definition at line 198 of file EventSelectorAthenaPoolSharedIO.cxx.

198 {
199 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
200 if (ds == nullptr) {
201 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
202 return StatusCode::FAILURE;
203 }
204 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
205 StatusCode sc = m_eventStreamingTool->lockEvent(evtnum);
206 while (sc.isRecoverable()) {
207 usleep(1000);
208 sc = m_eventStreamingTool->lockEvent(evtnum);
209 }
210// Send stop client and wait for restart
211 if (sc.isFailure()) {
212 if (ds->makeClient(0).isFailure()) {
213 return StatusCode::FAILURE;
214 }
215 sc = m_eventStreamingTool->lockEvent(evtnum);
216 while (sc.isRecoverable() || sc.isFailure()) {
217 usleep(1000);
218 sc = m_eventStreamingTool->lockEvent(evtnum);
219 }
220//FIXME
221 if (ds->makeClient(1).isFailure()) {
222 return StatusCode::FAILURE;
223 }
224 }
225 return(sc);
226 }
227 return StatusCode::FAILURE;
228}

◆ start()

StatusCode EventSelectorAthenaPoolSharedIO::start ( )
overridevirtual

Definition at line 61 of file EventSelectorAthenaPoolSharedIO.cxx.

61 {
62 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
63 return StatusCode::SUCCESS;
64 }
66}
virtual StatusCode start() override

◆ stop()

StatusCode EventSelectorAthenaPoolSharedIO::stop ( )
overridevirtual

Definition at line 68 of file EventSelectorAthenaPoolSharedIO.cxx.

68 {
69 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
70 return StatusCode::SUCCESS;
71 }
73}
virtual StatusCode stop() override

Member Data Documentation

◆ m_eventStreamingTool

ToolHandle<IAthenaIPCTool> EventSelectorAthenaPoolSharedIO::m_eventStreamingTool {this, "SharedMemoryTool", "", ""}
private

Definition at line 73 of file EventSelectorAthenaPoolSharedIO.h.

73{this, "SharedMemoryTool", "", ""};

◆ m_makeStreamingToolClient

Gaudi::Property<int> EventSelectorAthenaPoolSharedIO::m_makeStreamingToolClient {this, "MakeStreamingToolClient", 0}
private

Make this instance a Streaming Client during first iteration automatically.

Definition at line 75 of file EventSelectorAthenaPoolSharedIO.h.

75{this, "MakeStreamingToolClient", 0};

The documentation for this class was generated from the following files: