ATLAS Offline Software
DoubleEventSelectorAthenaPool.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 */
4 
16 
17 // Framework
18 #include "GaudiKernel/FileIncident.h"
19 
20 // Pool
26 
27 
28 DoubleEventSelectorAthenaPool::DoubleEventSelectorAthenaPool(const std::string& name, ISvcLocator* pSvcLocator)
29  : EventSelectorAthenaPool(name, pSvcLocator)
30 {
31 }
32 
33 
35 
36 
38 {
40 
41  ATH_CHECK(m_secondarySelector.retrieve());
42  if (dynamic_cast<EventSelectorAthenaPool *>(&*(m_secondarySelector)) == nullptr) {
43  m_secondaryByteStream = true;
44  }
45 
46  return StatusCode::SUCCESS;
47 }
48 
49 
50 StatusCode DoubleEventSelectorAthenaPool::next(IEvtSelector::Context& ctxt) const
51 {
52  ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::next");
53 
54  std::lock_guard<CallMutex> lockGuard(m_callLock);
55  if (!eventStore()->clearStore().isSuccess()) {
56  ATH_MSG_WARNING("Cannot clear Store");
57  }
58  for (const auto& tool : m_helperTools) {
59  if (!tool->preNext().isSuccess()) {
60  ATH_MSG_WARNING("Failed to preNext() " << tool->name());
61  }
62  }
63 
64  for (;;) {
65  // Move in the primary file (with skipping)
66  if (nextWithSkip(ctxt).isFailure()) {
67  return StatusCode::FAILURE;
68  }
69 
70  // Check if we're at the end of secondary file
71  if (m_secondarySelector->nextWithSkip(ctxt).isFailure()) {
72  return StatusCode::FAILURE;
73  }
74 
75  // Record the attribute list
76  if (!recordAttributeList().isSuccess()) {
77  ATH_MSG_ERROR("Failed to record AttributeList.");
78  return StatusCode::FAILURE;
79  }
80 
81  StatusCode status = StatusCode::SUCCESS;
82  for (const auto& tool : m_helperTools) {
83  StatusCode toolStatus = tool->postNext();
84  if (toolStatus.isRecoverable()) {
85  ATH_MSG_INFO("Request skipping event from: " << tool->name());
86  if (status.isSuccess()) {
87  status = StatusCode::RECOVERABLE;
88  }
89  } else if (toolStatus.isFailure()) {
90  ATH_MSG_WARNING("Failed to postNext() " << tool->name());
91  status = StatusCode::FAILURE;
92  }
93  }
94  if (status.isRecoverable()) {
95  ATH_MSG_INFO("skipping event " << m_evtCount);
96  } else if (status.isFailure()) {
97  ATH_MSG_WARNING("Failed to postNext() HelperTool.");
98  } else {
99  if (!m_counterTool.empty() && !m_counterTool->postNext().isSuccess()) {
100  ATH_MSG_WARNING("Failed to postNext() CounterTool.");
101  }
102  break;
103  }
104  }
105 
106  return StatusCode::SUCCESS;
107 }
108 
109 
110 StatusCode DoubleEventSelectorAthenaPool::next(IEvtSelector::Context& ctxt, int jump) const
111 {
112  ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::next(jump)");
113 
114  if (jump > 0) {
115  for (int i = 0; i < jump; i++) {
116  if (!next(ctxt).isSuccess()) {
117  return StatusCode::FAILURE;
118  }
119  }
120  return StatusCode::SUCCESS;
121  }
122 
123  return StatusCode::FAILURE;
124 }
125 
126 
127 StatusCode DoubleEventSelectorAthenaPool::seek(Context& ctxt, int evtNum) const
128 {
129  ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::seek");
130 
132  ATH_CHECK(m_secondarySelector->seek(ctxt, evtNum));
133 
134  return StatusCode::SUCCESS;
135 }
136 
137 
138 int DoubleEventSelectorAthenaPool::size(Context& ctxt) const
139 {
140  ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::size");
141 
142  int sz1 = EventSelectorAthenaPool::size(ctxt);
143  int sz2 = m_secondarySelector->size(ctxt);
144 
145  if (sz2 < sz1) {
146  ATH_MSG_WARNING("Fewer secondary input events than primary input events. Expect trouble!");
147  }
148 
149  return sz1;
150 }
151 
152 
154 {
155  ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::recordAttributeList");
156 
157  // Get access to AttributeList
158  ATH_MSG_DEBUG("Get AttributeList from the collection");
159  // MN: accessing only attribute list, ignoring token list
160  const coral::AttributeList& attrList = m_headerIterator->currentRow().attributeList();
161  ATH_MSG_DEBUG("AttributeList size " << attrList.size());
162  std::unique_ptr<AthenaAttributeList> athAttrList{};
163 
164  // Decide what to do based on the type of secondary file
165  if (m_secondaryByteStream) {
166  // Create empty attribute list
167  athAttrList = std::make_unique<AthenaAttributeList>();
168  // Always add ByteStream as primary input
169  ATH_CHECK(m_secondarySelector->fillAttributeList(athAttrList.get(), "", false));
170 
171  // Then fill the new attribute list from the primary file
172  ATH_MSG_DEBUG("Append primary attribute list properties to the secondary one with a suffix: " << m_secondaryAttrListSuffix.value());
173  ATH_CHECK(fillAttributeList(athAttrList.get(), "_" + m_secondaryAttrListSuffix.value(), true));
174  } else {
175  // Create a new attribute list from the primary input one
176  athAttrList = std::make_unique<AthenaAttributeList>(attrList);
177  // Fill the new attribute list from the primary file
178  ATH_CHECK(fillAttributeList(athAttrList.get(), "", false));
179 
180  // Fill the new attribute list from the secondary file
181  ATH_MSG_DEBUG("Append secondary attribute list properties to the primary one with a suffix: " << m_secondaryAttrListSuffix.value());
182  ATH_CHECK(m_secondarySelector->fillAttributeList(athAttrList.get(), "_" + m_secondaryAttrListSuffix.value(), true));
183  }
184 
185  // Add info about secondary input
186  athAttrList->extend("hasSecondaryInput", "bool");
187  (*athAttrList)["hasSecondaryInput"].data<bool>() = true;
188 
190  if (!wh.record(std::move(athAttrList)).isSuccess()) {
191  ATH_MSG_ERROR("Cannot record AttributeList to StoreGate " << StoreID::storeName(eventStore()->storeID()));
192  return StatusCode::FAILURE;
193  }
194 
195  return StatusCode::SUCCESS;
196 }
197 
198 
199 void DoubleEventSelectorAthenaPool::handle(const Incident& inc)
200 {
201  ATH_MSG_DEBUG("DoubleEventSelectorAthenaPool::handle");
202 
203  if (not Atlas::hasExtendedEventContext(inc.context()) ) {
204  ATH_MSG_WARNING("No extended event context available.");
205  return;
206  }
207 
208  SG::SourceID fid1;
209  if (inc.type() == IncidentType::BeginProcessing) {
210  if ( Atlas::hasExtendedEventContext(inc.context()) ) {
211  fid1 = Atlas::getExtendedEventContext(inc.context()).proxy()->sourceID();
212  }
213  *m_sourceID1.get(inc.context()) = fid1;
214  }
215  else {
216  fid1 = *m_sourceID1.get(inc.context());
217  }
218 
219  if( fid1.empty() ) {
220  ATH_MSG_WARNING("could not read event source ID from incident event context with key EventSelector");
221  return;
222  }
223 
224  ATH_MSG_DEBUG("** MN Incident handler " << inc.type() << " Event source ID=" << fid1 );
225  if( inc.type() == IncidentType::BeginProcessing ) {
226  // increment the events-per-file counter for FID
227  m_activeEventsPerSource[fid1]++;
228  } else if( inc.type() == IncidentType::EndProcessing ) {
229  m_activeEventsPerSource[fid1]--;
230  disconnectIfFinished( fid1 );
231  *m_sourceID1.get(inc.context()) = "";
232  }
233  if( msgLvl(MSG::DEBUG) ) {
234  for( auto& source: m_activeEventsPerSource )
235  msg(MSG::DEBUG) << "SourceID: " << source.first << " active events: " << source.second << endmsg;
236  }
237 
238  // Nothing to do if secondary event selector is ByteStream
239  if (m_secondaryByteStream) {
240  return;
241  }
242 
243  // Secondary guid
244  SG::SourceID fid2;
245  if (inc.type() == IncidentType::BeginProcessing) {
246  if ( Atlas::hasExtendedEventContext(inc.context()) ) {
247  fid2 = Atlas::getExtendedEventContext(inc.context()).proxy()->sourceID("SecondaryEventSelector");
248  }
249  *m_sourceID2.get(inc.context()) = fid2;
250  }
251  else {
252  fid2 = *m_sourceID2.get(inc.context());
253  }
254 
255  if( fid2.empty() ) {
256  ATH_MSG_WARNING("could not read event source ID from incident event context with key SecondaryEventSelector");
257  return;
258  }
259 
260  ATH_MSG_DEBUG("** MN Incident handler " << inc.type() << " Event source ID=" << fid2 );
261  if( inc.type() == IncidentType::BeginProcessing ) {
262  // increment the events-per-file counter for FID
263  m_activeEventsPerSource[fid2]++;
264  } else if( inc.type() == IncidentType::EndProcessing ) {
265  m_activeEventsPerSource[fid2]--;
266  m_secondarySelector->disconnectIfFinished( fid2 );
267  *m_sourceID2.get(inc.context()) = "";
268  }
269  if( msgLvl(MSG::DEBUG) ) {
270  for( auto& source: m_activeEventsPerSource )
271  msg(MSG::DEBUG) << "SourceID: " << source.first << " active events: " << source.second << endmsg;
272  }
273 }
DoubleEventSelectorAthenaPool::m_secondarySelector
ServiceHandle< ISecondaryEventSelector > m_secondarySelector
Definition: DoubleEventSelectorAthenaPool.h:65
DoubleEventSelectorAthenaPool::m_sourceID2
SG::SlotSpecificObj< SG::SourceID > m_sourceID2
Definition: DoubleEventSelectorAthenaPool.h:73
EventSelectorAthenaPool::fillAttributeList
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
Definition: EventSelectorAthenaPool.cxx:1092
fitman.sz2
sz2
Definition: fitman.py:543
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
TokenList.h
PoolCollectionConverter.h
This file contains the class definition for the PoolCollectionConverter class.
ICollectionCursor.h
Atlas::hasExtendedEventContext
bool hasExtendedEventContext(const EventContext &ctx)
Test whether a context object has an extended context installed.
Definition: ExtendedEventContext.cxx:23
SG::SlotSpecificObj::get
T * get(const EventContext &ctx)
Return pointer to the object for slot given by ctx.
EventSelectorAthenaPool::eventStore
StoreGateSvc * eventStore() const
Return pointer to active event SG.
Definition: EventSelectorAthenaPool.cxx:85
python.PyKernel.AttributeList
AttributeList
Definition: PyKernel.py:36
EventSelectorAthenaPool::m_callLock
CallMutex m_callLock
Definition: EventSelectorAthenaPool.h:248
DoubleEventSelectorAthenaPool::next
virtual StatusCode next(IEvtSelector::Context &ctxt) const override
Definition: DoubleEventSelectorAthenaPool.cxx:50
EventSelectorAthenaPool
This class is the EventSelector for event data.
Definition: EventSelectorAthenaPool.h:49
DoubleEventSelectorAthenaPool::m_sourceID1
SG::SlotSpecificObj< SG::SourceID > m_sourceID1
Definition: DoubleEventSelectorAthenaPool.h:72
DoubleEventSelectorAthenaPool::m_secondaryAttrListSuffix
Gaudi::Property< std::string > m_secondaryAttrListSuffix
Definition: DoubleEventSelectorAthenaPool.h:67
DoubleEventSelectorAthenaPool::m_secondaryByteStream
bool m_secondaryByteStream
Definition: DoubleEventSelectorAthenaPool.h:70
EventSelectorAthenaPool::m_counterTool
ToolHandle< IAthenaSelectorTool > m_counterTool
Definition: EventSelectorAthenaPool.h:212
Atlas::getExtendedEventContext
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
Definition: ExtendedEventContext.cxx:32
CollectionRowBuffer.h
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
EventSelectorAthenaPool::m_attrListKey
Gaudi::Property< std::string > m_attrListKey
AttributeList SG key.
Definition: EventSelectorAthenaPool.h:194
lumiFormat.i
int i
Definition: lumiFormat.py:85
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
parseDir.wh
wh
Definition: parseDir.py:46
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
DataHeader.h
This file contains the class definition for the DataHeader and DataHeaderElement classes.
DoubleEventSelectorAthenaPool::recordAttributeList
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
Definition: DoubleEventSelectorAthenaPool.cxx:153
EventSelectorAthenaPool::disconnectIfFinished
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Definition: EventSelectorAthenaPool.cxx:1228
EventSelectorAthenaPool::DoubleEventSelectorAthenaPool
friend class DoubleEventSelectorAthenaPool
make the DoubleEventSelectorAthenaPool a friend so it can access the internal EventSelectorAthenaPool...
Definition: EventSelectorAthenaPool.h:254
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:221
DoubleEventSelectorAthenaPool.h
This file contains the class definition for the DoubleEventSelectorAthenaPool class.
AtlCoolConsole.tool
tool
Definition: AtlCoolConsole.py:453
DoubleEventSelectorAthenaPool::seek
virtual StatusCode seek(Context &ctxt, int evtNum) const override
Seek to a given event number.
Definition: DoubleEventSelectorAthenaPool.cxx:127
EventSelectorAthenaPool::nextWithSkip
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
Definition: EventSelectorAthenaPool.cxx:701
DoubleEventSelectorAthenaPool::~DoubleEventSelectorAthenaPool
virtual ~DoubleEventSelectorAthenaPool()
Destructor.
Definition: DoubleEventSelectorAthenaPool.cxx:34
SG::WriteHandle
Definition: StoreGate/StoreGate/WriteHandle.h:76
DoubleEventSelectorAthenaPool::initialize
virtual StatusCode initialize() override
Initialize function.
Definition: DoubleEventSelectorAthenaPool.cxx:37
EventSelectorAthenaPool::initialize
virtual StatusCode initialize() override
Required of all Gaudi Services.
Definition: EventSelectorAthenaPool.cxx:89
EventSelectorAthenaPool::m_evtCount
std::atomic_int m_evtCount
Definition: EventSelectorAthenaPool.h:244
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
SG::SourceID
std::string SourceID
Definition: AthenaKernel/AthenaKernel/SourceID.h:23
DEBUG
#define DEBUG
Definition: page_access.h:11
copySelective.source
string source
Definition: copySelective.py:32
DoubleEventSelectorAthenaPool::handle
virtual void handle(const Incident &incident) override
Incident service handle listening for BeginProcessing and EndProcessing.
Definition: DoubleEventSelectorAthenaPool.cxx:199
merge.status
status
Definition: merge.py:17
EventSelectorAthenaPool::size
virtual int size(Context &ctxt) const override
Return the size of the collection.
Definition: EventSelectorAthenaPool.cxx:1020
Token.h
This file contains the class definition for the Token class (migrated from POOL).
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
StoreID::storeName
static const std::string & storeName(const StoreID::type &s)
Definition: StoreID.cxx:77
EventSelectorAthenaPool::seek
virtual StatusCode seek(Context &ctxt, int evtnum) const override
Seek to a given event number.
Definition: EventSelectorAthenaPool.cxx:804
DoubleEventSelectorAthenaPool::size
virtual int size(Context &ctxt) const override
Return the size of the collection.
Definition: DoubleEventSelectorAthenaPool.cxx:138