ATLAS Offline Software
Loading...
Searching...
No Matches
EventSelectorAthenaPool.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
9
13
18#include "PoolSvc/IPoolSvc.h"
22
23// Framework
24#include "GaudiKernel/ClassID.h"
25#include "GaudiKernel/FileIncident.h"
26#include "GaudiKernel/IIncidentSvc.h"
27#include "GaudiKernel/IIoComponentMgr.h"
28#include "GaudiKernel/GaudiException.h"
29#include "GaudiKernel/GenericAddress.h"
30#include "GaudiKernel/StatusCode.h"
32
33// Pool
37#include "StorageSvc/DbType.h"
38
39#include <boost/tokenizer.hpp>
40#include <algorithm>
41#include <format>
42#include <vector>
43
44
45namespace {
47 StatusCode putEvent_ST(const IAthenaIPCTool& tool,
48 long eventNumber, const void* source,
49 size_t nbytes, unsigned int status) {
50 StatusCode sc ATLAS_THREAD_SAFE = tool.putEvent(eventNumber, source, nbytes, status);
51 return sc;
52 }
53}
54
55
56//________________________________________________________________________________
57EventSelectorAthenaPool::EventSelectorAthenaPool(const std::string& name, ISvcLocator* pSvcLocator) :
58 base_class(name, pSvcLocator)
59{
60
61 // TODO: validate if those are even used
62 m_runNo.verifier().setLower(0);
63 m_oldRunNo.verifier().setLower(0);
64 m_eventsPerRun.verifier().setLower(0);
65 m_firstEventNo.verifier().setLower(1);
66 m_firstLBNo.verifier().setLower(0);
67 m_eventsPerLB.verifier().setLower(0);
68 m_initTimeStamp.verifier().setLower(0);
69
71 m_inputCollectionsChanged = false;
72}
73//________________________________________________________________________________
74void EventSelectorAthenaPool::inputCollectionsHandler(Gaudi::Details::PropertyBase&) {
75 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
76 m_inputCollectionsChanged = true;
77 }
78}
79//________________________________________________________________________________
82//________________________________________________________________________________
86//________________________________________________________________________________
88
89 m_autoRetrieveTools = false;
90 m_checkToolDeps = false;
91
92 if (m_isSecondary.value()) {
93 ATH_MSG_DEBUG("Initializing secondary event selector " << name());
94 } else {
95 ATH_MSG_DEBUG("Initializing " << name());
96 }
97
98 ATH_CHECK(::AthService::initialize());
99 // Check for input collection
100 if (m_inputCollectionsProp.value().empty()) {
101 ATH_MSG_FATAL("Use the property: EventSelector.InputCollections = "
102 << "[ \"<collectionName>\" ] (list of collections)");
103 return StatusCode::FAILURE;
104 }
105 boost::char_separator<char> sep_coma(","), sep_hyph("-");
106 boost::tokenizer ranges(m_skipEventRangesProp.value(), sep_coma);
107 for( const std::string& r: ranges ) {
108 boost::tokenizer fromto(r, sep_hyph);
109 auto from_iter = fromto.begin();
110 long from = std::stol(*from_iter);
111 long to = from;
112 if( ++from_iter != fromto.end() ) {
113 to = std::stol(*from_iter);
114 }
115 m_skipEventRanges.emplace_back(from, to);
116 }
117
118 for( auto v : m_skipEventSequenceProp.value() ) {
119 m_skipEventRanges.emplace_back(v, v);
120 }
121 std::sort(m_skipEventRanges.begin(), m_skipEventRanges.end());
122 if( msgLvl(MSG::DEBUG) ) {
123 std::string skip_ranges_str;
124 for( const auto& [first, second] : m_skipEventRanges ) {
125 if( !skip_ranges_str.empty() ) skip_ranges_str += ", ";
126 skip_ranges_str += std::to_string(first);
127 if( first != second) skip_ranges_str += std::format("-{}", second);
128 }
129 if( !skip_ranges_str.empty() )
130 ATH_MSG_DEBUG("Events to skip: " << skip_ranges_str);
131 }
132 // Get IncidentSvc
133 ATH_CHECK(m_incidentSvc.retrieve());
134 // Listen to the Event Processing incidents
135 if (m_eventStreamingTool.empty()) {
136 m_incidentSvc->addListener(this, IncidentType::BeginProcessing, 0);
137 m_incidentSvc->addListener(this, IncidentType::EndProcessing, 0);
138 }
139
140 // Get AthenaPoolCnvSvc
141 ATH_CHECK(m_athenaPoolCnvSvc.retrieve());
142 // Get CounterTool (if configured)
143 if (!m_counterTool.empty()) {
144 ATH_CHECK(m_counterTool.retrieve());
145 }
146 // Get HelperTools
147 ATH_CHECK(m_helperTools.retrieve());
148 // Get SharedMemoryTool (if configured)
149 if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.retrieve().isSuccess()) {
150 ATH_MSG_FATAL("Cannot get " << m_eventStreamingTool.typeAndName() << "");
151 return StatusCode::FAILURE;
152 } else if (m_makeStreamingToolClient.value() == -1) {
153 std::string dummyStr;
154 ATH_CHECK(m_eventStreamingTool->makeClient(m_makeStreamingToolClient.value(), dummyStr));
155 }
156
157 // Ensure the xAODCnvSvc is listed in the EventPersistencySvc
158 ServiceHandle<IProperty> epSvc("EventPersistencySvc", name());
159 std::vector<std::string> propVal;
160 ATH_CHECK(Gaudi::Parsers::parse(propVal , epSvc->getProperty("CnvServices").toString()));
161 bool foundCnvSvc = false;
162 for (const auto& property : propVal) {
163 if (property == m_athenaPoolCnvSvc.type()) { foundCnvSvc = true; }
164 }
165 if (!foundCnvSvc) {
166 propVal.push_back(m_athenaPoolCnvSvc.type());
167 if (!epSvc->setProperty("CnvServices", Gaudi::Utils::toString(propVal)).isSuccess()) {
168 ATH_MSG_FATAL("Cannot set EventPersistencySvc Property for CnvServices");
169 return StatusCode::FAILURE;
170 }
171 }
172
173 // Register this service for 'I/O' events
174 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
175 ATH_CHECK(iomgr.retrieve());
176 ATH_CHECK(iomgr->io_register(this));
177 // Register input file's names with the I/O manager
178 const std::vector<std::string>& incol = m_inputCollectionsProp.value();
179 bool allGood = true;
180 std::string fileName;
181 std::string fileType;
182 for (const auto& inputCollection : incol) {
183 if (inputCollection.starts_with("LFN:") || inputCollection.starts_with("FID:")) {
184 m_athenaPoolCnvSvc->getPoolSvc()->lookupBestPfn(inputCollection, fileName, fileType);
185 } else {
186 fileName = inputCollection;
187 }
188 if (fileName.starts_with("PFN:")) {
189 fileName = fileName.substr(4);
190 }
191 if (!iomgr->io_register(this, IIoComponentMgr::IoMode::READ, inputCollection, fileName).isSuccess()) {
192 ATH_MSG_FATAL("could not register [" << inputCollection << "] for output !");
193 allGood = false;
194 } else {
195 ATH_MSG_VERBOSE("io_register[" << this->name() << "](" << inputCollection << ") [ok]");
196 }
197 }
198 if (!allGood) {
199 return StatusCode::FAILURE;
200 }
201
202 // Connect to PersistencySvc
203 if (!m_athenaPoolCnvSvc->getPoolSvc()->connect(pool::ITransaction::READ, IPoolSvc::kInputStream).isSuccess()) {
204 ATH_MSG_FATAL("Cannot connect to POOL PersistencySvc.");
205 return StatusCode::FAILURE;
206 }
207 // Jump to reinit() to execute common init/reinit actions
208 m_guid = Guid::null();
209 return reinit();
210}
211//________________________________________________________________________________
213 ATH_MSG_DEBUG("reinitialization...");
214
215 // reset markers
216 m_numEvt.resize(m_inputCollectionsProp.value().size(), -1);
217 m_firstEvt.resize(m_inputCollectionsProp.value().size(), -1);
218
219 // Initialize InputCollectionsIterator
220 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
221 m_curCollection = 0;
222 if (!m_firstEvt.empty()) {
223 m_firstEvt[0] = 0;
224 }
225 m_inputCollectionsChanged = false;
226 m_evtCount = 0;
227 m_headerIterator = 0;
228 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
229 ATH_MSG_INFO("Done reinitialization for shared reader client");
230 return StatusCode::SUCCESS;
231 }
232 bool retError = false;
233 for (auto& tool : m_helperTools) {
234 if (!tool->postInitialize().isSuccess()) {
235 ATH_MSG_FATAL("Failed to postInitialize() " << tool->name());
236 retError = true;
237 }
238 }
239 if (retError) {
240 ATH_MSG_FATAL("Failed to postInitialize() helperTools");
241 return StatusCode::FAILURE;
242 }
243
244 // Create an m_poolCollectionConverter to read the objects in
245 m_poolCollectionConverter = getCollectionCnv();
246 if (!m_poolCollectionConverter) {
247 ATH_MSG_INFO("No Events found in any Input Collections");
248 if (m_processMetadata.value()) {
249 m_inputCollectionsIterator = m_inputCollectionsProp.value().end();
250 if (!m_inputCollectionsProp.value().empty()) --m_inputCollectionsIterator;
251 //NOTE (wb may 2016): this will make the FirstInputFile incident correspond to last file in the collection ... if want it to be first file then move iterator to begin and then move above two lines below this incident firing
252 if (m_collectionType.value() == "ImplicitCollection" && !m_firedIncident && !m_inputCollectionsProp.value().empty()) {
253 FileIncident firstInputFileIncident(name(), "FirstInputFile", *m_inputCollectionsIterator);
254 m_incidentSvc->fireIncident(firstInputFileIncident);
255 m_firedIncident = true;
256 }
257 }
258 return StatusCode::SUCCESS;
259 }
260 // Get DataHeader iterator
261 try {
262 m_headerIterator = &m_poolCollectionConverter->selectAll();
263 } catch (std::exception &e) {
264 ATH_MSG_FATAL("Cannot open implicit collection - check data/software version.");
265 ATH_MSG_ERROR(e.what());
266 return StatusCode::FAILURE;
267 }
268 while (m_headerIterator == nullptr || m_headerIterator->next() == 0) { // no selected events
269 if (m_poolCollectionConverter) {
270 m_poolCollectionConverter->disconnectDb().ignore();
271 m_poolCollectionConverter.reset();
272 }
273 ++m_inputCollectionsIterator;
274 m_poolCollectionConverter = getCollectionCnv();
275 if (m_poolCollectionConverter) {
276 m_headerIterator = &m_poolCollectionConverter->selectAll();
277 } else {
278 break;
279 }
280 }
281 if (!m_poolCollectionConverter || m_headerIterator == nullptr) { // no event selected in any collection
282 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
283 m_curCollection = 0;
284 m_poolCollectionConverter = getCollectionCnv();
285 if (!m_poolCollectionConverter) {
286 return StatusCode::SUCCESS;
287 }
288 m_headerIterator = &m_poolCollectionConverter->selectAll();
289 while (m_headerIterator == nullptr || m_headerIterator->next() == 0) { // empty collection
290 if (m_poolCollectionConverter) {
291 m_poolCollectionConverter->disconnectDb().ignore();
292 m_poolCollectionConverter.reset();
293 }
294 ++m_inputCollectionsIterator;
295 m_poolCollectionConverter = getCollectionCnv();
296 if (m_poolCollectionConverter) {
297 m_headerIterator = &m_poolCollectionConverter->selectAll();
298 } else {
299 break;
300 }
301 }
302 }
303 if (!m_poolCollectionConverter || m_headerIterator == nullptr) {
304 return StatusCode::SUCCESS;
305 }
306 const Token& headRef = m_headerIterator->eventRef();
307 const std::string fid = headRef.dbID().toString();
308 const int tech = headRef.technology();
309 ATH_MSG_VERBOSE("reinit(): First DataHeder Token=" << headRef.toString() );
310
311 // Check if File is BS, for which Incident is thrown by SingleEventInputSvc
312 if (tech != 0x00001000 && m_processMetadata.value() && !m_firedIncident) {
313 FileIncident firstInputFileIncident(name(), "FirstInputFile", "FID:" + fid, fid);
314 m_incidentSvc->fireIncident(firstInputFileIncident);
315 m_firedIncident = true;
316 }
317 return StatusCode::SUCCESS;
318}
319//________________________________________________________________________________
321 if (m_poolCollectionConverter) {
322 // Reset iterators and apply new query
323 m_poolCollectionConverter->disconnectDb().ignore();
324 m_poolCollectionConverter.reset();
325 }
326 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
327 m_curCollection = 0;
328 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
329 return StatusCode::SUCCESS;
330 }
331 m_poolCollectionConverter = getCollectionCnv(true);
332 if (!m_poolCollectionConverter) {
333 ATH_MSG_INFO("No Events found in any Input Collections");
334 m_inputCollectionsIterator = m_inputCollectionsProp.value().end();
335 if (!m_inputCollectionsProp.value().empty()) {
336 --m_inputCollectionsIterator; //leave iterator in state of last input file
337 }
338 } else {
339 m_headerIterator = &m_poolCollectionConverter->selectAll();
340 }
341 m_evtCount = 0;
342 delete m_endIter;
343 m_endIter = nullptr;
344 m_endIter = new EventContextAthenaPool(nullptr);
345 return StatusCode::SUCCESS;
346}
347//________________________________________________________________________________
349 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
350 return StatusCode::SUCCESS;
351 }
352 IEvtSelector::Context* ctxt(nullptr);
353 if (!releaseContext(ctxt).isSuccess()) {
354 ATH_MSG_WARNING("Cannot release context");
355 }
356 return StatusCode::SUCCESS;
357}
358
359//________________________________________________________________________________
361 if (m_processMetadata.value()) {
362 if (m_evtCount >= 0) {
363 // Assume that the end of collection file indicates the end of payload file.
364 if (m_guid != Guid::null()) {
365 // Fire EndInputFile incident
366 FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + m_guid.toString(), m_guid.toString());
367 m_incidentSvc->fireIncident(endInputFileIncident);
368 }
369 }
370 if (isLastFile && m_firedIncident) {
371 m_firedIncident = false;
372 }
373 }
374}
375
376//________________________________________________________________________________
378 if (m_eventStreamingTool.empty() || !m_eventStreamingTool->isClient()) {
379 if (!m_counterTool.empty() && !m_counterTool->preFinalize().isSuccess()) {
380 ATH_MSG_WARNING("Failed to preFinalize() CounterTool");
381 }
382 for (auto& tool : m_helperTools) {
383 if (!tool->preFinalize().isSuccess()) {
384 ATH_MSG_WARNING("Failed to preFinalize() " << tool->name());
385 }
386 }
387 }
388 delete m_endIter; m_endIter = nullptr;
389 m_headerIterator = nullptr;
390 if (m_poolCollectionConverter) {
391 m_poolCollectionConverter.reset();
392 }
393 // Finalize the Service base class.
394 return ::AthService::finalize();
395}
396
397//________________________________________________________________________________
398StatusCode EventSelectorAthenaPool::createContext(IEvtSelector::Context*& ctxt) const {
399 ctxt = new EventContextAthenaPool(this);
400 return StatusCode::SUCCESS;
401}
402//________________________________________________________________________________
403StatusCode EventSelectorAthenaPool::next(IEvtSelector::Context& ctxt) const {
404 std::lock_guard<CallMutex> lockGuard(m_callLock);
405 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
406 if (m_makeStreamingToolClient.value() == -1) {
407 StatusCode sc = m_eventStreamingTool->lockEvent(m_evtCount);
408 while (sc.isRecoverable()) {
409 usleep(1000);
410 sc = m_eventStreamingTool->lockEvent(m_evtCount);
411 }
412 }
413 // Increase event count
414 ++m_evtCount;
415 void* tokenStr = nullptr;
416 unsigned int status = 0;
417 StatusCode sc = m_eventStreamingTool->getLockedEvent(&tokenStr, status);
418 if (sc.isRecoverable()) {
419 delete [] (char*)tokenStr; tokenStr = nullptr;
420 // Return end iterator
421 ctxt = *m_endIter;
422 // This is not a real failure but a Gaudi way of handling "end of job"
423 return StatusCode::FAILURE;
424 }
425 if (sc.isFailure()) {
426 ATH_MSG_FATAL("Cannot get NextEvent from AthenaSharedMemoryTool");
427 delete [] (char*)tokenStr; tokenStr = nullptr;
428 return StatusCode::FAILURE;
429 }
430 if (!eventStore()->clearStore().isSuccess()) {
431 ATH_MSG_WARNING("Cannot clear Store");
432 }
433 std::unique_ptr<AthenaAttributeList> athAttrList(new AthenaAttributeList());
434 athAttrList->extend("eventRef", "string");
435 (*athAttrList)["eventRef"].data<std::string>() = std::string((char*)tokenStr);
437 if (!wh.record(std::move(athAttrList)).isSuccess()) {
438 delete [] (char*)tokenStr; tokenStr = nullptr;
439 ATH_MSG_ERROR("Cannot record AttributeList to StoreGate " << StoreID::storeName(eventStore()->storeID()));
440 return StatusCode::FAILURE;
441 }
442 Token token;
443 token.fromString(std::string((char*)tokenStr));
444 delete [] (char*)tokenStr; tokenStr = nullptr;
445 Guid guid = token.dbID();
446 if (guid != m_guid && m_processMetadata.value()) {
447 if (m_evtCount >= 0 && m_guid != Guid::null()) {
448 // Fire EndInputFile incident
449 FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + m_guid.toString(), m_guid.toString());
450 m_incidentSvc->fireIncident(endInputFileIncident);
451 }
452 m_guid = guid;
453 FileIncident beginInputFileIncident(name(), "BeginInputFile", "FID:" + m_guid.toString(), m_guid.toString());
454 m_incidentSvc->fireIncident(beginInputFileIncident);
455 }
456 return StatusCode::SUCCESS;
457 }
458 for (const auto& tool : m_helperTools) {
459 if (!tool->preNext().isSuccess()) {
460 ATH_MSG_WARNING("Failed to preNext() " << tool->name());
461 }
462 }
463 for (;;) {
464 // Handle possible file transition
465 StatusCode sc = nextHandleFileTransition(ctxt);
466 if (sc.isRecoverable()) {
467 continue; // handles empty files
468 }
469 if (sc.isFailure()) {
470 return StatusCode::FAILURE;
471 }
472 // Increase event count
473 ++m_evtCount;
474 if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
475 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
476 }
478 && (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
479 {
480 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) {
481 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
482 if (ds == nullptr) {
483 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
484 return StatusCode::FAILURE;
485 }
486 std::string token = m_headerIterator->eventRef().toString();
487 StatusCode sc;
488 while ( (sc = putEvent_ST(*m_eventStreamingTool,
489 m_evtCount - 1, token.c_str(),
490 token.length() + 1, 0)).isRecoverable() ) {
491 while (ds->readData().isSuccess()) {
492 ATH_MSG_VERBOSE("Called last readData, while putting next event in next()");
493 }
494 // Nothing to do right now, trigger alternative (e.g. caching) here? Currently just fast loop.
495 }
496 if (!sc.isSuccess()) {
497 ATH_MSG_ERROR("Cannot put Event " << m_evtCount - 1 << " to AthenaSharedMemoryTool");
498 return StatusCode::FAILURE;
499 }
500 } else {
501 if (!m_isSecondary.value()) {
502 if (!eventStore()->clearStore().isSuccess()) {
503 ATH_MSG_WARNING("Cannot clear Store");
504 }
505 if (!recordAttributeList().isSuccess()) {
506 ATH_MSG_ERROR("Failed to record AttributeList.");
507 return StatusCode::FAILURE;
508 }
509 }
510 }
511 StatusCode status = StatusCode::SUCCESS;
512 for (const auto& tool : m_helperTools) {
513 StatusCode toolStatus = tool->postNext();
514 if (toolStatus.isRecoverable()) {
515 ATH_MSG_INFO("Request skipping event from: " << tool->name());
516 if (status.isSuccess()) {
517 status = StatusCode::RECOVERABLE;
518 }
519 } else if (toolStatus.isFailure()) {
520 ATH_MSG_WARNING("Failed to postNext() " << tool->name());
521 status = StatusCode::FAILURE;
522 }
523 }
524 if (status.isRecoverable()) {
525 ATH_MSG_INFO("skipping event " << m_evtCount);
526 } else if (status.isFailure()) {
527 ATH_MSG_WARNING("Failed to postNext() HelperTool.");
528 } else {
529 if (!m_counterTool.empty() && !m_counterTool->postNext().isSuccess()) {
530 ATH_MSG_WARNING("Failed to postNext() CounterTool.");
531 }
532 break;
533 }
534 } else {
535 while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
536 m_skipEventRanges.erase(m_skipEventRanges.begin());
537 }
538 ATH_MSG_INFO("skipping event " << m_evtCount);
539 }
540 }
541 return StatusCode::SUCCESS;
542}
543//________________________________________________________________________________
544StatusCode EventSelectorAthenaPool::next(IEvtSelector::Context& ctxt, int jump) const {
545 if (jump > 0) {
546 for (int i = 0; i < jump; i++) {
547 ATH_CHECK(next(ctxt));
548 }
549 return StatusCode::SUCCESS;
550 }
551 return StatusCode::FAILURE;
552}
553//________________________________________________________________________________
554StatusCode EventSelectorAthenaPool::nextHandleFileTransition(IEvtSelector::Context& ctxt) const
555{
556 if( m_inputCollectionsChanged ) {
557 StatusCode rc = reinit();
558 if( rc != StatusCode::SUCCESS ) return rc;
559 }
560 else { // advance to the next (not needed after reinit)
561 // Check if we're at the end of file
562 if (m_headerIterator == nullptr || m_headerIterator->next() == 0) {
563 m_headerIterator = nullptr;
564 // Close previous collection.
565 m_poolCollectionConverter.reset();
566
567 // zero the current DB ID (m_guid) before disconnect() to indicate it is no longer in use
568 const SG::SourceID old_guid = m_guid.toString();
569 m_guid = Guid::null();
570 disconnectIfFinished( old_guid );
571
572 // check if somebody updated Inputs in the EOF incident (like VP1 does)
573 if( m_inputCollectionsChanged ) {
574 StatusCode rc = reinit();
575 if( rc != StatusCode::SUCCESS ) return rc;
576 } else {
577 // Open next file from inputCollections list.
578 ++m_inputCollectionsIterator;
579 // Create PoolCollectionConverter for input file
580 m_poolCollectionConverter = getCollectionCnv(true);
581 if (!m_poolCollectionConverter) {
582 // Return end iterator
583 ctxt = *m_endIter;
584 // This is not a real failure but a Gaudi way of handling "end of job"
585 return StatusCode::FAILURE;
586 }
587 // Get DataHeader iterator
588 m_headerIterator = &m_poolCollectionConverter->selectAll();
589
590 // Return RECOVERABLE to mark we should still continue
591 return StatusCode::RECOVERABLE;
592 }
593 }
594 }
595 const Token& headRef = m_headerIterator->eventRef();
596 const Guid guid = headRef.dbID();
597 const int tech = headRef.technology();
598 ATH_MSG_VERBOSE("next(): DataHeder Token=" << headRef.toString() );
599
600 if (guid != m_guid) {
601 // we are starting reading from a new DB. Check if the old one needs to be retired
602 if (m_guid != Guid::null()) {
603 // zero the current DB ID (m_guid) before trying disconnect() to indicate it is no longer in use
604 const SG::SourceID old_guid = m_guid.toString();
605 m_guid = Guid::null();
606 disconnectIfFinished( old_guid );
607 }
608 m_guid = guid;
609 m_activeEventsPerSource[guid.toString()] = 0;
610 // Fire BeginInputFile incident if current InputCollection is a payload file;
611 // otherwise, ascertain whether the pointed-to file is reachable before firing any incidents and/or proceeding
612 if (m_collectionType.value() == "ImplicitCollection") {
613 // For now, we can only deal with input metadata from POOL files, but we know we have a POOL file here
614 if (!m_athenaPoolCnvSvc->setInputAttributes(*m_inputCollectionsIterator).isSuccess()) {
615 ATH_MSG_ERROR("Failed to set input attributes.");
616 return StatusCode::FAILURE;
617 }
618 if (m_processMetadata.value()) {
619 FileIncident beginInputFileIncident(name(), "BeginInputFile", *m_inputCollectionsIterator, m_guid.toString());
620 m_incidentSvc->fireIncident(beginInputFileIncident);
621 }
622 } else {
623 // Check if File is BS
624 if (tech != 0x00001000 && m_processMetadata.value()) {
625 FileIncident beginInputFileIncident(name(), "BeginInputFile", "FID:" + m_guid.toString(), m_guid.toString());
626 m_incidentSvc->fireIncident(beginInputFileIncident);
627 }
628 }
629 } // end if (guid != m_guid)
630 return StatusCode::SUCCESS;
631}
632//________________________________________________________________________________
633StatusCode EventSelectorAthenaPool::nextWithSkip(IEvtSelector::Context& ctxt) const {
634 ATH_MSG_DEBUG("EventSelectorAthenaPool::nextWithSkip");
635
636 for (;;) {
637 // Check if we're at the end of file
638 StatusCode sc = nextHandleFileTransition(ctxt);
639 if (sc.isRecoverable()) {
640 continue; // handles empty files
641 }
642 if (sc.isFailure()) {
643 return StatusCode::FAILURE;
644 }
645
646 // Increase event count
647 ++m_evtCount;
648
649 if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
650 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
651 }
653 && (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
654 {
655 return StatusCode::SUCCESS;
656 } else {
657 while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
658 m_skipEventRanges.erase(m_skipEventRanges.begin());
659 }
660 if (m_isSecondary.value()) {
661 ATH_MSG_INFO("skipping secondary event " << m_evtCount);
662 } else {
663 ATH_MSG_INFO("skipping event " << m_evtCount);
664 }
665 }
666 }
667
668 return StatusCode::SUCCESS;
669}
670//________________________________________________________________________________
671StatusCode EventSelectorAthenaPool::previous(IEvtSelector::Context& /*ctxt*/) const {
672 ATH_MSG_ERROR("previous() not implemented");
673 return StatusCode::FAILURE;
674}
675//________________________________________________________________________________
676StatusCode EventSelectorAthenaPool::previous(IEvtSelector::Context& ctxt, int jump) const {
677 if (jump > 0) {
678 for (int i = 0; i < jump; i++) {
679 ATH_CHECK(previous(ctxt));
680 }
681 return StatusCode::SUCCESS;
682 }
683 return StatusCode::FAILURE;
684}
685//________________________________________________________________________________
686StatusCode EventSelectorAthenaPool::last(IEvtSelector::Context& ctxt) const {
687 if (ctxt.identifier() == m_endIter->identifier()) {
688 ATH_MSG_DEBUG("last(): Last event in InputStream.");
689 return StatusCode::SUCCESS;
690 }
691 return StatusCode::FAILURE;
692}
693//________________________________________________________________________________
694StatusCode EventSelectorAthenaPool::rewind(IEvtSelector::Context& ctxt) const {
695 ATH_CHECK(reinit());
696 ctxt = EventContextAthenaPool(this);
697 return StatusCode::SUCCESS;
698}
699//________________________________________________________________________________
700StatusCode EventSelectorAthenaPool::createAddress(const IEvtSelector::Context& /*ctxt*/,
701 IOpaqueAddress*& iop) const {
702 std::string tokenStr;
704 if (attrList.isValid()) {
705 try {
706 tokenStr = (*attrList)["eventRef"].data<std::string>();
707 ATH_MSG_DEBUG("found AthenaAttribute, name = eventRef = " << tokenStr);
708 } catch (std::exception &e) {
709 ATH_MSG_ERROR(e.what());
710 return StatusCode::FAILURE;
711 }
712 } else {
713 ATH_MSG_WARNING("Cannot find AthenaAttribute, key = " << m_attrListKey);
714 tokenStr = m_headerIterator->eventRef().toString();
715 }
716 auto token = std::make_unique<Token>();
717 token->fromString(tokenStr);
718 iop = new TokenAddress(pool::POOL_StorageType.type(), ClassID_traits<DataHeader>::ID(), "", "EventSelector", IPoolSvc::kInputStream, std::move(token));
719 return StatusCode::SUCCESS;
720}
721//________________________________________________________________________________
722StatusCode EventSelectorAthenaPool::releaseContext(IEvtSelector::Context*& /*ctxt*/) const {
723 return StatusCode::SUCCESS;
724}
725//________________________________________________________________________________
726StatusCode EventSelectorAthenaPool::resetCriteria(const std::string& /*criteria*/,
727 IEvtSelector::Context& /*ctxt*/) const {
728 return StatusCode::SUCCESS;
729}
730//__________________________________________________________________________
731StatusCode EventSelectorAthenaPool::seek(Context& /*ctxt*/, int evtNum) const {
732
733 if( m_inputCollectionsChanged ) {
734 StatusCode rc = reinit();
735 if( rc != StatusCode::SUCCESS ) return rc;
736 }
737
738 long newColl = findEvent(evtNum);
739 if (newColl == -1 && evtNum >= m_firstEvt[m_curCollection] && evtNum < m_evtCount - 1) {
740 newColl = m_curCollection;
741 }
742 if (newColl == -1) {
743 m_headerIterator = nullptr;
744 ATH_MSG_INFO("seek: Reached end of Input.");
746 return StatusCode::RECOVERABLE;
747 }
748 if (newColl != m_curCollection) {
749 if (!m_keepInputFilesOpen.value() && m_poolCollectionConverter) {
750 m_poolCollectionConverter->disconnectDb().ignore();
751 }
752 m_poolCollectionConverter.reset();
753 m_curCollection = newColl;
754 try {
755 ATH_MSG_DEBUG("Seek to item: \""
757 << "\" from the collection list.");
758 // Reset input collection iterator to the right place
759 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
760 m_inputCollectionsIterator += m_curCollection;
761 m_poolCollectionConverter = std::make_unique<PoolCollectionConverter>(m_collectionType.value(),
764 m_athenaPoolCnvSvc->getPoolSvc());
765 if (!m_poolCollectionConverter->initialize().isSuccess()) {
766 m_headerIterator = nullptr;
767 ATH_MSG_ERROR("seek: Unable to initialize PoolCollectionConverter.");
768 return StatusCode::FAILURE;
769 }
770 // Create DataHeader iterators
771 m_headerIterator = &m_poolCollectionConverter->selectAll();
772 EventContextAthenaPool* beginIter = new EventContextAthenaPool(this);
773 m_evtCount = m_firstEvt[m_curCollection];
774 next(*beginIter).ignore();
775 ATH_MSG_DEBUG("Token " << m_headerIterator->eventRef().toString());
776 } catch (std::exception &e) {
777 m_headerIterator = nullptr;
778 ATH_MSG_ERROR(e.what());
779 return StatusCode::FAILURE;
780 }
781 }
782
783 if (m_headerIterator->seek(evtNum - m_firstEvt[m_curCollection]) == 0) {
784 m_headerIterator = nullptr;
785 ATH_MSG_ERROR("Did not find event, evtNum = " << evtNum);
786 return StatusCode::FAILURE;
787 } else {
788 m_evtCount = evtNum + 1;
789 }
790 return StatusCode::SUCCESS;
791}
792//__________________________________________________________________________
793int EventSelectorAthenaPool::curEvent (const Context& /*ctxt*/) const {
794 return(m_evtCount);
795}
796//__________________________________________________________________________
797// Search for event number evtNum.
798// Return the index of the collection containing it, or -1 if not found.
799// Note: passing -1 for evtNum will always yield failure,
800// but this can be used to force filling in the entire m_numEvt array.
802 for (std::size_t i = 0, imax = m_numEvt.size(); i < imax; i++) {
803 if (m_numEvt[i] == -1) {
805 m_inputCollectionsProp.value()[i],
807 m_athenaPoolCnvSvc->getPoolSvc());
808 if (!pcc.initialize().isSuccess()) {
809 break;
810 }
811 int collection_size = 0;
812 if (pcc.isValid()) {
814 collection_size = hi->size();
815 }
816 if (i > 0) {
817 m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
818 } else {
819 m_firstEvt[i] = 0;
820 }
821 m_numEvt[i] = collection_size;
822 }
823 if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
824 return(i);
825 }
826 }
827 return(-1);
828}
829
830//________________________________________________________________________________
832 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
833 if (ds == nullptr) {
834 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
835 return StatusCode::FAILURE;
836 }
837 if (num < 0) {
838 if (ds->makeServer(num - 1).isFailure()) {
839 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to output DataStreaming server");
840 }
841 return StatusCode::SUCCESS;
842 }
843 if (ds->makeServer(num + 1).isFailure()) {
844 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to input DataStreaming server");
845 return StatusCode::FAILURE;
846 }
847 if (m_eventStreamingTool.empty()) {
848 return StatusCode::SUCCESS;
849 }
850 m_processMetadata = false;
851 ATH_MSG_DEBUG("makeServer: " << m_eventStreamingTool << " = " << num);
852 return(m_eventStreamingTool->makeServer(1, ""));
853}
854
855//________________________________________________________________________________
857 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
858 if (ds == nullptr) {
859 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
860 return StatusCode::FAILURE;
861 }
862 if (ds->makeClient(num + 1).isFailure()) {
863 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to DataStreaming client");
864 return StatusCode::FAILURE;
865 }
866 if (m_eventStreamingTool.empty()) {
867 return StatusCode::SUCCESS;
868 }
869 ATH_MSG_DEBUG("makeClient: " << m_eventStreamingTool << " = " << num);
870 std::string dummyStr;
871 return(m_eventStreamingTool->makeClient(0, dummyStr));
872}
873
874//________________________________________________________________________________
875StatusCode EventSelectorAthenaPool::share(int evtnum) {
876 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
877 if (ds == nullptr) {
878 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
879 return StatusCode::FAILURE;
880 }
881 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
882 StatusCode sc = m_eventStreamingTool->lockEvent(evtnum);
883 while (sc.isRecoverable()) {
884 usleep(1000);
885 sc = m_eventStreamingTool->lockEvent(evtnum);
886 }
887// Send stop client and wait for restart
888 if (sc.isFailure()) {
889 if (ds->makeClient(0).isFailure()) {
890 return StatusCode::FAILURE;
891 }
892 sc = m_eventStreamingTool->lockEvent(evtnum);
893 while (sc.isRecoverable() || sc.isFailure()) {
894 usleep(1000);
895 sc = m_eventStreamingTool->lockEvent(evtnum);
896 }
897//FIXME
898 if (ds->makeClient(1).isFailure()) {
899 return StatusCode::FAILURE;
900 }
901 }
902 return(sc);
903 }
904 return StatusCode::FAILURE;
905}
906
907//________________________________________________________________________________
909 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
910 if (ds == nullptr) {
911 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
912 return StatusCode::FAILURE;
913 }
914 if (m_eventStreamingTool.empty()) {
915 ATH_MSG_ERROR("No AthenaSharedMemoryTool configured for readEvent()");
916 return StatusCode::FAILURE;
917 }
918 ATH_MSG_VERBOSE("Called read Event " << maxevt);
919 IEvtSelector::Context* ctxt = new EventContextAthenaPool(this);
920 for (int i = 0; i < maxevt || maxevt == -1; ++i) {
921 if (!next(*ctxt).isSuccess()) {
922 if (m_evtCount == -1) {
923 ATH_MSG_VERBOSE("Called read Event and read last event from input: " << i);
924 break;
925 }
926 ATH_MSG_ERROR("Cannot read Event " << m_evtCount - 1 << " into AthenaSharedMemoryTool");
927 delete ctxt; ctxt = nullptr;
928 return StatusCode::FAILURE;
929 } else {
930 ATH_MSG_VERBOSE("Called next, read Event " << m_evtCount - 1);
931 }
932 }
933 delete ctxt; ctxt = nullptr;
934 // End of file, wait for last event to be taken
935 StatusCode sc;
936 while ( (sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
937 while (ds->readData().isSuccess()) {
938 ATH_MSG_VERBOSE("Called last readData, while marking last event in readEvent()");
939 }
940 usleep(1000);
941 }
942 if (!sc.isSuccess()) {
943 ATH_MSG_ERROR("Cannot put last Event marker to AthenaSharedMemoryTool");
944 return StatusCode::FAILURE;
945 } else {
946 sc = ds->readData();
947 while (sc.isSuccess() || sc.isRecoverable()) {
948 sc = ds->readData();
949 }
950 ATH_MSG_DEBUG("Failed last readData -> Clients are stopped, after marking last event in readEvent()");
951 }
952 return StatusCode::SUCCESS;
953}
954
955//__________________________________________________________________________
956int EventSelectorAthenaPool::size(Context& /*ctxt*/) const {
957 // Fetch sizes of all collections.
958 findEvent(-1);
959 return std::accumulate(m_numEvt.begin(), m_numEvt.end(), 0);
960}
961//__________________________________________________________________________
962std::unique_ptr<PoolCollectionConverter>
964 while (m_inputCollectionsIterator != m_inputCollectionsProp.value().end()) {
965 if (m_curCollection != 0) {
966 m_numEvt[m_curCollection] = m_evtCount - m_firstEvt[m_curCollection];
968 m_firstEvt[m_curCollection] = m_evtCount;
969 }
970 ATH_MSG_DEBUG("Try item: \"" << *m_inputCollectionsIterator << "\" from the collection list.");
971 auto pCollCnv = std::make_unique<PoolCollectionConverter>(m_collectionType.value(),
972 *m_inputCollectionsIterator,
974 m_athenaPoolCnvSvc->getPoolSvc());
975 StatusCode status = pCollCnv->initialize();
976 if (!status.isSuccess()) {
977 // Close previous collection.
978 pCollCnv.reset();
979 if (!status.isRecoverable()) {
980 ATH_MSG_ERROR("Unable to initialize PoolCollectionConverter.");
981 throw GaudiException("Unable to read: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
982 } else {
983 ATH_MSG_ERROR("Unable to open: " << *m_inputCollectionsIterator);
984 throw GaudiException("Unable to open: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
985 }
986 } else {
987 if (!pCollCnv->isValid().isSuccess()) {
988 pCollCnv.reset();
989 ATH_MSG_DEBUG("No events found in: " << *m_inputCollectionsIterator << " skipped!!!");
990 if (throwIncidents && m_processMetadata.value()) {
991 FileIncident beginInputFileIncident(name(), "BeginInputFile", *m_inputCollectionsIterator);
992 m_incidentSvc->fireIncident(beginInputFileIncident);
993 FileIncident endInputFileIncident(name(), "EndInputFile", "eventless " + *m_inputCollectionsIterator);
994 m_incidentSvc->fireIncident(endInputFileIncident);
995 }
996 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb(*m_inputCollectionsIterator).ignore();
997 ++m_inputCollectionsIterator;
998 } else {
999 return(pCollCnv);
1000 }
1001 }
1002 }
1003 return(nullptr);
1004}
1005//__________________________________________________________________________
1007 // Get access to AttributeList
1008 ATH_MSG_DEBUG("Get AttributeList from the collection");
1009 // MN: accessing only attribute list, ignoring token list
1010 const coral::AttributeList& attrList = m_headerIterator->currentRow().attributeList();
1011 ATH_MSG_DEBUG("AttributeList size " << attrList.size());
1012 std::unique_ptr<AthenaAttributeList> athAttrList(new AthenaAttributeList(attrList));
1013 // Fill the new attribute list
1014 ATH_CHECK(fillAttributeList(athAttrList.get(), "", false));
1015 // Write the AttributeList
1017 ATH_CHECK(wh.record(std::move(athAttrList)));
1018 return StatusCode::SUCCESS;
1019}
1020//__________________________________________________________________________
1021StatusCode EventSelectorAthenaPool::fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const
1022{
1023 const pool::TokenList& tokenList = m_headerIterator->currentRow().tokenList();
1024 for (pool::TokenList::const_iterator iter = tokenList.begin(), last = tokenList.end(); iter != last; ++iter) {
1025 attrList->extend(iter.tokenName() + suffix, "string");
1026 (*attrList)[iter.tokenName() + suffix].data<std::string>() = iter->toString();
1027 ATH_MSG_DEBUG("record AthenaAttribute, name = " << iter.tokenName() + suffix << " = " << iter->toString() << ".");
1028 }
1029
1030 std::string eventRef = "eventRef";
1031 if (m_isSecondary.value()) {
1032 eventRef.append(suffix);
1033 }
1034 attrList->extend(eventRef, "string");
1035 (*attrList)[eventRef].data<std::string>() = m_headerIterator->eventRef().toString();
1036 ATH_MSG_DEBUG("record AthenaAttribute, name = " + eventRef + " = " << m_headerIterator->eventRef().toString() << ".");
1037
1038 if (copySource) {
1039 const coral::AttributeList& sourceAttrList = m_headerIterator->currentRow().attributeList();
1040 for (const auto &attr : sourceAttrList) {
1041 attrList->extend(attr.specification().name() + suffix, attr.specification().type());
1042 (*attrList)[attr.specification().name() + suffix] = attr;
1043 }
1044 }
1045
1046 return StatusCode::SUCCESS;
1047}
1048//__________________________________________________________________________
1050 ATH_MSG_INFO("I/O reinitialization...");
1051 if (m_poolCollectionConverter) {
1052 m_poolCollectionConverter->disconnectDb().ignore();
1053 m_poolCollectionConverter.reset();
1054 }
1055 m_headerIterator = nullptr;
1056 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
1057 if (!iomgr.retrieve().isSuccess()) {
1058 ATH_MSG_FATAL("Could not retrieve IoComponentMgr !");
1059 return StatusCode::FAILURE;
1060 }
1061 if (!iomgr->io_hasitem(this)) {
1062 ATH_MSG_FATAL("IoComponentMgr does not know about myself !");
1063 return StatusCode::FAILURE;
1064 }
1065 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
1066 m_guid = Guid::null();
1067 return(this->reinit());
1068 }
1069 std::vector<std::string> inputCollections = m_inputCollectionsProp.value();
1070 std::set<std::size_t> updatedIndexes;
1071 for (std::size_t i = 0, imax = m_inputCollectionsProp.value().size(); i < imax; i++) {
1072 if (updatedIndexes.find(i) != updatedIndexes.end()) continue;
1073 std::string savedName = inputCollections[i];
1074 std::string &fname = inputCollections[i];
1075 if (!iomgr->io_contains(this, fname)) {
1076 ATH_MSG_ERROR("IoComponentMgr does not know about [" << fname << "] !");
1077 return StatusCode::FAILURE;
1078 }
1079 if (!iomgr->io_retrieve(this, fname).isSuccess()) {
1080 ATH_MSG_FATAL("Could not retrieve new value for [" << fname << "] !");
1081 return StatusCode::FAILURE;
1082 }
1083 if (savedName != fname) {
1084 ATH_MSG_DEBUG("Mapping value for [" << savedName << "] to [" << fname << "]");
1085 m_athenaPoolCnvSvc->getPoolSvc()->renamePfn(savedName, fname);
1086 }
1087 updatedIndexes.insert(i);
1088 for (std::size_t j = i + 1; j < imax; j++) {
1089 if (inputCollections[j] == savedName) {
1090 inputCollections[j] = fname;
1091 updatedIndexes.insert(j);
1092 }
1093 }
1094 }
1095 // all good... copy over.
1096 m_inputCollectionsProp = inputCollections;
1097 m_guid = Guid::null();
1098 return reinit();
1099}
1100//__________________________________________________________________________
1102 ATH_MSG_INFO("I/O finalization...");
1103 if (m_poolCollectionConverter) {
1104 m_poolCollectionConverter->disconnectDb().ignore();
1105 m_poolCollectionConverter.reset();
1106 }
1107 return StatusCode::SUCCESS;
1108}
1109
1110//__________________________________________________________________________
1111/* Listen to IncidentType::BeginProcessing and EndProcessing
1112 Maintain counters of how many events from a given file are being processed.
1113 Files are identified by SG::SourceID (string GUID).
1114 When there are no more events from a file, see if it can be closed.
1115*/
1116void EventSelectorAthenaPool::handle(const Incident& inc)
1117{
1118 SG::SourceID fid;
1119 if (inc.type() == IncidentType::BeginProcessing) {
1120 if ( Atlas::hasExtendedEventContext(inc.context()) ) {
1121 fid = Atlas::getExtendedEventContext(inc.context()).proxy()->sourceID();
1122 }
1123 *m_sourceID.get(inc.context()) = fid;
1124 }
1125 else {
1126 fid = *m_sourceID.get(inc.context());
1127 }
1128
1129 if( fid.empty() ) {
1130 ATH_MSG_WARNING("could not read event source ID from incident event context");
1131 return;
1132 }
1133 if( m_activeEventsPerSource.find( fid ) == m_activeEventsPerSource.end()) {
1134 ATH_MSG_DEBUG("Incident handler ignoring unknown input FID: " << fid );
1135 return;
1136 }
1137 ATH_MSG_DEBUG("** MN Incident handler " << inc.type() << " Event source ID=" << fid );
1138 if( inc.type() == IncidentType::BeginProcessing ) {
1139 // increment the events-per-file counter for FID
1140 m_activeEventsPerSource[fid]++;
1141 } else if( inc.type() == IncidentType::EndProcessing ) {
1142 m_activeEventsPerSource[fid]--;
1143 disconnectIfFinished( fid );
1144 *m_sourceID.get(inc.context()) = "";
1145 }
1146 if( msgLvl(MSG::DEBUG) ) {
1147 for( auto& source: m_activeEventsPerSource )
1148 msg(MSG::DEBUG) << "SourceID: " << source.first << " active events: " << source.second << endmsg;
1149 }
1150}
1151
1152//__________________________________________________________________________
1153/* Disconnect APR Database identifieed by a SG::SourceID when it is no longer in use:
1154 m_guid is not pointing to it and there are no events from it being processed
1155 (if the EventLoopMgr was not firing Begin/End incidents, this will just close the DB)
1156*/
1158{
1159 if( m_eventStreamingTool.empty() && m_activeEventsPerSource.find(fid) != m_activeEventsPerSource.end()
1160 && m_activeEventsPerSource[fid] <= 0 && m_guid != fid ) {
1161 // Explicitly disconnect file corresponding to old FID to release memory
1162 if( !m_keepInputFilesOpen.value() ) {
1163 // Assume that the end of collection file indicates the end of payload file.
1164 if (m_processMetadata.value()) {
1165 FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + fid, fid);
1166 m_incidentSvc->fireIncident(endInputFileIncident);
1167 }
1168 ATH_MSG_INFO("Disconnecting input sourceID: " << fid );
1169 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb("FID:" + fid, IPoolSvc::kInputStream).ignore();
1170 m_activeEventsPerSource.erase( fid );
1171 return true;
1172 }
1173 }
1174 return false;
1175}
#define endmsg
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the DataHeader and DataHeaderElement classes.
This file contains the class definition for the EventContextAthenaPool class.
This file contains the class definition for the EventSelectorAthenaPool class.
This file contains the class definition for the IPoolSvc interface class.
static Double_t sc
static Double_t rc
This file contains the class definition for the PoolCollectionConverter class.
Handle class for reading from StoreGate.
Handle class for recording to StoreGate.
int imax(int i, int j)
This file contains the class definition for the TokenAddress class.
This file contains the class definition for the Token class (migrated from POOL).
#define ATLAS_THREAD_SAFE
An AttributeList represents a logical row of attributes in a metadata table.
This class provides the context to access an event from POOL persistent store.
EventSelectorAthenaPool(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Gaudi::CheckedProperty< uint64_t > m_firstEventNo
virtual StatusCode start() override
std::unique_ptr< PoolCollectionConverter > getCollectionCnv(bool throwIncidents=false) const
Return pointer to new PoolCollectionConverter.
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
virtual StatusCode makeClient(int num) override
Make this a client.
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
virtual StatusCode initialize() override
Required of all Gaudi Services.
Gaudi::Property< int > m_skipEvents
SkipEvents, numbers of events to skip: default = 0.
Gaudi::Property< bool > m_processMetadata
ProcessMetadata, switch on firing of FileIncidents which will trigger processing of metadata: default...
virtual StatusCode share(int evtnum) override
Request to share a given event number.
virtual int curEvent(const Context &ctxt) const override
Return the current event number.
virtual StatusCode createContext(IEvtSelector::Context *&ctxt) const override
create context
virtual StatusCode io_finalize() override
Callback method to finalize the internal state of the component for I/O purposes (e....
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp
Gaudi::CheckedProperty< uint32_t > m_initTimeStamp
void inputCollectionsHandler(Gaudi::Details::PropertyBase &)
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
virtual StatusCode resetCriteria(const std::string &criteria, IEvtSelector::Context &ctxt) const override
Set a selection criteria.
StatusCode reinit() const
Reinitialize the service when a fork() occurred/was-issued.
StoreGateSvc * eventStore() const
Return pointer to active event SG.
void fireEndFileIncidents(bool isLastFile) const
Fires the EndInputFile incident (if there is an open file) at end of selector.
virtual StatusCode releaseContext(IEvtSelector::Context *&ctxt) const override
virtual StatusCode stop() override
ServiceHandle< IIncidentSvc > m_incidentSvc
virtual StatusCode next(IEvtSelector::Context &ctxt) const override
virtual ~EventSelectorAthenaPool()
Destructor.
ToolHandle< IAthenaSelectorTool > m_counterTool
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
std::string m_attrListKey
AttributeList SG key.
virtual StatusCode seek(Context &ctxt, int evtnum) const override
Seek to a given event number.
SG::SlotSpecificObj< SG::SourceID > m_sourceID
ToolHandle< IAthenaIPCTool > m_eventStreamingTool
virtual StatusCode last(IEvtSelector::Context &ctxt) const override
virtual StatusCode makeServer(int num) override
Make this a server.
Gaudi::CheckedProperty< uint64_t > m_eventsPerRun
Gaudi::CheckedProperty< uint32_t > m_eventsPerLB
virtual StatusCode createAddress(const IEvtSelector::Context &ctxt, IOpaqueAddress *&iop) const override
virtual void handle(const Incident &incident) override
Incident service handle listening for BeginProcessing and EndProcessing.
virtual StatusCode previous(IEvtSelector::Context &ctxt) const override
ServiceHandle< IAthenaPoolCnvSvc > m_athenaPoolCnvSvc
virtual StatusCode rewind(IEvtSelector::Context &ctxt) const override
Gaudi::Property< bool > m_keepInputFilesOpen
KeepInputFilesOpen, boolean flag to keep files open after PoolCollection reaches end: default = false...
Gaudi::CheckedProperty< uint32_t > m_oldRunNo
Gaudi::Property< std::string > m_collectionType
CollectionType, type of the collection: default = "ImplicitCollection".
virtual StatusCode finalize() override
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
InputCollections, vector with names of the input collections.
EventContextAthenaPool * m_endIter
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first iteration automatically.
virtual StatusCode readEvent(int maxevt) override
Read the next maxevt events.
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context &ctxt) const override
Handle file transition at the next iteration.
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
Gaudi::CheckedProperty< uint32_t > m_runNo
The following are included for compatibility with McEventSelector and are not really used.
virtual int size(Context &ctxt) const override
Return the size of the collection.
int findEvent(int evtNum) const
Search for event with number evtNum.
Gaudi::Property< std::string > m_skipEventRangesProp
Skip Events - comma separated list of event to skip, ranges with '-': <start> - <end>
Gaudi::CheckedProperty< uint32_t > m_firstLBNo
This class provides a encapsulation of a GUID/UUID/CLSID/IID data structure (128 bit number).
Definition Guid.h:25
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
Abstract interface for sharing data.
Definition IDataShare.h:24
@ kInputStream
Definition IPoolSvc.h:40
This class provides an interface to POOL collections.
StatusCode isValid() const
Check whether has valid pool::ICollection*.
pool::ICollectionCursor & selectAll()
StatusCode initialize()
Required by all Gaudi Services.
virtual bool isValid() override final
Can the handle be successfully dereferenced?
The Athena Transient Store API.
static StoreGateSvc * currentStoreGate()
get current StoreGate
static const std::string & storeName(const StoreID::type &s)
Definition StoreID.cxx:77
This class provides a Generic Transient Address for POOL tokens.
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition Token.h:21
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:134
int technology() const
Access technology type.
Definition Token.h:77
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition Token.cxx:147
const Guid & dbID() const
Access database identifier.
Definition Token.h:64
An interface used to navigate the result of a query on a collection.
virtual std::size_t size()=0
Returns the size of the collection.
Constant forward iterator class for navigation of TokenList objects.
Definition TokenList.h:164
A container class to facilitate the use of Token objects.
Definition TokenList.h:24
iterator begin()
Returns a forward iterator pointing to first element in Token list.
Definition TokenList.h:218
iterator end()
Returns a forward iterator pointing to last element in Token list.
Definition TokenList.h:224
int r
Definition globals.cxx:22
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
bool hasExtendedEventContext(const EventContext &ctx)
Test whether a context object has an extended context installed.
StatusCode parse(std::tuple< Tup... > &tup, const Gaudi::Parsers::InputData &input)
static const DbType POOL_StorageType
Definition DbType.h:98
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.
MsgStream & msg
Definition testRead.cxx:32