ATLAS Offline Software
EventSelectorAthenaPool.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
11 #include "EventContextAthenaPool.h"
13 
18 #include "PoolSvc/IPoolSvc.h"
19 #include "StoreGate/StoreGateSvc.h"
20 #include "StoreGate/ReadHandle.h"
21 #include "StoreGate/WriteHandle.h"
22 
23 // Framework
24 #include "GaudiKernel/ClassID.h"
25 #include "GaudiKernel/FileIncident.h"
26 #include "GaudiKernel/IIncidentSvc.h"
27 #include "GaudiKernel/IIoComponentMgr.h"
28 #include "GaudiKernel/GaudiException.h"
29 #include "GaudiKernel/GenericAddress.h"
30 #include "GaudiKernel/StatusCode.h"
32 
33 // Pool
37 #include "StorageSvc/DbType.h"
38 
39 #include <boost/tokenizer.hpp>
40 #include <algorithm>
41 #include <format>
42 #include <vector>
43 
44 
45 namespace {
47  StatusCode putEvent_ST(const IAthenaIPCTool& tool,
48  long eventNumber, const void* source,
49  size_t nbytes, unsigned int status) {
51  return sc;
52  }
53 }
54 
55 
56 //________________________________________________________________________________
57 EventSelectorAthenaPool::EventSelectorAthenaPool(const std::string& name, ISvcLocator* pSvcLocator) :
58  base_class(name, pSvcLocator)
59 {
60 
61  // TODO: validate if those are even used
62  m_runNo.verifier().setLower(0);
63  m_oldRunNo.verifier().setLower(0);
64  m_eventsPerRun.verifier().setLower(0);
65  m_firstEventNo.verifier().setLower(1);
66  m_firstLBNo.verifier().setLower(0);
67  m_eventsPerLB.verifier().setLower(0);
68  m_initTimeStamp.verifier().setLower(0);
69 
71  m_inputCollectionsChanged = false;
72 }
73 //________________________________________________________________________________
74 void EventSelectorAthenaPool::inputCollectionsHandler(Gaudi::Details::PropertyBase&) {
75  if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
76  m_inputCollectionsChanged = true;
77  }
78 }
79 //________________________________________________________________________________
81 }
82 //________________________________________________________________________________
85 }
86 //________________________________________________________________________________
88 
89  m_autoRetrieveTools = false;
90  m_checkToolDeps = false;
91 
92  if (m_isSecondary.value()) {
93  ATH_MSG_DEBUG("Initializing secondary event selector " << name());
94  } else {
95  ATH_MSG_DEBUG("Initializing " << name());
96  }
97 
99  // Check for input collection
100  if (m_inputCollectionsProp.value().empty()) {
101  ATH_MSG_FATAL("Use the property: EventSelector.InputCollections = "
102  << "[ \"<collectionName>\" ] (list of collections)");
103  return StatusCode::FAILURE;
104  }
105  boost::char_separator<char> sep_coma(","), sep_hyph("-");
106  boost::tokenizer ranges(m_skipEventRangesProp.value(), sep_coma);
107  for( const std::string& r: ranges ) {
108  boost::tokenizer fromto(r, sep_hyph);
109  auto from_iter = fromto.begin();
110  long from = std::stol(*from_iter);
111  long to = from;
112  if( ++from_iter != fromto.end() ) {
113  to = std::stol(*from_iter);
114  }
115  m_skipEventRanges.emplace_back(from, to);
116  }
117 
118  for( auto v : m_skipEventSequenceProp.value() ) {
119  m_skipEventRanges.emplace_back(v, v);
120  }
121  std::sort(m_skipEventRanges.begin(), m_skipEventRanges.end());
122  if( msgLvl(MSG::DEBUG) ) {
123  std::string skip_ranges_str;
124  for( const auto& [first, second] : m_skipEventRanges ) {
125  if( !skip_ranges_str.empty() ) skip_ranges_str += ", ";
126  skip_ranges_str += std::to_string(first);
127  if( first != second) skip_ranges_str += std::format("-{}", second);
128  }
129  if( !skip_ranges_str.empty() )
130  ATH_MSG_DEBUG("Events to skip: " << skip_ranges_str);
131  }
132  // CollectionType must be one of:
133  if (m_collectionType.value() != "RootCollection" && m_collectionType.value() != "ImplicitCollection") {
134  ATH_MSG_FATAL("EventSelector.CollectionType must be one of: RootCollection, ImplicitCollection (default)");
135  return StatusCode::FAILURE;
136  }
137  // Get IncidentSvc
138  ATH_CHECK(m_incidentSvc.retrieve());
139  // Listen to the Event Processing incidents
140  if (m_eventStreamingTool.empty()) {
141  m_incidentSvc->addListener(this, IncidentType::BeginProcessing, 0);
142  m_incidentSvc->addListener(this, IncidentType::EndProcessing, 0);
143  }
144 
145  // Get AthenaPoolCnvSvc
146  ATH_CHECK(m_athenaPoolCnvSvc.retrieve());
147  // Get CounterTool (if configured)
148  if (!m_counterTool.empty()) {
149  ATH_CHECK(m_counterTool.retrieve());
150  }
151  // Get HelperTools
152  ATH_CHECK(m_helperTools.retrieve());
153  // Get SharedMemoryTool (if configured)
154  if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.retrieve().isSuccess()) {
155  ATH_MSG_FATAL("Cannot get " << m_eventStreamingTool.typeAndName() << "");
156  return StatusCode::FAILURE;
157  } else if (m_makeStreamingToolClient.value() == -1) {
158  std::string dummyStr;
159  ATH_CHECK(m_eventStreamingTool->makeClient(m_makeStreamingToolClient.value(), dummyStr));
160  }
161 
162  // Ensure the xAODCnvSvc is listed in the EventPersistencySvc
163  ServiceHandle<IProperty> epSvc("EventPersistencySvc", name());
164  std::vector<std::string> propVal;
165  ATH_CHECK(Gaudi::Parsers::parse(propVal , epSvc->getProperty("CnvServices").toString()));
166  bool foundCnvSvc = false;
167  for (const auto& property : propVal) {
168  if (property == m_athenaPoolCnvSvc.type()) { foundCnvSvc = true; }
169  }
170  if (!foundCnvSvc) {
171  propVal.push_back(m_athenaPoolCnvSvc.type());
172  if (!epSvc->setProperty("CnvServices", Gaudi::Utils::toString(propVal)).isSuccess()) {
173  ATH_MSG_FATAL("Cannot set EventPersistencySvc Property for CnvServices");
174  return StatusCode::FAILURE;
175  }
176  }
177 
178  // Register this service for 'I/O' events
179  ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
180  ATH_CHECK(iomgr.retrieve());
181  ATH_CHECK(iomgr->io_register(this));
182  // Register input file's names with the I/O manager
183  const std::vector<std::string>& incol = m_inputCollectionsProp.value();
184  bool allGood = true;
185  std::string fileName;
186  std::string fileType;
187  for (const auto& inputCollection : incol) {
188  if (inputCollection.starts_with("LFN:") || inputCollection.starts_with("FID:")) {
189  m_athenaPoolCnvSvc->getPoolSvc()->lookupBestPfn(inputCollection, fileName, fileType);
190  } else {
191  fileName = inputCollection;
192  }
193  if (fileName.starts_with("PFN:")) {
194  fileName = fileName.substr(4);
195  }
196  if (!iomgr->io_register(this, IIoComponentMgr::IoMode::READ, inputCollection, fileName).isSuccess()) {
197  ATH_MSG_FATAL("could not register [" << inputCollection << "] for output !");
198  allGood = false;
199  } else {
200  ATH_MSG_VERBOSE("io_register[" << this->name() << "](" << inputCollection << ") [ok]");
201  }
202  }
203  if (!allGood) {
204  return StatusCode::FAILURE;
205  }
206 
207  // Connect to PersistencySvc
208  if (!m_athenaPoolCnvSvc->getPoolSvc()->connect(pool::ITransaction::READ, IPoolSvc::kInputStream).isSuccess()) {
209  ATH_MSG_FATAL("Cannot connect to POOL PersistencySvc.");
210  return StatusCode::FAILURE;
211  }
212  // Jump to reinit() to execute common init/reinit actions
213  m_guid = Guid::null();
214  return reinit();
215 }
216 //________________________________________________________________________________
218  ATH_MSG_DEBUG("reinitialization...");
219 
220  // reset markers
221  m_numEvt.resize(m_inputCollectionsProp.value().size(), -1);
222  m_firstEvt.resize(m_inputCollectionsProp.value().size(), -1);
223 
224  // Initialize InputCollectionsIterator
225  m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
226  m_curCollection = 0;
227  if (!m_firstEvt.empty()) {
228  m_firstEvt[0] = 0;
229  }
230  m_inputCollectionsChanged = false;
231  m_evtCount = 0;
232  m_headerIterator = 0;
233  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
234  ATH_MSG_INFO("Done reinitialization for shared reader client");
235  return StatusCode::SUCCESS;
236  }
237  bool retError = false;
238  for (auto& tool : m_helperTools) {
239  if (!tool->postInitialize().isSuccess()) {
240  ATH_MSG_FATAL("Failed to postInitialize() " << tool->name());
241  retError = true;
242  }
243  }
244  if (retError) {
245  ATH_MSG_FATAL("Failed to postInitialize() helperTools");
246  return StatusCode::FAILURE;
247  }
248 
249  // Create an m_poolCollectionConverter to read the objects in
250  m_poolCollectionConverter = getCollectionCnv();
251  if (!m_poolCollectionConverter) {
252  ATH_MSG_INFO("No Events found in any Input Collections");
253  if (m_processMetadata.value()) {
254  m_inputCollectionsIterator = m_inputCollectionsProp.value().end();
255  if (!m_inputCollectionsProp.value().empty()) --m_inputCollectionsIterator;
256  //NOTE (wb may 2016): this will make the FirstInputFile incident correspond to last file in the collection ... if want it to be first file then move iterator to begin and then move above two lines below this incident firing
257  if (m_collectionType.value() == "ImplicitCollection" && !m_firedIncident && !m_inputCollectionsProp.value().empty()) {
258  FileIncident firstInputFileIncident(name(), "FirstInputFile", *m_inputCollectionsIterator);
259  m_incidentSvc->fireIncident(firstInputFileIncident);
260  m_firedIncident = true;
261  }
262  }
263  return StatusCode::SUCCESS;
264  }
265  // Get DataHeader iterator
266  try {
267  m_headerIterator = &m_poolCollectionConverter->selectAll();
268  } catch (std::exception &e) {
269  ATH_MSG_FATAL("Cannot open implicit collection - check data/software version.");
270  ATH_MSG_ERROR(e.what());
271  return StatusCode::FAILURE;
272  }
273  while (m_headerIterator == nullptr || m_headerIterator->next() == 0) { // no selected events
274  if (m_poolCollectionConverter) {
275  m_poolCollectionConverter->disconnectDb().ignore();
276  m_poolCollectionConverter.reset();
277  }
278  ++m_inputCollectionsIterator;
279  m_poolCollectionConverter = getCollectionCnv();
280  if (m_poolCollectionConverter) {
281  m_headerIterator = &m_poolCollectionConverter->selectAll();
282  } else {
283  break;
284  }
285  }
286  if (!m_poolCollectionConverter || m_headerIterator == nullptr) { // no event selected in any collection
287  m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
288  m_curCollection = 0;
289  m_poolCollectionConverter = getCollectionCnv();
290  if (!m_poolCollectionConverter) {
291  return StatusCode::SUCCESS;
292  }
293  m_headerIterator = &m_poolCollectionConverter->selectAll();
294  while (m_headerIterator == nullptr || m_headerIterator->next() == 0) { // empty collection
295  if (m_poolCollectionConverter) {
296  m_poolCollectionConverter->disconnectDb().ignore();
297  m_poolCollectionConverter.reset();
298  }
299  ++m_inputCollectionsIterator;
300  m_poolCollectionConverter = getCollectionCnv();
301  if (m_poolCollectionConverter) {
302  m_headerIterator = &m_poolCollectionConverter->selectAll();
303  } else {
304  break;
305  }
306  }
307  }
308  if (!m_poolCollectionConverter || m_headerIterator == nullptr) {
309  return StatusCode::SUCCESS;
310  }
311  const Token& headRef = m_headerIterator->eventRef();
312  const std::string fid = headRef.dbID().toString();
313  const int tech = headRef.technology();
314  ATH_MSG_VERBOSE("reinit(): First DataHeder Token=" << headRef.toString() );
315 
316  // Check if File is BS, for which Incident is thrown by SingleEventInputSvc
317  if (tech != 0x00001000 && m_processMetadata.value() && !m_firedIncident) {
318  FileIncident firstInputFileIncident(name(), "FirstInputFile", "FID:" + fid, fid);
319  m_incidentSvc->fireIncident(firstInputFileIncident);
320  m_firedIncident = true;
321  }
322  return StatusCode::SUCCESS;
323 }
324 //________________________________________________________________________________
326  if (m_poolCollectionConverter) {
327  // Reset iterators and apply new query
328  m_poolCollectionConverter->disconnectDb().ignore();
329  m_poolCollectionConverter.reset();
330  }
331  m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
332  m_curCollection = 0;
333  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
334  return StatusCode::SUCCESS;
335  }
336  m_poolCollectionConverter = getCollectionCnv(true);
337  if (!m_poolCollectionConverter) {
338  ATH_MSG_INFO("No Events found in any Input Collections");
339  m_inputCollectionsIterator = m_inputCollectionsProp.value().end();
340  if (!m_inputCollectionsProp.value().empty()) {
341  --m_inputCollectionsIterator; //leave iterator in state of last input file
342  }
343  } else {
344  m_headerIterator = &m_poolCollectionConverter->selectAll();
345  }
346  m_evtCount = 0;
347  delete m_endIter;
348  m_endIter = nullptr;
349  m_endIter = new EventContextAthenaPool(nullptr);
350  return StatusCode::SUCCESS;
351 }
352 //________________________________________________________________________________
354  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
355  return StatusCode::SUCCESS;
356  }
357  IEvtSelector::Context* ctxt(nullptr);
358  if (!releaseContext(ctxt).isSuccess()) {
359  ATH_MSG_WARNING("Cannot release context");
360  }
361  return StatusCode::SUCCESS;
362 }
363 
364 //________________________________________________________________________________
366  if (m_processMetadata.value()) {
367  if (m_evtCount >= 0) {
368  // Assume that the end of collection file indicates the end of payload file.
369  if (m_guid != Guid::null()) {
370  // Fire EndInputFile incident
371  FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + m_guid.toString(), m_guid.toString());
372  m_incidentSvc->fireIncident(endInputFileIncident);
373  }
374  }
375  if (isLastFile && m_firedIncident) {
376  m_firedIncident = false;
377  }
378  }
379 }
380 
381 //________________________________________________________________________________
383  if (m_eventStreamingTool.empty() || !m_eventStreamingTool->isClient()) {
384  if (!m_counterTool.empty() && !m_counterTool->preFinalize().isSuccess()) {
385  ATH_MSG_WARNING("Failed to preFinalize() CounterTool");
386  }
387  for (auto& tool : m_helperTools) {
388  if (!tool->preFinalize().isSuccess()) {
389  ATH_MSG_WARNING("Failed to preFinalize() " << tool->name());
390  }
391  }
392  }
393  delete m_endIter; m_endIter = nullptr;
394  m_headerIterator = nullptr;
395  if (m_poolCollectionConverter) {
396  m_poolCollectionConverter.reset();
397  }
398  // Finalize the Service base class.
400 }
401 
402 //________________________________________________________________________________
403 StatusCode EventSelectorAthenaPool::createContext(IEvtSelector::Context*& ctxt) const {
404  ctxt = new EventContextAthenaPool(this);
405  return StatusCode::SUCCESS;
406 }
407 //________________________________________________________________________________
408 StatusCode EventSelectorAthenaPool::next(IEvtSelector::Context& ctxt) const {
409  std::lock_guard<CallMutex> lockGuard(m_callLock);
410  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
411  if (m_makeStreamingToolClient.value() == -1) {
413  while (sc.isRecoverable()) {
414  usleep(1000);
415  sc = m_eventStreamingTool->lockEvent(m_evtCount);
416  }
417  }
418  // Increase event count
419  ++m_evtCount;
420  void* tokenStr = nullptr;
421  unsigned int status = 0;
422  StatusCode sc = m_eventStreamingTool->getLockedEvent(&tokenStr, status);
423  if (sc.isRecoverable()) {
424  delete [] (char*)tokenStr; tokenStr = nullptr;
425  // Return end iterator
426  ctxt = *m_endIter;
427  // This is not a real failure but a Gaudi way of handling "end of job"
428  return StatusCode::FAILURE;
429  }
430  if (sc.isFailure()) {
431  ATH_MSG_FATAL("Cannot get NextEvent from AthenaSharedMemoryTool");
432  delete [] (char*)tokenStr; tokenStr = nullptr;
433  return StatusCode::FAILURE;
434  }
435  if (!eventStore()->clearStore().isSuccess()) {
436  ATH_MSG_WARNING("Cannot clear Store");
437  }
438  std::unique_ptr<AthenaAttributeList> athAttrList(new AthenaAttributeList());
439  athAttrList->extend("eventRef", "string");
440  (*athAttrList)["eventRef"].data<std::string>() = std::string((char*)tokenStr);
442  if (!wh.record(std::move(athAttrList)).isSuccess()) {
443  delete [] (char*)tokenStr; tokenStr = nullptr;
444  ATH_MSG_ERROR("Cannot record AttributeList to StoreGate " << StoreID::storeName(eventStore()->storeID()));
445  return StatusCode::FAILURE;
446  }
447  Token token;
448  token.fromString(std::string((char*)tokenStr));
449  delete [] (char*)tokenStr; tokenStr = nullptr;
450  Guid guid = token.dbID();
451  if (guid != m_guid && m_processMetadata.value()) {
452  if (m_evtCount >= 0 && m_guid != Guid::null()) {
453  // Fire EndInputFile incident
454  FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + m_guid.toString(), m_guid.toString());
455  m_incidentSvc->fireIncident(endInputFileIncident);
456  }
457  m_guid = guid;
458  FileIncident beginInputFileIncident(name(), "BeginInputFile", "FID:" + m_guid.toString(), m_guid.toString());
459  m_incidentSvc->fireIncident(beginInputFileIncident);
460  }
461  return StatusCode::SUCCESS;
462  }
463  for (const auto& tool : m_helperTools) {
464  if (!tool->preNext().isSuccess()) {
465  ATH_MSG_WARNING("Failed to preNext() " << tool->name());
466  }
467  }
468  for (;;) {
469  // Handle possible file transition
471  if (sc.isRecoverable()) {
472  continue; // handles empty files
473  }
474  if (sc.isFailure()) {
475  return StatusCode::FAILURE;
476  }
477  // Increase event count
478  ++m_evtCount;
479  if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
480  ATH_MSG_WARNING("Failed to preNext() CounterTool.");
481  }
483  && (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
484  {
485  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) {
486  IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
487  if (ds == nullptr) {
488  ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
489  return StatusCode::FAILURE;
490  }
491  std::string token = m_headerIterator->eventRef().toString();
492  StatusCode sc;
493  while ( (sc = putEvent_ST(*m_eventStreamingTool,
494  m_evtCount - 1, token.c_str(),
495  token.length() + 1, 0)).isRecoverable() ) {
496  while (ds->readData().isSuccess()) {
497  ATH_MSG_VERBOSE("Called last readData, while putting next event in next()");
498  }
499  // Nothing to do right now, trigger alternative (e.g. caching) here? Currently just fast loop.
500  }
501  if (!sc.isSuccess()) {
502  ATH_MSG_ERROR("Cannot put Event " << m_evtCount - 1 << " to AthenaSharedMemoryTool");
503  return StatusCode::FAILURE;
504  }
505  } else {
506  if (!m_isSecondary.value()) {
507  if (!eventStore()->clearStore().isSuccess()) {
508  ATH_MSG_WARNING("Cannot clear Store");
509  }
510  if (!recordAttributeList().isSuccess()) {
511  ATH_MSG_ERROR("Failed to record AttributeList.");
512  return StatusCode::FAILURE;
513  }
514  }
515  }
516  StatusCode status = StatusCode::SUCCESS;
517  for (const auto& tool : m_helperTools) {
518  StatusCode toolStatus = tool->postNext();
519  if (toolStatus.isRecoverable()) {
520  ATH_MSG_INFO("Request skipping event from: " << tool->name());
521  if (status.isSuccess()) {
522  status = StatusCode::RECOVERABLE;
523  }
524  } else if (toolStatus.isFailure()) {
525  ATH_MSG_WARNING("Failed to postNext() " << tool->name());
526  status = StatusCode::FAILURE;
527  }
528  }
529  if (status.isRecoverable()) {
530  ATH_MSG_INFO("skipping event " << m_evtCount);
531  } else if (status.isFailure()) {
532  ATH_MSG_WARNING("Failed to postNext() HelperTool.");
533  } else {
534  if (!m_counterTool.empty() && !m_counterTool->postNext().isSuccess()) {
535  ATH_MSG_WARNING("Failed to postNext() CounterTool.");
536  }
537  break;
538  }
539  } else {
540  while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
541  m_skipEventRanges.erase(m_skipEventRanges.begin());
542  }
543  ATH_MSG_INFO("skipping event " << m_evtCount);
544  }
545  }
546  return StatusCode::SUCCESS;
547 }
548 //________________________________________________________________________________
549 StatusCode EventSelectorAthenaPool::next(IEvtSelector::Context& ctxt, int jump) const {
550  if (jump > 0) {
551  for (int i = 0; i < jump; i++) {
552  ATH_CHECK(next(ctxt));
553  }
554  return StatusCode::SUCCESS;
555  }
556  return StatusCode::FAILURE;
557 }
558 //________________________________________________________________________________
560 {
561  if( m_inputCollectionsChanged ) {
562  StatusCode rc = reinit();
563  if( rc != StatusCode::SUCCESS ) return rc;
564  }
565  else { // advance to the next (not needed after reinit)
566  // Check if we're at the end of file
567  if (m_headerIterator == nullptr || m_headerIterator->next() == 0) {
568  m_headerIterator = nullptr;
569  // Close previous collection.
570  m_poolCollectionConverter.reset();
571 
572  // zero the current DB ID (m_guid) before disconnect() to indicate it is no longer in use
573  const SG::SourceID old_guid = m_guid.toString();
574  m_guid = Guid::null();
575  disconnectIfFinished( old_guid );
576 
577  // check if somebody updated Inputs in the EOF incident (like VP1 does)
578  if( m_inputCollectionsChanged ) {
579  StatusCode rc = reinit();
580  if( rc != StatusCode::SUCCESS ) return rc;
581  } else {
582  // Open next file from inputCollections list.
583  ++m_inputCollectionsIterator;
584  // Create PoolCollectionConverter for input file
585  m_poolCollectionConverter = getCollectionCnv(true);
586  if (!m_poolCollectionConverter) {
587  // Return end iterator
588  ctxt = *m_endIter;
589  // This is not a real failure but a Gaudi way of handling "end of job"
590  return StatusCode::FAILURE;
591  }
592  // Get DataHeader iterator
593  m_headerIterator = &m_poolCollectionConverter->selectAll();
594 
595  // Return RECOVERABLE to mark we should still continue
596  return StatusCode::RECOVERABLE;
597  }
598  }
599  }
600  const Token& headRef = m_headerIterator->eventRef();
601  const Guid guid = headRef.dbID();
602  const int tech = headRef.technology();
603  ATH_MSG_VERBOSE("next(): DataHeder Token=" << headRef.toString() );
604 
605  if (guid != m_guid) {
606  // we are starting reading from a new DB. Check if the old one needs to be retired
607  if (m_guid != Guid::null()) {
608  // zero the current DB ID (m_guid) before trying disconnect() to indicate it is no longer in use
609  const SG::SourceID old_guid = m_guid.toString();
610  m_guid = Guid::null();
611  disconnectIfFinished( old_guid );
612  }
613  m_guid = guid;
614  m_activeEventsPerSource[guid.toString()] = 0;
615  // Fire BeginInputFile incident if current InputCollection is a payload file;
616  // otherwise, ascertain whether the pointed-to file is reachable before firing any incidents and/or proceeding
617  if (m_collectionType.value() == "ImplicitCollection") {
618  // For now, we can only deal with input metadata from POOL files, but we know we have a POOL file here
619  if (!m_athenaPoolCnvSvc->setInputAttributes(*m_inputCollectionsIterator).isSuccess()) {
620  ATH_MSG_ERROR("Failed to set input attributes.");
621  return StatusCode::FAILURE;
622  }
623  if (m_processMetadata.value()) {
624  FileIncident beginInputFileIncident(name(), "BeginInputFile", *m_inputCollectionsIterator, m_guid.toString());
625  m_incidentSvc->fireIncident(beginInputFileIncident);
626  }
627  } else {
628  // Check if File is BS
629  if (tech != 0x00001000 && m_processMetadata.value()) {
630  FileIncident beginInputFileIncident(name(), "BeginInputFile", "FID:" + m_guid.toString(), m_guid.toString());
631  m_incidentSvc->fireIncident(beginInputFileIncident);
632  }
633  }
634  } // end if (guid != m_guid)
635  return StatusCode::SUCCESS;
636 }
637 //________________________________________________________________________________
638 StatusCode EventSelectorAthenaPool::nextWithSkip(IEvtSelector::Context& ctxt) const {
639  ATH_MSG_DEBUG("EventSelectorAthenaPool::nextWithSkip");
640 
641  for (;;) {
642  // Check if we're at the end of file
644  if (sc.isRecoverable()) {
645  continue; // handles empty files
646  }
647  if (sc.isFailure()) {
648  return StatusCode::FAILURE;
649  }
650 
651  // Increase event count
652  ++m_evtCount;
653 
654  if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
655  ATH_MSG_WARNING("Failed to preNext() CounterTool.");
656  }
658  && (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
659  {
660  return StatusCode::SUCCESS;
661  } else {
662  while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
663  m_skipEventRanges.erase(m_skipEventRanges.begin());
664  }
665  if (m_isSecondary.value()) {
666  ATH_MSG_INFO("skipping secondary event " << m_evtCount);
667  } else {
668  ATH_MSG_INFO("skipping event " << m_evtCount);
669  }
670  }
671  }
672 
673  return StatusCode::SUCCESS;
674 }
675 //________________________________________________________________________________
676 StatusCode EventSelectorAthenaPool::previous(IEvtSelector::Context& /*ctxt*/) const {
677  ATH_MSG_ERROR("previous() not implemented");
678  return StatusCode::FAILURE;
679 }
680 //________________________________________________________________________________
681 StatusCode EventSelectorAthenaPool::previous(IEvtSelector::Context& ctxt, int jump) const {
682  if (jump > 0) {
683  for (int i = 0; i < jump; i++) {
684  ATH_CHECK(previous(ctxt));
685  }
686  return StatusCode::SUCCESS;
687  }
688  return StatusCode::FAILURE;
689 }
690 //________________________________________________________________________________
691 StatusCode EventSelectorAthenaPool::last(IEvtSelector::Context& ctxt) const {
692  if (ctxt.identifier() == m_endIter->identifier()) {
693  ATH_MSG_DEBUG("last(): Last event in InputStream.");
694  return StatusCode::SUCCESS;
695  }
696  return StatusCode::FAILURE;
697 }
698 //________________________________________________________________________________
699 StatusCode EventSelectorAthenaPool::rewind(IEvtSelector::Context& ctxt) const {
700  ATH_CHECK(reinit());
701  ctxt = EventContextAthenaPool(this);
702  return StatusCode::SUCCESS;
703 }
704 //________________________________________________________________________________
705 StatusCode EventSelectorAthenaPool::createAddress(const IEvtSelector::Context& /*ctxt*/,
706  IOpaqueAddress*& iop) const {
707  std::string tokenStr;
709  if (attrList.isValid()) {
710  try {
711  tokenStr = (*attrList)["eventRef"].data<std::string>();
712  ATH_MSG_DEBUG("found AthenaAttribute, name = eventRef = " << tokenStr);
713  } catch (std::exception &e) {
714  ATH_MSG_ERROR(e.what());
715  return StatusCode::FAILURE;
716  }
717  } else {
718  ATH_MSG_WARNING("Cannot find AthenaAttribute, key = " << m_attrListKey.value());
719  tokenStr = m_headerIterator->eventRef().toString();
720  }
721  auto token = std::make_unique<Token>();
722  token->fromString(tokenStr);
723  iop = new TokenAddress(pool::POOL_StorageType.type(), ClassID_traits<DataHeader>::ID(), "", "EventSelector", IPoolSvc::kInputStream, std::move(token));
724  return StatusCode::SUCCESS;
725 }
726 //________________________________________________________________________________
727 StatusCode EventSelectorAthenaPool::releaseContext(IEvtSelector::Context*& /*ctxt*/) const {
728  return StatusCode::SUCCESS;
729 }
730 //________________________________________________________________________________
731 StatusCode EventSelectorAthenaPool::resetCriteria(const std::string& /*criteria*/,
732  IEvtSelector::Context& /*ctxt*/) const {
733  return StatusCode::SUCCESS;
734 }
735 //__________________________________________________________________________
736 StatusCode EventSelectorAthenaPool::seek(Context& /*ctxt*/, int evtNum) const {
737 
738  if( m_inputCollectionsChanged ) {
739  StatusCode rc = reinit();
740  if( rc != StatusCode::SUCCESS ) return rc;
741  }
742 
743  long newColl = findEvent(evtNum);
744  if (newColl == -1 && evtNum >= m_firstEvt[m_curCollection] && evtNum < m_evtCount - 1) {
745  newColl = m_curCollection;
746  }
747  if (newColl == -1) {
748  m_headerIterator = nullptr;
749  ATH_MSG_INFO("seek: Reached end of Input.");
750  fireEndFileIncidents(true);
751  return StatusCode::RECOVERABLE;
752  }
753  if (newColl != m_curCollection) {
754  if (!m_keepInputFilesOpen.value() && m_poolCollectionConverter) {
755  m_poolCollectionConverter->disconnectDb().ignore();
756  }
757  m_poolCollectionConverter.reset();
758  m_curCollection = newColl;
759  try {
760  ATH_MSG_DEBUG("Seek to item: \""
762  << "\" from the collection list.");
763  // Reset input collection iterator to the right place
764  m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
765  m_inputCollectionsIterator += m_curCollection;
766  m_poolCollectionConverter = std::make_unique<PoolCollectionConverter>(m_collectionType.value() + ":" + m_collectionTree.value(),
769  m_athenaPoolCnvSvc->getPoolSvc());
770  if (!m_poolCollectionConverter->initialize().isSuccess()) {
771  m_headerIterator = nullptr;
772  ATH_MSG_ERROR("seek: Unable to initialize PoolCollectionConverter.");
773  return StatusCode::FAILURE;
774  }
775  // Create DataHeader iterators
776  m_headerIterator = &m_poolCollectionConverter->selectAll();
777  EventContextAthenaPool* beginIter = new EventContextAthenaPool(this);
778  m_evtCount = m_firstEvt[m_curCollection];
779  next(*beginIter).ignore();
780  ATH_MSG_DEBUG("Token " << m_headerIterator->eventRef().toString());
781  } catch (std::exception &e) {
782  m_headerIterator = nullptr;
783  ATH_MSG_ERROR(e.what());
784  return StatusCode::FAILURE;
785  }
786  }
787 
788  if (m_headerIterator->seek(evtNum - m_firstEvt[m_curCollection]) == 0) {
789  m_headerIterator = nullptr;
790  ATH_MSG_ERROR("Did not find event, evtNum = " << evtNum);
791  return StatusCode::FAILURE;
792  } else {
793  m_evtCount = evtNum + 1;
794  }
795  return StatusCode::SUCCESS;
796 }
797 //__________________________________________________________________________
798 int EventSelectorAthenaPool::curEvent (const Context& /*ctxt*/) const {
799  return(m_evtCount);
800 }
801 //__________________________________________________________________________
802 // Search for event number evtNum.
803 // Return the index of the collection containing it, or -1 if not found.
804 // Note: passing -1 for evtNum will always yield failure,
805 // but this can be used to force filling in the entire m_numEvt array.
806 int EventSelectorAthenaPool::findEvent(int evtNum) const {
807  for (std::size_t i = 0, imax = m_numEvt.size(); i < imax; i++) {
808  if (m_numEvt[i] == -1) {
809  PoolCollectionConverter pcc(m_collectionType.value() + ":" + m_collectionTree.value(),
810  m_inputCollectionsProp.value()[i],
812  m_athenaPoolCnvSvc->getPoolSvc());
813  if (!pcc.initialize().isSuccess()) {
814  break;
815  }
816  int collection_size = 0;
817  if (pcc.isValid()) {
818  pool::ICollectionCursor* hi = &pcc.selectAll();
819  collection_size = hi->size();
820  }
821  if (i > 0) {
822  m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
823  } else {
824  m_firstEvt[i] = 0;
825  }
826  m_numEvt[i] = collection_size;
827  }
828  if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
829  return(i);
830  }
831  }
832  return(-1);
833 }
834 
835 //________________________________________________________________________________
837  IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
838  if (ds == nullptr) {
839  ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
840  return StatusCode::FAILURE;
841  }
842  if (num < 0) {
843  if (ds->makeServer(num - 1).isFailure()) {
844  ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to output DataStreaming server");
845  }
846  return StatusCode::SUCCESS;
847  }
848  if (ds->makeServer(num + 1).isFailure()) {
849  ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to input DataStreaming server");
850  return StatusCode::FAILURE;
851  }
852  if (m_eventStreamingTool.empty()) {
853  return StatusCode::SUCCESS;
854  }
855  m_processMetadata = false;
856  ATH_MSG_DEBUG("makeServer: " << m_eventStreamingTool << " = " << num);
857  return(m_eventStreamingTool->makeServer(1, ""));
858 }
859 
860 //________________________________________________________________________________
862  IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
863  if (ds == nullptr) {
864  ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
865  return StatusCode::FAILURE;
866  }
867  if (ds->makeClient(num + 1).isFailure()) {
868  ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to DataStreaming client");
869  return StatusCode::FAILURE;
870  }
871  if (m_eventStreamingTool.empty()) {
872  return StatusCode::SUCCESS;
873  }
874  ATH_MSG_DEBUG("makeClient: " << m_eventStreamingTool << " = " << num);
875  std::string dummyStr;
876  return(m_eventStreamingTool->makeClient(0, dummyStr));
877 }
878 
879 //________________________________________________________________________________
881  IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
882  if (ds == nullptr) {
883  ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
884  return StatusCode::FAILURE;
885  }
886  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
887  StatusCode sc = m_eventStreamingTool->lockEvent(evtnum);
888  while (sc.isRecoverable()) {
889  usleep(1000);
890  sc = m_eventStreamingTool->lockEvent(evtnum);
891  }
892 // Send stop client and wait for restart
893  if (sc.isFailure()) {
894  if (ds->makeClient(0).isFailure()) {
895  return StatusCode::FAILURE;
896  }
897  sc = m_eventStreamingTool->lockEvent(evtnum);
898  while (sc.isRecoverable() || sc.isFailure()) {
899  usleep(1000);
900  sc = m_eventStreamingTool->lockEvent(evtnum);
901  }
902 //FIXME
903  if (ds->makeClient(1).isFailure()) {
904  return StatusCode::FAILURE;
905  }
906  }
907  return(sc);
908  }
909  return StatusCode::FAILURE;
910 }
911 
912 //________________________________________________________________________________
914  IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
915  if (ds == nullptr) {
916  ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
917  return StatusCode::FAILURE;
918  }
919  if (m_eventStreamingTool.empty()) {
920  ATH_MSG_ERROR("No AthenaSharedMemoryTool configured for readEvent()");
921  return StatusCode::FAILURE;
922  }
923  ATH_MSG_VERBOSE("Called read Event " << maxevt);
924  IEvtSelector::Context* ctxt = new EventContextAthenaPool(this);
925  for (int i = 0; i < maxevt || maxevt == -1; ++i) {
926  if (!next(*ctxt).isSuccess()) {
927  if (m_evtCount == -1) {
928  ATH_MSG_VERBOSE("Called read Event and read last event from input: " << i);
929  break;
930  }
931  ATH_MSG_ERROR("Cannot read Event " << m_evtCount - 1 << " into AthenaSharedMemoryTool");
932  delete ctxt; ctxt = nullptr;
933  return StatusCode::FAILURE;
934  } else {
935  ATH_MSG_VERBOSE("Called next, read Event " << m_evtCount - 1);
936  }
937  }
938  delete ctxt; ctxt = nullptr;
939  // End of file, wait for last event to be taken
940  StatusCode sc;
941  while ( (sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
942  while (ds->readData().isSuccess()) {
943  ATH_MSG_VERBOSE("Called last readData, while marking last event in readEvent()");
944  }
945  usleep(1000);
946  }
947  if (!sc.isSuccess()) {
948  ATH_MSG_ERROR("Cannot put last Event marker to AthenaSharedMemoryTool");
949  return StatusCode::FAILURE;
950  } else {
951  sc = ds->readData();
952  while (sc.isSuccess() || sc.isRecoverable()) {
953  sc = ds->readData();
954  }
955  ATH_MSG_DEBUG("Failed last readData -> Clients are stopped, after marking last event in readEvent()");
956  }
957  return StatusCode::SUCCESS;
958 }
959 
960 //__________________________________________________________________________
961 int EventSelectorAthenaPool::size(Context& /*ctxt*/) const {
962  // Fetch sizes of all collections.
963  findEvent(-1);
964  return std::accumulate(m_numEvt.begin(), m_numEvt.end(), 0);
965 }
966 //__________________________________________________________________________
967 std::unique_ptr<PoolCollectionConverter>
968 EventSelectorAthenaPool::getCollectionCnv(bool throwIncidents) const {
969  while (m_inputCollectionsIterator != m_inputCollectionsProp.value().end()) {
970  if (m_curCollection != 0) {
971  m_numEvt[m_curCollection] = m_evtCount - m_firstEvt[m_curCollection];
972  m_curCollection++;
973  m_firstEvt[m_curCollection] = m_evtCount;
974  }
975  ATH_MSG_DEBUG("Try item: \"" << *m_inputCollectionsIterator << "\" from the collection list.");
976  auto pCollCnv = std::make_unique<PoolCollectionConverter>(m_collectionType.value() + ":" + m_collectionTree.value(),
977  *m_inputCollectionsIterator,
979  m_athenaPoolCnvSvc->getPoolSvc());
980  StatusCode status = pCollCnv->initialize();
981  if (!status.isSuccess()) {
982  // Close previous collection.
983  pCollCnv.reset();
984  if (!status.isRecoverable()) {
985  ATH_MSG_ERROR("Unable to initialize PoolCollectionConverter.");
986  throw GaudiException("Unable to read: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
987  } else {
988  ATH_MSG_ERROR("Unable to open: " << *m_inputCollectionsIterator);
989  throw GaudiException("Unable to open: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
990  }
991  } else {
992  if (!pCollCnv->isValid().isSuccess()) {
993  pCollCnv.reset();
994  ATH_MSG_DEBUG("No events found in: " << *m_inputCollectionsIterator << " skipped!!!");
995  if (throwIncidents && m_processMetadata.value()) {
996  FileIncident beginInputFileIncident(name(), "BeginInputFile", *m_inputCollectionsIterator);
997  m_incidentSvc->fireIncident(beginInputFileIncident);
998  FileIncident endInputFileIncident(name(), "EndInputFile", "eventless " + *m_inputCollectionsIterator);
999  m_incidentSvc->fireIncident(endInputFileIncident);
1000  }
1001  m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb(*m_inputCollectionsIterator).ignore();
1002  ++m_inputCollectionsIterator;
1003  } else {
1004  return(pCollCnv);
1005  }
1006  }
1007  }
1008  return(nullptr);
1009 }
1010 //__________________________________________________________________________
1012  // Get access to AttributeList
1013  ATH_MSG_DEBUG("Get AttributeList from the collection");
1014  // MN: accessing only attribute list, ignoring token list
1015  const coral::AttributeList& attrList = m_headerIterator->currentRow().attributeList();
1016  ATH_MSG_DEBUG("AttributeList size " << attrList.size());
1017  std::unique_ptr<AthenaAttributeList> athAttrList(new AthenaAttributeList(attrList));
1018  // Fill the new attribute list
1019  ATH_CHECK(fillAttributeList(athAttrList.get(), "", false));
1020  // Write the AttributeList
1022  ATH_CHECK(wh.record(std::move(athAttrList)));
1023  return StatusCode::SUCCESS;
1024 }
1025 //__________________________________________________________________________
1026 StatusCode EventSelectorAthenaPool::fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const
1027 {
1028  const pool::TokenList& tokenList = m_headerIterator->currentRow().tokenList();
1029  for (pool::TokenList::const_iterator iter = tokenList.begin(), last = tokenList.end(); iter != last; ++iter) {
1030  attrList->extend(iter.tokenName() + suffix, "string");
1031  (*attrList)[iter.tokenName() + suffix].data<std::string>() = iter->toString();
1032  ATH_MSG_DEBUG("record AthenaAttribute, name = " << iter.tokenName() + suffix << " = " << iter->toString() << ".");
1033  }
1034 
1035  std::string eventRef = "eventRef";
1036  if (m_isSecondary.value()) {
1037  eventRef.append(suffix);
1038  }
1039  attrList->extend(eventRef, "string");
1040  (*attrList)[eventRef].data<std::string>() = m_headerIterator->eventRef().toString();
1041  ATH_MSG_DEBUG("record AthenaAttribute, name = " + eventRef + " = " << m_headerIterator->eventRef().toString() << ".");
1042 
1043  if (copySource) {
1044  const coral::AttributeList& sourceAttrList = m_headerIterator->currentRow().attributeList();
1045  for (const auto &attr : sourceAttrList) {
1046  attrList->extend(attr.specification().name() + suffix, attr.specification().type());
1047  (*attrList)[attr.specification().name() + suffix] = attr;
1048  }
1049  }
1050 
1051  return StatusCode::SUCCESS;
1052 }
1053 //__________________________________________________________________________
1055  ATH_MSG_INFO("I/O reinitialization...");
1056  if (m_poolCollectionConverter) {
1057  m_poolCollectionConverter->disconnectDb().ignore();
1058  m_poolCollectionConverter.reset();
1059  }
1060  m_headerIterator = nullptr;
1061  ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
1062  if (!iomgr.retrieve().isSuccess()) {
1063  ATH_MSG_FATAL("Could not retrieve IoComponentMgr !");
1064  return StatusCode::FAILURE;
1065  }
1066  if (!iomgr->io_hasitem(this)) {
1067  ATH_MSG_FATAL("IoComponentMgr does not know about myself !");
1068  return StatusCode::FAILURE;
1069  }
1070  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
1071  m_guid = Guid::null();
1072  return(this->reinit());
1073  }
1074  std::vector<std::string> inputCollections = m_inputCollectionsProp.value();
1075  std::set<std::size_t> updatedIndexes;
1076  for (std::size_t i = 0, imax = m_inputCollectionsProp.value().size(); i < imax; i++) {
1077  if (updatedIndexes.find(i) != updatedIndexes.end()) continue;
1078  std::string savedName = inputCollections[i];
1079  std::string &fname = inputCollections[i];
1080  if (!iomgr->io_contains(this, fname)) {
1081  ATH_MSG_ERROR("IoComponentMgr does not know about [" << fname << "] !");
1082  return StatusCode::FAILURE;
1083  }
1084  if (!iomgr->io_retrieve(this, fname).isSuccess()) {
1085  ATH_MSG_FATAL("Could not retrieve new value for [" << fname << "] !");
1086  return StatusCode::FAILURE;
1087  }
1088  if (savedName != fname) {
1089  ATH_MSG_DEBUG("Mapping value for [" << savedName << "] to [" << fname << "]");
1090  m_athenaPoolCnvSvc->getPoolSvc()->renamePfn(savedName, fname);
1091  }
1092  updatedIndexes.insert(i);
1093  for (std::size_t j = i + 1; j < imax; j++) {
1094  if (inputCollections[j] == savedName) {
1095  inputCollections[j] = fname;
1096  updatedIndexes.insert(j);
1097  }
1098  }
1099  }
1100  // all good... copy over.
1102  m_guid = Guid::null();
1103  return reinit();
1104 }
1105 //__________________________________________________________________________
1107  ATH_MSG_INFO("I/O finalization...");
1108  if (m_poolCollectionConverter) {
1109  m_poolCollectionConverter->disconnectDb().ignore();
1110  m_poolCollectionConverter.reset();
1111  }
1112  return StatusCode::SUCCESS;
1113 }
1114 
1115 //__________________________________________________________________________
1116 /* Listen to IncidentType::BeginProcessing and EndProcessing
1117  Maintain counters of how many events from a given file are being processed.
1118  Files are identified by SG::SourceID (string GUID).
1119  When there are no more events from a file, see if it can be closed.
1120 */
1121 void EventSelectorAthenaPool::handle(const Incident& inc)
1122 {
1123  SG::SourceID fid;
1124  if (inc.type() == IncidentType::BeginProcessing) {
1125  if ( Atlas::hasExtendedEventContext(inc.context()) ) {
1126  fid = Atlas::getExtendedEventContext(inc.context()).proxy()->sourceID();
1127  }
1128  *m_sourceID.get(inc.context()) = fid;
1129  }
1130  else {
1131  fid = *m_sourceID.get(inc.context());
1132  }
1133 
1134  if( fid.empty() ) {
1135  ATH_MSG_WARNING("could not read event source ID from incident event context");
1136  return;
1137  }
1138  if( m_activeEventsPerSource.find( fid ) == m_activeEventsPerSource.end()) {
1139  ATH_MSG_DEBUG("Incident handler ignoring unknown input FID: " << fid );
1140  return;
1141  }
1142  ATH_MSG_DEBUG("** MN Incident handler " << inc.type() << " Event source ID=" << fid );
1143  if( inc.type() == IncidentType::BeginProcessing ) {
1144  // increment the events-per-file counter for FID
1145  m_activeEventsPerSource[fid]++;
1146  } else if( inc.type() == IncidentType::EndProcessing ) {
1147  m_activeEventsPerSource[fid]--;
1148  disconnectIfFinished( fid );
1149  *m_sourceID.get(inc.context()) = "";
1150  }
1151  if( msgLvl(MSG::DEBUG) ) {
1152  for( auto& source: m_activeEventsPerSource )
1153  msg(MSG::DEBUG) << "SourceID: " << source.first << " active events: " << source.second << endmsg;
1154  }
1155 }
1156 
1157 //__________________________________________________________________________
1158 /* Disconnect APR Database identifieed by a SG::SourceID when it is no longer in use:
1159  m_guid is not pointing to it and there are no events from it being processed
1160  (if the EventLoopMgr was not firing Begin/End incidents, this will just close the DB)
1161 */
1163 {
1164  if( m_eventStreamingTool.empty() && m_activeEventsPerSource.find(fid) != m_activeEventsPerSource.end()
1165  && m_activeEventsPerSource[fid] <= 0 && m_guid != fid ) {
1166  // Explicitly disconnect file corresponding to old FID to release memory
1167  if( !m_keepInputFilesOpen.value() ) {
1168  // Assume that the end of collection file indicates the end of payload file.
1169  if (m_processMetadata.value()) {
1170  FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + fid, fid);
1171  m_incidentSvc->fireIncident(endInputFileIncident);
1172  }
1173  ATH_MSG_INFO("Disconnecting input sourceID: " << fid );
1174  m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb("FID:" + fid, IPoolSvc::kInputStream).ignore();
1175  m_activeEventsPerSource.erase( fid );
1176  return true;
1177  }
1178  }
1179  return false;
1180 }
EventSelectorAthenaPool::last
virtual StatusCode last(IEvtSelector::Context &ctxt) const override
Definition: EventSelectorAthenaPool.cxx:691
AllowedVariables::e
e
Definition: AsgElectronSelectorTool.cxx:37
beamspotman.r
def r
Definition: beamspotman.py:672
EventSelectorAthenaPool::createContext
virtual StatusCode createContext(IEvtSelector::Context *&ctxt) const override
create context
Definition: EventSelectorAthenaPool.cxx:403
trigbs_pickEvents.ranges
ranges
Definition: trigbs_pickEvents.py:60
createLinkingScheme.iter
iter
Definition: createLinkingScheme.py:62
checkxAOD.ds
ds
Definition: Tools/PyUtils/bin/checkxAOD.py:260
python.tests.PyTestsLib.finalize
def finalize(self)
_info( "content of StoreGate..." ) self.sg.dump()
Definition: PyTestsLib.py:50
EventSelectorAthenaPool::fillAttributeList
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
Definition: EventSelectorAthenaPool.cxx:1026
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
EventSelectorAthenaPool::m_skipEvents
Gaudi::Property< int > m_skipEvents
SkipEvents, numbers of events to skip: default = 0.
Definition: EventSelectorAthenaPool.h:235
EventSelectorAthenaPool.h
This file contains the class definition for the EventSelectorAthenaPool class.
hotSpotInTAG.suffix
string suffix
Definition: hotSpotInTAG.py:185
EventSelectorAthenaPool::releaseContext
virtual StatusCode releaseContext(IEvtSelector::Context *&ctxt) const override
Definition: EventSelectorAthenaPool.cxx:727
EventSelectorAthenaPool::io_reinit
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
Definition: EventSelectorAthenaPool.cxx:1054
EventSelectorAthenaPool::curEvent
virtual int curEvent(const Context &ctxt) const override
Return the current event number.
Definition: EventSelectorAthenaPool.cxx:798
Gaudi::Parsers::parse
StatusCode parse(std::tuple< Tup... > &tup, const Gaudi::Parsers::InputData &input)
Definition: CaloGPUClusterAndCellDataMonitorOptions.h:284
vtune_athena.format
format
Definition: vtune_athena.py:14
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
EventSelectorAthenaPool::reinit
StatusCode reinit() const
Reinitialize the service when a fork() occurred/was-issued.
Definition: EventSelectorAthenaPool.cxx:217
TokenList.h
EventSelectorAthenaPool::makeClient
virtual StatusCode makeClient(int num) override
Make this a client.
Definition: EventSelectorAthenaPool.cxx:861
PoolCollectionConverter
This class provides an interface to POOL collections.
Definition: PoolCollectionConverter.h:27
SG::ReadHandle
Definition: StoreGate/StoreGate/ReadHandle.h:67
PoolCollectionConverter.h
This file contains the class definition for the PoolCollectionConverter class.
EventSelectorAthenaPool::previous
virtual StatusCode previous(IEvtSelector::Context &ctxt) const override
Definition: EventSelectorAthenaPool.cxx:676
IDataShare.h
EventSelectorAthenaPool::m_incidentSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
Definition: EventSelectorAthenaPool.h:179
runITkAlign.accumulate
accumulate
Update flags based on parser line args.
Definition: runITkAlign.py:62
initialize
void initialize()
Definition: run_EoverP.cxx:894
ICollectionCursor.h
Atlas::hasExtendedEventContext
bool hasExtendedEventContext(const EventContext &ctx)
Test whether a context object has an extended context installed.
Definition: ExtendedEventContext.cxx:23
EventSelectorAthenaPool::m_athenaPoolCnvSvc
ServiceHandle< IAthenaPoolCnvSvc > m_athenaPoolCnvSvc
Definition: EventSelectorAthenaPool.h:178
EventSelectorAthenaPool::start
virtual StatusCode start() override
Definition: EventSelectorAthenaPool.cxx:325
SG::SlotSpecificObj::get
T * get(const EventContext &ctx)
Return pointer to the object for slot given by ctx.
PoolCollectionConverter::selectAll
pool::ICollectionCursor & selectAll()
Definition: PoolCollectionConverter.cxx:110
python.SystemOfUnits.second
float second
Definition: SystemOfUnits.py:135
EventSelectorAthenaPool::readEvent
virtual StatusCode readEvent(int maxevt) override
Read the next maxevt events.
Definition: EventSelectorAthenaPool.cxx:913
Token::dbID
const Guid & dbID() const
Access database identifier.
Definition: Token.h:64
EventSelectorAthenaPool::m_eventStreamingTool
ToolHandle< IAthenaIPCTool > m_eventStreamingTool
Definition: EventSelectorAthenaPool.h:210
EventSelectorAthenaPool::eventStore
StoreGateSvc * eventStore() const
Return pointer to active event SG.
Definition: EventSelectorAthenaPool.cxx:83
EventSelectorAthenaPool::m_makeStreamingToolClient
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first iteration automatically.
Definition: EventSelectorAthenaPool.h:212
pool::TokenList::end
iterator end()
Returns a forward iterator pointing to last element in Token list.
Definition: TokenList.h:224
EventSelectorAthenaPool::m_firstLBNo
Gaudi::CheckedProperty< uint32_t > m_firstLBNo
Definition: EventSelectorAthenaPool.h:225
python.PyKernel.AttributeList
AttributeList
Definition: PyKernel.py:36
DbType.h
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
EventSelectorAthenaPool::findEvent
int findEvent(int evtNum) const
Search for event with number evtNum.
Definition: EventSelectorAthenaPool.cxx:806
MuonR4::to_string
std::string to_string(const SectorProjector proj)
Definition: MsTrackSeeder.cxx:66
EventSelectorAthenaPool::m_callLock
CallMutex m_callLock
Definition: EventSelectorAthenaPool.h:245
AthenaAttributeList.h
EventContextAthenaPool
This class provides the context to access an event from POOL persistent store.
Definition: EventContextAthenaPool.h:21
EventSelectorAthenaPool::stop
virtual StatusCode stop() override
Definition: EventSelectorAthenaPool.cxx:353
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
PoolCollectionConverter::isValid
StatusCode isValid() const
Check whether has valid pool::ICollection*.
Definition: PoolCollectionConverter.cxx:106
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
Token
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition: Token.h:21
IDataShare
Abstract interface for sharing data.
Definition: IDataShare.h:24
EventSelectorAthenaPool::m_counterTool
ToolHandle< IAthenaSelectorTool > m_counterTool
Definition: EventSelectorAthenaPool.h:209
TokenAddress
This class provides a Generic Transient Address for POOL tokens.
Definition: TokenAddress.h:23
Atlas::getExtendedEventContext
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
Definition: ExtendedEventContext.cxx:32
pool::TokenList
Definition: TokenList.h:24
EventSelectorAthenaPool::share
virtual StatusCode share(int evtnum) override
Request to share a given event number.
Definition: EventSelectorAthenaPool.cxx:880
WriteHandle.h
Handle class for recording to StoreGate.
IPoolSvc::kInputStream
@ kInputStream
Definition: IPoolSvc.h:39
EventSelectorAthenaPool::m_runNo
Gaudi::CheckedProperty< uint32_t > m_runNo
The following are included for compatibility with McEventSelector and are not really used.
Definition: EventSelectorAthenaPool.h:217
EventContextAthenaPool::identifier
virtual void * identifier() const
Definition: EventContextAthenaPool.h:56
StoreGateSvc
The Athena Transient Store API.
Definition: StoreGateSvc.h:122
TileAANtupleConfig.inputCollections
inputCollections
Definition: TileAANtupleConfig.py:145
CollectionRowBuffer.h
Token::technology
int technology() const
Access technology type.
Definition: Token.h:77
Guid::toString
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
EventSelectorAthenaPool::m_firedIncident
std::atomic_bool m_firedIncident
Definition: EventSelectorAthenaPool.h:242
Amg::toString
std::string toString(const Translation3D &translation, int precision=4)
GeoPrimitvesToStringConverter.
Definition: GeoPrimitivesToStringConverter.h:40
EventSelectorAthenaPool::m_oldRunNo
Gaudi::CheckedProperty< uint32_t > m_oldRunNo
Definition: EventSelectorAthenaPool.h:218
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
EventSelectorAthenaPool::m_attrListKey
Gaudi::Property< std::string > m_attrListKey
AttributeList SG key.
Definition: EventSelectorAthenaPool.h:194
EventSelectorAthenaPool::rewind
virtual StatusCode rewind(IEvtSelector::Context &ctxt) const override
Definition: EventSelectorAthenaPool.cxx:699
lumiFormat.i
int i
Definition: lumiFormat.py:85
IAthenaIPCTool
Definition: IAthenaIPCTool.h:14
EventSelectorAthenaPool::getCollectionCnv
std::unique_ptr< PoolCollectionConverter > getCollectionCnv(bool throwIncidents=false) const
Return pointer to new PoolCollectionConverter.
Definition: EventSelectorAthenaPool.cxx:968
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
L1CaloPhase1Monitoring.propVal
propVal
Definition: L1CaloPhase1Monitoring.py:557
EventSelectorAthenaPool::resetCriteria
virtual StatusCode resetCriteria(const std::string &criteria, IEvtSelector::Context &ctxt) const override
Set a selection criteria.
Definition: EventSelectorAthenaPool.cxx:731
EventSelectorAthenaPool::m_isSecondary
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
Definition: EventSelectorAthenaPool.h:183
StoreGateSvc::currentStoreGate
static StoreGateSvc * currentStoreGate()
get current StoreGate
Definition: StoreGateSvc.cxx:51
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:37
calibdata.exception
exception
Definition: calibdata.py:495
parseDir.wh
wh
Definition: parseDir.py:45
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
EventSelectorAthenaPool::fireEndFileIncidents
void fireEndFileIncidents(bool isLastFile) const
Fires the EndInputFile incident (if there is an open file) at end of selector.
Definition: EventSelectorAthenaPool.cxx:365
AthenaAttributeList
An AttributeList represents a logical row of attributes in a metadata table. The name and type of eac...
Definition: PersistentDataModel/PersistentDataModel/AthenaAttributeList.h:45
xAOD::eventNumber
eventNumber
Definition: EventInfo_v1.cxx:124
EventSelectorAthenaPool::m_eventsPerRun
Gaudi::CheckedProperty< uint64_t > m_eventsPerRun
Definition: EventSelectorAthenaPool.h:224
StoreGateSvc::clearStore
virtual StatusCode clearStore(bool forceRemove=false) override final
clear DataStore contents: called by the event loop mgrs
Definition: StoreGateSvc.cxx:425
DataHeader.h
This file contains the class definition for the DataHeader and DataHeaderElement classes.
imax
int imax(int i, int j)
Definition: TileLaserTimingTool.cxx:33
checkRpcDigits.allGood
bool allGood
Loop over the SDOs & Digits.
Definition: checkRpcDigits.py:171
EventSelectorAthenaPool::m_collectionTree
Gaudi::Property< std::string > m_collectionTree
CollectionTree, prefix of the collection TTree: default = "POOLContainer".
Definition: EventSelectorAthenaPool.h:189
EventSelectorAthenaPool::inputCollectionsHandler
void inputCollectionsHandler(Gaudi::Details::PropertyBase &)
Definition: EventSelectorAthenaPool.cxx:74
SG::ReadHandle::isValid
virtual bool isValid() override final
Can the handle be successfully dereferenced?
EventSelectorAthenaPool::m_firstEventNo
Gaudi::CheckedProperty< uint64_t > m_firstEventNo
Definition: EventSelectorAthenaPool.h:222
EventSelectorAthenaPool::m_skipEventRangesProp
Gaudi::Property< std::string > m_skipEventRangesProp
Skip Events - comma separated list of event to skip, ranges with '-': <start> - <end>
Definition: EventSelectorAthenaPool.h:238
EventSelectorAthenaPool::handle
virtual void handle(const Incident &incident) override
Incident service handle listening for BeginProcessing and EndProcessing.
Definition: EventSelectorAthenaPool.cxx:1121
pool::ICollectionCursor
Definition: ICollectionCursor.h:22
pool_uuid.guid
guid
Definition: pool_uuid.py:112
trigbs_pickEvents.num
num
Definition: trigbs_pickEvents.py:76
EventSelectorAthenaPool::~EventSelectorAthenaPool
virtual ~EventSelectorAthenaPool()
Destructor.
Definition: EventSelectorAthenaPool.cxx:80
pool::TokenList::const_iterator
Constant forward iterator class for navigation of TokenList objects.
Definition: TokenList.h:164
EventSelectorAthenaPool::next
virtual StatusCode next(IEvtSelector::Context &ctxt) const override
Definition: EventSelectorAthenaPool.cxx:408
EventSelectorAthenaPool::m_processMetadata
Gaudi::Property< bool > m_processMetadata
ProcessMetadata, switch on firing of FileIncidents which will trigger processing of metadata: default...
Definition: EventSelectorAthenaPool.h:185
pool::TokenList::extend
void extend(const std::string &name)
Extends the Token list by one element.
EventSelectorAthenaPool::disconnectIfFinished
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Definition: EventSelectorAthenaPool.cxx:1162
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
EventSelectorAthenaPool::m_inputCollectionsProp
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
InputCollections, vector with names of the input collections.
Definition: EventSelectorAthenaPool.h:197
pool::READ
@ READ
Definition: Database/APR/StorageSvc/StorageSvc/pool.h:45
AtlCoolConsole.tool
tool
Definition: AtlCoolConsole.py:452
EventSelectorAthenaPool::recordAttributeList
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
Definition: EventSelectorAthenaPool.cxx:1011
Token::toString
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition: Token.cxx:134
EventSelectorAthenaPool::m_sourceID
SG::SlotSpecificObj< SG::SourceID > m_sourceID
Definition: EventSelectorAthenaPool.h:247
EventSelectorAthenaPool::nextHandleFileTransition
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context &ctxt) const override
Handle file transition at the next iteration.
Definition: EventSelectorAthenaPool.cxx:559
EventSelectorAthenaPool::createAddress
virtual StatusCode createAddress(const IEvtSelector::Context &ctxt, IOpaqueAddress *&iop) const override
Definition: EventSelectorAthenaPool.cxx:705
EventSelectorAthenaPool::nextWithSkip
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
Definition: EventSelectorAthenaPool.cxx:638
EventSelectorAthenaPool::m_curCollection
std::atomic_long m_curCollection
Definition: EventSelectorAthenaPool.h:230
pool::TokenList::begin
iterator begin()
Returns a forward iterator pointing to first element in Token list.
Definition: TokenList.h:218
IPoolSvc.h
This file contains the class definition for the IPoolSvc interface class.
CxxUtils::to
CONT to(RANGE &&r)
Definition: ranges.h:39
Guid::null
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition: Guid.cxx:14
Token::fromString
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition: Token.cxx:147
python.AthDsoLogger.fname
string fname
Definition: AthDsoLogger.py:66
python.PyAthena.v
v
Definition: PyAthena.py:154
EventSelectorAthenaPool::m_eventsPerLB
Gaudi::CheckedProperty< uint32_t > m_eventsPerLB
Definition: EventSelectorAthenaPool.h:226
TokenAddress.h
This file contains the class definition for the TokenAddress class.
EventSelectorAthenaPool::m_endIter
EventContextAthenaPool * m_endIter
Definition: EventSelectorAthenaPool.h:170
PoolCollectionConverter::initialize
StatusCode initialize()
Required by all Gaudi Services.
Definition: PoolCollectionConverter.cxx:53
SG::WriteHandle
Definition: StoreGate/StoreGate/WriteHandle.h:73
EventSelectorAthenaPool::initialize
virtual StatusCode initialize() override
Required of all Gaudi Services.
Definition: EventSelectorAthenaPool.cxx:87
Guid
This class provides a encapsulation of a GUID/UUID/CLSID/IID data structure (128 bit number).
Definition: Guid.h:25
EventSelectorAthenaPool::m_evtCount
std::atomic_int m_evtCount
Definition: EventSelectorAthenaPool.h:241
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
pool::ITransaction::READ
@ READ
Definition: ITransaction.h:29
pool::ICollectionCursor::size
virtual std::size_t size()=0
Returns the size of the collection.
SG::SourceID
std::string SourceID
Definition: AthenaKernel/AthenaKernel/SourceID.h:25
DeMoScan.first
bool first
Definition: DeMoScan.py:534
DEBUG
#define DEBUG
Definition: page_access.h:11
EventSelectorAthenaPool::io_finalize
virtual StatusCode io_finalize() override
Callback method to finalize the internal state of the component for I/O purposes (e....
Definition: EventSelectorAthenaPool.cxx:1106
EventSelectorAthenaPool::m_initTimeStamp
Gaudi::CheckedProperty< uint32_t > m_initTimeStamp
Definition: EventSelectorAthenaPool.h:227
EventSelectorAthenaPool::m_skipEventSequenceProp
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp
Definition: EventSelectorAthenaPool.h:236
EventSelectorAthenaPool::makeServer
virtual StatusCode makeServer(int num) override
Make this a server.
Definition: EventSelectorAthenaPool.cxx:836
copySelective.source
string source
Definition: copySelective.py:31
merge.status
status
Definition: merge.py:16
EventSelectorAthenaPool::size
virtual int size(Context &ctxt) const override
Return the size of the collection.
Definition: EventSelectorAthenaPool.cxx:961
ATLAS_THREAD_SAFE
#define ATLAS_THREAD_SAFE
Definition: checker_macros.h:211
jobOptions.fileName
fileName
Definition: jobOptions.SuperChic_ALP2.py:39
ReadHandle.h
Handle class for reading from StoreGate.
EventSelectorAthenaPool::finalize
virtual StatusCode finalize() override
Definition: EventSelectorAthenaPool.cxx:382
EventSelectorAthenaPool::m_keepInputFilesOpen
Gaudi::Property< bool > m_keepInputFilesOpen
KeepInputFilesOpen, boolean flag to keep files open after PoolCollection reaches end: default = false...
Definition: EventSelectorAthenaPool.h:205
Token.h
This file contains the class definition for the Token class (migrated from POOL).
EventSelectorAthenaPool::m_collectionType
Gaudi::Property< std::string > m_collectionType
CollectionType, type of the collection: default = "ImplicitCollection".
Definition: EventSelectorAthenaPool.h:187
StoreGateSvc.h
EventSelectorAthenaPool::EventSelectorAthenaPool
EventSelectorAthenaPool(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Definition: EventSelectorAthenaPool.cxx:57
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
StoreID::storeName
static const std::string & storeName(const StoreID::type &s)
Definition: StoreID.cxx:77
EventContextAthenaPool.h
This file contains the class definition for the EventContextAthenaPool class.
EventSelectorAthenaPool::seek
virtual StatusCode seek(Context &ctxt, int evtnum) const override
Seek to a given event number.
Definition: EventSelectorAthenaPool.cxx:736
ServiceHandle
Definition: ClusterMakerTool.h:37