ATLAS Offline Software
Loading...
Searching...
No Matches
EventSelectorAthenaPool.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
9
13
18#include "PoolSvc/IPoolSvc.h"
22
23// Framework
24#include "GaudiKernel/ClassID.h"
25#include "GaudiKernel/FileIncident.h"
26#include "GaudiKernel/IIncidentSvc.h"
27#include "GaudiKernel/IIoComponentMgr.h"
28#include "GaudiKernel/GaudiException.h"
29#include "GaudiKernel/GenericAddress.h"
30#include "GaudiKernel/StatusCode.h"
32
33// Pool
36#include "StorageSvc/DbType.h"
37
38#include <boost/tokenizer.hpp>
39#include <algorithm>
40#include <format>
41#include <vector>
42
43
44namespace {
46 StatusCode putEvent_ST(const IAthenaIPCTool& tool,
47 long eventNumber, const void* source,
48 size_t nbytes, unsigned int status) {
49 StatusCode sc ATLAS_THREAD_SAFE = tool.putEvent(eventNumber, source, nbytes, status);
50 return sc;
51 }
52}
53
54
55//________________________________________________________________________________
56EventSelectorAthenaPool::EventSelectorAthenaPool(const std::string& name, ISvcLocator* pSvcLocator) :
57 base_class(name, pSvcLocator)
58{
59 // TODO: validate if those are even used
60 m_runNo.verifier().setLower(0);
61 m_oldRunNo.verifier().setLower(0);
62 m_eventsPerRun.verifier().setLower(0);
63 m_firstEventNo.verifier().setLower(1);
64 m_firstLBNo.verifier().setLower(0);
65 m_eventsPerLB.verifier().setLower(0);
66 m_initTimeStamp.verifier().setLower(0);
67
69 m_inputCollectionsChanged = false;
70}
71//________________________________________________________________________________
72void EventSelectorAthenaPool::inputCollectionsHandler(Gaudi::Details::PropertyBase&) {
73 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
74 m_inputCollectionsChanged = true;
75 }
76}
77//________________________________________________________________________________
80//________________________________________________________________________________
84//________________________________________________________________________________
86
87 m_autoRetrieveTools = false;
88 m_checkToolDeps = false;
89
90 if (m_isSecondary.value()) {
91 ATH_MSG_DEBUG("Initializing secondary event selector " << name());
92 } else {
93 ATH_MSG_DEBUG("Initializing " << name());
94 }
95
96 ATH_CHECK(::AthService::initialize());
97 // Check for input collection
98 if (m_inputCollectionsProp.value().empty()) {
99 ATH_MSG_FATAL("Use the property: EventSelector.InputCollections = "
100 << "[ \"<collectionName>\" ] (list of collections)");
101 return StatusCode::FAILURE;
102 }
103 boost::char_separator<char> sep_coma(","), sep_hyph("-");
104 boost::tokenizer ranges(m_skipEventRangesProp.value(), sep_coma);
105 for( const std::string& r: ranges ) {
106 boost::tokenizer fromto(r, sep_hyph);
107 auto from_iter = fromto.begin();
108 long from = std::stol(*from_iter);
109 long to = from;
110 if( ++from_iter != fromto.end() ) {
111 to = std::stol(*from_iter);
112 }
113 m_skipEventRanges.emplace_back(from, to);
114 }
115
116 for( auto v : m_skipEventSequenceProp.value() ) {
117 m_skipEventRanges.emplace_back(v, v);
118 }
119 std::sort(m_skipEventRanges.begin(), m_skipEventRanges.end());
120 if( msgLvl(MSG::DEBUG) ) {
121 std::string skip_ranges_str;
122 for( const auto& [first, second] : m_skipEventRanges ) {
123 if( !skip_ranges_str.empty() ) skip_ranges_str += ", ";
124 skip_ranges_str += std::to_string(first);
125 if( first != second) skip_ranges_str += std::format("-{}", second);
126 }
127 if( !skip_ranges_str.empty() )
128 ATH_MSG_DEBUG("Events to skip: " << skip_ranges_str);
129 }
130 // Get IncidentSvc
131 ATH_CHECK(m_incidentSvc.retrieve());
132 // Listen to the Event Processing incidents
133 if (m_eventStreamingTool.empty()) {
134 m_incidentSvc->addListener(this, IncidentType::BeginProcessing, 0);
135 m_incidentSvc->addListener(this, IncidentType::EndProcessing, 0);
136 }
137
138 // Get AthenaPoolCnvSvc
139 ATH_CHECK(m_athenaPoolCnvSvc.retrieve());
140 // Get CounterTool (if configured)
141 if (!m_counterTool.empty()) {
142 ATH_CHECK(m_counterTool.retrieve());
143 }
144 // Get HelperTools
145 ATH_CHECK(m_helperTools.retrieve());
146 // Get SharedMemoryTool (if configured)
147 if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.retrieve().isSuccess()) {
148 ATH_MSG_FATAL("Cannot get " << m_eventStreamingTool.typeAndName() << "");
149 return StatusCode::FAILURE;
150 } else if (m_makeStreamingToolClient.value() == -1) {
151 std::string dummyStr;
152 ATH_CHECK(m_eventStreamingTool->makeClient(m_makeStreamingToolClient.value(), dummyStr));
153 }
154
155 // Ensure the xAODCnvSvc is listed in the EventPersistencySvc
156 ServiceHandle<IProperty> epSvc("EventPersistencySvc", name());
157 std::vector<std::string> propVal;
158 ATH_CHECK(Gaudi::Parsers::parse(propVal , epSvc->getProperty("CnvServices").toString()));
159 bool foundCnvSvc = false;
160 for (const auto& property : propVal) {
161 if (property == m_athenaPoolCnvSvc.type()) { foundCnvSvc = true; }
162 }
163 if (!foundCnvSvc) {
164 propVal.push_back(m_athenaPoolCnvSvc.type());
165 if (!epSvc->setProperty("CnvServices", Gaudi::Utils::toString(propVal)).isSuccess()) {
166 ATH_MSG_FATAL("Cannot set EventPersistencySvc Property for CnvServices");
167 return StatusCode::FAILURE;
168 }
169 }
170
171 // Register this service for 'I/O' events
172 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
173 ATH_CHECK(iomgr.retrieve());
174 ATH_CHECK(iomgr->io_register(this));
175 // Register input file's names with the I/O manager
176 const std::vector<std::string>& incol = m_inputCollectionsProp.value();
177 bool allGood = true;
178 for (const auto& inputCollection : incol) {
179 if (!iomgr->io_register(this, IIoComponentMgr::IoMode::READ, inputCollection, inputCollection).isSuccess()) {
180 ATH_MSG_FATAL("could not register [" << inputCollection << "] for output !");
181 allGood = false;
182 } else {
183 ATH_MSG_VERBOSE("io_register[" << this->name() << "](" << inputCollection << ") [ok]");
184 }
185 }
186 if (!allGood) {
187 return StatusCode::FAILURE;
188 }
189
190 // Connect to PersistencySvc
191 if (!m_athenaPoolCnvSvc->getPoolSvc()->connect(pool::ITransaction::READ, IPoolSvc::kInputStream).isSuccess()) {
192 ATH_MSG_FATAL("Cannot connect to POOL PersistencySvc.");
193 return StatusCode::FAILURE;
194 }
195 // Jump to reinit() to execute common init/reinit actions
196 m_guid = Guid::null();
197 return reinit();
198}
199//________________________________________________________________________________
201 ATH_MSG_DEBUG("reinitialization...");
202
203 // reset markers
204 m_numEvt.resize(m_inputCollectionsProp.value().size(), -1);
205 m_firstEvt.resize(m_inputCollectionsProp.value().size(), -1);
206
207 // Initialize InputCollectionsIterator
208 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
209 m_curCollection = 0;
210 if (!m_firstEvt.empty()) {
211 m_firstEvt[0] = 0;
212 }
213 m_inputCollectionsChanged = false;
214 m_evtCount = 0;
215 m_headerIterator = 0;
216 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
217 ATH_MSG_INFO("Done reinitialization for shared reader client");
218 return StatusCode::SUCCESS;
219 }
220 bool retError = false;
221 for (auto& tool : m_helperTools) {
222 if (!tool->postInitialize().isSuccess()) {
223 ATH_MSG_FATAL("Failed to postInitialize() " << tool->name());
224 retError = true;
225 }
226 }
227 if (retError) {
228 ATH_MSG_FATAL("Failed to postInitialize() helperTools");
229 return StatusCode::FAILURE;
230 }
231
232 // Create an m_poolCollectionConverter to read the objects in
233 m_poolCollectionConverter = getCollectionCnv();
234 if (!m_poolCollectionConverter) {
235 ATH_MSG_INFO("No Events found in any Input Collections");
236 if (m_processMetadata.value()) {
237 m_inputCollectionsIterator = m_inputCollectionsProp.value().end();
238 if (!m_inputCollectionsProp.value().empty()) --m_inputCollectionsIterator;
239 //NOTE (wb may 2016): this will make the FirstInputFile incident correspond to last file in the collection ... if want it to be first file then move iterator to begin and then move above two lines below this incident firing
240 if (m_collectionType.value() == "ImplicitCollection" && !m_firedIncident && !m_inputCollectionsProp.value().empty()) {
241 FileIncident firstInputFileIncident(name(), "FirstInputFile", *m_inputCollectionsIterator);
242 m_incidentSvc->fireIncident(firstInputFileIncident);
243 m_firedIncident = true;
244 }
245 }
246 return StatusCode::SUCCESS;
247 }
248 // Get DataHeader iterator
249 try {
250 m_headerIterator = &m_poolCollectionConverter->selectAll();
251 } catch (std::exception &e) {
252 ATH_MSG_FATAL("Cannot open implicit collection - check data/software version.");
253 ATH_MSG_ERROR(e.what());
254 return StatusCode::FAILURE;
255 }
256 while (m_headerIterator == nullptr || m_headerIterator->next() == 0) { // no selected events
257 if (m_poolCollectionConverter) {
258 m_poolCollectionConverter->disconnectDb().ignore();
259 m_poolCollectionConverter.reset();
260 }
261 ++m_inputCollectionsIterator;
262 m_poolCollectionConverter = getCollectionCnv();
263 if (m_poolCollectionConverter) {
264 m_headerIterator = &m_poolCollectionConverter->selectAll();
265 } else {
266 break;
267 }
268 }
269 if (!m_poolCollectionConverter || m_headerIterator == nullptr) { // no event selected in any collection
270 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
271 m_curCollection = 0;
272 m_poolCollectionConverter = getCollectionCnv();
273 if (!m_poolCollectionConverter) {
274 return StatusCode::SUCCESS;
275 }
276 m_headerIterator = &m_poolCollectionConverter->selectAll();
277 while (m_headerIterator == nullptr || m_headerIterator->next() == 0) { // empty collection
278 if (m_poolCollectionConverter) {
279 m_poolCollectionConverter->disconnectDb().ignore();
280 m_poolCollectionConverter.reset();
281 }
282 ++m_inputCollectionsIterator;
283 m_poolCollectionConverter = getCollectionCnv();
284 if (m_poolCollectionConverter) {
285 m_headerIterator = &m_poolCollectionConverter->selectAll();
286 } else {
287 break;
288 }
289 }
290 }
291 if (!m_poolCollectionConverter || m_headerIterator == nullptr) {
292 return StatusCode::SUCCESS;
293 }
294 const Token& headRef = m_headerIterator->eventRef();
295 const std::string fid = headRef.dbID().toString();
296 const int tech = headRef.technology();
297 ATH_MSG_VERBOSE("reinit(): First DataHeder Token=" << headRef.toString() );
298
299 // Check if File is BS, for which Incident is thrown by SingleEventInputSvc
300 if (tech != 0x00001000 && m_processMetadata.value() && !m_firedIncident) {
301 FileIncident firstInputFileIncident(name(), "FirstInputFile", "FID:" + fid, fid);
302 m_incidentSvc->fireIncident(firstInputFileIncident);
303 m_firedIncident = true;
304 }
305 return StatusCode::SUCCESS;
306}
307//________________________________________________________________________________
309 if (m_poolCollectionConverter) {
310 // Reset iterators and apply new query
311 m_poolCollectionConverter->disconnectDb().ignore();
312 m_poolCollectionConverter.reset();
313 }
314 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
315 m_curCollection = 0;
316 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
317 return StatusCode::SUCCESS;
318 }
319 m_poolCollectionConverter = getCollectionCnv(true);
320 if (!m_poolCollectionConverter) {
321 ATH_MSG_INFO("No Events found in any Input Collections");
322 m_inputCollectionsIterator = m_inputCollectionsProp.value().end();
323 if (!m_inputCollectionsProp.value().empty()) {
324 --m_inputCollectionsIterator; //leave iterator in state of last input file
325 }
326 } else {
327 m_headerIterator = &m_poolCollectionConverter->selectAll();
328 }
329 m_evtCount = 0;
330 delete m_endIter;
331 m_endIter = nullptr;
332 m_endIter = new EventContextAthenaPool(nullptr);
333 return StatusCode::SUCCESS;
334}
335//________________________________________________________________________________
337 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
338 return StatusCode::SUCCESS;
339 }
340 IEvtSelector::Context* ctxt(nullptr);
341 if (!releaseContext(ctxt).isSuccess()) {
342 ATH_MSG_WARNING("Cannot release context");
343 }
344 return StatusCode::SUCCESS;
345}
346
347//________________________________________________________________________________
349 if (m_processMetadata.value()) {
350 if (m_evtCount >= 0) {
351 // Assume that the end of collection file indicates the end of payload file.
352 if (m_guid != Guid::null()) {
353 // Fire EndInputFile incident
354 FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + m_guid.toString(), m_guid.toString());
355 m_incidentSvc->fireIncident(endInputFileIncident);
356 }
357 }
358 if (isLastFile && m_firedIncident) {
359 m_firedIncident = false;
360 }
361 }
362}
363
364//________________________________________________________________________________
366 if (m_eventStreamingTool.empty() || !m_eventStreamingTool->isClient()) {
367 if (!m_counterTool.empty() && !m_counterTool->preFinalize().isSuccess()) {
368 ATH_MSG_WARNING("Failed to preFinalize() CounterTool");
369 }
370 for (auto& tool : m_helperTools) {
371 if (!tool->preFinalize().isSuccess()) {
372 ATH_MSG_WARNING("Failed to preFinalize() " << tool->name());
373 }
374 }
375 }
376 delete m_endIter; m_endIter = nullptr;
377 m_headerIterator = nullptr;
378 if (m_poolCollectionConverter) {
379 m_poolCollectionConverter.reset();
380 }
381 // Finalize the Service base class.
382 return ::AthService::finalize();
383}
384
385//________________________________________________________________________________
386StatusCode EventSelectorAthenaPool::createContext(IEvtSelector::Context*& ctxt) const {
387 ctxt = new EventContextAthenaPool(this);
388 return StatusCode::SUCCESS;
389}
390//________________________________________________________________________________
391StatusCode EventSelectorAthenaPool::next(IEvtSelector::Context& ctxt) const {
392 std::lock_guard<CallMutex> lockGuard(m_callLock);
393 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
394 if (m_makeStreamingToolClient.value() == -1) {
395 StatusCode sc = m_eventStreamingTool->lockEvent(m_evtCount);
396 while (sc.isRecoverable()) {
397 usleep(1000);
398 sc = m_eventStreamingTool->lockEvent(m_evtCount);
399 }
400 }
401 // Increase event count
402 ++m_evtCount;
403 void* tokenStr = nullptr;
404 unsigned int status = 0;
405 StatusCode sc = m_eventStreamingTool->getLockedEvent(&tokenStr, status);
406 if (sc.isRecoverable()) {
407 delete [] (char*)tokenStr; tokenStr = nullptr;
408 // Return end iterator
409 ctxt = *m_endIter;
410 // This is not a real failure but a Gaudi way of handling "end of job"
411 return StatusCode::FAILURE;
412 }
413 if (sc.isFailure()) {
414 ATH_MSG_FATAL("Cannot get NextEvent from AthenaSharedMemoryTool");
415 delete [] (char*)tokenStr; tokenStr = nullptr;
416 return StatusCode::FAILURE;
417 }
418 if (!eventStore()->clearStore().isSuccess()) {
419 ATH_MSG_WARNING("Cannot clear Store");
420 }
421 std::unique_ptr<AthenaAttributeList> athAttrList(new AthenaAttributeList());
422 athAttrList->extend("eventRef", "string");
423 (*athAttrList)["eventRef"].data<std::string>() = std::string((char*)tokenStr);
425 if (!wh.record(std::move(athAttrList)).isSuccess()) {
426 delete [] (char*)tokenStr; tokenStr = nullptr;
427 ATH_MSG_ERROR("Cannot record AttributeList to StoreGate " << StoreID::storeName(eventStore()->storeID()));
428 return StatusCode::FAILURE;
429 }
430 Token token;
431 token.fromString(std::string((char*)tokenStr));
432 delete [] (char*)tokenStr; tokenStr = nullptr;
433 Guid guid = token.dbID();
434 if (guid != m_guid && m_processMetadata.value()) {
435 if (m_evtCount >= 0 && m_guid != Guid::null()) {
436 // Fire EndInputFile incident
437 FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + m_guid.toString(), m_guid.toString());
438 m_incidentSvc->fireIncident(endInputFileIncident);
439 }
440 m_guid = guid;
441 FileIncident beginInputFileIncident(name(), "BeginInputFile", "FID:" + m_guid.toString(), m_guid.toString());
442 m_incidentSvc->fireIncident(beginInputFileIncident);
443 }
444 return StatusCode::SUCCESS;
445 }
446 for (const auto& tool : m_helperTools) {
447 if (!tool->preNext().isSuccess()) {
448 ATH_MSG_WARNING("Failed to preNext() " << tool->name());
449 }
450 }
451 for (;;) {
452 // Handle possible file transition
453 StatusCode sc = nextHandleFileTransition(ctxt);
454 if (sc.isRecoverable()) {
455 continue; // handles empty files
456 }
457 if (sc.isFailure()) {
458 return StatusCode::FAILURE;
459 }
460 // Increase event count
461 ++m_evtCount;
462 if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
463 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
464 }
466 && (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
467 {
468 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) {
469 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
470 if (ds == nullptr) {
471 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
472 return StatusCode::FAILURE;
473 }
474 std::string token = m_headerIterator->eventRef().toString();
475 StatusCode sc;
476 while ( (sc = putEvent_ST(*m_eventStreamingTool,
477 m_evtCount - 1, token.c_str(),
478 token.length() + 1, 0)).isRecoverable() ) {
479 while (ds->readData().isSuccess()) {
480 ATH_MSG_VERBOSE("Called last readData, while putting next event in next()");
481 }
482 // Nothing to do right now, trigger alternative (e.g. caching) here? Currently just fast loop.
483 }
484 if (!sc.isSuccess()) {
485 ATH_MSG_ERROR("Cannot put Event " << m_evtCount - 1 << " to AthenaSharedMemoryTool");
486 return StatusCode::FAILURE;
487 }
488 } else {
489 if (!m_isSecondary.value()) {
490 if (!eventStore()->clearStore().isSuccess()) {
491 ATH_MSG_WARNING("Cannot clear Store");
492 }
493 if (!recordAttributeList().isSuccess()) {
494 ATH_MSG_ERROR("Failed to record AttributeList.");
495 return StatusCode::FAILURE;
496 }
497 }
498 }
499 StatusCode status = StatusCode::SUCCESS;
500 for (const auto& tool : m_helperTools) {
501 StatusCode toolStatus = tool->postNext();
502 if (toolStatus.isRecoverable()) {
503 ATH_MSG_INFO("Request skipping event from: " << tool->name());
504 if (status.isSuccess()) {
505 status = StatusCode::RECOVERABLE;
506 }
507 } else if (toolStatus.isFailure()) {
508 ATH_MSG_WARNING("Failed to postNext() " << tool->name());
509 status = StatusCode::FAILURE;
510 }
511 }
512 if (status.isRecoverable()) {
513 ATH_MSG_INFO("skipping event " << m_evtCount);
514 } else if (status.isFailure()) {
515 ATH_MSG_WARNING("Failed to postNext() HelperTool.");
516 } else {
517 if (!m_counterTool.empty() && !m_counterTool->postNext().isSuccess()) {
518 ATH_MSG_WARNING("Failed to postNext() CounterTool.");
519 }
520 break;
521 }
522 } else {
523 while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
524 m_skipEventRanges.erase(m_skipEventRanges.begin());
525 }
526 ATH_MSG_INFO("skipping event " << m_evtCount);
527 }
528 }
529 return StatusCode::SUCCESS;
530}
531//________________________________________________________________________________
532StatusCode EventSelectorAthenaPool::next(IEvtSelector::Context& ctxt, int jump) const {
533 if (jump > 0) {
534 for (int i = 0; i < jump; i++) {
535 ATH_CHECK(next(ctxt));
536 }
537 return StatusCode::SUCCESS;
538 }
539 return StatusCode::FAILURE;
540}
541//________________________________________________________________________________
542StatusCode EventSelectorAthenaPool::nextHandleFileTransition(IEvtSelector::Context& ctxt) const
543{
544 if( m_inputCollectionsChanged ) {
545 StatusCode rc = reinit();
546 if( rc != StatusCode::SUCCESS ) return rc;
547 }
548 else { // advance to the next (not needed after reinit)
549 // Check if we're at the end of file
550 if (m_headerIterator == nullptr || m_headerIterator->next() == 0) {
551 m_headerIterator = nullptr;
552 // Close previous collection.
553 m_poolCollectionConverter.reset();
554
555 // zero the current DB ID (m_guid) before disconnect() to indicate it is no longer in use
556 const SG::SourceID old_guid = m_guid.toString();
557 m_guid = Guid::null();
558 disconnectIfFinished( old_guid );
559
560 // check if somebody updated Inputs in the EOF incident (like VP1 does)
561 if( m_inputCollectionsChanged ) {
562 StatusCode rc = reinit();
563 if( rc != StatusCode::SUCCESS ) return rc;
564 } else {
565 // Open next file from inputCollections list.
566 ++m_inputCollectionsIterator;
567 // Create PoolCollectionConverter for input file
568 m_poolCollectionConverter = getCollectionCnv(true);
569 if (!m_poolCollectionConverter) {
570 // Return end iterator
571 ctxt = *m_endIter;
572 // This is not a real failure but a Gaudi way of handling "end of job"
573 return StatusCode::FAILURE;
574 }
575 // Get DataHeader iterator
576 m_headerIterator = &m_poolCollectionConverter->selectAll();
577
578 // Return RECOVERABLE to mark we should still continue
579 return StatusCode::RECOVERABLE;
580 }
581 }
582 }
583 const Token& headRef = m_headerIterator->eventRef();
584 const Guid guid = headRef.dbID();
585 const int tech = headRef.technology();
586 ATH_MSG_VERBOSE("next(): DataHeder Token=" << headRef.toString() );
587
588 if (guid != m_guid) {
589 // we are starting reading from a new DB. Check if the old one needs to be retired
590 if (m_guid != Guid::null()) {
591 // zero the current DB ID (m_guid) before trying disconnect() to indicate it is no longer in use
592 const SG::SourceID old_guid = m_guid.toString();
593 m_guid = Guid::null();
594 disconnectIfFinished( old_guid );
595 }
596 m_guid = guid;
597 m_activeEventsPerSource[guid.toString()] = 0;
598 // Fire BeginInputFile incident if current InputCollection is a payload file;
599 // otherwise, ascertain whether the pointed-to file is reachable before firing any incidents and/or proceeding
600 if (m_collectionType.value() == "ImplicitCollection") {
601 // For now, we can only deal with input metadata from POOL files, but we know we have a POOL file here
602 if (!m_athenaPoolCnvSvc->setInputAttributes(*m_inputCollectionsIterator).isSuccess()) {
603 ATH_MSG_ERROR("Failed to set input attributes.");
604 return StatusCode::FAILURE;
605 }
606 if (m_processMetadata.value()) {
607 FileIncident beginInputFileIncident(name(), "BeginInputFile", *m_inputCollectionsIterator, m_guid.toString());
608 m_incidentSvc->fireIncident(beginInputFileIncident);
609 }
610 } else {
611 // Check if File is BS
612 if (tech != 0x00001000 && m_processMetadata.value()) {
613 FileIncident beginInputFileIncident(name(), "BeginInputFile", "FID:" + m_guid.toString(), m_guid.toString());
614 m_incidentSvc->fireIncident(beginInputFileIncident);
615 }
616 }
617 } // end if (guid != m_guid)
618 return StatusCode::SUCCESS;
619}
620//________________________________________________________________________________
621StatusCode EventSelectorAthenaPool::nextWithSkip(IEvtSelector::Context& ctxt) const {
622 ATH_MSG_DEBUG("EventSelectorAthenaPool::nextWithSkip");
623
624 for (;;) {
625 // Check if we're at the end of file
626 StatusCode sc = nextHandleFileTransition(ctxt);
627 if (sc.isRecoverable()) {
628 continue; // handles empty files
629 }
630 if (sc.isFailure()) {
631 return StatusCode::FAILURE;
632 }
633
634 // Increase event count
635 ++m_evtCount;
636
637 if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
638 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
639 }
641 && (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
642 {
643 return StatusCode::SUCCESS;
644 } else {
645 while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
646 m_skipEventRanges.erase(m_skipEventRanges.begin());
647 }
648 if (m_isSecondary.value()) {
649 ATH_MSG_INFO("skipping secondary event " << m_evtCount);
650 } else {
651 ATH_MSG_INFO("skipping event " << m_evtCount);
652 }
653 }
654 }
655
656 return StatusCode::SUCCESS;
657}
658//________________________________________________________________________________
659StatusCode EventSelectorAthenaPool::previous(IEvtSelector::Context& /*ctxt*/) const {
660 ATH_MSG_ERROR("previous() not implemented");
661 return StatusCode::FAILURE;
662}
663//________________________________________________________________________________
664StatusCode EventSelectorAthenaPool::previous(IEvtSelector::Context& ctxt, int jump) const {
665 if (jump > 0) {
666 for (int i = 0; i < jump; i++) {
667 ATH_CHECK(previous(ctxt));
668 }
669 return StatusCode::SUCCESS;
670 }
671 return StatusCode::FAILURE;
672}
673//________________________________________________________________________________
674StatusCode EventSelectorAthenaPool::last(IEvtSelector::Context& ctxt) const {
675 if (ctxt.identifier() == m_endIter->identifier()) {
676 ATH_MSG_DEBUG("last(): Last event in InputStream.");
677 return StatusCode::SUCCESS;
678 }
679 return StatusCode::FAILURE;
680}
681//________________________________________________________________________________
682StatusCode EventSelectorAthenaPool::rewind(IEvtSelector::Context& ctxt) const {
683 ATH_CHECK(reinit());
684 ctxt = EventContextAthenaPool(this);
685 return StatusCode::SUCCESS;
686}
687//________________________________________________________________________________
688StatusCode EventSelectorAthenaPool::createAddress(const IEvtSelector::Context& /*ctxt*/,
689 IOpaqueAddress*& iop) const {
690 std::string tokenStr;
692 if (attrList.isValid()) {
693 try {
694 tokenStr = (*attrList)["eventRef"].data<std::string>();
695 ATH_MSG_DEBUG("found AthenaAttribute, name = eventRef = " << tokenStr);
696 } catch (std::exception &e) {
697 ATH_MSG_ERROR(e.what());
698 return StatusCode::FAILURE;
699 }
700 } else {
701 ATH_MSG_WARNING("Cannot find AthenaAttribute, key = " << m_attrListKey);
702 tokenStr = m_headerIterator->eventRef().toString();
703 }
704 auto token = std::make_unique<Token>();
705 token->fromString(tokenStr);
706 iop = new TokenAddress(pool::POOL_StorageType.type(), ClassID_traits<DataHeader>::ID(), "", "EventSelector", IPoolSvc::kInputStream, std::move(token));
707 return StatusCode::SUCCESS;
708}
709//________________________________________________________________________________
710StatusCode EventSelectorAthenaPool::releaseContext(IEvtSelector::Context*& /*ctxt*/) const {
711 return StatusCode::SUCCESS;
712}
713//________________________________________________________________________________
714StatusCode EventSelectorAthenaPool::resetCriteria(const std::string& /*criteria*/,
715 IEvtSelector::Context& /*ctxt*/) const {
716 return StatusCode::SUCCESS;
717}
718//__________________________________________________________________________
719StatusCode EventSelectorAthenaPool::seek(Context& /*ctxt*/, int evtNum) const {
720
721 if( m_inputCollectionsChanged ) {
722 StatusCode rc = reinit();
723 if( rc != StatusCode::SUCCESS ) return rc;
724 }
725
726 long newColl = findEvent(evtNum);
727 if (newColl == -1 && evtNum >= m_firstEvt[m_curCollection] && evtNum < m_evtCount - 1) {
728 newColl = m_curCollection;
729 }
730 if (newColl == -1) {
731 m_headerIterator = nullptr;
732 ATH_MSG_INFO("seek: Reached end of Input.");
734 return StatusCode::RECOVERABLE;
735 }
736 if (newColl != m_curCollection) {
737 if (!m_keepInputFilesOpen.value() && m_poolCollectionConverter) {
738 m_poolCollectionConverter->disconnectDb().ignore();
739 }
740 m_poolCollectionConverter.reset();
741 m_curCollection = newColl;
742 try {
743 ATH_MSG_DEBUG("Seek to item: \""
745 << "\" from the collection list.");
746 // Reset input collection iterator to the right place
747 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
748 m_inputCollectionsIterator += m_curCollection;
749 m_poolCollectionConverter = std::make_unique<PoolCollectionConverter>(m_collectionType.value(),
752 m_athenaPoolCnvSvc->getPoolSvc());
753 if (!m_poolCollectionConverter->initialize().isSuccess()) {
754 m_headerIterator = nullptr;
755 ATH_MSG_ERROR("seek: Unable to initialize PoolCollectionConverter.");
756 return StatusCode::FAILURE;
757 }
758 // Create DataHeader iterators
759 m_headerIterator = &m_poolCollectionConverter->selectAll();
760 EventContextAthenaPool* beginIter = new EventContextAthenaPool(this);
761 m_evtCount = m_firstEvt[m_curCollection];
762 next(*beginIter).ignore();
763 ATH_MSG_DEBUG("Token " << m_headerIterator->eventRef().toString());
764 } catch (std::exception &e) {
765 m_headerIterator = nullptr;
766 ATH_MSG_ERROR(e.what());
767 return StatusCode::FAILURE;
768 }
769 }
770
771 if (m_headerIterator->seek(evtNum - m_firstEvt[m_curCollection]) == 0) {
772 m_headerIterator = nullptr;
773 ATH_MSG_ERROR("Did not find event, evtNum = " << evtNum);
774 return StatusCode::FAILURE;
775 } else {
776 m_evtCount = evtNum + 1;
777 }
778 return StatusCode::SUCCESS;
779}
780//__________________________________________________________________________
781int EventSelectorAthenaPool::curEvent (const Context& /*ctxt*/) const {
782 return(m_evtCount);
783}
784//__________________________________________________________________________
785// Search for event number evtNum.
786// Return the index of the collection containing it, or -1 if not found.
787// Note: passing -1 for evtNum will always yield failure,
788// but this can be used to force filling in the entire m_numEvt array.
790 for (std::size_t i = 0, imax = m_numEvt.size(); i < imax; i++) {
791 if (m_numEvt[i] == -1) {
793 m_inputCollectionsProp.value()[i],
795 m_athenaPoolCnvSvc->getPoolSvc());
796 if (!pcc.initialize().isSuccess()) {
797 break;
798 }
799 int collection_size = 0;
800 if (pcc.isValid()) {
802 collection_size = hi->size();
803 }
804 if (i > 0) {
805 m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
806 } else {
807 m_firstEvt[i] = 0;
808 }
809 m_numEvt[i] = collection_size;
810 }
811 if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
812 return(i);
813 }
814 }
815 return(-1);
816}
817
818//________________________________________________________________________________
820 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
821 if (ds == nullptr) {
822 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
823 return StatusCode::FAILURE;
824 }
825 if (num < 0) {
826 if (ds->makeServer(num - 1).isFailure()) {
827 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to output DataStreaming server");
828 }
829 return StatusCode::SUCCESS;
830 }
831 if (ds->makeServer(num + 1).isFailure()) {
832 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to input DataStreaming server");
833 return StatusCode::FAILURE;
834 }
835 if (m_eventStreamingTool.empty()) {
836 return StatusCode::SUCCESS;
837 }
838 m_processMetadata = false;
839 ATH_MSG_DEBUG("makeServer: " << m_eventStreamingTool << " = " << num);
840 return(m_eventStreamingTool->makeServer(1, ""));
841}
842
843//________________________________________________________________________________
845 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
846 if (ds == nullptr) {
847 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
848 return StatusCode::FAILURE;
849 }
850 if (ds->makeClient(num + 1).isFailure()) {
851 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to DataStreaming client");
852 return StatusCode::FAILURE;
853 }
854 if (m_eventStreamingTool.empty()) {
855 return StatusCode::SUCCESS;
856 }
857 ATH_MSG_DEBUG("makeClient: " << m_eventStreamingTool << " = " << num);
858 std::string dummyStr;
859 return(m_eventStreamingTool->makeClient(0, dummyStr));
860}
861
862//________________________________________________________________________________
863StatusCode EventSelectorAthenaPool::share(int evtnum) {
864 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
865 if (ds == nullptr) {
866 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
867 return StatusCode::FAILURE;
868 }
869 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
870 StatusCode sc = m_eventStreamingTool->lockEvent(evtnum);
871 while (sc.isRecoverable()) {
872 usleep(1000);
873 sc = m_eventStreamingTool->lockEvent(evtnum);
874 }
875// Send stop client and wait for restart
876 if (sc.isFailure()) {
877 if (ds->makeClient(0).isFailure()) {
878 return StatusCode::FAILURE;
879 }
880 sc = m_eventStreamingTool->lockEvent(evtnum);
881 while (sc.isRecoverable() || sc.isFailure()) {
882 usleep(1000);
883 sc = m_eventStreamingTool->lockEvent(evtnum);
884 }
885//FIXME
886 if (ds->makeClient(1).isFailure()) {
887 return StatusCode::FAILURE;
888 }
889 }
890 return(sc);
891 }
892 return StatusCode::FAILURE;
893}
894
895//________________________________________________________________________________
897 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
898 if (ds == nullptr) {
899 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
900 return StatusCode::FAILURE;
901 }
902 if (m_eventStreamingTool.empty()) {
903 ATH_MSG_ERROR("No AthenaSharedMemoryTool configured for readEvent()");
904 return StatusCode::FAILURE;
905 }
906 ATH_MSG_VERBOSE("Called read Event " << maxevt);
907 IEvtSelector::Context* ctxt = new EventContextAthenaPool(this);
908 for (int i = 0; i < maxevt || maxevt == -1; ++i) {
909 if (!next(*ctxt).isSuccess()) {
910 if (m_evtCount == -1) {
911 ATH_MSG_VERBOSE("Called read Event and read last event from input: " << i);
912 break;
913 }
914 ATH_MSG_ERROR("Cannot read Event " << m_evtCount - 1 << " into AthenaSharedMemoryTool");
915 delete ctxt; ctxt = nullptr;
916 return StatusCode::FAILURE;
917 } else {
918 ATH_MSG_VERBOSE("Called next, read Event " << m_evtCount - 1);
919 }
920 }
921 delete ctxt; ctxt = nullptr;
922 // End of file, wait for last event to be taken
923 StatusCode sc;
924 while ( (sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
925 while (ds->readData().isSuccess()) {
926 ATH_MSG_VERBOSE("Called last readData, while marking last event in readEvent()");
927 }
928 usleep(1000);
929 }
930 if (!sc.isSuccess()) {
931 ATH_MSG_ERROR("Cannot put last Event marker to AthenaSharedMemoryTool");
932 return StatusCode::FAILURE;
933 } else {
934 sc = ds->readData();
935 while (sc.isSuccess() || sc.isRecoverable()) {
936 sc = ds->readData();
937 }
938 ATH_MSG_DEBUG("Failed last readData -> Clients are stopped, after marking last event in readEvent()");
939 }
940 return StatusCode::SUCCESS;
941}
942
943//__________________________________________________________________________
944int EventSelectorAthenaPool::size(Context& /*ctxt*/) const {
945 // Fetch sizes of all collections.
946 findEvent(-1);
947 return std::accumulate(m_numEvt.begin(), m_numEvt.end(), 0);
948}
949//__________________________________________________________________________
950std::unique_ptr<PoolCollectionConverter>
952 while (m_inputCollectionsIterator != m_inputCollectionsProp.value().end()) {
953 if (m_curCollection != 0) {
954 m_numEvt[m_curCollection] = m_evtCount - m_firstEvt[m_curCollection];
956 m_firstEvt[m_curCollection] = m_evtCount;
957 }
958 ATH_MSG_DEBUG("Try item: \"" << *m_inputCollectionsIterator << "\" from the collection list.");
959 auto pCollCnv = std::make_unique<PoolCollectionConverter>(m_collectionType.value(),
960 *m_inputCollectionsIterator,
962 m_athenaPoolCnvSvc->getPoolSvc());
963 StatusCode status = pCollCnv->initialize();
964 if (!status.isSuccess()) {
965 // Close previous collection.
966 pCollCnv.reset();
967 if (!status.isRecoverable()) {
968 ATH_MSG_ERROR("Unable to initialize PoolCollectionConverter.");
969 throw GaudiException("Unable to read: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
970 } else {
971 ATH_MSG_ERROR("Unable to open: " << *m_inputCollectionsIterator);
972 throw GaudiException("Unable to open: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
973 }
974 } else {
975 if (!pCollCnv->isValid().isSuccess()) {
976 pCollCnv.reset();
977 ATH_MSG_DEBUG("No events found in: " << *m_inputCollectionsIterator << " skipped!!!");
978 if (throwIncidents && m_processMetadata.value()) {
979 FileIncident beginInputFileIncident(name(), "BeginInputFile", *m_inputCollectionsIterator);
980 m_incidentSvc->fireIncident(beginInputFileIncident);
981 FileIncident endInputFileIncident(name(), "EndInputFile", "eventless " + *m_inputCollectionsIterator);
982 m_incidentSvc->fireIncident(endInputFileIncident);
983 }
984 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb(*m_inputCollectionsIterator).ignore();
985 ++m_inputCollectionsIterator;
986 } else {
987 return(pCollCnv);
988 }
989 }
990 }
991 return(nullptr);
992}
993//__________________________________________________________________________
995 // Get access to AttributeList
996 ATH_MSG_DEBUG("Get AttributeList from the collection");
997 // MN: accessing only attribute list, ignoring token list
998 const coral::AttributeList& attrList = m_headerIterator->currentRow().attributeList();
999 ATH_MSG_DEBUG("AttributeList size " << attrList.size());
1000 std::unique_ptr<AthenaAttributeList> athAttrList(new AthenaAttributeList(attrList));
1001 // Fill the new attribute list
1002 ATH_CHECK(fillAttributeList(athAttrList.get(), "", false));
1003 // Write the AttributeList
1005 ATH_CHECK(wh.record(std::move(athAttrList)));
1006 return StatusCode::SUCCESS;
1007}
1008//__________________________________________________________________________
1009StatusCode EventSelectorAthenaPool::fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const
1010{
1011 const auto& row = m_headerIterator->currentRow();
1012 attrList->extend( row.tokenName() + suffix, "string" );
1013 (*attrList)[ row.tokenName() + suffix ].data<std::string>() = row.token().toString();
1014 ATH_MSG_DEBUG("record AthenaAttribute, name = " << row.tokenName() + suffix << " = " << row.token().toString() << ".");
1015
1016 std::string eventRef = "eventRef";
1017 if (m_isSecondary.value()) {
1018 eventRef.append(suffix);
1019 }
1020 attrList->extend(eventRef, "string");
1021 (*attrList)[eventRef].data<std::string>() = m_headerIterator->eventRef().toString();
1022 ATH_MSG_DEBUG("record AthenaAttribute, name = " + eventRef + " = " << m_headerIterator->eventRef().toString() << ".");
1023
1024 if (copySource) {
1025 const coral::AttributeList& sourceAttrList = m_headerIterator->currentRow().attributeList();
1026 for (const auto &attr : sourceAttrList) {
1027 attrList->extend(attr.specification().name() + suffix, attr.specification().type());
1028 (*attrList)[attr.specification().name() + suffix] = attr;
1029 }
1030 }
1031
1032 return StatusCode::SUCCESS;
1033}
1034//__________________________________________________________________________
1036 ATH_MSG_INFO("I/O reinitialization...");
1037 if (m_poolCollectionConverter) {
1038 m_poolCollectionConverter->disconnectDb().ignore();
1039 m_poolCollectionConverter.reset();
1040 }
1041 m_headerIterator = nullptr;
1042 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
1043 if (!iomgr.retrieve().isSuccess()) {
1044 ATH_MSG_FATAL("Could not retrieve IoComponentMgr !");
1045 return StatusCode::FAILURE;
1046 }
1047 if (!iomgr->io_hasitem(this)) {
1048 ATH_MSG_FATAL("IoComponentMgr does not know about myself !");
1049 return StatusCode::FAILURE;
1050 }
1051 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
1052 m_guid = Guid::null();
1053 return(this->reinit());
1054 }
1055 std::vector<std::string> inputCollections = m_inputCollectionsProp.value();
1056 std::set<std::size_t> updatedIndexes;
1057 for (std::size_t i = 0, imax = m_inputCollectionsProp.value().size(); i < imax; i++) {
1058 if (updatedIndexes.find(i) != updatedIndexes.end()) continue;
1059 std::string savedName = inputCollections[i];
1060 std::string &fname = inputCollections[i];
1061 if (!iomgr->io_contains(this, fname)) {
1062 ATH_MSG_ERROR("IoComponentMgr does not know about [" << fname << "] !");
1063 return StatusCode::FAILURE;
1064 }
1065 if (!iomgr->io_retrieve(this, fname).isSuccess()) {
1066 ATH_MSG_FATAL("Could not retrieve new value for [" << fname << "] !");
1067 return StatusCode::FAILURE;
1068 }
1069 if (savedName != fname) {
1070 ATH_MSG_DEBUG("Mapping value for [" << savedName << "] to [" << fname << "]");
1071 m_athenaPoolCnvSvc->getPoolSvc()->renamePfn(savedName, fname);
1072 }
1073 updatedIndexes.insert(i);
1074 for (std::size_t j = i + 1; j < imax; j++) {
1075 if (inputCollections[j] == savedName) {
1076 inputCollections[j] = fname;
1077 updatedIndexes.insert(j);
1078 }
1079 }
1080 }
1081 // all good... copy over.
1082 m_inputCollectionsProp = inputCollections;
1083 m_guid = Guid::null();
1084 return reinit();
1085}
1086//__________________________________________________________________________
1088 ATH_MSG_INFO("I/O finalization...");
1089 if (m_poolCollectionConverter) {
1090 m_poolCollectionConverter->disconnectDb().ignore();
1091 m_poolCollectionConverter.reset();
1092 }
1093 return StatusCode::SUCCESS;
1094}
1095
1096//__________________________________________________________________________
1097/* Listen to IncidentType::BeginProcessing and EndProcessing
1098 Maintain counters of how many events from a given file are being processed.
1099 Files are identified by SG::SourceID (string GUID).
1100 When there are no more events from a file, see if it can be closed.
1101*/
1102void EventSelectorAthenaPool::handle(const Incident& inc)
1103{
1104 SG::SourceID fid;
1105 if (inc.type() == IncidentType::BeginProcessing) {
1106 if ( Atlas::hasExtendedEventContext(inc.context()) ) {
1107 fid = Atlas::getExtendedEventContext(inc.context()).proxy()->sourceID();
1108 }
1109 *m_sourceID.get(inc.context()) = fid;
1110 }
1111 else {
1112 fid = *m_sourceID.get(inc.context());
1113 }
1114
1115 if( fid.empty() ) {
1116 ATH_MSG_WARNING("could not read event source ID from incident event context");
1117 return;
1118 }
1119 if( m_activeEventsPerSource.find( fid ) == m_activeEventsPerSource.end()) {
1120 ATH_MSG_DEBUG("Incident handler ignoring unknown input FID: " << fid );
1121 return;
1122 }
1123 ATH_MSG_DEBUG("** MN Incident handler " << inc.type() << " Event source ID=" << fid );
1124 if( inc.type() == IncidentType::BeginProcessing ) {
1125 // increment the events-per-file counter for FID
1126 m_activeEventsPerSource[fid]++;
1127 } else if( inc.type() == IncidentType::EndProcessing ) {
1128 m_activeEventsPerSource[fid]--;
1129 disconnectIfFinished( fid );
1130 *m_sourceID.get(inc.context()) = "";
1131 }
1132 if( msgLvl(MSG::DEBUG) ) {
1133 for( auto& source: m_activeEventsPerSource )
1134 msg(MSG::DEBUG) << "SourceID: " << source.first << " active events: " << source.second << endmsg;
1135 }
1136}
1137
1138//__________________________________________________________________________
1139/* Disconnect APR Database identifieed by a SG::SourceID when it is no longer in use:
1140 m_guid is not pointing to it and there are no events from it being processed
1141 (if the EventLoopMgr was not firing Begin/End incidents, this will just close the DB)
1142*/
1144{
1145 if( m_eventStreamingTool.empty() && m_activeEventsPerSource.find(fid) != m_activeEventsPerSource.end()
1146 && m_activeEventsPerSource[fid] <= 0 && m_guid != fid ) {
1147 // Explicitly disconnect file corresponding to old FID to release memory
1148 if( !m_keepInputFilesOpen.value() ) {
1149 // Assume that the end of collection file indicates the end of payload file.
1150 if (m_processMetadata.value()) {
1151 FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + fid, fid);
1152 m_incidentSvc->fireIncident(endInputFileIncident);
1153 }
1154 ATH_MSG_INFO("Disconnecting input sourceID: " << fid );
1155 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb("FID:" + fid, IPoolSvc::kInputStream).ignore();
1156 m_activeEventsPerSource.erase( fid );
1157 return true;
1158 }
1159 }
1160 return false;
1161}
#define endmsg
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the DataHeader and DataHeaderElement classes.
This file contains the class definition for the EventContextAthenaPool class.
This file contains the class definition for the EventSelectorAthenaPool class.
This file contains the class definition for the IPoolSvc interface class.
static Double_t sc
static Double_t rc
This file contains the class definition for the PoolCollectionConverter class.
Handle class for reading from StoreGate.
Handle class for recording to StoreGate.
int imax(int i, int j)
This file contains the class definition for the TokenAddress class.
This file contains the class definition for the Token class (migrated from POOL).
#define ATLAS_THREAD_SAFE
An AttributeList represents a logical row of attributes in a metadata table.
This class provides the context to access an event from POOL persistent store.
EventSelectorAthenaPool(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Gaudi::CheckedProperty< uint64_t > m_firstEventNo
virtual StatusCode start() override
std::unique_ptr< PoolCollectionConverter > getCollectionCnv(bool throwIncidents=false) const
Return pointer to new PoolCollectionConverter.
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
virtual StatusCode makeClient(int num) override
Make this a client.
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
virtual StatusCode initialize() override
Required of all Gaudi Services.
Gaudi::Property< int > m_skipEvents
SkipEvents, numbers of events to skip: default = 0.
Gaudi::Property< bool > m_processMetadata
ProcessMetadata, switch on firing of FileIncidents which will trigger processing of metadata: default...
virtual StatusCode share(int evtnum) override
Request to share a given event number.
virtual int curEvent(const Context &ctxt) const override
Return the current event number.
virtual StatusCode createContext(IEvtSelector::Context *&ctxt) const override
create context
virtual StatusCode io_finalize() override
Callback method to finalize the internal state of the component for I/O purposes (e....
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp
Gaudi::CheckedProperty< uint32_t > m_initTimeStamp
void inputCollectionsHandler(Gaudi::Details::PropertyBase &)
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
virtual StatusCode resetCriteria(const std::string &criteria, IEvtSelector::Context &ctxt) const override
Set a selection criteria.
StatusCode reinit() const
Reinitialize the service when a fork() occurred/was-issued.
StoreGateSvc * eventStore() const
Return pointer to active event SG.
void fireEndFileIncidents(bool isLastFile) const
Fires the EndInputFile incident (if there is an open file) at end of selector.
virtual StatusCode releaseContext(IEvtSelector::Context *&ctxt) const override
virtual StatusCode stop() override
ServiceHandle< IIncidentSvc > m_incidentSvc
virtual StatusCode next(IEvtSelector::Context &ctxt) const override
virtual ~EventSelectorAthenaPool()
Destructor.
ToolHandle< IAthenaSelectorTool > m_counterTool
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
std::string m_attrListKey
AttributeList SG key.
virtual StatusCode seek(Context &ctxt, int evtnum) const override
Seek to a given event number.
SG::SlotSpecificObj< SG::SourceID > m_sourceID
ToolHandle< IAthenaIPCTool > m_eventStreamingTool
virtual StatusCode last(IEvtSelector::Context &ctxt) const override
virtual StatusCode makeServer(int num) override
Make this a server.
Gaudi::CheckedProperty< uint64_t > m_eventsPerRun
Gaudi::CheckedProperty< uint32_t > m_eventsPerLB
virtual StatusCode createAddress(const IEvtSelector::Context &ctxt, IOpaqueAddress *&iop) const override
virtual void handle(const Incident &incident) override
Incident service handle listening for BeginProcessing and EndProcessing.
virtual StatusCode previous(IEvtSelector::Context &ctxt) const override
ServiceHandle< IAthenaPoolCnvSvc > m_athenaPoolCnvSvc
virtual StatusCode rewind(IEvtSelector::Context &ctxt) const override
Gaudi::Property< bool > m_keepInputFilesOpen
KeepInputFilesOpen, boolean flag to keep files open after PoolCollection reaches end: default = false...
Gaudi::CheckedProperty< uint32_t > m_oldRunNo
Gaudi::Property< std::string > m_collectionType
CollectionType, type of the collection: default = "ImplicitCollection".
virtual StatusCode finalize() override
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
InputCollections, vector with names of the input collections.
EventContextAthenaPool * m_endIter
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first iteration automatically.
virtual StatusCode readEvent(int maxevt) override
Read the next maxevt events.
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context &ctxt) const override
Handle file transition at the next iteration.
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
Gaudi::CheckedProperty< uint32_t > m_runNo
The following are included for compatibility with McEventSelector and are not really used.
virtual int size(Context &ctxt) const override
Return the size of the collection.
int findEvent(int evtNum) const
Search for event with number evtNum.
Gaudi::Property< std::string > m_skipEventRangesProp
Skip Events - comma separated list of event to skip, ranges with '-': <start> - <end>
Gaudi::CheckedProperty< uint32_t > m_firstLBNo
This class provides a encapsulation of a GUID/UUID/CLSID/IID data structure (128 bit number).
Definition Guid.h:25
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
Abstract interface for sharing data.
Definition IDataShare.h:24
@ kInputStream
Definition IPoolSvc.h:40
This class provides an interface to POOL collections.
StatusCode isValid() const
Check whether has valid pool::ICollection*.
pool::ICollectionCursor & selectAll()
StatusCode initialize()
Required by all Gaudi Services.
virtual bool isValid() override final
Can the handle be successfully dereferenced?
The Athena Transient Store API.
static StoreGateSvc * currentStoreGate()
get current StoreGate
static const std::string & storeName(const StoreID::type &s)
Definition StoreID.cxx:77
This class provides a Generic Transient Address for POOL tokens.
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition Token.h:21
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:134
int technology() const
Access technology type.
Definition Token.h:77
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition Token.cxx:147
const Guid & dbID() const
Access database identifier.
Definition Token.h:64
An interface used to navigate the result of a query on a collection.
virtual std::size_t size()=0
Returns the size of the collection.
int r
Definition globals.cxx:22
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
bool hasExtendedEventContext(const EventContext &ctx)
Test whether a context object has an extended context installed.
StatusCode parse(std::tuple< Tup... > &tup, const Gaudi::Parsers::InputData &input)
static const DbType POOL_StorageType
Definition DbType.h:84
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.
MsgStream & msg
Definition testRead.cxx:32