ATLAS Offline Software
Loading...
Searching...
No Matches
DoubleEventSelectorAthenaPool.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
13
15
16// Pool
20
21
22DoubleEventSelectorAthenaPool::DoubleEventSelectorAthenaPool(const std::string& name, ISvcLocator* pSvcLocator)
23 : EventSelectorAthenaPool(name, pSvcLocator)
24{
25}
26
27
29
30
32{
34
36 if (dynamic_cast<EventSelectorAthenaPool *>(&*(m_secondarySelector)) == nullptr) {
38 }
39
40 return StatusCode::SUCCESS;
41}
42
43
44StatusCode DoubleEventSelectorAthenaPool::next(IEvtSelector::Context& ctxt) const
45{
46 ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::next");
47
48 std::lock_guard<CallMutex> lockGuard(m_callLock);
49 if (!eventStore()->clearStore().isSuccess()) {
50 ATH_MSG_WARNING("Cannot clear Store");
51 }
52 for (const auto& tool : m_helperTools) {
53 if (!tool->preNext().isSuccess()) {
54 ATH_MSG_WARNING("Failed to preNext() " << tool->name());
55 }
56 }
57
58 for (;;) {
59 // Move in the primary file (with skipping)
60 if (nextWithSkip(ctxt).isFailure()) {
61 return StatusCode::FAILURE;
62 }
63
64 // Check if we're at the end of secondary file
65 if (m_secondarySelector->nextWithSkip(ctxt).isFailure()) {
66 return StatusCode::FAILURE;
67 }
68
69 // Record the attribute list
70 if (!recordAttributeList().isSuccess()) {
71 ATH_MSG_ERROR("Failed to record AttributeList.");
72 return StatusCode::FAILURE;
73 }
74
75 StatusCode status = StatusCode::SUCCESS;
76 for (const auto& tool : m_helperTools) {
77 StatusCode toolStatus = tool->postNext();
78 if (toolStatus.isRecoverable()) {
79 ATH_MSG_INFO("Request skipping event from: " << tool->name());
80 if (status.isSuccess()) {
81 status = StatusCode::RECOVERABLE;
82 }
83 } else if (toolStatus.isFailure()) {
84 ATH_MSG_WARNING("Failed to postNext() " << tool->name());
85 status = StatusCode::FAILURE;
86 }
87 }
88 if (status.isRecoverable()) {
89 ATH_MSG_INFO("skipping event " << m_evtCount);
90 } else if (status.isFailure()) {
91 ATH_MSG_WARNING("Failed to postNext() HelperTool.");
92 } else {
93 if (!m_counterTool.empty() && !m_counterTool->postNext().isSuccess()) {
94 ATH_MSG_WARNING("Failed to postNext() CounterTool.");
95 }
96 break;
97 }
98 }
99
100 return StatusCode::SUCCESS;
101}
102
103
104StatusCode DoubleEventSelectorAthenaPool::next(IEvtSelector::Context& ctxt, int jump) const
105{
106 ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::next(jump)");
107
108 if (jump > 0) {
109 for (int i = 0; i < jump; i++) {
110 if (!next(ctxt).isSuccess()) {
111 return StatusCode::FAILURE;
112 }
113 }
114 return StatusCode::SUCCESS;
115 }
116
117 return StatusCode::FAILURE;
118}
119
120
121StatusCode DoubleEventSelectorAthenaPool::seek(Context& ctxt, int evtNum) const
122{
123 ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::seek");
124
126 ATH_CHECK(m_secondarySelector->seek(ctxt, evtNum));
127
128 return StatusCode::SUCCESS;
129}
130
131
133{
134 ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::size");
135
136 int sz1 = EventSelectorAthenaPool::size(ctxt);
137 int sz2 = m_secondarySelector->size(ctxt);
138
139 if (sz2 < sz1) {
140 ATH_MSG_WARNING("Fewer secondary input events than primary input events. Expect trouble!");
141 }
142
143 return sz1;
144}
145
146
148{
149 ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::recordAttributeList");
150
151 // Get access to AttributeList
152 ATH_MSG_DEBUG("Get AttributeList from the collection");
153 // MN: accessing only attribute list, ignoring token list
154 const coral::AttributeList& attrList = m_headerIterator->currentRow().attributeList();
155 ATH_MSG_DEBUG("AttributeList size " << attrList.size());
156 std::unique_ptr<AthenaAttributeList> athAttrList{};
157
158 // Decide what to do based on the type of secondary file
160 // Create empty attribute list
161 athAttrList = std::make_unique<AthenaAttributeList>();
162 // Always add ByteStream as primary input
163 ATH_CHECK(m_secondarySelector->fillAttributeList(athAttrList.get(), "", false));
164
165 // Then fill the new attribute list from the primary file
166 ATH_MSG_DEBUG("Append primary attribute list properties to the secondary one with a suffix: " << m_secondaryAttrListSuffix.value());
167 ATH_CHECK(fillAttributeList(athAttrList.get(), "_" + m_secondaryAttrListSuffix.value(), true));
168 } else {
169 // Create a new attribute list from the primary input one
170 athAttrList = std::make_unique<AthenaAttributeList>(attrList);
171 // Fill the new attribute list from the primary file
172 ATH_CHECK(fillAttributeList(athAttrList.get(), "", false));
173
174 // Fill the new attribute list from the secondary file
175 ATH_MSG_DEBUG("Append secondary attribute list properties to the primary one with a suffix: " << m_secondaryAttrListSuffix.value());
176 ATH_CHECK(m_secondarySelector->fillAttributeList(athAttrList.get(), "_" + m_secondaryAttrListSuffix.value(), true));
177 }
178
179 // Add info about secondary input
180 athAttrList->extend("hasSecondaryInput", "bool");
181 (*athAttrList)["hasSecondaryInput"].data<bool>() = true;
182
184 ATH_CHECK(wh.record(std::move(athAttrList)));
185
186 return StatusCode::SUCCESS;
187}
188
189
191{
192 ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::handle");
193
194 if (not Atlas::hasExtendedEventContext(inc.context()) ) {
195 ATH_MSG_WARNING("No extended event context available.");
196 return;
197 }
198
199 SG::SourceID fid1;
200 if (inc.type() == IncidentType::BeginProcessing) {
201 if ( Atlas::hasExtendedEventContext(inc.context()) ) {
202 fid1 = Atlas::getExtendedEventContext(inc.context()).proxy()->sourceID();
203 }
204 *m_sourceID1.get(inc.context()) = fid1;
205 }
206 else {
207 fid1 = *m_sourceID1.get(inc.context());
208 }
209
210 if( fid1.empty() ) {
211 ATH_MSG_WARNING("could not read event source ID from incident event context with key EventSelector");
212 return;
213 }
214
215 ATH_MSG_DEBUG("** MN Incident handler " << inc.type() << " Event source ID=" << fid1 );
216 if( inc.type() == IncidentType::BeginProcessing ) {
217 // increment the events-per-file counter for FID
218 m_activeEventsPerSource[fid1]++;
219 } else if( inc.type() == IncidentType::EndProcessing ) {
220 m_activeEventsPerSource[fid1]--;
221 disconnectIfFinished( fid1 );
222 *m_sourceID1.get(inc.context()) = "";
223 }
224 if( msgLvl(MSG::DEBUG) ) {
225 for( auto& source: m_activeEventsPerSource )
226 msg(MSG::DEBUG) << "SourceID: " << source.first << " active events: " << source.second << endmsg;
227 }
228
229 // Nothing to do if secondary event selector is ByteStream
231 return;
232 }
233
234 // Secondary guid
235 SG::SourceID fid2;
236 if (inc.type() == IncidentType::BeginProcessing) {
237 if ( Atlas::hasExtendedEventContext(inc.context()) ) {
238 fid2 = Atlas::getExtendedEventContext(inc.context()).proxy()->sourceID("SecondaryEventSelector");
239 }
240 *m_sourceID2.get(inc.context()) = fid2;
241 }
242 else {
243 fid2 = *m_sourceID2.get(inc.context());
244 }
245
246 if( fid2.empty() ) {
247 ATH_MSG_WARNING("could not read event source ID from incident event context with key SecondaryEventSelector");
248 return;
249 }
250
251 ATH_MSG_DEBUG("** MN Incident handler " << inc.type() << " Event source ID=" << fid2 );
252 if( inc.type() == IncidentType::BeginProcessing ) {
253 // increment the events-per-file counter for FID
254 m_activeEventsPerSource[fid2]++;
255 } else if( inc.type() == IncidentType::EndProcessing ) {
256 m_activeEventsPerSource[fid2]--;
257 m_secondarySelector->disconnectIfFinished( fid2 );
258 *m_sourceID2.get(inc.context()) = "";
259 }
260 if( msgLvl(MSG::DEBUG) ) {
261 for( auto& source: m_activeEventsPerSource )
262 msg(MSG::DEBUG) << "SourceID: " << source.first << " active events: " << source.second << endmsg;
263 }
264}
#define endmsg
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the DoubleEventSelectorAthenaPool class.
An AttributeList represents a logical row of attributes in a metadata table.
SG::SlotSpecificObj< SG::SourceID > m_sourceID2
virtual StatusCode next(IEvtSelector::Context &ctxt) const override
Gaudi::Property< std::string > m_secondaryAttrListSuffix
virtual StatusCode seek(Context &ctxt, int evtNum) const override
Seek to a given event number.
virtual void handle(const Incident &incident) override
Incident service handle listening for BeginProcessing and EndProcessing.
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
SG::SlotSpecificObj< SG::SourceID > m_sourceID1
virtual StatusCode initialize() override
Initialize function.
virtual int size(Context &ctxt) const override
Return the size of the collection.
ServiceHandle< ISecondaryEventSelector > m_secondarySelector
EventSelectorAthenaPool(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
virtual StatusCode initialize() override
Required of all Gaudi Services.
friend class DoubleEventSelectorAthenaPool
make the DoubleEventSelectorAthenaPool a friend so it can access the internal EventSelectorAthenaPool...
StoreGateSvc * eventStore() const
Return pointer to active event SG.
ToolHandle< IAthenaSelectorTool > m_counterTool
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
std::string m_attrListKey
AttributeList SG key.
virtual StatusCode seek(Context &ctxt, int evtnum) const override
Seek to a given event number.
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
virtual int size(Context &ctxt) const override
Return the size of the collection.
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
bool hasExtendedEventContext(const EventContext &ctx)
Test whether a context object has an extended context installed.
MsgStream & msg
Definition testRead.cxx:32