ATLAS Offline Software
Loading...
Searching...
No Matches
EventSelectorAthenaPool.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
9
13
18#include "PoolSvc/IPoolSvc.h"
22
23// Framework
24#include "GaudiKernel/ClassID.h"
25#include "GaudiKernel/FileIncident.h"
26#include "GaudiKernel/IIncidentSvc.h"
27#include "GaudiKernel/IIoComponentMgr.h"
28#include "GaudiKernel/GaudiException.h"
29#include "GaudiKernel/GenericAddress.h"
30#include "GaudiKernel/StatusCode.h"
32
33// Pool
36#include "StorageSvc/DbType.h"
37
38#include <boost/tokenizer.hpp>
39#include <algorithm>
40#include <format>
41#include <vector>
42
43
44//________________________________________________________________________________
45EventSelectorAthenaPool::EventSelectorAthenaPool(const std::string& name, ISvcLocator* pSvcLocator) :
46 base_class(name, pSvcLocator)
47{
48 // TODO: validate if those are even used
49 m_runNo.verifier().setLower(0);
50 m_oldRunNo.verifier().setLower(0);
51 m_eventsPerRun.verifier().setLower(0);
52 m_firstEventNo.verifier().setLower(1);
53 m_firstLBNo.verifier().setLower(0);
54 m_eventsPerLB.verifier().setLower(0);
55 m_initTimeStamp.verifier().setLower(0);
56
58 m_inputCollectionsChanged = false;
59}
60//________________________________________________________________________________
61void EventSelectorAthenaPool::inputCollectionsHandler(Gaudi::Details::PropertyBase&) {
62 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
63 m_inputCollectionsChanged = true;
64 }
65}
66//________________________________________________________________________________
69//________________________________________________________________________________
73//________________________________________________________________________________
75
76 m_autoRetrieveTools = false;
77 m_checkToolDeps = false;
78
79 if (m_isSecondary.value()) {
80 ATH_MSG_DEBUG("Initializing secondary event selector " << name());
81 } else {
82 ATH_MSG_DEBUG("Initializing " << name());
83 }
84
85 ATH_CHECK(::AthService::initialize());
86 // Check for input collection
87 if (m_inputCollectionsProp.value().empty()) {
88 ATH_MSG_FATAL("Use the property: EventSelector.InputCollections = "
89 << "[ \"<collectionName>\" ] (list of collections)");
90 return StatusCode::FAILURE;
91 }
92 boost::char_separator<char> sep_coma(","), sep_hyph("-");
93 boost::tokenizer ranges(m_skipEventRangesProp.value(), sep_coma);
94 for( const std::string& r: ranges ) {
95 boost::tokenizer fromto(r, sep_hyph);
96 auto from_iter = fromto.begin();
97 long from = std::stol(*from_iter);
98 long to = from;
99 if( ++from_iter != fromto.end() ) {
100 to = std::stol(*from_iter);
101 }
102 m_skipEventRanges.emplace_back(from, to);
103 }
104
105 for( auto v : m_skipEventSequenceProp.value() ) {
106 m_skipEventRanges.emplace_back(v, v);
107 }
108 std::sort(m_skipEventRanges.begin(), m_skipEventRanges.end());
109 if( msgLvl(MSG::DEBUG) ) {
110 std::string skip_ranges_str;
111 for( const auto& [first, second] : m_skipEventRanges ) {
112 if( !skip_ranges_str.empty() ) skip_ranges_str += ", ";
113 skip_ranges_str += std::to_string(first);
114 if( first != second) skip_ranges_str += std::format("-{}", second);
115 }
116 if( !skip_ranges_str.empty() )
117 ATH_MSG_DEBUG("Events to skip: " << skip_ranges_str);
118 }
119
120 // Get AthenaPoolCnvSvc
121 ATH_CHECK(m_athenaPoolCnvSvc.retrieve());
122 // Get CounterTool (if configured)
123 if (!m_counterTool.empty()) {
124 ATH_CHECK(m_counterTool.retrieve());
125 }
126 // Get HelperTools
127 ATH_CHECK(m_helperTools.retrieve());
128
129 // Ensure the xAODCnvSvc is listed in the EventPersistencySvc
130 ServiceHandle<IProperty> epSvc("EventPersistencySvc", name());
131 std::vector<std::string> propVal;
132 ATH_CHECK(Gaudi::Parsers::parse(propVal , epSvc->getProperty("CnvServices").toString()));
133 bool foundCnvSvc = false;
134 for (const auto& property : propVal) {
135 if (property == m_athenaPoolCnvSvc.type()) { foundCnvSvc = true; }
136 }
137 if (!foundCnvSvc) {
138 propVal.push_back(m_athenaPoolCnvSvc.type());
139 if (!epSvc->setProperty("CnvServices", Gaudi::Utils::toString(propVal)).isSuccess()) {
140 ATH_MSG_FATAL("Cannot set EventPersistencySvc Property for CnvServices");
141 return StatusCode::FAILURE;
142 }
143 }
144
145 // Register this service for 'I/O' events
146 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
147 ATH_CHECK(iomgr.retrieve());
148 ATH_CHECK(iomgr->io_register(this));
149 // Register input file's names with the I/O manager
150 const std::vector<std::string>& incol = m_inputCollectionsProp.value();
151 bool allGood = true;
152 for (const auto& inputCollection : incol) {
153 if (!iomgr->io_register(this, IIoComponentMgr::IoMode::READ, inputCollection, inputCollection).isSuccess()) {
154 ATH_MSG_FATAL("could not register [" << inputCollection << "] for output !");
155 allGood = false;
156 } else {
157 ATH_MSG_VERBOSE("io_register[" << this->name() << "](" << inputCollection << ") [ok]");
158 }
159 }
160 if (!allGood) {
161 return StatusCode::FAILURE;
162 }
163
164 // Connect to PersistencySvc
165 if (!m_athenaPoolCnvSvc->getPoolSvc()->connect(pool::ITransaction::READ, IPoolSvc::kInputStream).isSuccess()) {
166 ATH_MSG_FATAL("Cannot connect to POOL PersistencySvc.");
167 return StatusCode::FAILURE;
168 }
169 // Jump to reinit() to execute common init/reinit actions
170 m_guid = Guid::null();
171 if (!reinit().isSuccess()) {
172 return StatusCode::FAILURE;
173 }
174 // Get IncidentSvc
175 ATH_CHECK(m_incidentSvc.retrieve());
176 // Listen to the Event Processing incidents
177 m_incidentSvc->addListener(this, IncidentType::BeginProcessing, 0);
178 m_incidentSvc->addListener(this, IncidentType::EndProcessing, 0);
179 return StatusCode::SUCCESS;
180}
181//________________________________________________________________________________
183 ATH_MSG_DEBUG("reinitialization...");
184
185 // reset markers
186 m_numEvt.resize(m_inputCollectionsProp.value().size(), -1);
187 m_firstEvt.resize(m_inputCollectionsProp.value().size(), -1);
188
189 // Initialize InputCollectionsIterator
190 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
191 m_curCollection = 0;
192 if (!m_firstEvt.empty()) {
193 m_firstEvt[0] = 0;
194 }
195 m_inputCollectionsChanged = false;
196 m_evtCount = 0;
197 m_headerIterator = 0;
198 bool retError = false;
199 for (auto& tool : m_helperTools) {
200 if (!tool->postInitialize().isSuccess()) {
201 ATH_MSG_FATAL("Failed to postInitialize() " << tool->name());
202 retError = true;
203 }
204 }
205 if (retError) {
206 ATH_MSG_FATAL("Failed to postInitialize() helperTools");
207 return StatusCode::FAILURE;
208 }
209
210 // Create an m_poolCollectionConverter to read the objects in
211 m_poolCollectionConverter = getCollectionCnv();
212 if (!m_poolCollectionConverter) {
213 ATH_MSG_INFO("No Events found in any Input Collections");
214 if (m_processMetadata.value()) {
215 m_inputCollectionsIterator = m_inputCollectionsProp.value().end();
216 if (!m_inputCollectionsProp.value().empty()) --m_inputCollectionsIterator;
217 //NOTE (wb may 2016): this will make the FirstInputFile incident correspond to last file in the collection ... if want it to be first file then move iterator to begin and then move above two lines below this incident firing
218 if (m_collectionType.value() == "ImplicitCollection" && !m_firedIncident && !m_inputCollectionsProp.value().empty()) {
219 FileIncident firstInputFileIncident(name(), "FirstInputFile", *m_inputCollectionsIterator);
220 m_incidentSvc->fireIncident(firstInputFileIncident);
221 m_firedIncident = true;
222 }
223 }
224 return StatusCode::SUCCESS;
225 }
226 // Get DataHeader iterator
227 try {
228 m_headerIterator = &m_poolCollectionConverter->selectAll();
229 } catch (std::exception &e) {
230 ATH_MSG_FATAL("Cannot open implicit collection - check data/software version.");
231 ATH_MSG_ERROR(e.what());
232 return StatusCode::FAILURE;
233 }
234 while (m_headerIterator == nullptr || m_headerIterator->next() == 0) { // no selected events
235 if (m_poolCollectionConverter) {
236 m_poolCollectionConverter->disconnectDb().ignore();
237 m_poolCollectionConverter.reset();
238 }
239 ++m_inputCollectionsIterator;
240 m_poolCollectionConverter = getCollectionCnv();
241 if (m_poolCollectionConverter) {
242 m_headerIterator = &m_poolCollectionConverter->selectAll();
243 } else {
244 break;
245 }
246 }
247 if (!m_poolCollectionConverter || m_headerIterator == nullptr) { // no event selected in any collection
248 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
249 m_curCollection = 0;
250 m_poolCollectionConverter = getCollectionCnv();
251 if (!m_poolCollectionConverter) {
252 return StatusCode::SUCCESS;
253 }
254 m_headerIterator = &m_poolCollectionConverter->selectAll();
255 while (m_headerIterator == nullptr || m_headerIterator->next() == 0) { // empty collection
256 if (m_poolCollectionConverter) {
257 m_poolCollectionConverter->disconnectDb().ignore();
258 m_poolCollectionConverter.reset();
259 }
260 ++m_inputCollectionsIterator;
261 m_poolCollectionConverter = getCollectionCnv();
262 if (m_poolCollectionConverter) {
263 m_headerIterator = &m_poolCollectionConverter->selectAll();
264 } else {
265 break;
266 }
267 }
268 }
269 if (!m_poolCollectionConverter || m_headerIterator == nullptr) {
270 return StatusCode::SUCCESS;
271 }
272 const Token& headRef = m_headerIterator->eventRef();
273 const std::string fid = headRef.dbID().toString();
274 const int tech = headRef.technology();
275 ATH_MSG_VERBOSE("reinit(): First DataHeder Token=" << headRef.toString() );
276
277 // Check if File is BS, for which Incident is thrown by SingleEventInputSvc
278 if (tech != 0x00001000 && m_processMetadata.value() && !m_firedIncident) {
279 FileIncident firstInputFileIncident(name(), "FirstInputFile", "FID:" + fid, fid);
280 m_incidentSvc->fireIncident(firstInputFileIncident);
281 m_firedIncident = true;
282 }
283 return StatusCode::SUCCESS;
284}
285//________________________________________________________________________________
287 if (m_poolCollectionConverter) {
288 // Reset iterators and apply new query
289 m_poolCollectionConverter->disconnectDb().ignore();
290 m_poolCollectionConverter.reset();
291 }
292 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
293 m_curCollection = 0;
294 m_poolCollectionConverter = getCollectionCnv(true);
295 if (!m_poolCollectionConverter) {
296 ATH_MSG_INFO("No Events found in any Input Collections");
297 m_inputCollectionsIterator = m_inputCollectionsProp.value().end();
298 if (!m_inputCollectionsProp.value().empty()) {
299 --m_inputCollectionsIterator; //leave iterator in state of last input file
300 }
301 } else {
302 m_headerIterator = &m_poolCollectionConverter->selectAll();
303 }
304 m_evtCount = 0;
305 delete m_endIter;
306 m_endIter = nullptr;
307 m_endIter = new EventContextAthenaPool(nullptr);
308 return StatusCode::SUCCESS;
309}
310//________________________________________________________________________________
312 // Fire EndInputFile for any file still open (the event loop may end
313 // before the file is fully read).
314 m_inputFileGuard.reset();
315
316 IEvtSelector::Context* ctxt(nullptr);
317 if (!releaseContext(ctxt).isSuccess()) {
318 ATH_MSG_WARNING("Cannot release context");
319 }
320 return StatusCode::SUCCESS;
321}
322
323//________________________________________________________________________________
325 if (!m_counterTool.empty() && !m_counterTool->preFinalize().isSuccess()) {
326 ATH_MSG_WARNING("Failed to preFinalize() CounterTool");
327 }
328 for (auto& tool : m_helperTools) {
329 if (!tool->preFinalize().isSuccess()) {
330 ATH_MSG_WARNING("Failed to preFinalize() " << tool->name());
331 }
332 }
333 delete m_endIter; m_endIter = nullptr;
334 m_headerIterator = nullptr;
335 if (m_poolCollectionConverter) {
336 m_poolCollectionConverter.reset();
337 }
338 // Finalize the Service base class.
339 return ::AthService::finalize();
340}
341
342//________________________________________________________________________________
343StatusCode EventSelectorAthenaPool::createContext(IEvtSelector::Context*& ctxt) const {
344 ctxt = new EventContextAthenaPool(this);
345 return StatusCode::SUCCESS;
346}
347//________________________________________________________________________________
348StatusCode EventSelectorAthenaPool::next(IEvtSelector::Context& ctxt) const {
349 std::lock_guard<CallMutex> lockGuard(m_callLock);
350 for (const auto& tool : m_helperTools) {
351 if (!tool->preNext().isSuccess()) {
352 ATH_MSG_WARNING("Failed to preNext() " << tool->name());
353 }
354 }
355 for (;;) {
356 // Handle possible file transition
357 StatusCode sc = nextHandleFileTransition(ctxt);
358 if (sc.isRecoverable()) {
359 continue; // handles empty files
360 }
361 if (sc.isFailure()) {
362 return StatusCode::FAILURE;
363 }
364 // Increase event count
365 ++m_evtCount;
366 if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
367 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
368 }
370 && (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
371 {
372 if (!m_isSecondary.value()) {
373 if (!this->recordAttributeList().isSuccess()) {
374 ATH_MSG_ERROR("Failed to record AttributeList.");
375 return StatusCode::FAILURE;
376 }
377 }
378 StatusCode status = StatusCode::SUCCESS;
379 for (const auto& tool : m_helperTools) {
380 StatusCode toolStatus = tool->postNext();
381 if (toolStatus.isRecoverable()) {
382 ATH_MSG_INFO("Request skipping event from: " << tool->name());
383 if (status.isSuccess()) {
384 status = StatusCode::RECOVERABLE;
385 }
386 } else if (toolStatus.isFailure()) {
387 ATH_MSG_WARNING("Failed to postNext() " << tool->name());
388 status = StatusCode::FAILURE;
389 }
390 }
391 if (status.isRecoverable()) {
392 ATH_MSG_INFO("skipping event " << m_evtCount);
393 } else if (status.isFailure()) {
394 ATH_MSG_WARNING("Failed to postNext() HelperTool.");
395 } else {
396 if (!m_counterTool.empty() && !m_counterTool->postNext().isSuccess()) {
397 ATH_MSG_WARNING("Failed to postNext() CounterTool.");
398 }
399 break;
400 }
401 } else {
402 while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
403 m_skipEventRanges.erase(m_skipEventRanges.begin());
404 }
405 ATH_MSG_INFO("skipping event " << m_evtCount);
406 }
407 }
408 return StatusCode::SUCCESS;
409}
410//________________________________________________________________________________
411StatusCode EventSelectorAthenaPool::next(IEvtSelector::Context& ctxt, int jump) const {
412 if (jump > 0) {
413 for (int i = 0; i < jump; i++) {
414 ATH_CHECK(next(ctxt));
415 }
416 return StatusCode::SUCCESS;
417 }
418 return StatusCode::FAILURE;
419}
420//________________________________________________________________________________
421StatusCode EventSelectorAthenaPool::nextHandleFileTransition(IEvtSelector::Context& ctxt) const
422{
423 if( m_inputCollectionsChanged ) {
424 StatusCode rc = reinit();
425 if( rc != StatusCode::SUCCESS ) return rc;
426 }
427 else { // advance to the next (not needed after reinit)
428 // Check if we're at the end of file
429 if (m_headerIterator == nullptr || m_headerIterator->next() == 0) {
430 m_headerIterator = nullptr;
431 // Close previous collection.
432 m_poolCollectionConverter.reset();
433
434 // Fire EndInputFile while data is still accessible, then disconnect
435 m_inputFileGuard.reset();
436 const SG::SourceID old_guid = m_guid.toString();
437 m_guid = Guid::null();
438 disconnectIfFinished( old_guid );
439
440 // check if somebody updated Inputs in the EOF incident (like VP1 does)
441 if( m_inputCollectionsChanged ) {
442 StatusCode rc = reinit();
443 if( rc != StatusCode::SUCCESS ) return rc;
444 } else {
445 // Open next file from inputCollections list.
446 ++m_inputCollectionsIterator;
447 // Create PoolCollectionConverter for input file
448 m_poolCollectionConverter = getCollectionCnv(true);
449 if (!m_poolCollectionConverter) {
450 // Return end iterator
451 ctxt = *m_endIter;
452 // This is not a real failure but a Gaudi way of handling "end of job"
453 return StatusCode::FAILURE;
454 }
455 // Get DataHeader iterator
456 m_headerIterator = &m_poolCollectionConverter->selectAll();
457
458 // Return RECOVERABLE to mark we should still continue
459 return StatusCode::RECOVERABLE;
460 }
461 }
462 }
463 const Token& headRef = m_headerIterator->eventRef();
464 const Guid guid = headRef.dbID();
465 const int tech = headRef.technology();
466 ATH_MSG_VERBOSE("next(): DataHeder Token=" << headRef.toString() );
467
468 if (guid != m_guid) {
469 // we are starting reading from a new DB. Check if the old one needs to be retired
470 if (m_guid != Guid::null()) {
471 // zero the current DB ID (m_guid) before trying disconnect() to indicate it is no longer in use
472 const SG::SourceID old_guid = m_guid.toString();
473 m_guid = Guid::null();
474 // EndInputFile is fired by the guard transition() below; just disconnect here
475 disconnectIfFinished( old_guid );
476 }
477 m_guid = guid;
478 m_activeEventsPerSource[guid.toString()] = 0;
479 // Fire BeginInputFile incident if current InputCollection is a payload file;
480 // otherwise, ascertain whether the pointed-to file is reachable before firing any incidents and/or proceeding
481 if (m_collectionType.value() == "ImplicitCollection") {
482 // For now, we can only deal with input metadata from POOL files, but we know we have a POOL file here
483 if (!m_athenaPoolCnvSvc->setInputAttributes(*m_inputCollectionsIterator).isSuccess()) {
484 ATH_MSG_ERROR("Failed to set input attributes.");
485 return StatusCode::FAILURE;
486 }
487 if (m_processMetadata.value()) {
488 InputFileIncidentGuard::transition(m_inputFileGuard, *m_incidentSvc, name(),
489 *m_inputCollectionsIterator, m_guid.toString(),
490 /*endFileName=*/{});
491 }
492 } else {
493 // Check if File is BS
494 if (tech != 0x00001000 && m_processMetadata.value()) {
495 InputFileIncidentGuard::transition(m_inputFileGuard, *m_incidentSvc, name(),
496 "FID:" + m_guid.toString(), m_guid.toString(),
497 /*endFileName=*/{});
498 }
499 }
500 } // end if (guid != m_guid)
501 return StatusCode::SUCCESS;
502}
503//________________________________________________________________________________
504StatusCode EventSelectorAthenaPool::nextWithSkip(IEvtSelector::Context& ctxt) const {
505 ATH_MSG_DEBUG("EventSelectorAthenaPool::nextWithSkip");
506
507 for (;;) {
508 // Check if we're at the end of file
509 StatusCode sc = nextHandleFileTransition(ctxt);
510 if (sc.isRecoverable()) {
511 continue; // handles empty files
512 }
513 if (sc.isFailure()) {
514 return StatusCode::FAILURE;
515 }
516
517 // Increase event count
518 ++m_evtCount;
519
520 if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
521 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
522 }
524 && (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
525 {
526 return StatusCode::SUCCESS;
527 } else {
528 while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
529 m_skipEventRanges.erase(m_skipEventRanges.begin());
530 }
531 if (m_isSecondary.value()) {
532 ATH_MSG_INFO("skipping secondary event " << m_evtCount);
533 } else {
534 ATH_MSG_INFO("skipping event " << m_evtCount);
535 }
536 }
537 }
538
539 return StatusCode::SUCCESS;
540}
541//________________________________________________________________________________
542StatusCode EventSelectorAthenaPool::previous(IEvtSelector::Context& /*ctxt*/) const {
543 ATH_MSG_ERROR("previous() not implemented");
544 return StatusCode::FAILURE;
545}
546//________________________________________________________________________________
547StatusCode EventSelectorAthenaPool::previous(IEvtSelector::Context& ctxt, int jump) const {
548 if (jump > 0) {
549 for (int i = 0; i < jump; i++) {
550 ATH_CHECK(previous(ctxt));
551 }
552 return StatusCode::SUCCESS;
553 }
554 return StatusCode::FAILURE;
555}
556//________________________________________________________________________________
557StatusCode EventSelectorAthenaPool::last(IEvtSelector::Context& ctxt) const {
558 if (ctxt.identifier() == m_endIter->identifier()) {
559 ATH_MSG_DEBUG("last(): Last event in InputStream.");
560 return StatusCode::SUCCESS;
561 }
562 return StatusCode::FAILURE;
563}
564//________________________________________________________________________________
565StatusCode EventSelectorAthenaPool::rewind(IEvtSelector::Context& ctxt) const {
566 ATH_CHECK(reinit());
567 ctxt = EventContextAthenaPool(this);
568 return StatusCode::SUCCESS;
569}
570//________________________________________________________________________________
571StatusCode EventSelectorAthenaPool::createAddress(const IEvtSelector::Context& /*ctxt*/,
572 IOpaqueAddress*& iop) const {
573 std::string tokenStr;
575 if (attrList.isValid()) {
576 try {
577 tokenStr = (*attrList)["eventRef"].data<std::string>();
578 ATH_MSG_DEBUG("found AthenaAttribute, name = eventRef = " << tokenStr);
579 } catch (std::exception &e) {
580 ATH_MSG_ERROR(e.what());
581 return StatusCode::FAILURE;
582 }
583 } else {
584 ATH_MSG_WARNING("Cannot find AthenaAttribute, key = " << m_attrListKey);
585 tokenStr = m_headerIterator->eventRef().toString();
586 }
587 auto token = std::make_unique<Token>();
588 token->fromString(tokenStr);
589 iop = new TokenAddress(pool::POOL_StorageType.type(), ClassID_traits<DataHeader>::ID(), "", "EventSelector", IPoolSvc::kInputStream, std::move(token));
590 return StatusCode::SUCCESS;
591}
592//________________________________________________________________________________
593StatusCode EventSelectorAthenaPool::releaseContext(IEvtSelector::Context*& /*ctxt*/) const {
594 return StatusCode::SUCCESS;
595}
596//________________________________________________________________________________
597StatusCode EventSelectorAthenaPool::resetCriteria(const std::string& /*criteria*/,
598 IEvtSelector::Context& /*ctxt*/) const {
599 return StatusCode::SUCCESS;
600}
601//__________________________________________________________________________
602StatusCode EventSelectorAthenaPool::seek(Context& /*ctxt*/, int evtNum) const {
603
604 if( m_inputCollectionsChanged ) {
605 StatusCode rc = reinit();
606 if( rc != StatusCode::SUCCESS ) return rc;
607 }
608
609 long newColl = findEvent(evtNum);
610 if (newColl == -1 && evtNum >= m_firstEvt[m_curCollection] && evtNum < m_evtCount - 1) {
611 newColl = m_curCollection;
612 }
613 if (newColl == -1) {
614 m_headerIterator = nullptr;
615 ATH_MSG_INFO("seek: Reached end of Input.");
616 m_inputFileGuard.reset();
617 return StatusCode::RECOVERABLE;
618 }
619 if (newColl != m_curCollection) {
620 if (!m_keepInputFilesOpen.value() && m_poolCollectionConverter) {
621 m_poolCollectionConverter->disconnectDb().ignore();
622 }
623 m_poolCollectionConverter.reset();
624 m_curCollection = newColl;
625 try {
626 ATH_MSG_DEBUG("Seek to item: \""
628 << "\" from the collection list.");
629 // Reset input collection iterator to the right place
630 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
631 m_inputCollectionsIterator += m_curCollection;
632 m_poolCollectionConverter = std::make_unique<PoolCollectionConverter>(m_collectionType.value(),
635 m_athenaPoolCnvSvc->getPoolSvc());
636 if (!m_poolCollectionConverter || !m_poolCollectionConverter->initialize().isSuccess()) {
637 m_headerIterator = nullptr;
638 ATH_MSG_ERROR("seek: Unable to initialize PoolCollectionConverter.");
639 return StatusCode::FAILURE;
640 }
641 // Create DataHeader iterators
642 m_headerIterator = &m_poolCollectionConverter->selectAll();
643 EventContextAthenaPool* beginIter = new EventContextAthenaPool(this);
644 m_evtCount = m_firstEvt[m_curCollection];
645 next(*beginIter).ignore();
646 ATH_MSG_DEBUG("Token " << m_headerIterator->eventRef().toString());
647 } catch (std::exception &e) {
648 m_headerIterator = nullptr;
649 ATH_MSG_ERROR(e.what());
650 return StatusCode::FAILURE;
651 }
652 }
653
654 if (m_headerIterator->seek(evtNum - m_firstEvt[m_curCollection]) == 0) {
655 m_headerIterator = nullptr;
656 ATH_MSG_ERROR("Did not find event, evtNum = " << evtNum);
657 return StatusCode::FAILURE;
658 } else {
659 m_evtCount = evtNum + 1;
660 }
661 return StatusCode::SUCCESS;
662}
663//__________________________________________________________________________
664int EventSelectorAthenaPool::curEvent (const Context& /*ctxt*/) const {
665 return(m_evtCount);
666}
667//__________________________________________________________________________
668// Search for event number evtNum.
669// Return the index of the collection containing it, or -1 if not found.
670// Note: passing -1 for evtNum will always yield failure,
671// but this can be used to force filling in the entire m_numEvt array.
673 for (std::size_t i = 0, imax = m_numEvt.size(); i < imax; i++) {
674 if (m_numEvt[i] == -1) {
676 m_inputCollectionsProp.value()[i],
678 m_athenaPoolCnvSvc->getPoolSvc());
679 if (!pcc.initialize().isSuccess()) {
680 break;
681 }
682 int collection_size = 0;
683 if (pcc.isValid()) {
685 collection_size = hi->size();
686 }
687 if (i > 0) {
688 m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
689 } else {
690 m_firstEvt[i] = 0;
691 }
692 m_numEvt[i] = collection_size;
693 }
694 if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
695 return(i);
696 }
697 }
698 return(-1);
699}
700
701//__________________________________________________________________________
702int EventSelectorAthenaPool::size(Context& /*ctxt*/) const {
703 // Fetch sizes of all collections.
704 findEvent(-1);
705 return std::accumulate(m_numEvt.begin(), m_numEvt.end(), 0);
706}
707//__________________________________________________________________________
708std::unique_ptr<PoolCollectionConverter>
710 while (m_inputCollectionsIterator != m_inputCollectionsProp.value().end()) {
711 if (m_curCollection != 0) {
712 m_numEvt[m_curCollection] = m_evtCount - m_firstEvt[m_curCollection];
714 m_firstEvt[m_curCollection] = m_evtCount;
715 }
716 ATH_MSG_DEBUG("Try item: \"" << *m_inputCollectionsIterator << "\" from the collection list.");
717 auto pCollCnv = std::make_unique<PoolCollectionConverter>(m_collectionType.value(),
718 *m_inputCollectionsIterator,
720 m_athenaPoolCnvSvc->getPoolSvc());
721 StatusCode status = pCollCnv->initialize();
722 if (!status.isSuccess()) {
723 // Close previous collection.
724 pCollCnv.reset();
725 if (!status.isRecoverable()) {
726 ATH_MSG_ERROR("Unable to initialize PoolCollectionConverter.");
727 throw GaudiException("Unable to read: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
728 } else {
729 ATH_MSG_ERROR("Unable to open: " << *m_inputCollectionsIterator);
730 throw GaudiException("Unable to open: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
731 }
732 } else {
733 if (!pCollCnv->isValid().isSuccess()) {
734 pCollCnv.reset();
735 ATH_MSG_DEBUG("No events found in: " << *m_inputCollectionsIterator << " skipped!!!");
736 if (throwIncidents && m_processMetadata.value()) {
737 // Scoped guard: fires BeginInputFile now, EndInputFile at scope exit
738 auto guard = InputFileIncidentGuard::begin(*m_incidentSvc, name(),
739 *m_inputCollectionsIterator, {},
740 "eventless " + *m_inputCollectionsIterator);
741 }
742 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb(*m_inputCollectionsIterator).ignore();
743 ++m_inputCollectionsIterator;
744 } else {
745 return(pCollCnv);
746 }
747 }
748 }
749 return(nullptr);
750}
751//__________________________________________________________________________
753 if (!eventStore()->clearStore().isSuccess()) {
754 ATH_MSG_WARNING("Cannot clear Store");
755 }
756 // Get access to AttributeList
757 ATH_MSG_DEBUG("Get AttributeList from the collection");
758 // MN: accessing only attribute list, ignoring token list
759 const coral::AttributeList& attrList = m_headerIterator->currentRow().attributeList();
760 ATH_MSG_DEBUG("AttributeList size " << attrList.size());
761 std::unique_ptr<AthenaAttributeList> athAttrList(new AthenaAttributeList(attrList));
762 // Fill the new attribute list
763 ATH_CHECK(fillAttributeList(athAttrList.get(), "", false));
764 // Write the AttributeList
766 ATH_CHECK(wh.record(std::move(athAttrList)));
767 return StatusCode::SUCCESS;
768}
769//__________________________________________________________________________
770StatusCode EventSelectorAthenaPool::fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const
771{
772 const auto& row = m_headerIterator->currentRow();
773 attrList->extend( row.tokenName() + suffix, "string" );
774 (*attrList)[ row.tokenName() + suffix ].data<std::string>() = row.token().toString();
775 ATH_MSG_DEBUG("record AthenaAttribute, name = " << row.tokenName() + suffix << " = " << row.token().toString() << ".");
776
777 std::string eventRef = "eventRef";
778 if (m_isSecondary.value()) {
779 eventRef.append(suffix);
780 }
781 attrList->extend(eventRef, "string");
782 (*attrList)[eventRef].data<std::string>() = m_headerIterator->eventRef().toString();
783 ATH_MSG_DEBUG("record AthenaAttribute, name = " + eventRef + " = " << m_headerIterator->eventRef().toString() << ".");
784
785 if (copySource) {
786 const coral::AttributeList& sourceAttrList = m_headerIterator->currentRow().attributeList();
787 for (const auto &attr : sourceAttrList) {
788 attrList->extend(attr.specification().name() + suffix, attr.specification().type());
789 (*attrList)[attr.specification().name() + suffix] = attr;
790 }
791 }
792
793 return StatusCode::SUCCESS;
794}
795//__________________________________________________________________________
797 ATH_MSG_INFO("I/O reinitialization...");
798 if (m_poolCollectionConverter) {
799 m_poolCollectionConverter->disconnectDb().ignore();
800 m_poolCollectionConverter.reset();
801 }
802 m_headerIterator = nullptr;
803 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
804 if (!iomgr.retrieve().isSuccess()) {
805 ATH_MSG_FATAL("Could not retrieve IoComponentMgr !");
806 return StatusCode::FAILURE;
807 }
808 if (!iomgr->io_hasitem(this)) {
809 ATH_MSG_FATAL("IoComponentMgr does not know about myself !");
810 return StatusCode::FAILURE;
811 }
812 std::vector<std::string> inputCollections = m_inputCollectionsProp.value();
813 std::set<std::size_t> updatedIndexes;
814 for (std::size_t i = 0, imax = m_inputCollectionsProp.value().size(); i < imax; i++) {
815 if (updatedIndexes.find(i) != updatedIndexes.end()) continue;
816 std::string savedName = inputCollections[i];
817 std::string &fname = inputCollections[i];
818 if (!iomgr->io_contains(this, fname)) {
819 ATH_MSG_ERROR("IoComponentMgr does not know about [" << fname << "] !");
820 return StatusCode::FAILURE;
821 }
822 if (!iomgr->io_retrieve(this, fname).isSuccess()) {
823 ATH_MSG_FATAL("Could not retrieve new value for [" << fname << "] !");
824 return StatusCode::FAILURE;
825 }
826 if (savedName != fname) {
827 ATH_MSG_DEBUG("Mapping value for [" << savedName << "] to [" << fname << "]");
828 m_athenaPoolCnvSvc->getPoolSvc()->renamePfn(savedName, fname);
829 }
830 updatedIndexes.insert(i);
831 for (std::size_t j = i + 1; j < imax; j++) {
832 if (inputCollections[j] == savedName) {
833 inputCollections[j] = fname;
834 updatedIndexes.insert(j);
835 }
836 }
837 }
838 // all good... copy over.
839 m_inputCollectionsProp = inputCollections;
840 m_guid = Guid::null();
841 return reinit();
842}
843//__________________________________________________________________________
845 ATH_MSG_INFO("I/O finalization...");
846 // Fire EndInputFile before disconnecting — file data is still accessible here
847 m_inputFileGuard.reset();
848 if (m_poolCollectionConverter) {
849 m_poolCollectionConverter->disconnectDb().ignore();
850 m_poolCollectionConverter.reset();
851 }
852 return StatusCode::SUCCESS;
853}
854
855//__________________________________________________________________________
856/* Listen to IncidentType::BeginProcessing and EndProcessing
857 Maintain counters of how many events from a given file are being processed.
858 Files are identified by SG::SourceID (string GUID).
859 When there are no more events from a file, see if it can be closed.
860*/
861void EventSelectorAthenaPool::handle(const Incident& inc)
862{
863 SG::SourceID fid;
864 if (inc.type() == IncidentType::BeginProcessing) {
865 if ( Atlas::hasExtendedEventContext(inc.context()) ) {
866 fid = Atlas::getExtendedEventContext(inc.context()).proxy()->sourceID();
867 }
868 *m_sourceID.get(inc.context()) = fid;
869 }
870 else {
871 fid = *m_sourceID.get(inc.context());
872 }
873
874 if( fid.empty() ) {
875 ATH_MSG_WARNING("could not read event source ID from incident event context");
876 return;
877 }
878 if( m_activeEventsPerSource.find( fid ) == m_activeEventsPerSource.end()) {
879 ATH_MSG_DEBUG("Incident handler ignoring unknown input FID: " << fid );
880 return;
881 }
882 ATH_MSG_DEBUG("** MN Incident handler " << inc.type() << " Event source ID=" << fid );
883 if( inc.type() == IncidentType::BeginProcessing ) {
884 // increment the events-per-file counter for FID
885 m_activeEventsPerSource[fid]++;
886 } else if( inc.type() == IncidentType::EndProcessing ) {
887 m_activeEventsPerSource[fid]--;
889 *m_sourceID.get(inc.context()) = "";
890 }
891 if( msgLvl(MSG::DEBUG) ) {
892 for( auto& source: m_activeEventsPerSource )
893 msg(MSG::DEBUG) << "SourceID: " << source.first << " active events: " << source.second << endmsg;
894 }
895}
896
897//__________________________________________________________________________
898/* Disconnect Database identifieed by a SG::SourceID when it is no longer in use:
899 m_guid is not pointing to it and there are no events from it being processed
900 (if the EventLoopMgr was not firing Begin/End incidents, this will just close the DB)
901*/
903{
904 if( m_activeEventsPerSource.find(fid) != m_activeEventsPerSource.end()
905 && m_activeEventsPerSource[fid] <= 0 && m_guid != fid ) {
906 // Explicitly disconnect file corresponding to old FID to release memory.
907 // EndInputFile is handled by the InputFileIncidentGuard.
908 if( !m_keepInputFilesOpen.value() ) {
909 ATH_MSG_INFO("Disconnecting input sourceID: " << fid );
910 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb("FID:" + fid, IPoolSvc::kInputStream).ignore();
911 m_activeEventsPerSource.erase( fid );
912 return true;
913 }
914 }
915 return false;
916}
#define endmsg
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the DataHeader and DataHeaderElement classes.
This file contains the class definition for the EventContextAthenaPool class.
This file contains the class definition for the EventSelectorAthenaPool class.
This file contains the class definition for the IPoolSvc interface class.
static Double_t sc
static Double_t rc
This file contains the class definition for the PoolCollectionConverter class.
Handle class for reading from StoreGate.
Handle class for recording to StoreGate.
int imax(int i, int j)
This file contains the class definition for the TokenAddress class.
This file contains the class definition for the Token class (migrated from POOL).
An AttributeList represents a logical row of attributes in a metadata table.
This class provides the context to access an event from POOL persistent store.
EventSelectorAthenaPool(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Gaudi::CheckedProperty< uint64_t > m_firstEventNo
virtual StatusCode start() override
std::unique_ptr< PoolCollectionConverter > getCollectionCnv(bool throwIncidents=false) const
Return pointer to new PoolCollectionConverter.
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
virtual StatusCode initialize() override
Required of all Gaudi Services.
Gaudi::Property< int > m_skipEvents
SkipEvents, numbers of events to skip: default = 0.
Gaudi::Property< bool > m_processMetadata
ProcessMetadata, switch on firing of FileIncidents which will trigger processing of metadata: default...
virtual int curEvent(const Context &ctxt) const override
Return the current event number.
virtual StatusCode createContext(IEvtSelector::Context *&ctxt) const override
create context
virtual StatusCode io_finalize() override
Callback method to finalize the internal state of the component for I/O purposes (e....
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp
Gaudi::CheckedProperty< uint32_t > m_initTimeStamp
void inputCollectionsHandler(Gaudi::Details::PropertyBase &)
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
virtual StatusCode resetCriteria(const std::string &criteria, IEvtSelector::Context &ctxt) const override
Set a selection criteria.
StatusCode reinit() const
Reinitialize the service when a fork() occurred/was-issued.
StoreGateSvc * eventStore() const
Return pointer to active event SG.
virtual StatusCode releaseContext(IEvtSelector::Context *&ctxt) const override
virtual StatusCode stop() override
ServiceHandle< IIncidentSvc > m_incidentSvc
virtual StatusCode next(IEvtSelector::Context &ctxt) const override
virtual ~EventSelectorAthenaPool()
Destructor.
ToolHandle< IAthenaSelectorTool > m_counterTool
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
std::string m_attrListKey
AttributeList SG key.
virtual StatusCode seek(Context &ctxt, int evtnum) const override
Seek to a given event number.
SG::SlotSpecificObj< SG::SourceID > m_sourceID
virtual StatusCode last(IEvtSelector::Context &ctxt) const override
Gaudi::CheckedProperty< uint64_t > m_eventsPerRun
Gaudi::CheckedProperty< uint32_t > m_eventsPerLB
virtual StatusCode createAddress(const IEvtSelector::Context &ctxt, IOpaqueAddress *&iop) const override
virtual void handle(const Incident &incident) override
Incident service handle listening for BeginProcessing and EndProcessing.
virtual StatusCode previous(IEvtSelector::Context &ctxt) const override
ServiceHandle< IAthenaPoolCnvSvc > m_athenaPoolCnvSvc
virtual StatusCode rewind(IEvtSelector::Context &ctxt) const override
Gaudi::Property< bool > m_keepInputFilesOpen
KeepInputFilesOpen, boolean flag to keep files open after PoolCollection reaches end: default = false...
Gaudi::CheckedProperty< uint32_t > m_oldRunNo
Gaudi::Property< std::string > m_collectionType
CollectionType, type of the collection: default = "ImplicitCollection".
virtual StatusCode finalize() override
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Disconnect DB if all events from the source FID were processed and the Selector moved to another file...
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
InputCollections, vector with names of the input collections.
EventContextAthenaPool * m_endIter
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context &ctxt) const override
Handle file transition at the next iteration.
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
Gaudi::CheckedProperty< uint32_t > m_runNo
The following are included for compatibility with McEventSelector and are not really used.
virtual int size(Context &ctxt) const override
Return the size of the collection.
int findEvent(int evtNum) const
Search for event with number evtNum.
Gaudi::Property< std::string > m_skipEventRangesProp
Skip Events - comma separated list of event to skip, ranges with '-': <start> - <end>.
Gaudi::CheckedProperty< uint32_t > m_firstLBNo
This class provides a encapsulation of a GUID/UUID/CLSID/IID data structure (128 bit number).
Definition Guid.h:25
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
@ kInputStream
Definition IPoolSvc.h:40
static InputFileIncidentGuard begin(IIncidentSvc &incSvc, std::string_view source, std::string_view beginFileName, std::string_view guid, std::string_view endFileName={}, std::string_view beginType=IncidentType::BeginInputFile, std::string_view endType=IncidentType::EndInputFile)
Factory: fire the begin incident and return a guard whose destructor fires the matching end incident.
static void transition(std::optional< InputFileIncidentGuard > &guard, IIncidentSvc &incSvc, std::string_view source, std::string_view beginFileName, std::string_view guid, std::string_view endFileName={}, std::string_view beginType=IncidentType::BeginInputFile, std::string_view endType=IncidentType::EndInputFile)
Replace the guard in an optional, with strict End-before-Begin ordering.
This class provides an interface to POOL collections.
StatusCode isValid() const
Check whether has valid pool::ICollection*.
pool::ICollectionCursor & selectAll()
StatusCode initialize()
Required by all Gaudi Services.
virtual bool isValid() override final
Can the handle be successfully dereferenced?
The Athena Transient Store API.
static StoreGateSvc * currentStoreGate()
get current StoreGate
This class provides a Generic Transient Address for POOL tokens.
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition Token.h:21
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:134
int technology() const
Access technology type.
Definition Token.h:77
const Guid & dbID() const
Access database identifier.
Definition Token.h:64
An interface used to navigate the result of a query on a collection.
virtual std::size_t size()=0
Returns the size of the collection.
int r
Definition globals.cxx:22
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
bool hasExtendedEventContext(const EventContext &ctx)
Test whether a context object has an extended context installed.
StatusCode parse(std::tuple< Tup... > &tup, const Gaudi::Parsers::InputData &input)
static const DbType POOL_StorageType
Definition DbType.h:84
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.
MsgStream & msg
Definition testRead.cxx:32