ATLAS Offline Software
Loading...
Searching...
No Matches
EventSelectorAthenaPool.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
9
13
18#include "PoolSvc/IPoolSvc.h"
22
23// Framework
24#include "GaudiKernel/ClassID.h"
25#include "GaudiKernel/FileIncident.h"
26#include "GaudiKernel/IIncidentSvc.h"
27#include "GaudiKernel/IIoComponentMgr.h"
28#include "GaudiKernel/GaudiException.h"
29#include "GaudiKernel/GenericAddress.h"
30#include "GaudiKernel/StatusCode.h"
32
33// Pool
36#include "StorageSvc/DbType.h"
37
38#include <boost/tokenizer.hpp>
39#include <algorithm>
40#include <format>
41#include <vector>
42
43
44namespace {
46 StatusCode putEvent_ST(const IAthenaIPCTool& tool,
47 long eventNumber, const void* source,
48 size_t nbytes, unsigned int status) {
49 StatusCode sc ATLAS_THREAD_SAFE = tool.putEvent(eventNumber, source, nbytes, status);
50 return sc;
51 }
52}
53
54
55//________________________________________________________________________________
56EventSelectorAthenaPool::EventSelectorAthenaPool(const std::string& name, ISvcLocator* pSvcLocator) :
57 base_class(name, pSvcLocator)
58{
59
60 // TODO: validate if those are even used
61 m_runNo.verifier().setLower(0);
62 m_oldRunNo.verifier().setLower(0);
63 m_eventsPerRun.verifier().setLower(0);
64 m_firstEventNo.verifier().setLower(1);
65 m_firstLBNo.verifier().setLower(0);
66 m_eventsPerLB.verifier().setLower(0);
67 m_initTimeStamp.verifier().setLower(0);
68
70 m_inputCollectionsChanged = false;
71}
72//________________________________________________________________________________
73void EventSelectorAthenaPool::inputCollectionsHandler(Gaudi::Details::PropertyBase&) {
74 if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
75 m_inputCollectionsChanged = true;
76 }
77}
78//________________________________________________________________________________
81//________________________________________________________________________________
85//________________________________________________________________________________
87
88 m_autoRetrieveTools = false;
89 m_checkToolDeps = false;
90
91 if (m_isSecondary.value()) {
92 ATH_MSG_DEBUG("Initializing secondary event selector " << name());
93 } else {
94 ATH_MSG_DEBUG("Initializing " << name());
95 }
96
97 ATH_CHECK(::AthService::initialize());
98 // Check for input collection
99 if (m_inputCollectionsProp.value().empty()) {
100 ATH_MSG_FATAL("Use the property: EventSelector.InputCollections = "
101 << "[ \"<collectionName>\" ] (list of collections)");
102 return StatusCode::FAILURE;
103 }
104 boost::char_separator<char> sep_coma(","), sep_hyph("-");
105 boost::tokenizer ranges(m_skipEventRangesProp.value(), sep_coma);
106 for( const std::string& r: ranges ) {
107 boost::tokenizer fromto(r, sep_hyph);
108 auto from_iter = fromto.begin();
109 long from = std::stol(*from_iter);
110 long to = from;
111 if( ++from_iter != fromto.end() ) {
112 to = std::stol(*from_iter);
113 }
114 m_skipEventRanges.emplace_back(from, to);
115 }
116
117 for( auto v : m_skipEventSequenceProp.value() ) {
118 m_skipEventRanges.emplace_back(v, v);
119 }
120 std::sort(m_skipEventRanges.begin(), m_skipEventRanges.end());
121 if( msgLvl(MSG::DEBUG) ) {
122 std::string skip_ranges_str;
123 for( const auto& [first, second] : m_skipEventRanges ) {
124 if( !skip_ranges_str.empty() ) skip_ranges_str += ", ";
125 skip_ranges_str += std::to_string(first);
126 if( first != second) skip_ranges_str += std::format("-{}", second);
127 }
128 if( !skip_ranges_str.empty() )
129 ATH_MSG_DEBUG("Events to skip: " << skip_ranges_str);
130 }
131 // Get IncidentSvc
132 ATH_CHECK(m_incidentSvc.retrieve());
133 // Listen to the Event Processing incidents
134 if (m_eventStreamingTool.empty()) {
135 m_incidentSvc->addListener(this, IncidentType::BeginProcessing, 0);
136 m_incidentSvc->addListener(this, IncidentType::EndProcessing, 0);
137 }
138
139 // Get AthenaPoolCnvSvc
140 ATH_CHECK(m_athenaPoolCnvSvc.retrieve());
141 // Get CounterTool (if configured)
142 if (!m_counterTool.empty()) {
143 ATH_CHECK(m_counterTool.retrieve());
144 }
145 // Get HelperTools
146 ATH_CHECK(m_helperTools.retrieve());
147 // Get SharedMemoryTool (if configured)
148 if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.retrieve().isSuccess()) {
149 ATH_MSG_FATAL("Cannot get " << m_eventStreamingTool.typeAndName() << "");
150 return StatusCode::FAILURE;
151 } else if (m_makeStreamingToolClient.value() == -1) {
152 std::string dummyStr;
153 ATH_CHECK(m_eventStreamingTool->makeClient(m_makeStreamingToolClient.value(), dummyStr));
154 }
155
156 // Ensure the xAODCnvSvc is listed in the EventPersistencySvc
157 ServiceHandle<IProperty> epSvc("EventPersistencySvc", name());
158 std::vector<std::string> propVal;
159 ATH_CHECK(Gaudi::Parsers::parse(propVal , epSvc->getProperty("CnvServices").toString()));
160 bool foundCnvSvc = false;
161 for (const auto& property : propVal) {
162 if (property == m_athenaPoolCnvSvc.type()) { foundCnvSvc = true; }
163 }
164 if (!foundCnvSvc) {
165 propVal.push_back(m_athenaPoolCnvSvc.type());
166 if (!epSvc->setProperty("CnvServices", Gaudi::Utils::toString(propVal)).isSuccess()) {
167 ATH_MSG_FATAL("Cannot set EventPersistencySvc Property for CnvServices");
168 return StatusCode::FAILURE;
169 }
170 }
171
172 // Register this service for 'I/O' events
173 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
174 ATH_CHECK(iomgr.retrieve());
175 ATH_CHECK(iomgr->io_register(this));
176 // Register input file's names with the I/O manager
177 const std::vector<std::string>& incol = m_inputCollectionsProp.value();
178 bool allGood = true;
179 std::string fileName;
180 std::string fileType;
181 for (const auto& inputCollection : incol) {
182 if (inputCollection.starts_with("LFN:") || inputCollection.starts_with("FID:")) {
183 m_athenaPoolCnvSvc->getPoolSvc()->lookupBestPfn(inputCollection, fileName, fileType);
184 } else {
185 fileName = inputCollection;
186 }
187 if (fileName.starts_with("PFN:")) {
188 fileName = fileName.substr(4);
189 }
190 if (!iomgr->io_register(this, IIoComponentMgr::IoMode::READ, inputCollection, fileName).isSuccess()) {
191 ATH_MSG_FATAL("could not register [" << inputCollection << "] for output !");
192 allGood = false;
193 } else {
194 ATH_MSG_VERBOSE("io_register[" << this->name() << "](" << inputCollection << ") [ok]");
195 }
196 }
197 if (!allGood) {
198 return StatusCode::FAILURE;
199 }
200
201 // Connect to PersistencySvc
202 if (!m_athenaPoolCnvSvc->getPoolSvc()->connect(pool::ITransaction::READ, IPoolSvc::kInputStream).isSuccess()) {
203 ATH_MSG_FATAL("Cannot connect to POOL PersistencySvc.");
204 return StatusCode::FAILURE;
205 }
206 // Jump to reinit() to execute common init/reinit actions
207 m_guid = Guid::null();
208 return reinit();
209}
210//________________________________________________________________________________
212 ATH_MSG_DEBUG("reinitialization...");
213
214 // reset markers
215 m_numEvt.resize(m_inputCollectionsProp.value().size(), -1);
216 m_firstEvt.resize(m_inputCollectionsProp.value().size(), -1);
217
218 // Initialize InputCollectionsIterator
219 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
220 m_curCollection = 0;
221 if (!m_firstEvt.empty()) {
222 m_firstEvt[0] = 0;
223 }
224 m_inputCollectionsChanged = false;
225 m_evtCount = 0;
226 m_headerIterator = 0;
227 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
228 ATH_MSG_INFO("Done reinitialization for shared reader client");
229 return StatusCode::SUCCESS;
230 }
231 bool retError = false;
232 for (auto& tool : m_helperTools) {
233 if (!tool->postInitialize().isSuccess()) {
234 ATH_MSG_FATAL("Failed to postInitialize() " << tool->name());
235 retError = true;
236 }
237 }
238 if (retError) {
239 ATH_MSG_FATAL("Failed to postInitialize() helperTools");
240 return StatusCode::FAILURE;
241 }
242
243 // Create an m_poolCollectionConverter to read the objects in
244 m_poolCollectionConverter = getCollectionCnv();
245 if (!m_poolCollectionConverter) {
246 ATH_MSG_INFO("No Events found in any Input Collections");
247 if (m_processMetadata.value()) {
248 m_inputCollectionsIterator = m_inputCollectionsProp.value().end();
249 if (!m_inputCollectionsProp.value().empty()) --m_inputCollectionsIterator;
250 //NOTE (wb may 2016): this will make the FirstInputFile incident correspond to last file in the collection ... if want it to be first file then move iterator to begin and then move above two lines below this incident firing
251 if (m_collectionType.value() == "ImplicitCollection" && !m_firedIncident && !m_inputCollectionsProp.value().empty()) {
252 FileIncident firstInputFileIncident(name(), "FirstInputFile", *m_inputCollectionsIterator);
253 m_incidentSvc->fireIncident(firstInputFileIncident);
254 m_firedIncident = true;
255 }
256 }
257 return StatusCode::SUCCESS;
258 }
259 // Get DataHeader iterator
260 try {
261 m_headerIterator = &m_poolCollectionConverter->selectAll();
262 } catch (std::exception &e) {
263 ATH_MSG_FATAL("Cannot open implicit collection - check data/software version.");
264 ATH_MSG_ERROR(e.what());
265 return StatusCode::FAILURE;
266 }
267 while (m_headerIterator == nullptr || m_headerIterator->next() == 0) { // no selected events
268 if (m_poolCollectionConverter) {
269 m_poolCollectionConverter->disconnectDb().ignore();
270 m_poolCollectionConverter.reset();
271 }
272 ++m_inputCollectionsIterator;
273 m_poolCollectionConverter = getCollectionCnv();
274 if (m_poolCollectionConverter) {
275 m_headerIterator = &m_poolCollectionConverter->selectAll();
276 } else {
277 break;
278 }
279 }
280 if (!m_poolCollectionConverter || m_headerIterator == nullptr) { // no event selected in any collection
281 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
282 m_curCollection = 0;
283 m_poolCollectionConverter = getCollectionCnv();
284 if (!m_poolCollectionConverter) {
285 return StatusCode::SUCCESS;
286 }
287 m_headerIterator = &m_poolCollectionConverter->selectAll();
288 while (m_headerIterator == nullptr || m_headerIterator->next() == 0) { // empty collection
289 if (m_poolCollectionConverter) {
290 m_poolCollectionConverter->disconnectDb().ignore();
291 m_poolCollectionConverter.reset();
292 }
293 ++m_inputCollectionsIterator;
294 m_poolCollectionConverter = getCollectionCnv();
295 if (m_poolCollectionConverter) {
296 m_headerIterator = &m_poolCollectionConverter->selectAll();
297 } else {
298 break;
299 }
300 }
301 }
302 if (!m_poolCollectionConverter || m_headerIterator == nullptr) {
303 return StatusCode::SUCCESS;
304 }
305 const Token& headRef = m_headerIterator->eventRef();
306 const std::string fid = headRef.dbID().toString();
307 const int tech = headRef.technology();
308 ATH_MSG_VERBOSE("reinit(): First DataHeder Token=" << headRef.toString() );
309
310 // Check if File is BS, for which Incident is thrown by SingleEventInputSvc
311 if (tech != 0x00001000 && m_processMetadata.value() && !m_firedIncident) {
312 FileIncident firstInputFileIncident(name(), "FirstInputFile", "FID:" + fid, fid);
313 m_incidentSvc->fireIncident(firstInputFileIncident);
314 m_firedIncident = true;
315 }
316 return StatusCode::SUCCESS;
317}
318//________________________________________________________________________________
320 if (m_poolCollectionConverter) {
321 // Reset iterators and apply new query
322 m_poolCollectionConverter->disconnectDb().ignore();
323 m_poolCollectionConverter.reset();
324 }
325 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
326 m_curCollection = 0;
327 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
328 return StatusCode::SUCCESS;
329 }
330 m_poolCollectionConverter = getCollectionCnv(true);
331 if (!m_poolCollectionConverter) {
332 ATH_MSG_INFO("No Events found in any Input Collections");
333 m_inputCollectionsIterator = m_inputCollectionsProp.value().end();
334 if (!m_inputCollectionsProp.value().empty()) {
335 --m_inputCollectionsIterator; //leave iterator in state of last input file
336 }
337 } else {
338 m_headerIterator = &m_poolCollectionConverter->selectAll();
339 }
340 m_evtCount = 0;
341 delete m_endIter;
342 m_endIter = nullptr;
343 m_endIter = new EventContextAthenaPool(nullptr);
344 return StatusCode::SUCCESS;
345}
346//________________________________________________________________________________
348 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
349 return StatusCode::SUCCESS;
350 }
351 IEvtSelector::Context* ctxt(nullptr);
352 if (!releaseContext(ctxt).isSuccess()) {
353 ATH_MSG_WARNING("Cannot release context");
354 }
355 return StatusCode::SUCCESS;
356}
357
358//________________________________________________________________________________
360 if (m_processMetadata.value()) {
361 if (m_evtCount >= 0) {
362 // Assume that the end of collection file indicates the end of payload file.
363 if (m_guid != Guid::null()) {
364 // Fire EndInputFile incident
365 FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + m_guid.toString(), m_guid.toString());
366 m_incidentSvc->fireIncident(endInputFileIncident);
367 }
368 }
369 if (isLastFile && m_firedIncident) {
370 m_firedIncident = false;
371 }
372 }
373}
374
375//________________________________________________________________________________
377 if (m_eventStreamingTool.empty() || !m_eventStreamingTool->isClient()) {
378 if (!m_counterTool.empty() && !m_counterTool->preFinalize().isSuccess()) {
379 ATH_MSG_WARNING("Failed to preFinalize() CounterTool");
380 }
381 for (auto& tool : m_helperTools) {
382 if (!tool->preFinalize().isSuccess()) {
383 ATH_MSG_WARNING("Failed to preFinalize() " << tool->name());
384 }
385 }
386 }
387 delete m_endIter; m_endIter = nullptr;
388 m_headerIterator = nullptr;
389 if (m_poolCollectionConverter) {
390 m_poolCollectionConverter.reset();
391 }
392 // Finalize the Service base class.
393 return ::AthService::finalize();
394}
395
396//________________________________________________________________________________
397StatusCode EventSelectorAthenaPool::createContext(IEvtSelector::Context*& ctxt) const {
398 ctxt = new EventContextAthenaPool(this);
399 return StatusCode::SUCCESS;
400}
401//________________________________________________________________________________
402StatusCode EventSelectorAthenaPool::next(IEvtSelector::Context& ctxt) const {
403 std::lock_guard<CallMutex> lockGuard(m_callLock);
404 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
405 if (m_makeStreamingToolClient.value() == -1) {
406 StatusCode sc = m_eventStreamingTool->lockEvent(m_evtCount);
407 while (sc.isRecoverable()) {
408 usleep(1000);
409 sc = m_eventStreamingTool->lockEvent(m_evtCount);
410 }
411 }
412 // Increase event count
413 ++m_evtCount;
414 void* tokenStr = nullptr;
415 unsigned int status = 0;
416 StatusCode sc = m_eventStreamingTool->getLockedEvent(&tokenStr, status);
417 if (sc.isRecoverable()) {
418 delete [] (char*)tokenStr; tokenStr = nullptr;
419 // Return end iterator
420 ctxt = *m_endIter;
421 // This is not a real failure but a Gaudi way of handling "end of job"
422 return StatusCode::FAILURE;
423 }
424 if (sc.isFailure()) {
425 ATH_MSG_FATAL("Cannot get NextEvent from AthenaSharedMemoryTool");
426 delete [] (char*)tokenStr; tokenStr = nullptr;
427 return StatusCode::FAILURE;
428 }
429 if (!eventStore()->clearStore().isSuccess()) {
430 ATH_MSG_WARNING("Cannot clear Store");
431 }
432 std::unique_ptr<AthenaAttributeList> athAttrList(new AthenaAttributeList());
433 athAttrList->extend("eventRef", "string");
434 (*athAttrList)["eventRef"].data<std::string>() = std::string((char*)tokenStr);
436 if (!wh.record(std::move(athAttrList)).isSuccess()) {
437 delete [] (char*)tokenStr; tokenStr = nullptr;
438 ATH_MSG_ERROR("Cannot record AttributeList to StoreGate " << StoreID::storeName(eventStore()->storeID()));
439 return StatusCode::FAILURE;
440 }
441 Token token;
442 token.fromString(std::string((char*)tokenStr));
443 delete [] (char*)tokenStr; tokenStr = nullptr;
444 Guid guid = token.dbID();
445 if (guid != m_guid && m_processMetadata.value()) {
446 if (m_evtCount >= 0 && m_guid != Guid::null()) {
447 // Fire EndInputFile incident
448 FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + m_guid.toString(), m_guid.toString());
449 m_incidentSvc->fireIncident(endInputFileIncident);
450 }
451 m_guid = guid;
452 FileIncident beginInputFileIncident(name(), "BeginInputFile", "FID:" + m_guid.toString(), m_guid.toString());
453 m_incidentSvc->fireIncident(beginInputFileIncident);
454 }
455 return StatusCode::SUCCESS;
456 }
457 for (const auto& tool : m_helperTools) {
458 if (!tool->preNext().isSuccess()) {
459 ATH_MSG_WARNING("Failed to preNext() " << tool->name());
460 }
461 }
462 for (;;) {
463 // Handle possible file transition
464 StatusCode sc = nextHandleFileTransition(ctxt);
465 if (sc.isRecoverable()) {
466 continue; // handles empty files
467 }
468 if (sc.isFailure()) {
469 return StatusCode::FAILURE;
470 }
471 // Increase event count
472 ++m_evtCount;
473 if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
474 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
475 }
477 && (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
478 {
479 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) {
480 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
481 if (ds == nullptr) {
482 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
483 return StatusCode::FAILURE;
484 }
485 std::string token = m_headerIterator->eventRef().toString();
486 StatusCode sc;
487 while ( (sc = putEvent_ST(*m_eventStreamingTool,
488 m_evtCount - 1, token.c_str(),
489 token.length() + 1, 0)).isRecoverable() ) {
490 while (ds->readData().isSuccess()) {
491 ATH_MSG_VERBOSE("Called last readData, while putting next event in next()");
492 }
493 // Nothing to do right now, trigger alternative (e.g. caching) here? Currently just fast loop.
494 }
495 if (!sc.isSuccess()) {
496 ATH_MSG_ERROR("Cannot put Event " << m_evtCount - 1 << " to AthenaSharedMemoryTool");
497 return StatusCode::FAILURE;
498 }
499 } else {
500 if (!m_isSecondary.value()) {
501 if (!eventStore()->clearStore().isSuccess()) {
502 ATH_MSG_WARNING("Cannot clear Store");
503 }
504 if (!recordAttributeList().isSuccess()) {
505 ATH_MSG_ERROR("Failed to record AttributeList.");
506 return StatusCode::FAILURE;
507 }
508 }
509 }
510 StatusCode status = StatusCode::SUCCESS;
511 for (const auto& tool : m_helperTools) {
512 StatusCode toolStatus = tool->postNext();
513 if (toolStatus.isRecoverable()) {
514 ATH_MSG_INFO("Request skipping event from: " << tool->name());
515 if (status.isSuccess()) {
516 status = StatusCode::RECOVERABLE;
517 }
518 } else if (toolStatus.isFailure()) {
519 ATH_MSG_WARNING("Failed to postNext() " << tool->name());
520 status = StatusCode::FAILURE;
521 }
522 }
523 if (status.isRecoverable()) {
524 ATH_MSG_INFO("skipping event " << m_evtCount);
525 } else if (status.isFailure()) {
526 ATH_MSG_WARNING("Failed to postNext() HelperTool.");
527 } else {
528 if (!m_counterTool.empty() && !m_counterTool->postNext().isSuccess()) {
529 ATH_MSG_WARNING("Failed to postNext() CounterTool.");
530 }
531 break;
532 }
533 } else {
534 while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
535 m_skipEventRanges.erase(m_skipEventRanges.begin());
536 }
537 ATH_MSG_INFO("skipping event " << m_evtCount);
538 }
539 }
540 return StatusCode::SUCCESS;
541}
542//________________________________________________________________________________
543StatusCode EventSelectorAthenaPool::next(IEvtSelector::Context& ctxt, int jump) const {
544 if (jump > 0) {
545 for (int i = 0; i < jump; i++) {
546 ATH_CHECK(next(ctxt));
547 }
548 return StatusCode::SUCCESS;
549 }
550 return StatusCode::FAILURE;
551}
552//________________________________________________________________________________
553StatusCode EventSelectorAthenaPool::nextHandleFileTransition(IEvtSelector::Context& ctxt) const
554{
555 if( m_inputCollectionsChanged ) {
556 StatusCode rc = reinit();
557 if( rc != StatusCode::SUCCESS ) return rc;
558 }
559 else { // advance to the next (not needed after reinit)
560 // Check if we're at the end of file
561 if (m_headerIterator == nullptr || m_headerIterator->next() == 0) {
562 m_headerIterator = nullptr;
563 // Close previous collection.
564 m_poolCollectionConverter.reset();
565
566 // zero the current DB ID (m_guid) before disconnect() to indicate it is no longer in use
567 const SG::SourceID old_guid = m_guid.toString();
568 m_guid = Guid::null();
569 disconnectIfFinished( old_guid );
570
571 // check if somebody updated Inputs in the EOF incident (like VP1 does)
572 if( m_inputCollectionsChanged ) {
573 StatusCode rc = reinit();
574 if( rc != StatusCode::SUCCESS ) return rc;
575 } else {
576 // Open next file from inputCollections list.
577 ++m_inputCollectionsIterator;
578 // Create PoolCollectionConverter for input file
579 m_poolCollectionConverter = getCollectionCnv(true);
580 if (!m_poolCollectionConverter) {
581 // Return end iterator
582 ctxt = *m_endIter;
583 // This is not a real failure but a Gaudi way of handling "end of job"
584 return StatusCode::FAILURE;
585 }
586 // Get DataHeader iterator
587 m_headerIterator = &m_poolCollectionConverter->selectAll();
588
589 // Return RECOVERABLE to mark we should still continue
590 return StatusCode::RECOVERABLE;
591 }
592 }
593 }
594 const Token& headRef = m_headerIterator->eventRef();
595 const Guid guid = headRef.dbID();
596 const int tech = headRef.technology();
597 ATH_MSG_VERBOSE("next(): DataHeder Token=" << headRef.toString() );
598
599 if (guid != m_guid) {
600 // we are starting reading from a new DB. Check if the old one needs to be retired
601 if (m_guid != Guid::null()) {
602 // zero the current DB ID (m_guid) before trying disconnect() to indicate it is no longer in use
603 const SG::SourceID old_guid = m_guid.toString();
604 m_guid = Guid::null();
605 disconnectIfFinished( old_guid );
606 }
607 m_guid = guid;
608 m_activeEventsPerSource[guid.toString()] = 0;
609 // Fire BeginInputFile incident if current InputCollection is a payload file;
610 // otherwise, ascertain whether the pointed-to file is reachable before firing any incidents and/or proceeding
611 if (m_collectionType.value() == "ImplicitCollection") {
612 // For now, we can only deal with input metadata from POOL files, but we know we have a POOL file here
613 if (!m_athenaPoolCnvSvc->setInputAttributes(*m_inputCollectionsIterator).isSuccess()) {
614 ATH_MSG_ERROR("Failed to set input attributes.");
615 return StatusCode::FAILURE;
616 }
617 if (m_processMetadata.value()) {
618 FileIncident beginInputFileIncident(name(), "BeginInputFile", *m_inputCollectionsIterator, m_guid.toString());
619 m_incidentSvc->fireIncident(beginInputFileIncident);
620 }
621 } else {
622 // Check if File is BS
623 if (tech != 0x00001000 && m_processMetadata.value()) {
624 FileIncident beginInputFileIncident(name(), "BeginInputFile", "FID:" + m_guid.toString(), m_guid.toString());
625 m_incidentSvc->fireIncident(beginInputFileIncident);
626 }
627 }
628 } // end if (guid != m_guid)
629 return StatusCode::SUCCESS;
630}
631//________________________________________________________________________________
632StatusCode EventSelectorAthenaPool::nextWithSkip(IEvtSelector::Context& ctxt) const {
633 ATH_MSG_DEBUG("EventSelectorAthenaPool::nextWithSkip");
634
635 for (;;) {
636 // Check if we're at the end of file
637 StatusCode sc = nextHandleFileTransition(ctxt);
638 if (sc.isRecoverable()) {
639 continue; // handles empty files
640 }
641 if (sc.isFailure()) {
642 return StatusCode::FAILURE;
643 }
644
645 // Increase event count
646 ++m_evtCount;
647
648 if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
649 ATH_MSG_WARNING("Failed to preNext() CounterTool.");
650 }
652 && (m_skipEventRanges.empty() || m_evtCount < m_skipEventRanges.front().first))
653 {
654 return StatusCode::SUCCESS;
655 } else {
656 while( !m_skipEventRanges.empty() && m_evtCount >= m_skipEventRanges.front().second ) {
657 m_skipEventRanges.erase(m_skipEventRanges.begin());
658 }
659 if (m_isSecondary.value()) {
660 ATH_MSG_INFO("skipping secondary event " << m_evtCount);
661 } else {
662 ATH_MSG_INFO("skipping event " << m_evtCount);
663 }
664 }
665 }
666
667 return StatusCode::SUCCESS;
668}
669//________________________________________________________________________________
670StatusCode EventSelectorAthenaPool::previous(IEvtSelector::Context& /*ctxt*/) const {
671 ATH_MSG_ERROR("previous() not implemented");
672 return StatusCode::FAILURE;
673}
674//________________________________________________________________________________
675StatusCode EventSelectorAthenaPool::previous(IEvtSelector::Context& ctxt, int jump) const {
676 if (jump > 0) {
677 for (int i = 0; i < jump; i++) {
678 ATH_CHECK(previous(ctxt));
679 }
680 return StatusCode::SUCCESS;
681 }
682 return StatusCode::FAILURE;
683}
684//________________________________________________________________________________
685StatusCode EventSelectorAthenaPool::last(IEvtSelector::Context& ctxt) const {
686 if (ctxt.identifier() == m_endIter->identifier()) {
687 ATH_MSG_DEBUG("last(): Last event in InputStream.");
688 return StatusCode::SUCCESS;
689 }
690 return StatusCode::FAILURE;
691}
692//________________________________________________________________________________
693StatusCode EventSelectorAthenaPool::rewind(IEvtSelector::Context& ctxt) const {
694 ATH_CHECK(reinit());
695 ctxt = EventContextAthenaPool(this);
696 return StatusCode::SUCCESS;
697}
698//________________________________________________________________________________
699StatusCode EventSelectorAthenaPool::createAddress(const IEvtSelector::Context& /*ctxt*/,
700 IOpaqueAddress*& iop) const {
701 std::string tokenStr;
703 if (attrList.isValid()) {
704 try {
705 tokenStr = (*attrList)["eventRef"].data<std::string>();
706 ATH_MSG_DEBUG("found AthenaAttribute, name = eventRef = " << tokenStr);
707 } catch (std::exception &e) {
708 ATH_MSG_ERROR(e.what());
709 return StatusCode::FAILURE;
710 }
711 } else {
712 ATH_MSG_WARNING("Cannot find AthenaAttribute, key = " << m_attrListKey);
713 tokenStr = m_headerIterator->eventRef().toString();
714 }
715 auto token = std::make_unique<Token>();
716 token->fromString(tokenStr);
717 iop = new TokenAddress(pool::POOL_StorageType.type(), ClassID_traits<DataHeader>::ID(), "", "EventSelector", IPoolSvc::kInputStream, std::move(token));
718 return StatusCode::SUCCESS;
719}
720//________________________________________________________________________________
721StatusCode EventSelectorAthenaPool::releaseContext(IEvtSelector::Context*& /*ctxt*/) const {
722 return StatusCode::SUCCESS;
723}
724//________________________________________________________________________________
725StatusCode EventSelectorAthenaPool::resetCriteria(const std::string& /*criteria*/,
726 IEvtSelector::Context& /*ctxt*/) const {
727 return StatusCode::SUCCESS;
728}
729//__________________________________________________________________________
730StatusCode EventSelectorAthenaPool::seek(Context& /*ctxt*/, int evtNum) const {
731
732 if( m_inputCollectionsChanged ) {
733 StatusCode rc = reinit();
734 if( rc != StatusCode::SUCCESS ) return rc;
735 }
736
737 long newColl = findEvent(evtNum);
738 if (newColl == -1 && evtNum >= m_firstEvt[m_curCollection] && evtNum < m_evtCount - 1) {
739 newColl = m_curCollection;
740 }
741 if (newColl == -1) {
742 m_headerIterator = nullptr;
743 ATH_MSG_INFO("seek: Reached end of Input.");
745 return StatusCode::RECOVERABLE;
746 }
747 if (newColl != m_curCollection) {
748 if (!m_keepInputFilesOpen.value() && m_poolCollectionConverter) {
749 m_poolCollectionConverter->disconnectDb().ignore();
750 }
751 m_poolCollectionConverter.reset();
752 m_curCollection = newColl;
753 try {
754 ATH_MSG_DEBUG("Seek to item: \""
756 << "\" from the collection list.");
757 // Reset input collection iterator to the right place
758 m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
759 m_inputCollectionsIterator += m_curCollection;
760 m_poolCollectionConverter = std::make_unique<PoolCollectionConverter>(m_collectionType.value(),
763 m_athenaPoolCnvSvc->getPoolSvc());
764 if (!m_poolCollectionConverter->initialize().isSuccess()) {
765 m_headerIterator = nullptr;
766 ATH_MSG_ERROR("seek: Unable to initialize PoolCollectionConverter.");
767 return StatusCode::FAILURE;
768 }
769 // Create DataHeader iterators
770 m_headerIterator = &m_poolCollectionConverter->selectAll();
771 EventContextAthenaPool* beginIter = new EventContextAthenaPool(this);
772 m_evtCount = m_firstEvt[m_curCollection];
773 next(*beginIter).ignore();
774 ATH_MSG_DEBUG("Token " << m_headerIterator->eventRef().toString());
775 } catch (std::exception &e) {
776 m_headerIterator = nullptr;
777 ATH_MSG_ERROR(e.what());
778 return StatusCode::FAILURE;
779 }
780 }
781
782 if (m_headerIterator->seek(evtNum - m_firstEvt[m_curCollection]) == 0) {
783 m_headerIterator = nullptr;
784 ATH_MSG_ERROR("Did not find event, evtNum = " << evtNum);
785 return StatusCode::FAILURE;
786 } else {
787 m_evtCount = evtNum + 1;
788 }
789 return StatusCode::SUCCESS;
790}
791//__________________________________________________________________________
792int EventSelectorAthenaPool::curEvent (const Context& /*ctxt*/) const {
793 return(m_evtCount);
794}
795//__________________________________________________________________________
796// Search for event number evtNum.
797// Return the index of the collection containing it, or -1 if not found.
798// Note: passing -1 for evtNum will always yield failure,
799// but this can be used to force filling in the entire m_numEvt array.
801 for (std::size_t i = 0, imax = m_numEvt.size(); i < imax; i++) {
802 if (m_numEvt[i] == -1) {
804 m_inputCollectionsProp.value()[i],
806 m_athenaPoolCnvSvc->getPoolSvc());
807 if (!pcc.initialize().isSuccess()) {
808 break;
809 }
810 int collection_size = 0;
811 if (pcc.isValid()) {
813 collection_size = hi->size();
814 }
815 if (i > 0) {
816 m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
817 } else {
818 m_firstEvt[i] = 0;
819 }
820 m_numEvt[i] = collection_size;
821 }
822 if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
823 return(i);
824 }
825 }
826 return(-1);
827}
828
829//________________________________________________________________________________
831 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
832 if (ds == nullptr) {
833 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
834 return StatusCode::FAILURE;
835 }
836 if (num < 0) {
837 if (ds->makeServer(num - 1).isFailure()) {
838 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to output DataStreaming server");
839 }
840 return StatusCode::SUCCESS;
841 }
842 if (ds->makeServer(num + 1).isFailure()) {
843 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to input DataStreaming server");
844 return StatusCode::FAILURE;
845 }
846 if (m_eventStreamingTool.empty()) {
847 return StatusCode::SUCCESS;
848 }
849 m_processMetadata = false;
850 ATH_MSG_DEBUG("makeServer: " << m_eventStreamingTool << " = " << num);
851 return(m_eventStreamingTool->makeServer(1, ""));
852}
853
854//________________________________________________________________________________
856 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
857 if (ds == nullptr) {
858 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
859 return StatusCode::FAILURE;
860 }
861 if (ds->makeClient(num + 1).isFailure()) {
862 ATH_MSG_ERROR("Failed to switch AthenaPoolCnvSvc to DataStreaming client");
863 return StatusCode::FAILURE;
864 }
865 if (m_eventStreamingTool.empty()) {
866 return StatusCode::SUCCESS;
867 }
868 ATH_MSG_DEBUG("makeClient: " << m_eventStreamingTool << " = " << num);
869 std::string dummyStr;
870 return(m_eventStreamingTool->makeClient(0, dummyStr));
871}
872
873//________________________________________________________________________________
874StatusCode EventSelectorAthenaPool::share(int evtnum) {
875 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
876 if (ds == nullptr) {
877 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
878 return StatusCode::FAILURE;
879 }
880 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
881 StatusCode sc = m_eventStreamingTool->lockEvent(evtnum);
882 while (sc.isRecoverable()) {
883 usleep(1000);
884 sc = m_eventStreamingTool->lockEvent(evtnum);
885 }
886// Send stop client and wait for restart
887 if (sc.isFailure()) {
888 if (ds->makeClient(0).isFailure()) {
889 return StatusCode::FAILURE;
890 }
891 sc = m_eventStreamingTool->lockEvent(evtnum);
892 while (sc.isRecoverable() || sc.isFailure()) {
893 usleep(1000);
894 sc = m_eventStreamingTool->lockEvent(evtnum);
895 }
896//FIXME
897 if (ds->makeClient(1).isFailure()) {
898 return StatusCode::FAILURE;
899 }
900 }
901 return(sc);
902 }
903 return StatusCode::FAILURE;
904}
905
906//________________________________________________________________________________
908 IDataShare* ds = dynamic_cast<IDataShare*>(m_athenaPoolCnvSvc.get());
909 if (ds == nullptr) {
910 ATH_MSG_ERROR("Cannot cast AthenaPoolCnvSvc to DataShare");
911 return StatusCode::FAILURE;
912 }
913 if (m_eventStreamingTool.empty()) {
914 ATH_MSG_ERROR("No AthenaSharedMemoryTool configured for readEvent()");
915 return StatusCode::FAILURE;
916 }
917 ATH_MSG_VERBOSE("Called read Event " << maxevt);
918 IEvtSelector::Context* ctxt = new EventContextAthenaPool(this);
919 for (int i = 0; i < maxevt || maxevt == -1; ++i) {
920 if (!next(*ctxt).isSuccess()) {
921 if (m_evtCount == -1) {
922 ATH_MSG_VERBOSE("Called read Event and read last event from input: " << i);
923 break;
924 }
925 ATH_MSG_ERROR("Cannot read Event " << m_evtCount - 1 << " into AthenaSharedMemoryTool");
926 delete ctxt; ctxt = nullptr;
927 return StatusCode::FAILURE;
928 } else {
929 ATH_MSG_VERBOSE("Called next, read Event " << m_evtCount - 1);
930 }
931 }
932 delete ctxt; ctxt = nullptr;
933 // End of file, wait for last event to be taken
934 StatusCode sc;
935 while ( (sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
936 while (ds->readData().isSuccess()) {
937 ATH_MSG_VERBOSE("Called last readData, while marking last event in readEvent()");
938 }
939 usleep(1000);
940 }
941 if (!sc.isSuccess()) {
942 ATH_MSG_ERROR("Cannot put last Event marker to AthenaSharedMemoryTool");
943 return StatusCode::FAILURE;
944 } else {
945 sc = ds->readData();
946 while (sc.isSuccess() || sc.isRecoverable()) {
947 sc = ds->readData();
948 }
949 ATH_MSG_DEBUG("Failed last readData -> Clients are stopped, after marking last event in readEvent()");
950 }
951 return StatusCode::SUCCESS;
952}
953
954//__________________________________________________________________________
955int EventSelectorAthenaPool::size(Context& /*ctxt*/) const {
956 // Fetch sizes of all collections.
957 findEvent(-1);
958 return std::accumulate(m_numEvt.begin(), m_numEvt.end(), 0);
959}
960//__________________________________________________________________________
961std::unique_ptr<PoolCollectionConverter>
963 while (m_inputCollectionsIterator != m_inputCollectionsProp.value().end()) {
964 if (m_curCollection != 0) {
965 m_numEvt[m_curCollection] = m_evtCount - m_firstEvt[m_curCollection];
967 m_firstEvt[m_curCollection] = m_evtCount;
968 }
969 ATH_MSG_DEBUG("Try item: \"" << *m_inputCollectionsIterator << "\" from the collection list.");
970 auto pCollCnv = std::make_unique<PoolCollectionConverter>(m_collectionType.value(),
971 *m_inputCollectionsIterator,
973 m_athenaPoolCnvSvc->getPoolSvc());
974 StatusCode status = pCollCnv->initialize();
975 if (!status.isSuccess()) {
976 // Close previous collection.
977 pCollCnv.reset();
978 if (!status.isRecoverable()) {
979 ATH_MSG_ERROR("Unable to initialize PoolCollectionConverter.");
980 throw GaudiException("Unable to read: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
981 } else {
982 ATH_MSG_ERROR("Unable to open: " << *m_inputCollectionsIterator);
983 throw GaudiException("Unable to open: " + *m_inputCollectionsIterator, name(), StatusCode::FAILURE);
984 }
985 } else {
986 if (!pCollCnv->isValid().isSuccess()) {
987 pCollCnv.reset();
988 ATH_MSG_DEBUG("No events found in: " << *m_inputCollectionsIterator << " skipped!!!");
989 if (throwIncidents && m_processMetadata.value()) {
990 FileIncident beginInputFileIncident(name(), "BeginInputFile", *m_inputCollectionsIterator);
991 m_incidentSvc->fireIncident(beginInputFileIncident);
992 FileIncident endInputFileIncident(name(), "EndInputFile", "eventless " + *m_inputCollectionsIterator);
993 m_incidentSvc->fireIncident(endInputFileIncident);
994 }
995 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb(*m_inputCollectionsIterator).ignore();
996 ++m_inputCollectionsIterator;
997 } else {
998 return(pCollCnv);
999 }
1000 }
1001 }
1002 return(nullptr);
1003}
1004//__________________________________________________________________________
1006 // Get access to AttributeList
1007 ATH_MSG_DEBUG("Get AttributeList from the collection");
1008 // MN: accessing only attribute list, ignoring token list
1009 const coral::AttributeList& attrList = m_headerIterator->currentRow().attributeList();
1010 ATH_MSG_DEBUG("AttributeList size " << attrList.size());
1011 std::unique_ptr<AthenaAttributeList> athAttrList(new AthenaAttributeList(attrList));
1012 // Fill the new attribute list
1013 ATH_CHECK(fillAttributeList(athAttrList.get(), "", false));
1014 // Write the AttributeList
1016 ATH_CHECK(wh.record(std::move(athAttrList)));
1017 return StatusCode::SUCCESS;
1018}
1019//__________________________________________________________________________
1020StatusCode EventSelectorAthenaPool::fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const
1021{
1022 const auto& row = m_headerIterator->currentRow();
1023 attrList->extend( row.tokenName() + suffix, "string" );
1024 (*attrList)[ row.tokenName() + suffix ].data<std::string>() = row.token().toString();
1025 ATH_MSG_DEBUG("record AthenaAttribute, name = " << row.tokenName() + suffix << " = " << row.token().toString() << ".");
1026
1027 std::string eventRef = "eventRef";
1028 if (m_isSecondary.value()) {
1029 eventRef.append(suffix);
1030 }
1031 attrList->extend(eventRef, "string");
1032 (*attrList)[eventRef].data<std::string>() = m_headerIterator->eventRef().toString();
1033 ATH_MSG_DEBUG("record AthenaAttribute, name = " + eventRef + " = " << m_headerIterator->eventRef().toString() << ".");
1034
1035 if (copySource) {
1036 const coral::AttributeList& sourceAttrList = m_headerIterator->currentRow().attributeList();
1037 for (const auto &attr : sourceAttrList) {
1038 attrList->extend(attr.specification().name() + suffix, attr.specification().type());
1039 (*attrList)[attr.specification().name() + suffix] = attr;
1040 }
1041 }
1042
1043 return StatusCode::SUCCESS;
1044}
1045//__________________________________________________________________________
1047 ATH_MSG_INFO("I/O reinitialization...");
1048 if (m_poolCollectionConverter) {
1049 m_poolCollectionConverter->disconnectDb().ignore();
1050 m_poolCollectionConverter.reset();
1051 }
1052 m_headerIterator = nullptr;
1053 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
1054 if (!iomgr.retrieve().isSuccess()) {
1055 ATH_MSG_FATAL("Could not retrieve IoComponentMgr !");
1056 return StatusCode::FAILURE;
1057 }
1058 if (!iomgr->io_hasitem(this)) {
1059 ATH_MSG_FATAL("IoComponentMgr does not know about myself !");
1060 return StatusCode::FAILURE;
1061 }
1062 if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
1063 m_guid = Guid::null();
1064 return(this->reinit());
1065 }
1066 std::vector<std::string> inputCollections = m_inputCollectionsProp.value();
1067 std::set<std::size_t> updatedIndexes;
1068 for (std::size_t i = 0, imax = m_inputCollectionsProp.value().size(); i < imax; i++) {
1069 if (updatedIndexes.find(i) != updatedIndexes.end()) continue;
1070 std::string savedName = inputCollections[i];
1071 std::string &fname = inputCollections[i];
1072 if (!iomgr->io_contains(this, fname)) {
1073 ATH_MSG_ERROR("IoComponentMgr does not know about [" << fname << "] !");
1074 return StatusCode::FAILURE;
1075 }
1076 if (!iomgr->io_retrieve(this, fname).isSuccess()) {
1077 ATH_MSG_FATAL("Could not retrieve new value for [" << fname << "] !");
1078 return StatusCode::FAILURE;
1079 }
1080 if (savedName != fname) {
1081 ATH_MSG_DEBUG("Mapping value for [" << savedName << "] to [" << fname << "]");
1082 m_athenaPoolCnvSvc->getPoolSvc()->renamePfn(savedName, fname);
1083 }
1084 updatedIndexes.insert(i);
1085 for (std::size_t j = i + 1; j < imax; j++) {
1086 if (inputCollections[j] == savedName) {
1087 inputCollections[j] = fname;
1088 updatedIndexes.insert(j);
1089 }
1090 }
1091 }
1092 // all good... copy over.
1093 m_inputCollectionsProp = inputCollections;
1094 m_guid = Guid::null();
1095 return reinit();
1096}
1097//__________________________________________________________________________
1099 ATH_MSG_INFO("I/O finalization...");
1100 if (m_poolCollectionConverter) {
1101 m_poolCollectionConverter->disconnectDb().ignore();
1102 m_poolCollectionConverter.reset();
1103 }
1104 return StatusCode::SUCCESS;
1105}
1106
1107//__________________________________________________________________________
1108/* Listen to IncidentType::BeginProcessing and EndProcessing
1109 Maintain counters of how many events from a given file are being processed.
1110 Files are identified by SG::SourceID (string GUID).
1111 When there are no more events from a file, see if it can be closed.
1112*/
1113void EventSelectorAthenaPool::handle(const Incident& inc)
1114{
1115 SG::SourceID fid;
1116 if (inc.type() == IncidentType::BeginProcessing) {
1117 if ( Atlas::hasExtendedEventContext(inc.context()) ) {
1118 fid = Atlas::getExtendedEventContext(inc.context()).proxy()->sourceID();
1119 }
1120 *m_sourceID.get(inc.context()) = fid;
1121 }
1122 else {
1123 fid = *m_sourceID.get(inc.context());
1124 }
1125
1126 if( fid.empty() ) {
1127 ATH_MSG_WARNING("could not read event source ID from incident event context");
1128 return;
1129 }
1130 if( m_activeEventsPerSource.find( fid ) == m_activeEventsPerSource.end()) {
1131 ATH_MSG_DEBUG("Incident handler ignoring unknown input FID: " << fid );
1132 return;
1133 }
1134 ATH_MSG_DEBUG("** MN Incident handler " << inc.type() << " Event source ID=" << fid );
1135 if( inc.type() == IncidentType::BeginProcessing ) {
1136 // increment the events-per-file counter for FID
1137 m_activeEventsPerSource[fid]++;
1138 } else if( inc.type() == IncidentType::EndProcessing ) {
1139 m_activeEventsPerSource[fid]--;
1140 disconnectIfFinished( fid );
1141 *m_sourceID.get(inc.context()) = "";
1142 }
1143 if( msgLvl(MSG::DEBUG) ) {
1144 for( auto& source: m_activeEventsPerSource )
1145 msg(MSG::DEBUG) << "SourceID: " << source.first << " active events: " << source.second << endmsg;
1146 }
1147}
1148
1149//__________________________________________________________________________
1150/* Disconnect APR Database identifieed by a SG::SourceID when it is no longer in use:
1151 m_guid is not pointing to it and there are no events from it being processed
1152 (if the EventLoopMgr was not firing Begin/End incidents, this will just close the DB)
1153*/
1155{
1156 if( m_eventStreamingTool.empty() && m_activeEventsPerSource.find(fid) != m_activeEventsPerSource.end()
1157 && m_activeEventsPerSource[fid] <= 0 && m_guid != fid ) {
1158 // Explicitly disconnect file corresponding to old FID to release memory
1159 if( !m_keepInputFilesOpen.value() ) {
1160 // Assume that the end of collection file indicates the end of payload file.
1161 if (m_processMetadata.value()) {
1162 FileIncident endInputFileIncident(name(), "EndInputFile", "FID:" + fid, fid);
1163 m_incidentSvc->fireIncident(endInputFileIncident);
1164 }
1165 ATH_MSG_INFO("Disconnecting input sourceID: " << fid );
1166 m_athenaPoolCnvSvc->getPoolSvc()->disconnectDb("FID:" + fid, IPoolSvc::kInputStream).ignore();
1167 m_activeEventsPerSource.erase( fid );
1168 return true;
1169 }
1170 }
1171 return false;
1172}
#define endmsg
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the DataHeader and DataHeaderElement classes.
This file contains the class definition for the EventContextAthenaPool class.
This file contains the class definition for the EventSelectorAthenaPool class.
This file contains the class definition for the IPoolSvc interface class.
static Double_t sc
static Double_t rc
This file contains the class definition for the PoolCollectionConverter class.
Handle class for reading from StoreGate.
Handle class for recording to StoreGate.
int imax(int i, int j)
This file contains the class definition for the TokenAddress class.
This file contains the class definition for the Token class (migrated from POOL).
#define ATLAS_THREAD_SAFE
An AttributeList represents a logical row of attributes in a metadata table.
This class provides the context to access an event from POOL persistent store.
EventSelectorAthenaPool(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Gaudi::CheckedProperty< uint64_t > m_firstEventNo
virtual StatusCode start() override
std::unique_ptr< PoolCollectionConverter > getCollectionCnv(bool throwIncidents=false) const
Return pointer to new PoolCollectionConverter.
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
virtual StatusCode makeClient(int num) override
Make this a client.
virtual StatusCode nextWithSkip(IEvtSelector::Context &ctxt) const override
Go to next event and skip if necessary.
virtual StatusCode initialize() override
Required of all Gaudi Services.
Gaudi::Property< int > m_skipEvents
SkipEvents, numbers of events to skip: default = 0.
Gaudi::Property< bool > m_processMetadata
ProcessMetadata, switch on firing of FileIncidents which will trigger processing of metadata: default...
virtual StatusCode share(int evtnum) override
Request to share a given event number.
virtual int curEvent(const Context &ctxt) const override
Return the current event number.
virtual StatusCode createContext(IEvtSelector::Context *&ctxt) const override
create context
virtual StatusCode io_finalize() override
Callback method to finalize the internal state of the component for I/O purposes (e....
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp
Gaudi::CheckedProperty< uint32_t > m_initTimeStamp
void inputCollectionsHandler(Gaudi::Details::PropertyBase &)
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
virtual StatusCode resetCriteria(const std::string &criteria, IEvtSelector::Context &ctxt) const override
Set a selection criteria.
StatusCode reinit() const
Reinitialize the service when a fork() occurred/was-issued.
StoreGateSvc * eventStore() const
Return pointer to active event SG.
void fireEndFileIncidents(bool isLastFile) const
Fires the EndInputFile incident (if there is an open file) at end of selector.
virtual StatusCode releaseContext(IEvtSelector::Context *&ctxt) const override
virtual StatusCode stop() override
ServiceHandle< IIncidentSvc > m_incidentSvc
virtual StatusCode next(IEvtSelector::Context &ctxt) const override
virtual ~EventSelectorAthenaPool()
Destructor.
ToolHandle< IAthenaSelectorTool > m_counterTool
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
std::string m_attrListKey
AttributeList SG key.
virtual StatusCode seek(Context &ctxt, int evtnum) const override
Seek to a given event number.
SG::SlotSpecificObj< SG::SourceID > m_sourceID
ToolHandle< IAthenaIPCTool > m_eventStreamingTool
virtual StatusCode last(IEvtSelector::Context &ctxt) const override
virtual StatusCode makeServer(int num) override
Make this a server.
Gaudi::CheckedProperty< uint64_t > m_eventsPerRun
Gaudi::CheckedProperty< uint32_t > m_eventsPerLB
virtual StatusCode createAddress(const IEvtSelector::Context &ctxt, IOpaqueAddress *&iop) const override
virtual void handle(const Incident &incident) override
Incident service handle listening for BeginProcessing and EndProcessing.
virtual StatusCode previous(IEvtSelector::Context &ctxt) const override
ServiceHandle< IAthenaPoolCnvSvc > m_athenaPoolCnvSvc
virtual StatusCode rewind(IEvtSelector::Context &ctxt) const override
Gaudi::Property< bool > m_keepInputFilesOpen
KeepInputFilesOpen, boolean flag to keep files open after PoolCollection reaches end: default = false...
Gaudi::CheckedProperty< uint32_t > m_oldRunNo
Gaudi::Property< std::string > m_collectionType
CollectionType, type of the collection: default = "ImplicitCollection".
virtual StatusCode finalize() override
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
InputCollections, vector with names of the input collections.
EventContextAthenaPool * m_endIter
Gaudi::Property< int > m_makeStreamingToolClient
Make this instance a Streaming Client during first iteration automatically.
virtual StatusCode readEvent(int maxevt) override
Read the next maxevt events.
virtual StatusCode nextHandleFileTransition(IEvtSelector::Context &ctxt) const override
Handle file transition at the next iteration.
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
Gaudi::CheckedProperty< uint32_t > m_runNo
The following are included for compatibility with McEventSelector and are not really used.
virtual int size(Context &ctxt) const override
Return the size of the collection.
int findEvent(int evtNum) const
Search for event with number evtNum.
Gaudi::Property< std::string > m_skipEventRangesProp
Skip Events - comma separated list of event to skip, ranges with '-': <start> - <end>
Gaudi::CheckedProperty< uint32_t > m_firstLBNo
This class provides a encapsulation of a GUID/UUID/CLSID/IID data structure (128 bit number).
Definition Guid.h:25
static const Guid & null() noexcept
NULL-Guid: static class method.
Definition Guid.cxx:14
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
Abstract interface for sharing data.
Definition IDataShare.h:24
@ kInputStream
Definition IPoolSvc.h:40
This class provides an interface to POOL collections.
StatusCode isValid() const
Check whether has valid pool::ICollection*.
pool::ICollectionCursor & selectAll()
StatusCode initialize()
Required by all Gaudi Services.
virtual bool isValid() override final
Can the handle be successfully dereferenced?
The Athena Transient Store API.
static StoreGateSvc * currentStoreGate()
get current StoreGate
static const std::string & storeName(const StoreID::type &s)
Definition StoreID.cxx:77
This class provides a Generic Transient Address for POOL tokens.
This class provides a token that identifies in a unique way objects on the persistent storage.
Definition Token.h:21
virtual const std::string toString() const
Retrieve the string representation of the token.
Definition Token.cxx:134
int technology() const
Access technology type.
Definition Token.h:77
Token & fromString(const std::string_view from)
Build from the string representation of a token.
Definition Token.cxx:147
const Guid & dbID() const
Access database identifier.
Definition Token.h:64
An interface used to navigate the result of a query on a collection.
virtual std::size_t size()=0
Returns the size of the collection.
int r
Definition globals.cxx:22
const ExtendedEventContext & getExtendedEventContext(const EventContext &ctx)
Retrieve an extended context from a context object.
bool hasExtendedEventContext(const EventContext &ctx)
Test whether a context object has an extended context installed.
StatusCode parse(std::tuple< Tup... > &tup, const Gaudi::Parsers::InputData &input)
static const DbType POOL_StorageType
Definition DbType.h:98
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.
MsgStream & msg
Definition testRead.cxx:32