ATLAS Offline Software
DoubleEventSelectorAthenaPool.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
16 
17 // Framework
18 #include "GaudiKernel/FileIncident.h"
19 
20 // Pool
27 
28 
29 DoubleEventSelectorAthenaPool::DoubleEventSelectorAthenaPool(const std::string& name, ISvcLocator* pSvcLocator)
30  : EventSelectorAthenaPool(name, pSvcLocator)
31 {
32 }
33 
34 
36 
37 
39 {
41 
42  ATH_CHECK(m_secondarySelector.retrieve());
43  if (dynamic_cast<EventSelectorAthenaPool *>(&*(m_secondarySelector)) == nullptr) {
44  m_secondaryByteStream = true;
45  }
46 
47  return StatusCode::SUCCESS;
48 }
49 
50 
51 StatusCode DoubleEventSelectorAthenaPool::next(IEvtSelector::Context& ctxt) const
52 {
53  ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::next");
54 
55  std::lock_guard<CallMutex> lockGuard(m_callLock);
56  if (!eventStore()->clearStore().isSuccess()) {
57  ATH_MSG_WARNING("Cannot clear Store");
58  }
59  for (const auto& tool : m_helperTools) {
60  if (!tool->preNext().isSuccess()) {
61  ATH_MSG_WARNING("Failed to preNext() " << tool->name());
62  }
63  }
64 
65  for (;;) {
66  // Move in the primary file (with skipping)
67  if (nextWithSkip(ctxt).isFailure()) {
68  return StatusCode::FAILURE;
69  }
70 
71  // Check if we're at the end of secondary file
72  if (m_secondarySelector->nextWithSkip(ctxt).isFailure()) {
73  return StatusCode::FAILURE;
74  }
75 
76  // Record the attribute list
77  if (!recordAttributeList().isSuccess()) {
78  ATH_MSG_ERROR("Failed to record AttributeList.");
79  return StatusCode::FAILURE;
80  }
81 
82  StatusCode status = StatusCode::SUCCESS;
83  for (const auto& tool : m_helperTools) {
84  StatusCode toolStatus = tool->postNext();
85  if (toolStatus.isRecoverable()) {
86  ATH_MSG_INFO("Request skipping event from: " << tool->name());
87  if (status.isSuccess()) {
88  status = StatusCode::RECOVERABLE;
89  }
90  } else if (toolStatus.isFailure()) {
91  ATH_MSG_WARNING("Failed to postNext() " << tool->name());
92  status = StatusCode::FAILURE;
93  }
94  }
95  if (status.isRecoverable()) {
96  ATH_MSG_INFO("skipping event " << m_evtCount);
97  } else if (status.isFailure()) {
98  ATH_MSG_WARNING("Failed to postNext() HelperTool.");
99  } else {
100  if (!m_counterTool.empty() && !m_counterTool->postNext().isSuccess()) {
101  ATH_MSG_WARNING("Failed to postNext() CounterTool.");
102  }
103  break;
104  }
105  }
106 
107  return StatusCode::SUCCESS;
108 }
109 
110 
111 StatusCode DoubleEventSelectorAthenaPool::next(IEvtSelector::Context& ctxt, int jump) const
112 {
113  ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::next(jump)");
114 
115  if (jump > 0) {
116  for (int i = 0; i < jump; i++) {
117  if (!next(ctxt).isSuccess()) {
118  return StatusCode::FAILURE;
119  }
120  }
121  return StatusCode::SUCCESS;
122  }
123 
124  return StatusCode::FAILURE;
125 }
126 
127 
128 StatusCode DoubleEventSelectorAthenaPool::seek(Context& ctxt, int evtNum) const
129 {
130  ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::seek");
131 
133  ATH_CHECK(m_secondarySelector->seek(ctxt, evtNum));
134 
135  return StatusCode::SUCCESS;
136 }
137 
138 
139 int DoubleEventSelectorAthenaPool::size(Context& ctxt) const
140 {
141  ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::size");
142 
143  int sz1 = EventSelectorAthenaPool::size(ctxt);
144  int sz2 = m_secondarySelector->size(ctxt);
145 
146  if (sz2 < sz1) {
147  ATH_MSG_WARNING("Fewer secondary input events than primary input events. Expect trouble!");
148  }
149 
150  return sz1;
151 }
152 
153 
155 {
156  ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::recordAttributeList");
157 
158  // Get access to AttributeList
159  ATH_MSG_DEBUG("Get AttributeList from the collection");
160  // MN: accessing only attribute list, ignoring token list
161  const coral::AttributeList& attrList = m_headerIterator->currentRow().attributeList();
162  ATH_MSG_DEBUG("AttributeList size " << attrList.size());
163  std::unique_ptr<AthenaAttributeList> athAttrList{};
164 
165  // Decide what to do based on the type of secondary file
166  if (m_secondaryByteStream) {
167  // Create empty attribute list
168  athAttrList = std::make_unique<AthenaAttributeList>();
169  // Always add ByteStream as primary input
170  ATH_CHECK(m_secondarySelector->fillAttributeList(athAttrList.get(), "", false));
171 
172  // Then fill the new attribute list from the primary file
173  ATH_MSG_DEBUG("Append primary attribute list properties to the secondary one with a suffix: " << m_secondaryAttrListSuffix.value());
174  ATH_CHECK(fillAttributeList(athAttrList.get(), "_" + m_secondaryAttrListSuffix.value(), true));
175  } else {
176  // Create a new attribute list from the primary input one
177  athAttrList = std::make_unique<AthenaAttributeList>(attrList);
178  // Fill the new attribute list from the primary file
179  ATH_CHECK(fillAttributeList(athAttrList.get(), "", false));
180 
181  // Fill the new attribute list from the secondary file
182  ATH_MSG_DEBUG("Append secondary attribute list properties to the primary one with a suffix: " << m_secondaryAttrListSuffix.value());
183  ATH_CHECK(m_secondarySelector->fillAttributeList(athAttrList.get(), "_" + m_secondaryAttrListSuffix.value(), true));
184  }
185 
186  // Add info about secondary input
187  athAttrList->extend("hasSecondaryInput", "bool");
188  (*athAttrList)["hasSecondaryInput"].data<bool>() = true;
189 
191  ATH_CHECK(wh.record(std::move(athAttrList)));
192 
193  return StatusCode::SUCCESS;
194 }
195 
196 
197 void DoubleEventSelectorAthenaPool::handle(const Incident& inc)
198 {
199  ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::handle");
200 
201  if (not Atlas::hasExtendedEventContext(inc.context()) ) {
202  ATH_MSG_WARNING("No extended event context available.");
203  return;
204  }
205 
206  SG::SourceID fid1;
207  if (inc.type() == IncidentType::BeginProcessing) {
208  if ( Atlas::hasExtendedEventContext(inc.context()) ) {
209  fid1 = Atlas::getExtendedEventContext(inc.context()).proxy()->sourceID();
210  }
211  *m_sourceID1.get(inc.context()) = fid1;
212  }
213  else {
214  fid1 = *m_sourceID1.get(inc.context());
215  }
216 
217  if( fid1.empty() ) {
218  ATH_MSG_WARNING("could not read event source ID from incident event context with key EventSelector");
219  return;
220  }
221 
222  ATH_MSG_DEBUG("** MN Incident handler " << inc.type() << " Event source ID=" << fid1 );
223  if( inc.type() == IncidentType::BeginProcessing ) {
224  // increment the events-per-file counter for FID
225  m_activeEventsPerSource[fid1]++;
226  } else if( inc.type() == IncidentType::EndProcessing ) {
227  m_activeEventsPerSource[fid1]--;
228  disconnectIfFinished( fid1 );
229  *m_sourceID1.get(inc.context()) = "";
230  }
231  if( msgLvl(MSG::DEBUG) ) {
232  for( auto& source: m_activeEventsPerSource )
233  msg(MSG::DEBUG) << "SourceID: " << source.first << " active events: " << source.second << endmsg;
234  }
235 
236  // Nothing to do if secondary event selector is ByteStream
237  if (m_secondaryByteStream) {
238  return;
239  }
240 
241  // Secondary guid
242  SG::SourceID fid2;
243  if (inc.type() == IncidentType::BeginProcessing) {
244  if ( Atlas::hasExtendedEventContext(inc.context()) ) {
245  fid2 = Atlas::getExtendedEventContext(inc.context()).proxy()->sourceID("SecondaryEventSelector");
246  }
247  *m_sourceID2.get(inc.context()) = fid2;
248  }
249  else {
250  fid2 = *m_sourceID2.get(inc.context());
251  }
252 
253  if( fid2.empty() ) {
254  ATH_MSG_WARNING("could not read event source ID from incident event context with key SecondaryEventSelector");
255  return;
256  }
257 
258  ATH_MSG_DEBUG("** MN Incident handler " << inc.type() << " Event source ID=" << fid2 );
259  if( inc.type() == IncidentType::BeginProcessing ) {
260  // increment the events-per-file counter for FID
261  m_activeEventsPerSource[fid2]++;
262  } else if( inc.type() == IncidentType::EndProcessing ) {
263  m_activeEventsPerSource[fid2]--;
264  m_secondarySelector->disconnectIfFinished( fid2 );
265  *m_sourceID2.get(inc.context()) = "";
266  }
267  if( msgLvl(MSG::DEBUG) ) {
268  for( auto& source: m_activeEventsPerSource )
269  msg(MSG::DEBUG) << "SourceID: " << source.first << " active events: " << source.second << endmsg;
270  }
271 }
DoubleEventSelectorAthenaPool::m_secondarySelector
ServiceHandle< ISecondaryEventSelector > m_secondarySelector
Definition: DoubleEventSelectorAthenaPool.h:65
DoubleEventSelectorAthenaPool::m_sourceID2
SG::SlotSpecificObj< SG::SourceID > m_sourceID2
Definition: DoubleEventSelectorAthenaPool.h:73
EventSelectorAthenaPool::fillAttributeList
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
Definition: EventSelectorAthenaPool.cxx:1026
fitman.sz2
sz2
Definition: fitman.py:543
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
TokenList.h
PoolCollectionConverter.h
This file contains the class definition for the PoolCollectionConverter class.
ICollectionCursor.h
Atlas::hasExtendedEventContext
bool hasExtendedEventContext(const EventContext &ctx)
Test whether a context object has an extended context installed.
Definition: ExtendedEventContext.cxx:23
SG::SlotSpecificObj::get
T * get(const EventContext &ctx)
Return pointer to the object for slot given by ctx.
EventSelectorAthenaPool::eventStore
StoreGateSvc * eventStore() const
Return pointer to active event SG.
Definition: EventSelectorAthenaPool.cxx:83
python.PyKernel.AttributeList
AttributeList
Definition: PyKernel.py:36
EventSelectorAthenaPool::m_callLock
CallMutex m_callLock
Definition: EventSelectorAthenaPool.h:245
DoubleEventSelectorAthenaPool::next
virtual StatusCode next(IEvtSelector::Context &ctxt) const override
Definition: DoubleEventSelectorAthenaPool.cxx:51
EventSelectorAthenaPool
This class is the EventSelector for event data.
Definition: EventSelectorAthenaPool.h:51
DoubleEventSelectorAthenaPool::m_sourceID1
SG::SlotSpecificObj< SG::SourceID > m_sourceID1
Definition: DoubleEventSelectorAthenaPool.h:72
DoubleEventSelectorAthenaPool::m_secondaryAttrListSuffix
Gaudi::Property< std::string > m_secondaryAttrListSuffix
Definition: DoubleEventSelectorAthenaPool.h:67
DoubleEventSelectorAthenaPool::m_secondaryByteStream
bool m_secondaryByteStream
Definition: DoubleEventSelectorAthenaPool.h:70
EventSelectorAthenaPool::m_counterTool
ToolHandle< IAthenaSelectorTool > m_counterTool
Definition: EventSelectorAthenaPool.h:209
Atlas::getExtendedEventContext
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
Definition: ExtendedEventContext.cxx:32
CollectionRowBuffer.h
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
EventSelectorAthenaPool::m_attrListKey
Gaudi::Property< std::string > m_attrListKey
AttributeList SG key.
Definition: EventSelectorAthenaPool.h:194
lumiFormat.i
int i
Definition: lumiFormat.py:85
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
parseDir.wh
wh
Definition: parseDir.py:45
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
DataHeader.h
This file contains the class definition for the DataHeader and DataHeaderElement classes.
DoubleEventSelectorAthenaPool::recordAttributeList
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
Definition: DoubleEventSelectorAthenaPool.cxx:154
EventSelectorAthenaPool::disconnectIfFinished
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Definition: EventSelectorAthenaPool.cxx:1162
EventSelectorAthenaPool::DoubleEventSelectorAthenaPool
friend class DoubleEventSelectorAthenaPool
make the DoubleEventSelectorAthenaPool a friend so it can access the internal EventSelectorAthenaPool...
Definition: EventSelectorAthenaPool.h:251
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
DoubleEventSelectorAthenaPool.h
This file contains the class definition for the DoubleEventSelectorAthenaPool class.
AtlCoolConsole.tool
tool
Definition: AtlCoolConsole.py:452
DoubleEventSelectorAthenaPool::seek
virtual StatusCode seek(Context &ctxt, int evtNum) const override
Seek to a given event number.
Definition: DoubleEventSelectorAthenaPool.cxx:128
EventSelectorAthenaPool::nextWithSkip
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
Definition: EventSelectorAthenaPool.cxx:638
DoubleEventSelectorAthenaPool::~DoubleEventSelectorAthenaPool
virtual ~DoubleEventSelectorAthenaPool()
Destructor.
Definition: DoubleEventSelectorAthenaPool.cxx:35
AthenaAttributeList.h
An AttributeList represents a logical row of attributes in a metadata table. The name and type of eac...
SG::WriteHandle
Definition: StoreGate/StoreGate/WriteHandle.h:73
DoubleEventSelectorAthenaPool::initialize
virtual StatusCode initialize() override
Initialize function.
Definition: DoubleEventSelectorAthenaPool.cxx:38
EventSelectorAthenaPool::initialize
virtual StatusCode initialize() override
Required of all Gaudi Services.
Definition: EventSelectorAthenaPool.cxx:87
EventSelectorAthenaPool::m_evtCount
std::atomic_int m_evtCount
Definition: EventSelectorAthenaPool.h:241
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
SG::SourceID
std::string SourceID
Definition: AthenaKernel/AthenaKernel/SourceID.h:25
DEBUG
#define DEBUG
Definition: page_access.h:11
copySelective.source
string source
Definition: copySelective.py:31
DoubleEventSelectorAthenaPool::handle
virtual void handle(const Incident &incident) override
Incident service handle listening for BeginProcessing and EndProcessing.
Definition: DoubleEventSelectorAthenaPool.cxx:197
merge.status
status
Definition: merge.py:16
EventSelectorAthenaPool::size
virtual int size(Context &ctxt) const override
Return the size of the collection.
Definition: EventSelectorAthenaPool.cxx:961
Token.h
This file contains the class definition for the Token class (migrated from POOL).
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
EventSelectorAthenaPool::seek
virtual StatusCode seek(Context &ctxt, int evtnum) const override
Seek to a given event number.
Definition: EventSelectorAthenaPool.cxx:736
DoubleEventSelectorAthenaPool::size
virtual int size(Context &ctxt) const override
Return the size of the collection.
Definition: DoubleEventSelectorAthenaPool.cxx:139