ATLAS Offline Software
EventSelectorAthenaPool.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 */
4 
11 #include "EventContextAthenaPool.h"
13 
18 #include "PoolSvc/IPoolSvc.h"
19 #include "StoreGate/StoreGateSvc.h"
20 #include "StoreGate/ReadHandle.h"
21 #include "StoreGate/WriteHandle.h"
22 
24 
25 // Framework
26 #include "GaudiKernel/ClassID.h"
27 #include "GaudiKernel/FileIncident.h"
28 #include "GaudiKernel/IIncidentSvc.h"
29 #include "GaudiKernel/IIoComponentMgr.h"
30 #include "GaudiKernel/GaudiException.h"
31 #include "GaudiKernel/GenericAddress.h"
32 #include "GaudiKernel/StatusCode.h"
33 
34 // Pool
39 
40 #include <boost/tokenizer.hpp>
41 #include <algorithm>
42 #include <sstream>
43 #include <vector>
44 
45 
46 namespace {
48  StatusCode putEvent_ST(const IAthenaIPCTool& tool,
49  long eventNumber, const void* source,
50  size_t nbytes, unsigned int status) {
51  StatusCode sc ATLAS_THREAD_SAFE = tool.putEvent(eventNumber, source, nbytes, status);
52  return sc;
53  }
54 }
55 
56 
57 //________________________________________________________________________________
58 EventSelectorAthenaPool::EventSelectorAthenaPool(const std::string& name, ISvcLocator* pSvcLocator) :
59  base_class(name, pSvcLocator)
60 {
61  declareProperty("HelperTools", m_helperTools);
62 
63  // TODO: validate if those are even used
64  m_runNo.verifier().setLower(0);
65  m_oldRunNo.verifier().setLower(0);
66  m_eventsPerRun.verifier().setLower(0);
67  m_firstEventNo.verifier().setLower(1);
68  m_firstLBNo.verifier().setLower(0);
69  m_eventsPerLB.verifier().setLower(0);
70  m_initTimeStamp.verifier().setLower(0);
71 
73  m_inputCollectionsChanged = false;
74 }
75 //________________________________________________________________________________
76 void EventSelectorAthenaPool::inputCollectionsHandler(Gaudi::Details::PropertyBase&) {
77  if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
78  m_inputCollectionsChanged = true;
79  }
80 }
81 //________________________________________________________________________________
83 }
84 //________________________________________________________________________________
87 }
88 //________________________________________________________________________________
90 
91  m_autoRetrieveTools = false;
92  m_checkToolDeps = false;
93 
94  if (m_isSecondary.value()) {
95  ATH_MSG_DEBUG("Initializing secondary event selector " << name());
96  } else {
97  ATH_MSG_DEBUG("Initializing " << name());
98  }
99 
100  if (!::AthService::initialize().isSuccess()) {
101  ATH_MSG_FATAL("Cannot initialize AthService base class.");
102  return(StatusCode::FAILURE);
103  }
104  // Check for input collection
105  if (m_inputCollectionsProp.value().empty()) {
106  ATH_MSG_FATAL("Use the property: EventSelector.InputCollections = "
107  << "[ \"<collectionName>\" ] (list of collections)");
108  return(StatusCode::FAILURE);
109  }
110  boost::char_separator<char> sep_coma(","), sep_hyph("-");
111  boost::tokenizer ranges(m_skipEventRangesProp.value(), sep_coma);
112  for( const std::string& r: ranges ) {
113  boost::tokenizer fromto(r, sep_hyph);
114  auto from_iter = fromto.begin();
115  std::stringstream strstr1( *from_iter );
116  long from, to;
117  strstr1 >> from;
118  if( ++from_iter != fromto.end() ) {
119  std::stringstream strstr2( *from_iter );
120  strstr2 >> to;
121  } else {
122  to = from;
123  }
124  m_skipEventRanges.push_back( std::pair(from,to) );
125  }
126 
127  for( auto v : m_skipEventSequenceProp.value() ) {
128  m_skipEventRanges.push_back( std::pair(v,v) );
129  }
130  std::sort(m_skipEventRanges.begin(), m_skipEventRanges.end());
131  if( msgLvl(MSG::DEBUG) ) {
132  std::stringstream skip_ranges_ss;
133  for( auto& r: m_skipEventRanges ) {
134  if( not skip_ranges_ss.str().empty() ) skip_ranges_ss << ", ";
135  skip_ranges_ss << r.first;
136  if( r.first != r.second) skip_ranges_ss << "-" << r.second;
137  }
138  if( not skip_ranges_ss.str().empty() )
139  ATH_MSG_DEBUG("Events to skip: " << skip_ranges_ss.str());
140  }
141  // CollectionType must be one of:
142  if (m_collectionType.value() != "ExplicitROOT" && m_collectionType.value() != "ImplicitROOT") {
143  ATH_MSG_FATAL("EventSelector.CollectionType must be one of: ExplicitROOT, ImplicitROOT (default)");
144  return(StatusCode::FAILURE);
145  }
146  // Get IncidentSvc
147  if (!m_incidentSvc.retrieve().isSuccess()) {
148  ATH_MSG_FATAL("Cannot get " << m_incidentSvc.typeAndName() << ".");
149  return(StatusCode::FAILURE);
150  }
151  // Listen to the Event Processing incidents
152  if (m_eventStreamingTool.empty()) {
153  m_incidentSvc->addListener(this, IncidentType::BeginProcessing, 0);
154  m_incidentSvc->addListener(this, IncidentType::EndProcessing, 0);
155  }
156 
157  // Get AthenaPoolCnvSvc
158  if (!m_athenaPoolCnvSvc.retrieve().isSuccess()) {
159  ATH_MSG_FATAL("Cannot get " << m_athenaPoolCnvSvc.typeAndName() << ".");
160  return(StatusCode::FAILURE);
161  }
162  // Get CounterTool (if configured)
163  if (!m_counterTool.empty() && !m_counterTool.retrieve().isSuccess()) {
164  ATH_MSG_FATAL("Cannot get CounterTool.");
165  return(StatusCode::FAILURE);
166  }
167  // Get HelperTools
168  if (!m_helperTools.retrieve().isSuccess()) {
169  ATH_MSG_FATAL("Cannot get " << m_helperTools);
170  return(StatusCode::FAILURE);
171  }
172  // Get SharedMemoryTool (if configured)
173  if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.retrieve().isSuccess()) {
174  ATH_MSG_FATAL("Cannot get " << m_eventStreamingTool.typeAndName() << "");
175  return(StatusCode::FAILURE);
176  } else if (m_makeStreamingToolClient.value() == -1) {
177  std::string dummyStr;
178  if (!m_eventStreamingTool->makeClient(m_makeStreamingToolClient.value(), dummyStr).isSuccess()) {
179  ATH_MSG_ERROR("Could not make AthenaPoolCnvSvc a Share Client");
180  return(StatusCode::FAILURE);
181  }
182  }
183 
184  // Ensure the xAODCnvSvc is listed in the EventPersistencySvc
185  ServiceHandle<IProperty> epSvc("EventPersistencySvc", name());
186  std::vector<std::string> propVal;
187  if (!Gaudi::Parsers::parse(propVal , epSvc->getProperty("CnvServices").toString()).isSuccess()) {
188  ATH_MSG_FATAL("Cannot get EventPersistencySvc Property for CnvServices");
189  return(StatusCode::FAILURE);
190  }
191  bool foundCnvSvc = false;
192  for (const auto& property : propVal) {
193  if (property == m_athenaPoolCnvSvc.type()) { foundCnvSvc = true; }
194  }
195  if (!foundCnvSvc) {
196  propVal.push_back(m_athenaPoolCnvSvc.type());
197  if (!epSvc->setProperty("CnvServices", Gaudi::Utils::toString(propVal)).isSuccess()) {
198  ATH_MSG_FATAL("Cannot set EventPersistencySvc Property for CnvServices");
199  return(StatusCode::FAILURE);
200  }
201  }
202 
203  // Register this service for 'I/O' events
204  ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
205  if (!iomgr.retrieve().isSuccess()) {
206  ATH_MSG_FATAL("Could not retrieve IoComponentMgr !");
207  return(StatusCode::FAILURE);
208  }
209  if (!iomgr->io_register(this).isSuccess()) {
210  ATH_MSG_FATAL("Could not register myself with the IoComponentMgr !");
211  return(StatusCode::FAILURE);
212  }
213  // Register input file's names with the I/O manager
214  const std::vector<std::string>& incol = m_inputCollectionsProp.value();
215  bool allGood = true;
216  std::string fileName, fileType;
217  for (std::size_t icol = 0, imax = incol.size(); icol < imax; icol++) {
218  if (incol[icol].substr(0, 4) == "LFN:" || incol[icol].substr(0, 4) == "FID:") {
219  m_athenaPoolCnvSvc->getPoolSvc()->lookupBestPfn(incol[icol], fileName, fileType);
220  } else {
221  fileName = incol[icol];
222  }
223  if (fileName.substr(0, 4) == "PFN:") {
224  fileName = fileName.substr(4);
225  }
226  if (!iomgr->io_register(this, IIoComponentMgr::IoMode::READ, incol[icol], fileName).isSuccess()) {
227  ATH_MSG_FATAL("could not register [" << incol[icol] << "] for output !");
228  allGood = false;
229  } else {
230  ATH_MSG_VERBOSE("io_register[" << this->name() << "](" << incol[icol] << ") [ok]");
231  }
232  }
233  if (!allGood) {
234  return(StatusCode::FAILURE);
235  }
236 
237  // Connect to PersistencySvc
238  if (!m_athenaPoolCnvSvc->getPoolSvc()->connect(pool::ITransaction::READ, IPoolSvc::kInputStream).isSuccess()) {
239  ATH_MSG_FATAL("Cannot connect to POOL PersistencySvc.");
240  return(StatusCode::FAILURE);
241  }
242  // Jump to reinit() to execute common init/reinit actions
243  m_guid = Guid::null();
244  return reinit();
245 }
246 //________________________________________________________________________________
248  ATH_MSG_DEBUG("reinitialization...");
249 
250  // reset markers
251  m_numEvt.resize(m_inputCollectionsProp.value().size(), -1);
252  for( auto& el : m_numEvt ) el = -1;
253  m_firstEvt.resize(m_inputCollectionsProp.value().size(), -1);
254  for( auto& el : m_firstEvt ) el = -1;
255 
256  // Initialize InputCollectionsIterator
257  m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
258  m_curCollection = 0;
259  if (!m_firstEvt.empty()) {
260  m_firstEvt[0] = 0;
261  }
262  m_inputCollectionsChanged = false;
263  m_evtCount = 0;
264  m_headerIterator = 0;
265  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
266  ATH_MSG_INFO("Done reinitialization for shared reader client");
267  return(StatusCode::SUCCESS);
268  }
269  bool retError = false;
270  for (auto& tool : m_helperTools) {
271  if (!tool->postInitialize().isSuccess()) {
272  ATH_MSG_FATAL("Failed to postInitialize() " << tool->name());
273  retError = true;
274  }
275  }
276  if (retError) {
277  ATH_MSG_FATAL("Failed to postInitialize() helperTools");
278  return(StatusCode::FAILURE);
279  }
280 
281  ATH_MSG_INFO("EventSelection with query " << m_query.value());
282  // Create an m_poolCollectionConverter to read the objects in
283  m_poolCollectionConverter = getCollectionCnv();
284  if (m_poolCollectionConverter == nullptr) {
285  ATH_MSG_INFO("No Events found in any Input Collections");
286  if (m_processMetadata.value()) {
287  m_inputCollectionsIterator = m_inputCollectionsProp.value().end();
288  if (!m_inputCollectionsProp.value().empty()) --m_inputCollectionsIterator;
289  //NOTE (wb may 2016): this will make the FirstInputFile incident correspond to last file in the collection ... if want it to be first file then move iterator to begin and then move above two lines below this incident firing
290  if (m_collectionType.value() == "ImplicitROOT" && !m_firedIncident && !m_inputCollectionsProp.value().empty()) {
291  FileIncident firstInputFileIncident(name(), "FirstInputFile", *m_inputCollectionsIterator);
292  m_incidentSvc->fireIncident(firstInputFileIncident);
293  m_firedIncident = true;
294  }
295  }
296  return(StatusCode::SUCCESS);
297  }
298  // Check for valid header name
299  if (!m_refName.value().empty()) {
300  if (m_collectionType.value() == "ExplicitROOT") {
301  ATH_MSG_INFO("Using collection ref name: " << m_refName.value());
302  } else {
303  ATH_MSG_INFO("Using implicit collection, ignore ref name: " << m_refName.value());
304  }
305  } else if (m_collectionType.value() == "ExplicitROOT") {
306  ATH_MSG_INFO("Using standard collection ref ");
307  }
308  // Get DataHeader iterator
309  try {
310  m_headerIterator = &m_poolCollectionConverter->executeQuery();
311  } catch (std::exception &e) {
312  ATH_MSG_FATAL("Cannot open implicit collection - check data/software version.");
313  ATH_MSG_ERROR(e.what());
314  return(StatusCode::FAILURE);
315  }
316  while (m_headerIterator == nullptr || m_headerIterator->next() == 0) { // no selected events
317  if (m_poolCollectionConverter != nullptr) {
318  m_poolCollectionConverter->disconnectDb().ignore();
319  delete m_poolCollectionConverter; m_poolCollectionConverter = nullptr;
320  }
321  ++m_inputCollectionsIterator;
322  m_poolCollectionConverter = getCollectionCnv();
323  if (m_poolCollectionConverter != nullptr) {
324  m_headerIterator = &m_poolCollectionConverter->executeQuery();
325  } else {
326  break;
327  }
328  }
329  if (m_poolCollectionConverter == nullptr || m_headerIterator == nullptr) { // no event selected in any collection
330  m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
331  m_curCollection = 0;
332  m_poolCollectionConverter = getCollectionCnv();
333  if (m_poolCollectionConverter == nullptr) {
334  return(StatusCode::SUCCESS);
335  }
336  m_headerIterator = &m_poolCollectionConverter->selectAll();
337  while (m_headerIterator == nullptr || m_headerIterator->next() == 0) { // empty collection
338  if (m_poolCollectionConverter != nullptr) {
339  m_poolCollectionConverter->disconnectDb().ignore();
340  delete m_poolCollectionConverter; m_poolCollectionConverter = nullptr;
341  }
342  ++m_inputCollectionsIterator;
343  m_poolCollectionConverter = getCollectionCnv();
344  if (m_poolCollectionConverter != nullptr) {
345  m_headerIterator = &m_poolCollectionConverter->selectAll();
346  } else {
347  break;
348  }
349  }
350  }
351  if (m_poolCollectionConverter == nullptr || m_headerIterator == nullptr) {
352  return(StatusCode::SUCCESS);
353  }
354  const Token& headRef = m_refName.value().empty()?
355  m_headerIterator->eventRef()
356  : m_headerIterator->currentRow().tokenList()[m_refName.value() + "_ref"];
357  const std::string fid = headRef.dbID().toString();
358  const int tech = headRef.technology();
359  ATH_MSG_VERBOSE("reinit(): First DataHeder Token=" << headRef.toString() );
360 
361  // Check if File is BS, for which Incident is thrown by SingleEventInputSvc
362  if (tech != 0x00001000 && m_processMetadata.value() && !m_firedIncident) {
363  FileIncident firstInputFileIncident(name(), "FirstInputFile", "FID:" + fid, fid);
364  m_incidentSvc->fireIncident(firstInputFileIncident);
365  m_firedIncident = true;
366  }
367  return(StatusCode::SUCCESS);
368 }
369 //________________________________________________________________________________
371  if (m_poolCollectionConverter != nullptr) {
372  // Reset iterators and apply new query
373  m_poolCollectionConverter->disconnectDb().ignore();
374  delete m_poolCollectionConverter; m_poolCollectionConverter = nullptr;
375  }
376  m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
377  m_curCollection = 0;
378  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
379  return(StatusCode::SUCCESS);
380  }
381  m_poolCollectionConverter = getCollectionCnv(true);
382  if (m_poolCollectionConverter == nullptr) {
383  ATH_MSG_INFO("No Events found in any Input Collections");
384  m_inputCollectionsIterator = m_inputCollectionsProp.value().end();
385  if (!m_inputCollectionsProp.value().empty()) {
386  --m_inputCollectionsIterator; //leave iterator in state of last input file
387  }
388  } else {
389  m_headerIterator = &m_poolCollectionConverter->executeQuery(/*m_query.value()*/);
390  }
391  m_evtCount = 0;
392  delete m_endIter;
393  m_endIter = nullptr;
394  m_endIter = new EventContextAthenaPool(nullptr);
395  return(StatusCode::SUCCESS);
396 }
397 //________________________________________________________________________________
399  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
400  return(StatusCode::SUCCESS);
401  }
402  IEvtSelector::Context* ctxt(nullptr);
403  if (!releaseContext(ctxt).isSuccess()) {
404  ATH_MSG_WARNING("Cannot release context");
405  }
406  return(StatusCode::SUCCESS);
407 }
408 
409 //________________________________________________________________________________
411  if (m_processMetadata.value()) {
412  if (m_evtCount >= 0) {
413  // Assume that the end of collection file indicates the end of payload file.
414  if (m_guid != Guid::null()) {
415  // Fire EndInputFile incident
416  FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + m_guid.toString(), m_guid.toString());
417  m_incidentSvc->fireIncident(endInputFileIncident);
418  }
419  }
420  if (isLastFile && m_firedIncident) {
421  m_firedIncident = false;
422  }
423  }
424 }
425 
426 //________________________________________________________________________________
428  if (m_eventStreamingTool.empty() || !m_eventStreamingTool->isClient()) {
429  if (!m_counterTool.empty() && !m_counterTool->preFinalize().isSuccess()) {
430  ATH_MSG_WARNING("Failed to preFinalize() CounterTool");
431  }
432  for (auto& tool : m_helperTools) {
433  if (!tool->preFinalize().isSuccess()) {
434  ATH_MSG_WARNING("Failed to preFinalize() " << tool->name());
435  }
436  }
437  }
438  delete m_endIter; m_endIter = nullptr;
439  m_headerIterator = nullptr;
440  if (m_poolCollectionConverter != nullptr) {
441  delete m_poolCollectionConverter; m_poolCollectionConverter = nullptr;
442  }
443  // Release AthenaSharedMemoryTool
444  if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.release().isSuccess()) {
445  ATH_MSG_WARNING("Cannot release AthenaSharedMemoryTool");
446  }
447  // Release CounterTool
448  if (!m_counterTool.empty() && !m_counterTool.release().isSuccess()) {
449  ATH_MSG_WARNING("Cannot release CounterTool.");
450  }
451  // Release HelperTools
452  if (!m_helperTools.release().isSuccess()) {
453  ATH_MSG_WARNING("Cannot release " << m_helperTools);
454  }
455  // Release AthenaPoolCnvSvc
456  if (!m_athenaPoolCnvSvc.release().isSuccess()) {
457  ATH_MSG_WARNING("Cannot release " << m_athenaPoolCnvSvc.typeAndName() << ".");
458  }
459  // Release IncidentSvc
460  if (!m_incidentSvc.release().isSuccess()) {
461  ATH_MSG_WARNING("Cannot release " << m_incidentSvc.typeAndName() << ".");
462  }
463  // Finalize the Service base class.
464  return(::AthService::finalize());
465 }
466 
467 //________________________________________________________________________________
468 StatusCode EventSelectorAthenaPool::queryInterface(const InterfaceID& riid, void** ppvInterface) {
469  if (riid == IEvtSelector::interfaceID()) {
470  *ppvInterface = dynamic_cast<IEvtSelector*>(this);
471  } else if (riid == IIoComponent::interfaceID()) {
472  *ppvInterface = dynamic_cast<IIoComponent*>(this);
473  } else if (riid == IProperty::interfaceID()) {
474  *ppvInterface = dynamic_cast<IProperty*>(this);
475  } else if (riid == IEvtSelectorSeek::interfaceID()) {
476  *ppvInterface = dynamic_cast<IEvtSelectorSeek*>(this);
477  } else if (riid == IEventShare::interfaceID()) {
478  *ppvInterface = dynamic_cast<IEventShare*>(this);
479  } else if (riid == ISecondaryEventSelector::interfaceID()) {
480  *ppvInterface = dynamic_cast<ISecondaryEventSelector*>(this);
481  } else {
482  return(::AthService::queryInterface(riid, ppvInterface));
483  }
484  addRef();
485  return(StatusCode::SUCCESS);
486 }
487 //________________________________________________________________________________
488 StatusCode EventSelectorAthenaPool::createContext(IEvtSelector::Context*& ctxt) const {
489  ctxt = new EventContextAthenaPool(this);
490  return(StatusCode::SUCCESS);
491 }
492 //________________________________________________________________________________
493 StatusCode EventSelectorAthenaPool::next(IEvtSelector::Context& ctxt) const {
494  std::lock_guard<CallMutex> lockGuard(m_callLock);
495  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
496  if (m_makeStreamingToolClient.value() == -1) {
498  while (sc.isRecoverable()) {
499  usleep(1000);
500  sc = m_eventStreamingTool->lockEvent(m_evtCount);
501  }
502  }
503  // Increase event count
504  ++m_evtCount;
505  void* tokenStr = nullptr;
506  unsigned int status = 0;
507  StatusCode sc = m_eventStreamingTool->getLockedEvent(&tokenStr, status);
508  if (sc.isRecoverable()) {
509  delete [] (char*)tokenStr; tokenStr = nullptr;
510  // Return end iterator
511  ctxt = *m_endIter;
512  // This is not a real failure but a Gaudi way of handling "end of job"
513  return(StatusCode::FAILURE);
514  }
515  if (sc.isFailure()) {
516  ATH_MSG_FATAL("Cannot get NextEvent from AthenaSharedMemoryTool");
517  delete [] (char*)tokenStr; tokenStr = nullptr;
518  return(StatusCode::FAILURE);
519  }
520  if (!eventStore()->clearStore().isSuccess()) {
521  ATH_MSG_WARNING("Cannot clear Store");
522  }
523  std::unique_ptr<AthenaAttributeList> athAttrList(new AthenaAttributeList());
524  athAttrList->extend("eventRef", "string");
525  (*athAttrList)["eventRef"].data<std::string>() = std::string((char*)tokenStr);
527  if (!wh.record(std::move(athAttrList)).isSuccess()) {
528  delete [] (char*)tokenStr; tokenStr = nullptr;
529  ATH_MSG_ERROR("Cannot record AttributeList to StoreGate " << StoreID::storeName(eventStore()->storeID()));
530  return(StatusCode::FAILURE);
531  }
532  Token token;
533  token.fromString(std::string((char*)tokenStr));
534  delete [] (char*)tokenStr; tokenStr = nullptr;
535  Guid guid = token.dbID();
536  if (guid != m_guid && m_processMetadata.value()) {
537  if (m_evtCount >= 0 && m_guid != Guid::null()) {
538  // Fire EndInputFile incident
539  FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + m_guid.toString(), m_guid.toString());
540  m_incidentSvc->fireIncident(endInputFileIncident);
541  }
542  m_guid = guid;
543  FileIncident beginInputFileIncident(name(), "BeginInputFile", "FID:" + m_guid.toString(), m_guid.toString());
544  m_incidentSvc->fireIncident(beginInputFileIncident);
545  }
546  return(StatusCode::SUCCESS);
547  }
548  for (const auto& tool : m_helperTools) {
549  if (!tool->preNext().isSuccess()) {
550  ATH_MSG_WARNING("Failed to preNext() " << tool->name());
551  }
552  }
553  for (;;) {
554  // Handle possible file transition
556  if (sc.isRecoverable()) {
557  continue; // handles empty files
558  }
559  if (sc.isFailure()) {
560  return StatusCode::FAILURE;
561  }
562  // Increase event count
563  ++m_evtCount;
564  if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
565  ATH_MSG_WARNING("Failed to preNext() CounterTool.");
566  }
568  && (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
569  {
570  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) {
571  std::string token = m_headerIterator->eventRef().toString();
572  StatusCode sc;
573  while ( (sc = putEvent_ST(*m_eventStreamingTool,
574  m_evtCount - 1, token.c_str(),
575  token.length() + 1, 0)).isRecoverable() ) {
576  while (m_athenaPoolCnvSvc->readData().isSuccess()) {
577  ATH_MSG_VERBOSE("Called last readData, while putting next event in next()");
578  }
579  // Nothing to do right now, trigger alternative (e.g. caching) here? Currently just fast loop.
580  }
581  if (!sc.isSuccess()) {
582  ATH_MSG_ERROR("Cannot put Event " << m_evtCount - 1 << " to AthenaSharedMemoryTool");
583  return(StatusCode::FAILURE);
584  }
585  } else {
586  if (!m_isSecondary.value()) {
587  if (!eventStore()->clearStore().isSuccess()) {
588  ATH_MSG_WARNING("Cannot clear Store");
589  }
590  if (!recordAttributeList().isSuccess()) {
591  ATH_MSG_ERROR("Failed to record AttributeList.");
592  return(StatusCode::FAILURE);
593  }
594  }
595  }
596  StatusCode status = StatusCode::SUCCESS;
597  for (const auto& tool : m_helperTools) {
598  StatusCode toolStatus = tool->postNext();
599  if (toolStatus.isRecoverable()) {
600  ATH_MSG_INFO("Request skipping event from: " << tool->name());
601  if (status.isSuccess()) {
602  status = StatusCode::RECOVERABLE;
603  }
604  } else if (toolStatus.isFailure()) {
605  ATH_MSG_WARNING("Failed to postNext() " << tool->name());
606  status = StatusCode::FAILURE;
607  }
608  }
609  if (status.isRecoverable()) {
610  ATH_MSG_INFO("skipping event " << m_evtCount);
611  } else if (status.isFailure()) {
612  ATH_MSG_WARNING("Failed to postNext() HelperTool.");
613  } else {
614  if (!m_counterTool.empty() && !m_counterTool->postNext().isSuccess()) {
615  ATH_MSG_WARNING("Failed to postNext() CounterTool.");
616  }
617  break;
618  }
619  } else {
620  while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
621  m_skipEventRanges.erase(m_skipEventRanges.begin());
622  }
623  ATH_MSG_INFO("skipping event " << m_evtCount);
624  }
625  }
626  return(StatusCode::SUCCESS);
627 }
628 //________________________________________________________________________________
629 StatusCode EventSelectorAthenaPool::next(IEvtSelector::Context& ctxt, int jump) const {
630  if (jump > 0) {
631  for (int i = 0; i < jump; i++) {
632  ATH_CHECK(next(ctxt));
633  }
634  return(StatusCode::SUCCESS);
635  }
636  return(StatusCode::FAILURE);
637 }
638 //________________________________________________________________________________
640 {
641  if( m_inputCollectionsChanged ) {
642  StatusCode rc = reinit();
643  if( rc != StatusCode::SUCCESS ) return rc;
644  }
645  else { // advance to the next (not needed after reinit)
646  // Check if we're at the end of file
647  if (m_headerIterator == nullptr || m_headerIterator->next() == 0) {
648  m_headerIterator = nullptr;
649  // Close previous collection.
650  delete m_poolCollectionConverter; m_poolCollectionConverter = nullptr;
651 
652  // zero the current DB ID (m_guid) before disconnect() to indicate it is no longer in use
653  const SG::SourceID old_guid = m_guid.toString();
654  m_guid = Guid::null();
655  disconnectIfFinished( old_guid );
656 
657  // check if somebody updated Inputs in the EOF incident (like VP1 does)
658  if( m_inputCollectionsChanged ) {
659  StatusCode rc = reinit();
660  if( rc != StatusCode::SUCCESS ) return rc;
661  } else {
662  // Open next file from inputCollections list.
663  ++m_inputCollectionsIterator;
664  // Create PoolCollectionConverter for input file
665  m_poolCollectionConverter = getCollectionCnv(true);
666  if (m_poolCollectionConverter == nullptr) {
667  // Return end iterator
668  ctxt = *m_endIter;
669  // This is not a real failure but a Gaudi way of handling "end of job"
670  return StatusCode::FAILURE;
671  }
672  // Get DataHeader iterator
673  m_headerIterator = &m_poolCollectionConverter->executeQuery();
674 
675  // Return RECOVERABLE to mark we should still continue
676  return StatusCode::RECOVERABLE;
677  }
678  }
679  }
680 
681  const Token& headRef = m_refName.value().empty()?
682  m_headerIterator->eventRef()
683  : m_headerIterator->currentRow().tokenList()[m_refName.value() + "_ref"];
684  const Guid guid = headRef.dbID();
685  const int tech = headRef.technology();
686  ATH_MSG_VERBOSE("next(): DataHeder Token=" << headRef.toString() );
687 
688  if (guid != m_guid) {
689  // we are starting reading from a new DB. Check if the old one needs to be retired
690  if (m_guid != Guid::null()) {
691  // zero the current DB ID (m_guid) before trying disconnect() to indicate it is no longer in use
692  const SG::SourceID old_guid = m_guid.toString();
693  m_guid = Guid::null();
694  disconnectIfFinished( old_guid );
695  }
696  m_guid = guid;
697  m_activeEventsPerSource[guid.toString()] = 0;
698  // Fire BeginInputFile incident if current InputCollection is a payload file;
699  // otherwise, ascertain whether the pointed-to file is reachable before firing any incidents and/or proceeding
700  if (m_collectionType.value() == "ImplicitROOT") {
701  // For now, we can only deal with input metadata from POOL files, but we know we have a POOL file here
702  if (!m_athenaPoolCnvSvc->setInputAttributes(*m_inputCollectionsIterator).isSuccess()) {
703  ATH_MSG_ERROR("Failed to set input attributes.");
704  return(StatusCode::FAILURE);
705  }
706  if (m_processMetadata.value()) {
707  FileIncident beginInputFileIncident(name(), "BeginInputFile", *m_inputCollectionsIterator, m_guid.toString());
708  m_incidentSvc->fireIncident(beginInputFileIncident);
709  }
710  } else {
711  // Check if File is BS
712  if (tech != 0x00001000 && m_processMetadata.value()) {
713  FileIncident beginInputFileIncident(name(), "BeginInputFile", "FID:" + m_guid.toString(), m_guid.toString());
714  m_incidentSvc->fireIncident(beginInputFileIncident);
715  }
716  }
717  } // end if (guid != m_guid)
718  return StatusCode::SUCCESS;
719 }
720 //________________________________________________________________________________
721 StatusCode EventSelectorAthenaPool::nextWithSkip(IEvtSelector::Context& ctxt) const {
722  ATH_MSG_DEBUG("EventSelectorAthenaPool::nextWithSkip");
723 
724  for (;;) {
725  // Check if we're at the end of file
727  if (sc.isRecoverable()) {
728  continue; // handles empty files
729  }
730  if (sc.isFailure()) {
731  return StatusCode::FAILURE;
732  }
733 
734  // Increase event count
735  ++m_evtCount;
736 
737  if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
738  ATH_MSG_WARNING("Failed to preNext() CounterTool.");
739  }
741  && (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
742  {
743  return StatusCode::SUCCESS;
744  } else {
745  while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
746  m_skipEventRanges.erase(m_skipEventRanges.begin());
747  }
748  if (m_isSecondary.value()) {
749  ATH_MSG_INFO("skipping secondary event " << m_evtCount);
750  } else {
751  ATH_MSG_INFO("skipping event " << m_evtCount);
752  }
753  }
754  }
755 
756  return StatusCode::SUCCESS;
757 }
758 //________________________________________________________________________________
759 StatusCode EventSelectorAthenaPool::previous(IEvtSelector::Context& /*ctxt*/) const {
760  ATH_MSG_ERROR("previous() not implemented");
761  return(StatusCode::FAILURE);
762 }
763 //________________________________________________________________________________
764 StatusCode EventSelectorAthenaPool::previous(IEvtSelector::Context& ctxt, int jump) const {
765  if (jump > 0) {
766  for (int i = 0; i < jump; i++) {
767  ATH_CHECK(previous(ctxt));
768  }
769  return(StatusCode::SUCCESS);
770  }
771  return(StatusCode::FAILURE);
772 }
773 //________________________________________________________________________________
774 StatusCode EventSelectorAthenaPool::last(IEvtSelector::Context& ctxt) const {
775  if (ctxt.identifier() == m_endIter->identifier()) {
776  ATH_MSG_DEBUG("last(): Last event in InputStream.");
777  return(StatusCode::SUCCESS);
778  }
779  return(StatusCode::FAILURE);
780 }
781 //________________________________________________________________________________
782 StatusCode EventSelectorAthenaPool::rewind(IEvtSelector::Context& ctxt) const {
783  ATH_CHECK(reinit());
784  ctxt = EventContextAthenaPool(this);
785  return(StatusCode::SUCCESS);
786 }
787 //________________________________________________________________________________
788 StatusCode EventSelectorAthenaPool::createAddress(const IEvtSelector::Context& /*ctxt*/,
789  IOpaqueAddress*& iop) const {
790  std::string tokenStr;
792  if (attrList.isValid()) {
793  try {
794  if (m_refName.value().empty()) {
795  tokenStr = (*attrList)["eventRef"].data<std::string>();
796  ATH_MSG_DEBUG("found AthenaAttribute, name = eventRef = " << tokenStr);
797  } else {
798  tokenStr = (*attrList)[m_refName.value() + "_ref"].data<std::string>();
799  ATH_MSG_DEBUG("found AthenaAttribute, name = " << m_refName.value() << "_ref = " << tokenStr);
800  }
801  } catch (std::exception &e) {
802  ATH_MSG_ERROR(e.what());
803  return(StatusCode::FAILURE);
804  }
805  } else {
806  ATH_MSG_WARNING("Cannot find AthenaAttribute, key = " << m_attrListKey.value());
807  tokenStr = m_poolCollectionConverter->retrieveToken(m_headerIterator, m_refName.value());
808  }
809  Token* token = new Token;
810  token->fromString(tokenStr);
811  iop = new TokenAddress(POOL_StorageType, ClassID_traits<DataHeader>::ID(), "", "EventSelector", IPoolSvc::kInputStream, token);
812  return(StatusCode::SUCCESS);
813 }
814 //________________________________________________________________________________
815 StatusCode EventSelectorAthenaPool::releaseContext(IEvtSelector::Context*& /*ctxt*/) const {
816  return(StatusCode::SUCCESS);
817 }
818 //________________________________________________________________________________
819 StatusCode EventSelectorAthenaPool::resetCriteria(const std::string& /*criteria*/,
820  IEvtSelector::Context& /*ctxt*/) const {
821  return(StatusCode::SUCCESS);
822 }
823 //__________________________________________________________________________
824 StatusCode EventSelectorAthenaPool::seek(Context& /*ctxt*/, int evtNum) const {
825 
826  if( m_inputCollectionsChanged ) {
827  StatusCode rc = reinit();
828  if( rc != StatusCode::SUCCESS ) return rc;
829  }
830 
831  long newColl = findEvent(evtNum);
832  if (newColl == -1 && evtNum >= m_firstEvt[m_curCollection] && evtNum < m_evtCount - 1) {
833  newColl = m_curCollection;
834  }
835  if (newColl == -1) {
836  m_headerIterator = nullptr;
837  ATH_MSG_INFO("seek: Reached end of Input.");
838  fireEndFileIncidents(true);
839  return(StatusCode::RECOVERABLE);
840  }
841  if (newColl != m_curCollection) {
842  if (!m_keepInputFilesOpen.value() && m_poolCollectionConverter != nullptr) {
843  m_poolCollectionConverter->disconnectDb().ignore();
844  }
845  delete m_poolCollectionConverter; m_poolCollectionConverter = nullptr;
846  m_curCollection = newColl;
847  try {
848  ATH_MSG_DEBUG("Seek to item: \""
850  << "\" from the collection list.");
851  // Reset input collection iterator to the right place
852  m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
853  m_inputCollectionsIterator += m_curCollection;
854  m_poolCollectionConverter = new PoolCollectionConverter(m_collectionType.value() + ":" + m_collectionTree.value(),
857  m_query.value(),
858  m_athenaPoolCnvSvc->getPoolSvc());
859  if (!m_poolCollectionConverter->initialize().isSuccess()) {
860  m_headerIterator = nullptr;
861  ATH_MSG_ERROR("seek: Unable to initialize PoolCollectionConverter.");
862  return(StatusCode::FAILURE);
863  }
864  // Create DataHeader iterators
865  m_headerIterator = &m_poolCollectionConverter->executeQuery();
866  EventContextAthenaPool* beginIter = new EventContextAthenaPool(this);
867  m_evtCount = m_firstEvt[m_curCollection];
868  next(*beginIter).ignore();
869  ATH_MSG_DEBUG("Token " << m_headerIterator->eventRef().toString());
870  } catch (std::exception &e) {
871  m_headerIterator = nullptr;
872  ATH_MSG_ERROR(e.what());
873  return(StatusCode::FAILURE);
874  }
875  }
876 
877  pool::IPositionSeek* is = dynamic_cast<pool::IPositionSeek*>(m_headerIterator);
878  if (is == nullptr) {
879  ATH_MSG_ERROR("Container does not allow seeking.");
880  return(StatusCode::FAILURE);
881  }
882  if (is->seek(evtNum - m_firstEvt[m_curCollection]) == 0) {
883  m_headerIterator = nullptr;
884  ATH_MSG_ERROR("Did not find event, evtNum = " << evtNum);
885  return(StatusCode::FAILURE);
886  } else {
887  m_evtCount = evtNum + 1;
888  }
889  return(StatusCode::SUCCESS);
890 }
891 //__________________________________________________________________________
892 int EventSelectorAthenaPool::curEvent (const Context& /*ctxt*/) const {
893  return(m_evtCount);
894 }
895 //__________________________________________________________________________
896 // Search for event number evtNum.
897 // Return the index of the collection containing it, or -1 if not found.
898 // Note: passing -1 for evtNum will always yield failure,
899 // but this can be used to force filling in the entire m_numEvt array.
900 int EventSelectorAthenaPool::findEvent(int evtNum) const {
901  for (std::size_t i = 0, imax = m_numEvt.size(); i < imax; i++) {
902  if (m_numEvt[i] == -1) {
903  PoolCollectionConverter pcc(m_collectionType.value() + ":" + m_collectionTree.value(),
904  m_inputCollectionsProp.value()[i],
906  m_query.value(),
907  m_athenaPoolCnvSvc->getPoolSvc());
908  if (!pcc.initialize().isSuccess()) {
909  break;
910  }
911  int collection_size = 0;
912  if (pcc.isValid()) {
914  ICollectionSize* cs = dynamic_cast<ICollectionSize*>(hi);
915  if (cs == nullptr) {
916  break;
917  }
918  collection_size = cs->size();
919  }
920  if (i > 0) {
921  m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
922  } else {
923  m_firstEvt[i] = 0;
924  }
925  m_numEvt[i] = collection_size;
926  }
927  if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
928  return(i);
929  }
930  }
931  return(-1);
932 }
933 
934 //________________________________________________________________________________
936  if (num < 0) {
937  if (m_athenaPoolCnvSvc->makeServer(num - 1).isFailure()) {
938  ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to output DataStreaming server");
939  }
940  return(StatusCode::SUCCESS);
941  }
942  if (m_athenaPoolCnvSvc->makeServer(num + 1).isFailure()) {
943  ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to input DataStreaming server");
944  return(StatusCode::FAILURE);
945  }
946  if (m_eventStreamingTool.empty()) {
947  return(StatusCode::SUCCESS);
948  }
949  m_processMetadata = false;
950  ATH_MSG_DEBUG("makeServer: " << m_eventStreamingTool << " = " << num);
951  return(m_eventStreamingTool->makeServer(1, ""));
952 }
953 
954 //________________________________________________________________________________
956  if (m_athenaPoolCnvSvc->makeClient(num + 1).isFailure()) {
957  ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to DataStreaming client");
958  return(StatusCode::FAILURE);
959  }
960  if (m_eventStreamingTool.empty()) {
961  return(StatusCode::SUCCESS);
962  }
963  ATH_MSG_DEBUG("makeClient: " << m_eventStreamingTool << " = " << num);
964  std::string dummyStr;
965  return(m_eventStreamingTool->makeClient(0, dummyStr));
966 }
967 
968 //________________________________________________________________________________
970  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
971  StatusCode sc = m_eventStreamingTool->lockEvent(evtnum);
972  while (sc.isRecoverable()) {
973  usleep(1000);
974  sc = m_eventStreamingTool->lockEvent(evtnum);
975  }
976 // Send stop client and wait for restart
977  if (sc.isFailure()) {
978  if (m_athenaPoolCnvSvc->makeClient(0).isFailure()) {
979  return(StatusCode::FAILURE);
980  }
981  sc = m_eventStreamingTool->lockEvent(evtnum);
982  while (sc.isRecoverable() || sc.isFailure()) {
983  usleep(1000);
984  sc = m_eventStreamingTool->lockEvent(evtnum);
985  }
986 //FIXME
987  if (m_athenaPoolCnvSvc->makeClient(1).isFailure()) {
988  return(StatusCode::FAILURE);
989  }
990  }
991  return(sc);
992  }
993  return(StatusCode::FAILURE);
994 }
995 
996 //________________________________________________________________________________
998  if (m_eventStreamingTool.empty()) {
999  ATH_MSG_ERROR("No AthenaSharedMemoryTool configured for readEvent()");
1000  return(StatusCode::FAILURE);
1001  }
1002  ATH_MSG_VERBOSE("Called read Event " << maxevt);
1003  IEvtSelector::Context* ctxt = new EventContextAthenaPool(this);
1004  for (int i = 0; i < maxevt || maxevt == -1; ++i) {
1005  if (!next(*ctxt).isSuccess()) {
1006  if (m_evtCount == -1) {
1007  ATH_MSG_VERBOSE("Called read Event and read last event from input: " << i);
1008  break;
1009  }
1010  ATH_MSG_ERROR("Cannot read Event " << m_evtCount - 1 << " into AthenaSharedMemoryTool");
1011  delete ctxt; ctxt = nullptr;
1012  return(StatusCode::FAILURE);
1013  } else {
1014  ATH_MSG_VERBOSE("Called next, read Event " << m_evtCount - 1);
1015  }
1016  }
1017  delete ctxt; ctxt = nullptr;
1018  // End of file, wait for last event to be taken
1019  StatusCode sc;
1020  while ( (sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
1021  while (m_athenaPoolCnvSvc->readData().isSuccess()) {
1022  ATH_MSG_VERBOSE("Called last readData, while marking last event in readEvent()");
1023  }
1024  usleep(1000);
1025  }
1026  if (!sc.isSuccess()) {
1027  ATH_MSG_ERROR("Cannot put last Event marker to AthenaSharedMemoryTool");
1028  return(StatusCode::FAILURE);
1029  } else {
1030  sc = m_athenaPoolCnvSvc->readData();
1031  while (sc.isSuccess() || sc.isRecoverable()) {
1032  sc = m_athenaPoolCnvSvc->readData();
1033  }
1034  ATH_MSG_DEBUG("Failed last readData -> Clients are stopped, after marking last event in readEvent()");
1035  }
1036  return(StatusCode::SUCCESS);
1037 }
1038 
1039 //__________________________________________________________________________
1040 int EventSelectorAthenaPool::size(Context& /*ctxt*/) const {
1041  // Fetch sizes of all collections.
1042  findEvent(-1);
1043  int sz = 0;
1044  for (std::size_t i = 0, imax = m_numEvt.size(); i < imax; i++) {
1045  sz += m_numEvt[i];
1046  }
1047  return(sz);
1048 }
1049 //__________________________________________________________________________
1051  while (m_inputCollectionsIterator != m_inputCollectionsProp.value().end()) {
1052  if (m_curCollection != 0) {
1053  m_numEvt[m_curCollection] = m_evtCount - m_firstEvt[m_curCollection];
1054  m_curCollection++;
1055  m_firstEvt[m_curCollection] = m_evtCount;
1056  }
1057  ATH_MSG_DEBUG("Try item: \"" << *m_inputCollectionsIterator << "\" from the collection list.");
1058  PoolCollectionConverter* pCollCnv = new PoolCollectionConverter(m_collectionType.value() + ":" + m_collectionTree.value(),
1059  *m_inputCollectionsIterator,
1061  m_query.value(),
1062  m_athenaPoolCnvSvc->getPoolSvc());
1063  StatusCode status = pCollCnv->initialize();
1064  if (!status.isSuccess()) {
1065  // Close previous collection.
1066  delete pCollCnv; pCollCnv = nullptr;
1067  if (!status.isRecoverable()) {
1068  ATH_MSG_ERROR("Unable to initialize PoolCollectionConverter.");
1069  throw GaudiException("Unable to read: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
1070  } else {
1071  ATH_MSG_ERROR("Unable to open: " << *m_inputCollectionsIterator);
1072  throw GaudiException("Unable to open: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
1073  }
1074  } else {
1075  if (!pCollCnv->isValid().isSuccess()) {
1076  delete pCollCnv; pCollCnv = nullptr;
1077  ATH_MSG_DEBUG("No events found in: " << *m_inputCollectionsIterator << " skipped!!!");
1078  if (throwIncidents && m_processMetadata.value()) {
1079  FileIncident beginInputFileIncident(name(), "BeginInputFile", *m_inputCollectionsIterator);
1080  m_incidentSvc->fireIncident(beginInputFileIncident);
1081  FileIncident endInputFileIncident(name(), "EndInputFile", "eventless " + *m_inputCollectionsIterator);
1082  m_incidentSvc->fireIncident(endInputFileIncident);
1083  }
1084  m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb(*m_inputCollectionsIterator).ignore();
1085  ++m_inputCollectionsIterator;
1086  } else {
1087  return(pCollCnv);
1088  }
1089  }
1090  }
1091  return(nullptr);
1092 }
1093 //__________________________________________________________________________
1095  // Get access to AttributeList
1096  ATH_MSG_DEBUG("Get AttributeList from the collection");
1097  // MN: accessing only attribute list, ignoring token list
1098  const coral::AttributeList& attrList = m_headerIterator->currentRow().attributeList();
1099  ATH_MSG_DEBUG("AttributeList size " << attrList.size());
1100  std::unique_ptr<AthenaAttributeList> athAttrList(new AthenaAttributeList(attrList));
1101  // Fill the new attribute list
1102  ATH_CHECK(fillAttributeList(athAttrList.get(), "", false));
1103  // Write the AttributeList
1105  if (!wh.record(std::move(athAttrList)).isSuccess()) {
1106  ATH_MSG_ERROR("Cannot record AttributeList to StoreGate " << StoreID::storeName(eventStore()->storeID()));
1107  return(StatusCode::FAILURE);
1108  }
1109  return(StatusCode::SUCCESS);
1110 }
1111 //__________________________________________________________________________
1112 StatusCode EventSelectorAthenaPool::fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const
1113 {
1114  const pool::TokenList& tokenList = m_headerIterator->currentRow().tokenList();
1115  for (pool::TokenList::const_iterator iter = tokenList.begin(), last = tokenList.end(); iter != last; ++iter) {
1116  attrList->extend(iter.tokenName() + suffix, "string");
1117  (*attrList)[iter.tokenName() + suffix].data<std::string>() = iter->toString();
1118  ATH_MSG_DEBUG("record AthenaAttribute, name = " << iter.tokenName() + suffix << " = " << iter->toString() << ".");
1119  }
1120 
1121  std::string eventRef = "eventRef";
1122  if (m_isSecondary.value()) {
1123  eventRef.append(suffix);
1124  }
1125  attrList->extend(eventRef, "string");
1126  (*attrList)[eventRef].data<std::string>() = m_headerIterator->eventRef().toString();
1127  ATH_MSG_DEBUG("record AthenaAttribute, name = " + eventRef + " = " << m_headerIterator->eventRef().toString() << ".");
1128 
1129  if (copySource) {
1130  const coral::AttributeList& sourceAttrList = m_headerIterator->currentRow().attributeList();
1131  for (const auto &attr : sourceAttrList) {
1132  attrList->extend(attr.specification().name() + suffix, attr.specification().type());
1133  (*attrList)[attr.specification().name() + suffix] = attr;
1134  }
1135  }
1136 
1137  return StatusCode::SUCCESS;
1138 }
1139 //__________________________________________________________________________
1141  ATH_MSG_INFO("I/O reinitialization...");
1142  if (m_poolCollectionConverter != nullptr) {
1143  m_poolCollectionConverter->disconnectDb().ignore();
1144  delete m_poolCollectionConverter; m_poolCollectionConverter = nullptr;
1145  }
1146  m_headerIterator = nullptr;
1147  ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
1148  if (!iomgr.retrieve().isSuccess()) {
1149  ATH_MSG_FATAL("Could not retrieve IoComponentMgr !");
1150  return(StatusCode::FAILURE);
1151  }
1152  if (!iomgr->io_hasitem(this)) {
1153  ATH_MSG_FATAL("IoComponentMgr does not know about myself !");
1154  return(StatusCode::FAILURE);
1155  }
1156  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
1157  m_guid = Guid::null();
1158  return(this->reinit());
1159  }
1160  std::vector<std::string> inputCollections = m_inputCollectionsProp.value();
1161  std::set<std::size_t> updatedIndexes;
1162  for (std::size_t i = 0, imax = m_inputCollectionsProp.value().size(); i < imax; i++) {
1163  if (updatedIndexes.find(i) != updatedIndexes.end()) continue;
1164  std::string savedName = inputCollections[i];
1165  std::string &fname = inputCollections[i];
1166  if (!iomgr->io_contains(this, fname)) {
1167  ATH_MSG_ERROR("IoComponentMgr does not know about [" << fname << "] !");
1168  return(StatusCode::FAILURE);
1169  }
1170  if (!iomgr->io_retrieve(this, fname).isSuccess()) {
1171  ATH_MSG_FATAL("Could not retrieve new value for [" << fname << "] !");
1172  return(StatusCode::FAILURE);
1173  }
1174  if (savedName != fname) {
1175  ATH_MSG_DEBUG("Mapping value for [" << savedName << "] to [" << fname << "]");
1176  m_athenaPoolCnvSvc->getPoolSvc()->renamePfn(savedName, fname);
1177  }
1178  updatedIndexes.insert(i);
1179  for (std::size_t j = i + 1; j < imax; j++) {
1180  if (inputCollections[j] == savedName) {
1181  inputCollections[j] = fname;
1182  updatedIndexes.insert(j);
1183  }
1184  }
1185  }
1186  // all good... copy over.
1188  m_guid = Guid::null();
1189  return reinit();
1190 }
1191 //__________________________________________________________________________
1193  ATH_MSG_INFO("I/O finalization...");
1194  if (m_poolCollectionConverter != nullptr) {
1195  m_poolCollectionConverter->disconnectDb().ignore();
1196  delete m_poolCollectionConverter; m_poolCollectionConverter = nullptr;
1197  }
1198  return(StatusCode::SUCCESS);
1199 }
1200 
1201 //__________________________________________________________________________
1202 /* Listen to IncidentType::BeginProcessing and EndProcessing
1203  Maintain counters of how many events from a given file are being processed.
1204  Files are identified by SG::SourceID (string GUID).
1205  When there are no more events from a file, see if it can be closed.
1206 */
1207 void EventSelectorAthenaPool::handle(const Incident& inc)
1208 {
1209  SG::SourceID fid;
1210  if (inc.type() == IncidentType::BeginProcessing) {
1211  if ( Atlas::hasExtendedEventContext(inc.context()) ) {
1212  fid = Atlas::getExtendedEventContext(inc.context()).proxy()->sourceID();
1213  }
1214  *m_sourceID.get(inc.context()) = fid;
1215  }
1216  else {
1217  fid = *m_sourceID.get(inc.context());
1218  }
1219 
1220  if( fid.empty() ) {
1221  ATH_MSG_WARNING("could not read event source ID from incident event context");
1222  return;
1223  }
1224  if( m_activeEventsPerSource.find( fid ) == m_activeEventsPerSource.end()) {
1225  ATH_MSG_DEBUG("Incident handler ignoring unknown input FID: " << fid );
1226  return;
1227  }
1228  ATH_MSG_DEBUG("** MN Incident handler " << inc.type() << " Event source ID=" << fid );
1229  if( inc.type() == IncidentType::BeginProcessing ) {
1230  // increment the events-per-file counter for FID
1231  m_activeEventsPerSource[fid]++;
1232  } else if( inc.type() == IncidentType::EndProcessing ) {
1233  m_activeEventsPerSource[fid]--;
1234  disconnectIfFinished( fid );
1235  *m_sourceID.get(inc.context()) = "";
1236  }
1237  if( msgLvl(MSG::DEBUG) ) {
1238  for( auto& source: m_activeEventsPerSource )
1239  msg(MSG::DEBUG) << "SourceID: " << source.first << " active events: " << source.second << endmsg;
1240  }
1241 }
1242 
1243 //__________________________________________________________________________
1244 /* Disconnect APR Database identifieed by a SG::SourceID when it is no longer in use:
1245  m_guid is not pointing to it and there are no events from it being processed
1246  (if the EventLoopMgr was not firing Begin/End incidents, this will just close the DB)
1247 */
1249 {
1250  if( m_eventStreamingTool.empty() && m_activeEventsPerSource.find(fid) != m_activeEventsPerSource.end()
1251  && m_activeEventsPerSource[fid] <= 0 && m_guid != fid ) {
1252  // Explicitly disconnect file corresponding to old FID to release memory
1253  if( !m_keepInputFilesOpen.value() ) {
1254  // Assume that the end of collection file indicates the end of payload file.
1255  if (m_processMetadata.value()) {
1256  FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + fid, fid);
1257  m_incidentSvc->fireIncident(endInputFileIncident);
1258  }
1259  ATH_MSG_INFO("Disconnecting input sourceID: " << fid );
1260  m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb("FID:" + fid, IPoolSvc::kInputStream).ignore();
1261  m_activeEventsPerSource.erase( fid );
1262  return true;
1263  }
1264  }
1265  return false;
1266 }
EventSelectorAthenaPool::last
virtual StatusCode last(IEvtSelector::Context &ctxt) const override
Definition: EventSelectorAthenaPool.cxx:774
beamspotman.r
def r
Definition: beamspotman.py:676
EventSelectorAthenaPool::createContext
virtual StatusCode createContext(IEvtSelector::Context *&ctxt) const override
create context
Definition: EventSelectorAthenaPool.cxx:488
trigbs_pickEvents.ranges
ranges
Definition: trigbs_pickEvents.py:60
ICollectionSize::size
virtual int size()=0
Return the size of the collection.
Guid::null
static const Guid & null()
NULL-Guid: static class method.
Definition: Guid.cxx:18
python.tests.PyTestsLib.finalize
def finalize(self)
_info( "content of StoreGate..." ) self.sg.dump()
Definition: PyTestsLib.py:53
EventSelectorAthenaPool::fillAttributeList
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
Definition: EventSelectorAthenaPool.cxx:1112
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
EventSelectorAthenaPool::m_skipEvents
Gaudi::Property< int > m_skipEvents
SkipEvents, numbers of events to skip: default = 0.
Definition: EventSelectorAthenaPool.h:240
EventSelectorAthenaPool.h
This file contains the class definition for the EventSelectorAthenaPool class.
fitman.sz
sz
Definition: fitman.py:527
hotSpotInTAG.suffix
string suffix
Definition: hotSpotInTAG.py:186
EventSelectorAthenaPool::releaseContext
virtual StatusCode releaseContext(IEvtSelector::Context *&ctxt) const override
Definition: EventSelectorAthenaPool.cxx:815
EventSelectorAthenaPool::io_reinit
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
Definition: EventSelectorAthenaPool.cxx:1140
EventSelectorAthenaPool::curEvent
virtual int curEvent(const Context &ctxt) const override
Return the current event number.
Definition: EventSelectorAthenaPool.cxx:892
Gaudi::Parsers::parse
StatusCode parse(std::tuple< Tup... > &tup, const Gaudi::Parsers::InputData &input)
Definition: CaloGPUClusterAndCellDataMonitorOptions.h:284
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
pool::IPositionSeek::seek
virtual bool seek(long long int position)=0
Seek to a given position in the collection.
EventSelectorAthenaPool::m_oldRunNo
Gaudi::CheckedProperty< int > m_oldRunNo
Definition: EventSelectorAthenaPool.h:223
EventSelectorAthenaPool::m_runNo
Gaudi::CheckedProperty< int > m_runNo
The following are included for compatibility with McEventSelector and are not really used.
Definition: EventSelectorAthenaPool.h:222
EventSelectorAthenaPool::reinit
StatusCode reinit() const
Reinitialize the service when a fork() occured/was-issued.
Definition: EventSelectorAthenaPool.cxx:247
TokenList.h
EventSelectorAthenaPool::makeClient
virtual StatusCode makeClient(int num) override
Make this a client.
Definition: EventSelectorAthenaPool.cxx:955
PoolCollectionConverter
This class provides an interface to POOL collections.
Definition: PoolCollectionConverter.h:27
PoolCollectionConverter::executeQuery
pool::ICollectionCursor & executeQuery()
Query the collection.
Definition: PoolCollectionConverter.cxx:135
SG::ReadHandle
Definition: StoreGate/StoreGate/ReadHandle.h:70
PoolCollectionConverter.h
This file contains the class definition for the PoolCollectionConverter class.
EventSelectorAthenaPool::previous
virtual StatusCode previous(IEvtSelector::Context &ctxt) const override
Definition: EventSelectorAthenaPool.cxx:759
EventSelectorAthenaPool::m_incidentSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
Definition: EventSelectorAthenaPool.h:179
initialize
void initialize()
Definition: run_EoverP.cxx:894
ICollectionCursor.h
Atlas::hasExtendedEventContext
bool hasExtendedEventContext(const EventContext &ctx)
Test whether a context object has an extended context installed.
Definition: ExtendedEventContext.cxx:23
EventSelectorAthenaPool::m_firstEventNo
Gaudi::CheckedProperty< int > m_firstEventNo
Definition: EventSelectorAthenaPool.h:227
EventSelectorAthenaPool::m_athenaPoolCnvSvc
ServiceHandle< IAthenaPoolCnvSvc > m_athenaPoolCnvSvc
Definition: EventSelectorAthenaPool.h:178
EventSelectorAthenaPool::start
virtual StatusCode start() override
Definition: EventSelectorAthenaPool.cxx:370
SG::SlotSpecificObj::get
T * get(const EventContext &ctx)
Return pointer to the object for slot given by ctx.
EventSelectorAthenaPool::readEvent
virtual StatusCode readEvent(int maxevt) override
Read the next maxevt events.
Definition: EventSelectorAthenaPool.cxx:997
Token::dbID
const Guid & dbID() const
Access database identifier.
Definition: Token.h:62
EventSelectorAthenaPool::queryInterface
virtual StatusCode queryInterface(const InterfaceID &riid, void **ppvInterface) override
Does this object satisfy a given interface? See Gaudi documentation for details.
Definition: EventSelectorAthenaPool.cxx:468
EventSelectorAthenaPool::m_eventStreamingTool
ToolHandle< IAthenaIPCTool > m_eventStreamingTool
Definition: EventSelectorAthenaPool.h:215
EventSelectorAthenaPool::m_query
Gaudi::Property< std::string > m_query
Query string passed to APR when opening DataHeader container (kind of useless).
Definition: EventSelectorAthenaPool.h:206
EventSelectorAthenaPool::eventStore
StoreGateSvc * eventStore() const
Return pointer to active event SG.
Definition: EventSelectorAthenaPool.cxx:85
ICollectionSize.h
Abstract interface for finding the size of an event collection.
Guid::toString
const std::string toString() const
Automatic conversion to string representation.
Definition: Guid.cxx:58
pool::TokenList::end
iterator end()
Returns a forward iterator pointing to last element in Token list.
Definition: TokenList.h:224
python.PyKernel.AttributeList
AttributeList
Definition: PyKernel.py:36
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
EventSelectorAthenaPool::findEvent
int findEvent(int evtNum) const
Search for event with number evtNum.
Definition: EventSelectorAthenaPool.cxx:900
EventSelectorAthenaPool::m_callLock
CallMutex m_callLock
Definition: EventSelectorAthenaPool.h:250
AthenaAttributeList.h
EventContextAthenaPool
This class provides the context to access an event from POOL persistent store.
Definition: EventContextAthenaPool.h:21
EventSelectorAthenaPool::stop
virtual StatusCode stop() override
Definition: EventSelectorAthenaPool.cxx:398
PoolCollectionConverter::isValid
StatusCode isValid() const
Check whether has valid pool::ICollection*.
Definition: PoolCollectionConverter.cxx:123
atlasStyleMacro.icol
int icol
Definition: atlasStyleMacro.py:13
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
EventSelectorAthenaPool::m_refName
Gaudi::Property< std::string > m_refName
RefName, attribute name.
Definition: EventSelectorAthenaPool.h:194
Token
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition: Token.h:21
EventSelectorAthenaPool::m_counterTool
ToolHandle< IAthenaSelectorTool > m_counterTool
Definition: EventSelectorAthenaPool.h:214
TokenAddress
This class provides a Generic Transient Address for POOL tokens.
Definition: TokenAddress.h:21
Atlas::getExtendedEventContext
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
Definition: ExtendedEventContext.cxx:32
pool::TokenList
Definition: TokenList.h:24
EventSelectorAthenaPool::share
virtual StatusCode share(int evtnum) override
Request to share a given event number.
Definition: EventSelectorAthenaPool.cxx:969
Token::fromString
Token & fromString(const std::string &from)
Build from the string representation of a token.
Definition: Token.cxx:133
WriteHandle.h
Handle class for recording to StoreGate.
IPoolSvc::kInputStream
@ kInputStream
Definition: IPoolSvc.h:40
EventContextAthenaPool::identifier
virtual void * identifier() const
Definition: EventContextAthenaPool.h:56
StoreGateSvc
The Athena Transient Store API.
Definition: StoreGateSvc.h:128
TileAANtupleConfig.inputCollections
inputCollections
Definition: TileAANtupleConfig.py:133
IEventShare
Abstract interface for sharing within an event stream.
Definition: IEventShare.h:28
CollectionRowBuffer.h
Token::technology
int technology() const
Access technology type.
Definition: Token.h:75
FortranAlgorithmOptions.fileName
fileName
Definition: FortranAlgorithmOptions.py:13
EventSelectorAthenaPool::m_firedIncident
std::atomic_bool m_firedIncident
Definition: EventSelectorAthenaPool.h:247
Amg::toString
std::string toString(const Translation3D &translation, int precision=4)
GeoPrimitvesToStringConverter.
Definition: GeoPrimitivesToStringConverter.h:40
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
EventSelectorAthenaPool::m_attrListKey
Gaudi::Property< std::string > m_attrListKey
AttributeList SG key.
Definition: EventSelectorAthenaPool.h:196
EventSelectorAthenaPool::rewind
virtual StatusCode rewind(IEvtSelector::Context &ctxt) const override
Definition: EventSelectorAthenaPool.cxx:782
lumiFormat.i
int i
Definition: lumiFormat.py:92
IAthenaIPCTool
Definition: IAthenaIPCTool.h:15
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
IEventShare::interfaceID
static const InterfaceID & interfaceID()
Definition: IEventShare.h:35
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
EventSelectorAthenaPool::m_eventsPerLB
Gaudi::CheckedProperty< int > m_eventsPerLB
Definition: EventSelectorAthenaPool.h:231
L1CaloPhase1Monitoring.propVal
propVal
Definition: L1CaloPhase1Monitoring.py:349
ISecondaryEventSelector
Abstract interface for secondary event selectors.
Definition: ISecondaryEventSelector.h:28
EventSelectorAthenaPool::resetCriteria
virtual StatusCode resetCriteria(const std::string &criteria, IEvtSelector::Context &ctxt) const override
Set a selection criteria.
Definition: EventSelectorAthenaPool.cxx:819
EventSelectorAthenaPool::m_eventsPerRun
Gaudi::CheckedProperty< int > m_eventsPerRun
Definition: EventSelectorAthenaPool.h:229
EventSelectorAthenaPool::m_isSecondary
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
Definition: EventSelectorAthenaPool.h:183
StoreGateSvc::currentStoreGate
static StoreGateSvc * currentStoreGate()
get current StoreGate
Definition: StoreGateSvc.cxx:69
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:40
calibdata.exception
exception
Definition: calibdata.py:496
plotIsoValidation.el
el
Definition: plotIsoValidation.py:197
parseDir.wh
wh
Definition: parseDir.py:46
IPositionSeek.h
Abstract interface for seeking within an event stream.
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
ICollectionSize
Abstract interface for finding the size of an event collection.
Definition: ICollectionSize.h:31
EventSelectorAthenaPool::fireEndFileIncidents
void fireEndFileIncidents(bool isLastFile) const
Fires the EndInputFile incident (if there is an open file) at end of selector.
Definition: EventSelectorAthenaPool.cxx:410
AthenaAttributeList
An AttributeList represents a logical row of attributes in a metadata table. The name and type of eac...
Definition: PersistentDataModel/PersistentDataModel/AthenaAttributeList.h:45
EventSelectorAthenaPool::m_makeStreamingToolClient
IntegerProperty m_makeStreamingToolClient
Make this instance a Streaming Client during first iteration automatically.
Definition: EventSelectorAthenaPool.h:217
xAOD::eventNumber
eventNumber
Definition: EventInfo_v1.cxx:124
StoreGateSvc::clearStore
virtual StatusCode clearStore(bool forceRemove=false) override final
clear DataStore contents: called by the event loop mgrs
Definition: StoreGateSvc.cxx:461
EventSelectorAthenaPool::m_firstLBNo
Gaudi::CheckedProperty< int > m_firstLBNo
Definition: EventSelectorAthenaPool.h:230
DataHeader.h
This file contains the class definition for the DataHeader and DataHeaderElement classes.
imax
int imax(int i, int j)
Definition: TileLaserTimingTool.cxx:33
checkRpcDigits.allGood
bool allGood
Loop over the SDOs & Digits.
Definition: checkRpcDigits.py:171
EventSelectorAthenaPool::m_collectionTree
Gaudi::Property< std::string > m_collectionTree
CollectionTree, prefix of the collection TTree: default = "POOLContainer".
Definition: EventSelectorAthenaPool.h:189
EventSelectorAthenaPool::inputCollectionsHandler
void inputCollectionsHandler(Gaudi::Details::PropertyBase &)
Definition: EventSelectorAthenaPool.cxx:76
SG::ReadHandle::isValid
virtual bool isValid() override final
Can the handle be successfully dereferenced?
EventSelectorAthenaPool::m_skipEventRangesProp
Gaudi::Property< std::string > m_skipEventRangesProp
Skip Events - comma separated list of event to skip, ranges with '-': <start> - <end>
Definition: EventSelectorAthenaPool.h:243
EventSelectorAthenaPool::handle
virtual void handle(const Incident &incident) override
Incident service handle listening for BeginProcessing and EndProcessing.
Definition: EventSelectorAthenaPool.cxx:1207
pool::ICollectionCursor
Definition: ICollectionCursor.h:21
pool_uuid.guid
guid
Definition: pool_uuid.py:112
trigbs_pickEvents.num
num
Definition: trigbs_pickEvents.py:76
EventSelectorAthenaPool::~EventSelectorAthenaPool
virtual ~EventSelectorAthenaPool()
Destructor.
Definition: EventSelectorAthenaPool.cxx:82
pool::TokenList::const_iterator
Constant forward iterator class for navigation of TokenList objects.
Definition: TokenList.h:164
EventSelectorAthenaPool::next
virtual StatusCode next(IEvtSelector::Context &ctxt) const override
Definition: EventSelectorAthenaPool.cxx:493
EventSelectorAthenaPool::m_initTimeStamp
Gaudi::CheckedProperty< int > m_initTimeStamp
Definition: EventSelectorAthenaPool.h:232
EventSelectorAthenaPool::m_processMetadata
Gaudi::Property< bool > m_processMetadata
ProcessMetadata, switch on firing of FileIncidents which will trigger processing of metadata: default...
Definition: EventSelectorAthenaPool.h:185
pool::TokenList::extend
void extend(const std::string &name)
Extends the Token list by one element.
EventSelectorAthenaPool::disconnectIfFinished
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Definition: EventSelectorAthenaPool.cxx:1248
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
EventSelectorAthenaPool::m_inputCollectionsProp
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
InputCollections, vector with names of the input collections.
Definition: EventSelectorAthenaPool.h:199
pool::READ
@ READ
Definition: Database/APR/StorageSvc/StorageSvc/pool.h:68
AtlCoolConsole.tool
tool
Definition: AtlCoolConsole.py:453
EventSelectorAthenaPool::recordAttributeList
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
Definition: EventSelectorAthenaPool.cxx:1094
Token::toString
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition: Token.cxx:114
EventSelectorAthenaPool::m_sourceID
SG::SlotSpecificObj< SG::SourceID > m_sourceID
Definition: EventSelectorAthenaPool.h:252
EventSelectorAthenaPool::nextHandleFileTransition
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context &ctxt) const override
Handle file transition at the next iteration.
Definition: EventSelectorAthenaPool.cxx:639
EventSelectorAthenaPool::createAddress
virtual StatusCode createAddress(const IEvtSelector::Context &ctxt, IOpaqueAddress *&iop) const override
Definition: EventSelectorAthenaPool.cxx:788
EventSelectorAthenaPool::nextWithSkip
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
Definition: EventSelectorAthenaPool.cxx:721
EventSelectorAthenaPool::m_curCollection
std::atomic_long m_curCollection
Definition: EventSelectorAthenaPool.h:235
pool::TokenList::begin
iterator begin()
Returns a forward iterator pointing to first element in Token list.
Definition: TokenList.h:218
IPoolSvc.h
This file contains the class definition for the IPoolSvc interface class.
CxxUtils::to
CONT to(RANGE &&r)
Definition: ranges.h:32
python.AthDsoLogger.fname
string fname
Definition: AthDsoLogger.py:67
python.PyAthena.v
v
Definition: PyAthena.py:157
TokenAddress.h
This file contains the class definition for the TokenAddress class.
EventSelectorAthenaPool::m_endIter
EventContextAthenaPool * m_endIter
Definition: EventSelectorAthenaPool.h:170
PoolCollectionConverter::initialize
StatusCode initialize()
Required by all Gaudi Services.
Definition: PoolCollectionConverter.cxx:69
SG::WriteHandle
Definition: StoreGate/StoreGate/WriteHandle.h:76
EventSelectorAthenaPool::initialize
virtual StatusCode initialize() override
Required of all Gaudi Services.
Definition: EventSelectorAthenaPool.cxx:89
DiTauMassTools::MaxHistStrategyV2::e
e
Definition: PhysicsAnalysis/TauID/DiTauMassTools/DiTauMassTools/HelperFunctions.h:26
Guid
This class provides a encapsulation of a GUID/UUID/CLSID/IID data structure (128 bit number).
Definition: Guid.h:20
EventSelectorAthenaPool::m_evtCount
std::atomic_int m_evtCount
Definition: EventSelectorAthenaPool.h:246
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
pool::ITransaction::READ
@ READ
Definition: ITransaction.h:29
SG::SourceID
std::string SourceID
Definition: AthenaKernel/AthenaKernel/SourceID.h:23
DEBUG
#define DEBUG
Definition: page_access.h:11
pool::IPositionSeek
Abstract interface for seeking inside a collection.
Definition: IPositionSeek.h:26
EventSelectorAthenaPool::io_finalize
virtual StatusCode io_finalize() override
Callback method to finalize the internal state of the component for I/O purposes (e....
Definition: EventSelectorAthenaPool.cxx:1192
EventSelectorAthenaPool::m_skipEventSequenceProp
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp
Definition: EventSelectorAthenaPool.h:241
declareProperty
#define declareProperty(n, p, h)
Definition: BaseFakeBkgTool.cxx:15
EventSelectorAthenaPool::makeServer
virtual StatusCode makeServer(int num) override
Make this a server.
Definition: EventSelectorAthenaPool.cxx:935
EventSelectorAthenaPool::getCollectionCnv
PoolCollectionConverter * getCollectionCnv(bool throwIncidents=false) const
Return pointer to new PoolCollectionConverter.
Definition: EventSelectorAthenaPool.cxx:1050
merge.status
status
Definition: merge.py:17
EventSelectorAthenaPool::size
virtual int size(Context &ctxt) const override
Return the size of the collection.
Definition: EventSelectorAthenaPool.cxx:1040
ATLAS_THREAD_SAFE
#define ATLAS_THREAD_SAFE
Definition: checker_macros.h:211
ReadHandle.h
Handle class for reading from StoreGate.
EventSelectorAthenaPool::finalize
virtual StatusCode finalize() override
Definition: EventSelectorAthenaPool.cxx:427
EventSelectorAthenaPool::m_keepInputFilesOpen
Gaudi::Property< bool > m_keepInputFilesOpen
KeepInputFilesOpen, boolean flag to keep files open after PoolCollection reaches end: default = false...
Definition: EventSelectorAthenaPool.h:210
Token.h
This file contains the class definition for the Token class (migrated from POOL).
EventSelectorAthenaPool::m_collectionType
Gaudi::Property< std::string > m_collectionType
CollectionType, type of the collection: default = "ImplicitROOT".
Definition: EventSelectorAthenaPool.h:187
StoreGateSvc.h
EventSelectorAthenaPool::EventSelectorAthenaPool
EventSelectorAthenaPool(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Definition: EventSelectorAthenaPool.cxx:58
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
StoreID::storeName
static const std::string & storeName(const StoreID::type &s)
Definition: StoreID.cxx:77
EventContextAthenaPool.h
This file contains the class definition for the EventContextAthenaPool class.
EventSelectorAthenaPool::seek
virtual StatusCode seek(Context &ctxt, int evtnum) const override
Seek to a given event number.
Definition: EventSelectorAthenaPool.cxx:824
IEvtSelectorSeek
Abstract interface for seeking for an event selector.
Definition: IEvtSelectorSeek.h:28
ServiceHandle
Definition: ClusterMakerTool.h:37