ATLAS Offline Software
EventSelectorByteStream.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 
7 #include <vector>
8 #include <algorithm>
9 
10 #include "EventContextByteStream.h"
15 
16 #include "GaudiKernel/ClassID.h"
17 #include "GaudiKernel/FileIncident.h"
18 #include "GaudiKernel/IIncidentSvc.h"
19 #include "GaudiKernel/IIoComponentMgr.h"
20 
23 
24 // EventInfoAttributeList includes
27 #include "eformat/StreamTag.h"
28 
29 
30 namespace {
32  StatusCode putEvent_ST(const IAthenaIPCTool& tool,
33  long eventNumber, const void* source,
34  size_t nbytes, unsigned int status) {
35  StatusCode sc ATLAS_THREAD_SAFE = tool.putEvent(eventNumber, source, nbytes, status);
36  return sc;
37  }
38 }
39 
40 
41 // Constructor.
43  ISvcLocator *svcloc)
44  : base_class(name, svcloc) {
45  declareProperty("HelperTools", m_helperTools);
46 
47  // RunNumber, OldRunNumber and OverrideRunNumberFromInput are used
48  // to override the run number coming in on the input stream
49  m_runNo.verifier().setLower(0);
50  // The following properties are only for compatibility with
51  // McEventSelector and are not really used anywhere
52  // TODO(berghaus): validate if those are even used
53  m_eventsPerRun.verifier().setLower(0);
54  m_firstEventNo.verifier().setLower(1);
55  m_firstLBNo.verifier().setLower(0);
56  m_eventsPerLB.verifier().setLower(0);
57  m_initTimeStamp.verifier().setLower(0);
58 
59  m_inputCollectionsProp.declareUpdateHandler(
61 }
62 
63 /******************************************************************************/
64 void EventSelectorByteStream::inputCollectionsHandler(Gaudi::Details::PropertyBase&) {
65  lock_t lock (m_mutex);
66  if (this->FSMState() != Gaudi::StateMachine::OFFLINE) {
67  this->reinit(lock).ignore();
68  }
69 }
70 
71 
72 /******************************************************************************/
74 }
75 
76 
77 /******************************************************************************/
81 }
82 
83 
84 //________________________________________________________________________________
86 
87  m_autoRetrieveTools = false;
88  m_checkToolDeps = false;
89 
90  if (m_isSecondary.value()) {
91  ATH_MSG_DEBUG("Initializing secondary event selector " << name());
92  } else {
93  ATH_MSG_DEBUG("Initializing " << name());
94  }
95 
96  if (!::AthService::initialize().isSuccess()) {
97  ATH_MSG_FATAL("Cannot initialize AthService base class.");
98  return(StatusCode::FAILURE);
99  }
100 
101  // Check for input setting
102  if (m_filebased && m_inputCollectionsProp.value().empty()) {
103  ATH_MSG_FATAL("Unable to retrieve valid input list");
104  return(StatusCode::FAILURE);
105  }
106  m_skipEventSequence = m_skipEventSequenceProp.value();
107  std::sort(m_skipEventSequence.begin(), m_skipEventSequence.end());
108 
109  // Check ByteStreamCnvSvc
110  IService* svc;
111  if (!serviceLocator()->getService(m_eventSourceName.value(), svc).isSuccess()) {
112  ATH_MSG_FATAL("Cannot get ByteStreamInputSvc");
113  return(StatusCode::FAILURE);
114  }
115  m_eventSource = dynamic_cast<ByteStreamInputSvc*>(svc);
116  if (m_eventSource == 0) {
117  ATH_MSG_FATAL("Cannot cast ByteStreamInputSvc");
118  return(StatusCode::FAILURE);
119  }
120 
121  // Get CounterTool (if configured)
122  if (!m_counterTool.empty()) {
123  if (!m_counterTool.retrieve().isSuccess()) {
124  ATH_MSG_FATAL("Cannot get CounterTool.");
125  return(StatusCode::FAILURE);
126  }
127  }
128  // Get HelperTools
129  if (!m_helperTools.empty()) {
130  if (!m_helperTools.retrieve().isSuccess()) {
131  ATH_MSG_FATAL("Cannot get " << m_helperTools);
132  return(StatusCode::FAILURE);
133  }
134  }
135  // Get SharedMemoryTool (if configured)
136  if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.retrieve().isSuccess()) {
137  ATH_MSG_FATAL("Cannot get AthenaSharedMemoryTool");
138  return(StatusCode::FAILURE);
139  }
140 
141  // Register this service for 'I/O' events
142  ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
143  if (!iomgr.retrieve().isSuccess()) {
144  ATH_MSG_FATAL("Cannot retrieve IoComponentMgr.");
145  return(StatusCode::FAILURE);
146  }
147  if (!iomgr->io_register(this).isSuccess()) {
148  ATH_MSG_FATAL("Cannot register myself with the IoComponentMgr.");
149  return(StatusCode::FAILURE);
150  }
151 
152  // Register the input files with the iomgr
153  bool allGood = true;
154  const std::vector<std::string>& incol = m_inputCollectionsProp.value();
155  for (std::size_t icol = 0, imax = incol.size(); icol != imax; ++icol) {
156  if (!iomgr->io_register(this, IIoComponentMgr::IoMode::READ, incol[icol]).isSuccess()) {
157  ATH_MSG_FATAL("could not register [" << incol[icol] << "] for output !");
158  allGood = false;
159  } else {
160  ATH_MSG_VERBOSE("io_register[" << this->name() << "](" << incol[icol] << ") [ok]");
161  }
162  }
163  if (!allGood) {
164  return(StatusCode::FAILURE);
165  }
166 
167  // Make sure MetaDataSvc is initialized before the first file is opened
168  ServiceHandle<IAthMetaDataSvc> metaDataSvc("MetaDataSvc", name());
169  ATH_CHECK(metaDataSvc.retrieve());
170 
171  // Must happen before trying to open a file
172  lock_t lock (m_mutex);
173  StatusCode risc = this->reinit(lock);
174 
175  return risc;
176 }
177 //__________________________________________________________________________
179  ATH_MSG_INFO("reinitialization...");
180  // reset markers
181  if (m_inputCollectionsProp.value().size()>0) {
182  m_numEvt.resize(m_inputCollectionsProp.value().size(), -1);
183  m_firstEvt.resize(m_inputCollectionsProp.value().size(), -1);
184  }
185  else {
186  m_numEvt.resize(1);
187  m_firstEvt.resize(1);
188  }
189 
190  // Initialize InputCollectionsIterator
191  m_inputCollectionsIterator = m_inputCollectionsProp.value().begin();
192  m_NumEvents = 0;
193  bool retError = false;
194  if (!m_helperTools.empty()) {
195  for (ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
196  if (!tool->postInitialize().isSuccess()) {
197  ATH_MSG_FATAL("Failed to postInitialize() " << tool->name());
198  retError = true;
199  }
200  }
201  }
202  if (retError) {
203  ATH_MSG_FATAL("Failed to postInitialize() helperTools");
204  return(StatusCode::FAILURE);
205  }
206 
207  // If file based input then fire appropriate incidents
208  if (m_filebased) {
209  if (!m_firstFileFired) {
210  FileIncident firstInputFileIncident(name(), "FirstInputFile", "BSF:" + *m_inputCollectionsIterator);
211  m_incidentSvc->fireIncident(firstInputFileIncident);
212  m_firstFileFired = true;
213  }
214 
215  // try to open a file
216  if (this->openNewRun(lock).isFailure()) {
217  ATH_MSG_FATAL("Unable to open any file in initialize");
218  return(StatusCode::FAILURE);
219  }
220  // should be in openNewRun, but see comment there
221  m_beginFileFired = true;
222  }
223 
224  return(StatusCode::SUCCESS);
225 }
226 
227 //________________________________________________________________________________
229  ATH_MSG_DEBUG("Calling EventSelectorByteStream::start()");
230  lock_t lock (m_mutex);
231  // Create the begin and end iterator's for this selector.
233  // Increment to get the new event in.
235 
236  return(StatusCode::SUCCESS);
237 }
238 
239 //________________________________________________________________________________
241  ATH_MSG_DEBUG("Calling EventSelectorByteStream::stop()");
242  // Handle open files
243  if (m_filebased) {
244  // Close the file
245  if (m_eventSource->ready()) {
247  FileIncident endInputFileIncident(name(), "EndInputFile", "stop");
248  m_incidentSvc->fireIncident(endInputFileIncident);
249  }
250  }
251  return(StatusCode::SUCCESS);
252 }
253 
254 //__________________________________________________________________________
256  if (!m_counterTool.empty()) {
257  if (!m_counterTool->preFinalize().isSuccess()) {
258  ATH_MSG_WARNING("Failed to preFinalize() CounterTool");
259  }
260  }
261  for (ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
262  if (!tool->preFinalize().isSuccess()) {
263  ATH_MSG_WARNING("Failed to preFinalize() " << tool->name());
264  }
265  }
266  delete m_beginIter; m_beginIter = 0;
267  delete m_endIter; m_endIter = 0;
268  // Release AthenaSharedMemoryTool
269  if (!m_eventStreamingTool.empty() && !m_eventStreamingTool.release().isSuccess()) {
270  ATH_MSG_WARNING("Cannot release AthenaSharedMemoryTool");
271  }
272  // Release CounterTool
273  if (!m_counterTool.empty()) {
274  if (!m_counterTool.release().isSuccess()) {
275  ATH_MSG_WARNING("Cannot release CounterTool.");
276  }
277  }
278  // Release HelperTools
279  if (!m_helperTools.release().isSuccess()) {
280  ATH_MSG_WARNING("Cannot release " << m_helperTools);
281  }
282  if (m_eventSource) m_eventSource->release();
283  // Finalize the Service base class.
284  return(AthService::finalize());
285 }
286 
288  FileIncident endInputFileIncident(name(), "EndInputFile", "BSF:" + *m_inputCollectionsIterator);
289  m_incidentSvc->fireIncident(endInputFileIncident);
290  ++m_inputCollectionsIterator;
291  ++m_fileCount;
292 }
293 
295  // Should be protected upstream, but this is further protection
296  if (!m_filebased) {
297  ATH_MSG_ERROR("cannot open new run for non-filebased inputs");
298  return(StatusCode::FAILURE);
299  }
300  // Check for end of file list
301  if (m_inputCollectionsIterator == m_inputCollectionsProp.value().end()) {
302  ATH_MSG_INFO("End of input file list reached");
303  return(StatusCode::FAILURE);
304  }
305  std::string blockname = *m_inputCollectionsIterator;
306  // try to open a file, if failure go to next FIXME: PVG: silent failure?
307  //long nev = m_eventSource->getBlockIterator(blockname);
308  auto nevguid = m_eventSource->getBlockIterator(blockname);
309  long nev = nevguid.first;
310  if (nev == -1) {
311  ATH_MSG_FATAL("Unable to access file " << *m_inputCollectionsIterator << ", stopping here");
313  }
314  // Fire the incident
315  if (!m_beginFileFired) {
316  FileIncident beginInputFileIncident(name(), "BeginInputFile", "BSF:" + *m_inputCollectionsIterator,nevguid.second);
317  m_incidentSvc->fireIncident(beginInputFileIncident);
318  //m_beginFileFired = true; // Should go here, but can't because IEvtSelector next is const
319  }
320 
321  // check if file is empty
322  if (nev == 0) {
323  ATH_MSG_WARNING("no events in file " << blockname << " try next");
325  this->nextFile(lock);
326  return openNewRun(lock);
327  // check if skipping all events in that file (minus events already skipped)
328  } else if (m_skipEvents.value() - m_NumEvents > nev) {
329  ATH_MSG_WARNING("skipping more events " << m_skipEvents.value() - m_NumEvents << "(" << nev <<") than in file " << *m_inputCollectionsIterator << ", try next");
330  m_NumEvents += nev;
331  m_numEvt[m_fileCount] = nev;
333  this->nextFile(lock);
334  return openNewRun(lock);
335  }
336 
337  ATH_MSG_DEBUG("Opened block/file " << blockname);
338  m_firstEvt[m_fileCount] = m_NumEvents;
339  m_numEvt[m_fileCount] = nev;
340 
341  return(StatusCode::SUCCESS);
342 }
343 
344 StatusCode EventSelectorByteStream::createContext(IEvtSelector::Context*& it) const {
345  it = new EventContextByteStream(this);
346  return(StatusCode::SUCCESS);
347 }
348 
349 StatusCode EventSelectorByteStream::next(IEvtSelector::Context& it) const {
350  lock_t lock (m_mutex);
351  return nextImpl (it, lock);
352 }
354  lock_t& lock) const
355 {
356  static std::atomic<int> n_bad_events = 0; // cross loop counter of bad events
357  // Check if this is an athenaMP client process
358  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isClient()) {
359  void* source = 0;
360  unsigned int status = 0;
361  if (!m_eventStreamingTool->getLockedEvent(&source, status).isSuccess()) {
362  ATH_MSG_FATAL("Cannot get NextEvent from AthenaSharedMemoryTool");
363  return(StatusCode::FAILURE);
364  }
365  m_eventSource->setEvent(static_cast<char*>(source), status);
366  return(StatusCode::SUCCESS);
367  }
368  // Call all selector tool preNext before starting loop
369  for (const ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
370  if (!tool->preNext().isSuccess()) {
371  ATH_MSG_WARNING("Failed to preNext() " << tool->name());
372  }
373  }
374  if (!m_counterTool.empty()) {
375  if (!m_counterTool->preNext().isSuccess()) {
376  ATH_MSG_WARNING("Failed to preNext() CounterTool.");
377  }
378  }
379  // Find an event to return
380  for (;;) {
381  bool badEvent{};
383  if (sc.isRecoverable()) {
384  badEvent = true;
385  } else if (sc.isFailure()) {
386  return StatusCode::FAILURE;
387  }
388 
389  // increment that an event was found
390  ++m_NumEvents;
391 
392  // check bad event flag and handle as configured
393  if (badEvent) {
394  int nbad = ++n_bad_events;
395  ATH_MSG_INFO("Bad event encountered, current count at " << nbad);
396  bool toomany = (m_maxBadEvts >= 0 && nbad > m_maxBadEvts);
397  if (toomany) {ATH_MSG_FATAL("too many bad events ");}
398  if (!m_procBadEvent || toomany) {
399  // End of file
400  it = *m_endIter;
401  return(StatusCode::FAILURE);
402  }
403  ATH_MSG_WARNING("Continue with bad event");
404  }
405 
406  // Check whether properties or tools reject this event
407  if ( m_NumEvents > m_skipEvents.value() &&
408  (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
409 
410  // Build a DH for use by other components
412  if (rec_sg != StatusCode::SUCCESS) {
413  ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
414  }
415 
416  // Build event info attribute list
417  if (recordAttributeListImpl(lock).isFailure()) ATH_MSG_WARNING("Unable to build event info att list");
418 
419  StatusCode status(StatusCode::SUCCESS);
420  for (const ToolHandle<IAthenaSelectorTool>& tool : m_helperTools) {
421  StatusCode toolStatus = tool->postNext();
422  if (toolStatus.isRecoverable()) {
423  ATH_MSG_INFO("Request skipping event from: " << tool->name());
424  status = StatusCode::RECOVERABLE;
425  } else if (toolStatus.isFailure()) {
426  ATH_MSG_WARNING("Failed to postNext() " << tool->name());
427  status = StatusCode::FAILURE;
428  }
429  }
430  if (status.isRecoverable()) {
431  ATH_MSG_INFO("skipping event " << m_NumEvents);
432  } else if (status.isFailure()) {
433  ATH_MSG_WARNING("Failed to postNext() HelperTool.");
434  } else {
435  if (!m_counterTool.empty()) {
436  if (!m_counterTool->postNext().isSuccess()) {
437  ATH_MSG_WARNING("Failed to postNext() CounterTool.");
438  }
439  }
440  break;
441  }
442 
443  // Validate the event
444  try {
446  }
448  ATH_MSG_ERROR("badFragment data encountered");
449 
450  int nbad = ++n_bad_events;
451  ATH_MSG_INFO("Bad event encountered, current count at " << nbad);
452 
453  bool toomany = (m_maxBadEvts >= 0 && nbad > m_maxBadEvts);
454  if (toomany) {ATH_MSG_FATAL("too many bad events ");}
455  if (!m_procBadEvent || toomany) {
456  // End of file
457  it = *m_endIter;
458  return(StatusCode::FAILURE);
459  }
460  ATH_MSG_WARNING("Continue with bad event");
461  }
462  } else {
463  if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
464  m_skipEventSequence.erase(m_skipEventSequence.begin());
465  }
466  if ( m_NumEvents % 1'000 == 0 ) {
467  ATH_MSG_INFO("Skipping event " << m_NumEvents - 1);
468  } else {
469  ATH_MSG_DEBUG("Skipping event " << m_NumEvents - 1);
470  }
471  }
472  } // for loop
473 
474  if (!m_eventStreamingTool.empty() && m_eventStreamingTool->isServer()) { // For SharedReader Server, put event into SHM
475  const RawEvent* pre = m_eventSource->currentEvent();
476  StatusCode sc;
477  while ( (sc = putEvent_ST(*m_eventStreamingTool,
478  m_NumEvents - 1, pre->start(),
479  pre->fragment_size_word() * sizeof(uint32_t),
480  m_eventSource->currentEventStatus())).isRecoverable() ) {
481  usleep(1000);
482  }
483  if (!sc.isSuccess()) {
484  ATH_MSG_ERROR("Cannot put Event " << m_NumEvents - 1 << " to AthenaSharedMemoryTool");
485  return(StatusCode::FAILURE);
486  }
487  }
488  return(StatusCode::SUCCESS);
489 }
490 
491 //________________________________________________________________________________
492 StatusCode EventSelectorByteStream::next(IEvtSelector::Context& ctxt, int jump) const {
493  lock_t lock (m_mutex);
494  return nextImpl (ctxt, jump, lock);
495 }
496 //________________________________________________________________________________
497 StatusCode
498 EventSelectorByteStream::nextImpl(IEvtSelector::Context& ctxt,
499  int jump,
500  lock_t& lock) const
501 {
502  if (jump > 0) {
503  if ( m_NumEvents+jump != m_skipEvents.value()) {
504  // Save initial event count
505  unsigned int cntr = m_NumEvents;
506  // In case NumEvents increments multiple times in a single next call
507  while (m_NumEvents+1 <= cntr + jump) {
508  if (!nextImpl(ctxt, lock).isSuccess()) {
509  return(StatusCode::FAILURE);
510  }
511  }
512  }
513  else ATH_MSG_DEBUG("Jump covered by skip event " << m_skipEvents.value());
514  return(StatusCode::SUCCESS);
515  }
516  else {
517  ATH_MSG_WARNING("Called jump next with non-multiple jump");
518  }
519  return(StatusCode::SUCCESS);
520 }
521 
522 //________________________________________________________________________________
523 StatusCode EventSelectorByteStream::nextHandleFileTransition(IEvtSelector::Context& ctxt) const
524 {
525  lock_t lock (m_mutex);
526  return nextHandleFileTransitionImpl (ctxt, lock);
527 }
528 StatusCode EventSelectorByteStream::nextHandleFileTransitionImpl(IEvtSelector::Context& ctxt,
529  lock_t& lock) const
530 {
531  const RawEvent* pre{};
532  bool badEvent{};
533  // if event source not ready from init, try next file
534  if (m_filebased && !m_eventSource->ready()) {
535  // next file
536  this->nextFile(lock);
537  if (this->openNewRun(lock).isFailure()) {
538  ATH_MSG_DEBUG("Event source found no more valid files left in input list");
539  m_NumEvents = -1;
540  return StatusCode::FAILURE;
541  }
542  }
543  try {
544  pre = m_eventSource->nextEvent();
545  }
546  catch (const ByteStreamExceptions::readError&) {
547  ATH_MSG_FATAL("Caught ByteStreamExceptions::readError");
548  return StatusCode::FAILURE;
549  }
550  catch (const ByteStreamExceptions::badFragment&) {
551  ATH_MSG_ERROR("badFragment encountered");
552  badEvent = true;
553  }
554  catch (const ByteStreamExceptions::badFragmentData&) {
555  ATH_MSG_ERROR("badFragment data encountered");
556  badEvent = true;
557  }
558  // Check whether a RawEvent has actually been provided
559  if (pre == nullptr) {
560  ctxt = *m_endIter;
561  return StatusCode::FAILURE;
562  }
563 
564  // If not secondary just return the status code based on if the event is bas
565  if (!m_isSecondary.value()) {
566  // check bad event flag and handle as configured
567  return badEvent ? StatusCode::RECOVERABLE : StatusCode::SUCCESS;
568  }
569 
570  // Build a DH for use by other components
571  StatusCode rec_sg = m_eventSource->generateDataHeader();
572  if (rec_sg != StatusCode::SUCCESS) {
573  ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
574  }
575 
576  return StatusCode::SUCCESS;
577 }
578 //________________________________________________________________________________
579 StatusCode EventSelectorByteStream::nextWithSkip(IEvtSelector::Context& ctxt) const
580 {
581  lock_t lock (m_mutex);
582  return nextWithSkipImpl (ctxt, lock);
583 }
584 StatusCode EventSelectorByteStream::nextWithSkipImpl(IEvtSelector::Context& ctxt,
585  lock_t& lock) const {
586  ATH_MSG_DEBUG("EventSelectorByteStream::nextWithSkip");
587 
588  for (;;) {
589  // Check if we're at the end of file
591  if (sc.isRecoverable()) {
592  continue; // handles empty files
593  }
594  if (sc.isFailure()) {
595  return StatusCode::FAILURE;
596  }
597 
598  // Increase event count
599  ++m_NumEvents;
600 
601  if (!m_counterTool.empty() && !m_counterTool->preNext().isSuccess()) {
602  ATH_MSG_WARNING("Failed to preNext() CounterTool.");
603  }
604  if ( m_NumEvents > m_skipEvents.value() &&
605  (m_skipEventSequence.empty() || m_NumEvents != m_skipEventSequence.front()) ) {
606  return StatusCode::SUCCESS;
607  } else {
608  if (!m_skipEventSequence.empty() && m_NumEvents == m_skipEventSequence.front()) {
609  m_skipEventSequence.erase(m_skipEventSequence.begin());
610  }
611  if (m_isSecondary.value()) {
612  ATH_MSG_INFO("skipping secondary event " << m_NumEvents);
613  } else {
614  ATH_MSG_INFO("skipping event " << m_NumEvents);
615  }
616  }
617  }
618 
619  return StatusCode::SUCCESS;
620 }
621 //________________________________________________________________________________
622 StatusCode EventSelectorByteStream::previous(IEvtSelector::Context& ctxt) const
623 {
624  lock_t lock (m_mutex);
625  return previousImpl (ctxt, lock);
626 }
627 StatusCode EventSelectorByteStream::previousImpl(IEvtSelector::Context& /*ctxt*/,
628  lock_t& /*lock*/) const {
629  ATH_MSG_DEBUG(" ... previous");
630  const RawEvent* pre = 0;
631  bool badEvent(false);
632  // if event source not ready from init, try next file
633  if (m_eventSource->ready()) {
634  try {
635  pre = m_eventSource->previousEvent();
636  }
637  catch (const ByteStreamExceptions::readError&) {
638  ATH_MSG_FATAL("Caught ByteStreamExceptions::readError");
639  return StatusCode::FAILURE;
640  }
641  catch (const ByteStreamExceptions::badFragment&) {
642  ATH_MSG_ERROR("badFragment encountered");
643  badEvent = true;
644  }
646  ATH_MSG_ERROR("badFragment data encountered");
647  badEvent = true;
648  }
649  // Check whether a RawEvent has actually been provided
650  if (pre == 0) {
651  ATH_MSG_ERROR("No event built");
652  //it = *m_endIter;
653  return(StatusCode::FAILURE);
654  }
655  }
656  else {
657  ATH_MSG_FATAL("Attempt to read previous data on invalid reader");
658  return(StatusCode::FAILURE);
659  }
660  // increment that an event was found
661  //++m_NumEvents;
662 
663  // check bad event flag and handle as configured
664  if (badEvent) {
665  ATH_MSG_ERROR("Called previous for bad event");
666  if (!m_procBadEvent) {
667  // End of file
668  //it = *m_endIter;
669  return(StatusCode::FAILURE);
670  }
671  ATH_MSG_WARNING("Continue with bad event");
672  }
673 
674  // Build a DH for use by other components
676  if (rec_sg != StatusCode::SUCCESS) {
677  ATH_MSG_ERROR("Fail to record BS DataHeader in StoreGate. Skipping events?! " << rec_sg);
678  }
679 
680  return StatusCode::SUCCESS;
681 }
682 //________________________________________________________________________________
683 StatusCode EventSelectorByteStream::previous(IEvtSelector::Context& ctxt, int jump) const {
684  lock_t lock (m_mutex);
685  return previousImpl (ctxt, jump, lock);
686 }
687 //________________________________________________________________________________
689 EventSelectorByteStream::previousImpl(IEvtSelector::Context& ctxt,
690  int jump,
691  lock_t& lock) const
692 {
693  if (jump > 0) {
694  for (int i = 0; i < jump; i++) {
695  if (!previousImpl(ctxt, lock).isSuccess()) {
696  return(StatusCode::FAILURE);
697  }
698  }
699  return(StatusCode::SUCCESS);
700  }
701  return(StatusCode::FAILURE);
702 }
703 //________________________________________________________________________________
704 StatusCode EventSelectorByteStream::last(IEvtSelector::Context& it)const {
705  if (it.identifier() == m_endIter->identifier()) {
706  ATH_MSG_DEBUG("last(): Last event in InputStream.");
707  return(StatusCode::SUCCESS);
708  }
709  return(StatusCode::FAILURE);
710 }
711 //________________________________________________________________________________
712 StatusCode EventSelectorByteStream::rewind(IEvtSelector::Context& /*it*/) const {
713  ATH_MSG_ERROR("rewind() not implemented");
714  return(StatusCode::FAILURE);
715 }
716 
717 //________________________________________________________________________________
718 StatusCode EventSelectorByteStream::resetCriteria(const std::string& /*criteria*/, IEvtSelector::Context& /*ctxt*/) const {
719  return(StatusCode::SUCCESS);
720 }
721 
722 //__________________________________________________________________________
723 StatusCode EventSelectorByteStream::seek(Context& /* it */, int evtNum) const {
724  lock_t lock (m_mutex);
725  // Check that input is seekable
726  if (!m_filebased) {
727  ATH_MSG_ERROR("Input not seekable, choose different input svc");
728  return StatusCode::FAILURE;
729  }
730  // find the file index with that event
731  long fileNum = findEvent(evtNum, lock);
732  if (fileNum == -1 && evtNum >= m_firstEvt[m_fileCount] && evtNum < m_NumEvents) {
733  fileNum = m_fileCount;
734  }
735  // if unable to locate file, exit
736  if (fileNum == -1) {
737  ATH_MSG_INFO("seek: Reached end of Input.");
738  return(StatusCode::RECOVERABLE);
739  }
740  // check if it is the current file
741  if (fileNum != m_fileCount) { // event in different file
742  // Close input file if open
744  ATH_MSG_DEBUG("Seek to item: \"" << m_inputCollectionsProp.value()[fileNum] << "\" from the explicit file list.");
745  std::string fileName = m_inputCollectionsProp.value()[fileNum];
746  m_fileCount = fileNum;
747  // Open the correct file
748  auto nevguid = m_eventSource->getBlockIterator(fileName);
749  long nev = nevguid.first;
750  if (nev == -1) {
751  ATH_MSG_FATAL("Unable to open file with seeked event " << evtNum << " file " << fileName);
752  return StatusCode::FAILURE;
753  }
754  int delta = evtNum - m_firstEvt[m_fileCount];
755  if (delta > 0) {
757  if (nextImpl(*beginIter,delta, lock).isFailure()) return StatusCode::FAILURE;
758  }
759  }
760  // event in current file
761  {
762  int delta = (evtNum - m_firstEvt[m_fileCount] + 1) - m_eventSource->positionInBlock();
763  ATH_MSG_DEBUG("Seeking event " << evtNum << " in current file with delta " << delta);
764  if ( delta == 0 ) { // current event
765  // nothing to do
766  }
767  else if ( delta > 0 ) { // forward
769  if ( this->nextImpl(*beginIter, delta, lock).isFailure() ) return StatusCode::FAILURE;
770  }
771  else if ( delta < 0 ) { // backward
773  if ( this->previousImpl(*beginIter, -1*delta, lock).isFailure() ) return(StatusCode::FAILURE);
774  }
775  }
776  return StatusCode::SUCCESS;
777 }
778 
780 {
781  lock_t lock (m_mutex);
782  return recordAttributeListImpl (lock);
783 }
785 {
786  std::string listName("EventInfoAtts");
787 
788  if (eventStore()->contains<AthenaAttributeList>(listName)) {
789  const AthenaAttributeList* oldAttrList = nullptr;
790  if (!eventStore()->retrieve(oldAttrList, listName).isSuccess()) {
791  ATH_MSG_ERROR("Cannot retrieve old AttributeList from StoreGate.");
792  return(StatusCode::FAILURE);
793  }
794  if (!eventStore()->removeDataAndProxy(oldAttrList).isSuccess()) {
795  ATH_MSG_ERROR("Cannot remove old AttributeList from StoreGate.");
796  return(StatusCode::FAILURE);
797  }
798  }
799 
800  // build the new attr list
801  auto attrList = std::make_unique<AthenaAttributeList>();
802 
803  // fill the attr list
804  ATH_CHECK(fillAttributeListImpl(attrList.get(), "", false, lock));
805 
806  // put result in event store
807  if (eventStore()->record(std::move(attrList), listName).isFailure()) {
808  return StatusCode::FAILURE;
809  }
810 
811  return StatusCode::SUCCESS;
812 }
813 
814 StatusCode EventSelectorByteStream::fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const
815 {
816  lock_t lock (m_mutex);
817  return fillAttributeListImpl (attrList, suffix, copySource, lock);
818 }
819 StatusCode EventSelectorByteStream::fillAttributeListImpl(coral::AttributeList *attrList, const std::string &suffix, bool /* copySource */,
820  lock_t& /*lock*/) const
821 {
822  attrList->extend("RunNumber" + suffix, "unsigned int");
823  attrList->extend("EventNumber" + suffix, "unsigned long long");
824  attrList->extend("LumiBlockN" + suffix, "unsigned int");
825  attrList->extend("BunchId" + suffix, "unsigned int");
826  attrList->extend("EventTime" + suffix, "unsigned int");
827  attrList->extend("EventTimeNanoSec" + suffix, "unsigned int");
828 
829  // fill attribute list
830  const RawEvent* event = m_eventSource->currentEvent();
831 
832  (*attrList)["RunNumber" + suffix].data<unsigned int>() = event->run_no();
833  if (event->version() < 0x03010000) {
834  (*attrList)["EventNumber" + suffix].data<unsigned long long>() = event->lvl1_id();
835  } else {
836  (*attrList)["EventNumber" + suffix].data<unsigned long long>() = event->global_id();
837  }
838  (*attrList)["LumiBlockN" + suffix].data<unsigned int>() = event->lumi_block();
839  (*attrList)["BunchId" + suffix].data<unsigned int>() = event->bc_id();
840 
841  unsigned int bc_time_sec = event->bc_time_seconds();
842  unsigned int bc_time_ns = event->bc_time_nanoseconds();
843  // bc_time_ns should be lt 1e9.
844  if (bc_time_ns > 1000000000) {
845  // round it off to 1e9
846  ATH_MSG_WARNING(" bc_time nanosecond number larger than 1e9, it is " << bc_time_ns << ", reset it to 1 sec");
847  bc_time_ns = 1000000000;
848  }
849  (*attrList)["EventTime" + suffix].data<unsigned int>() = bc_time_sec;
850  (*attrList)["EventTimeNanoSec" + suffix].data<unsigned int>() = bc_time_ns;
851 
853 
854  event->status(buffer);
855  attrList->extend("TriggerStatus" + suffix, "unsigned int");
856  (*attrList)["TriggerStatus" + suffix].data<unsigned int>() = *buffer;
857 
858  attrList->extend("ExtendedL1ID" + suffix, "unsigned int");
859  attrList->extend("L1TriggerType" + suffix, "unsigned int");
860  (*attrList)["ExtendedL1ID" + suffix].data<unsigned int>() = event->lvl1_id();
861  (*attrList)["L1TriggerType" + suffix].data<unsigned int>() = event->lvl1_trigger_type();
862 
863  // Grab L1 words
864  event->lvl1_trigger_info(buffer);
865  for (uint32_t iT1 = 0; iT1 < event->nlvl1_trigger_info(); ++iT1) {
866  std::stringstream name;
867  name << "L1TriggerInfo_" << iT1;
868  attrList->extend(name.str() + suffix, "unsigned int");
869  (*attrList)[name.str() + suffix].data<unsigned int>() = *buffer;
870  ++buffer;
871  }
872 
873  // Grab L2 words
874  event->lvl2_trigger_info(buffer);
875  for (uint32_t iT1 = 0; iT1 < event->nlvl2_trigger_info(); ++iT1) {
876  if (*buffer != 0) {
877  std::stringstream name;
878  name << "L2TriggerInfo_" << iT1;
879  attrList->extend(name.str() + suffix, "unsigned int");
880  (*attrList)[name.str() + suffix].data<unsigned int>() = *buffer;
881  }
882  ++buffer;
883  }
884 
885  // Grab EF words
886  event->event_filter_info(buffer);
887  for (uint32_t iT1 = 0; iT1 < event->nevent_filter_info(); ++iT1) {
888  if (*buffer != 0) {
889  std::stringstream name;
890  name << "EFTriggerInfo_" << iT1;
891  attrList->extend(name.str() + suffix, "unsigned int");
892  (*attrList)[name.str() + suffix].data<unsigned int>() = *buffer;
893  }
894  ++buffer;
895  }
896 
897  // Grab stream tags
898  event->stream_tag(buffer);
899  std::vector<eformat::helper::StreamTag> onl_streamTags;
900  eformat::helper::decode(event->nstream_tag(), buffer, onl_streamTags);
901  for (std::vector<eformat::helper::StreamTag>::const_iterator itS = onl_streamTags.begin(),
902  itSE = onl_streamTags.end(); itS != itSE; ++itS) {
903  attrList->extend(itS->name + suffix, "string");
904  (*attrList)[itS->name + suffix].data<std::string>() = itS->type;
905  }
906 
907  return StatusCode::SUCCESS;
908 }
909 
910 //__________________________________________________________________________
911 int EventSelectorByteStream::findEvent(int evtNum, lock_t& /*lock*/) const {
912  // Loop over file event counts
913  //ATH_MSG_INFO("try to find evnum = " << evtNum << " in " << m_numEvt.size() << " files");
914  for (size_t i = 0; i < m_inputCollectionsProp.value().size(); i++) {
915  if (m_inputCollectionsProp.value().size() != m_numEvt.size()) {
916  ATH_MSG_ERROR("vector size incompatibility");
917  break;
918  }
919  // if file not opened yet, check it
920  if (m_numEvt[i] == -1) {
921  std::string fileName = m_inputCollectionsProp.value()[i];
922  auto nevguid = m_eventSource->getBlockIterator(fileName);
923  long nev = nevguid.first;
924  // if failure on file open, exit
925  if (nev==-1) {
926  break;
927  }
928  // set initial event counter for that file
929  if (i > 0) {
930  m_firstEvt[i] = m_firstEvt[i - 1] + m_numEvt[i - 1];
931  } else {
932  m_firstEvt[i] = 0;
933  }
934  // log number of events in that file
935  m_numEvt[i] = nev;
936  }
937  // if sought event is in this file, then return the index of that file
938  if (evtNum >= m_firstEvt[i] && evtNum < m_firstEvt[i] + m_numEvt[i]) {
939  ATH_MSG_INFO("found " << evtNum << " in file " << i);
940  return(i);
941  }
942  }
943  ATH_MSG_INFO("did not find ev " << evtNum);
944  // return file not found marker
945  return(-1);
946 }
947 
948 //__________________________________________________________________________
949 int EventSelectorByteStream::curEvent (const Context& /*it*/) const {
950  // event counter in IEvtSelectorSeek interface
951  lock_t lock (m_mutex);
952  return int(m_NumEvents);
953 }
954 
955 //__________________________________________________________________________
956 int EventSelectorByteStream::size (Context& /*it*/) const {
957  return -1;
958 }
959 
960 //________________________________________________________________________________
962  lock_t lock (m_mutex);
963  if (m_eventStreamingTool.empty()) {
964  return(StatusCode::FAILURE);
965  }
966  return(m_eventStreamingTool->makeServer(1, ""));
967 }
968 
969 //________________________________________________________________________________
971  lock_t lock (m_mutex);
972  if (m_eventStreamingTool.empty()) {
973  return(StatusCode::FAILURE);
974  }
975  std::string dummyStr;
976  return(m_eventStreamingTool->makeClient(0, dummyStr));
977 }
978 
979 //________________________________________________________________________________
981  lock_t lock (m_mutex);
982  if (m_eventStreamingTool.empty()) {
983  return(StatusCode::FAILURE);
984  }
985  if (m_eventStreamingTool->isClient()) {
986  StatusCode sc = m_eventStreamingTool->lockEvent(evtNum);
987  while (sc.isRecoverable()) {
988  usleep(1000);
989  sc = m_eventStreamingTool->lockEvent(evtNum);
990  }
991  return(sc);
992  }
993  return(StatusCode::FAILURE);
994 }
995 
996 //________________________________________________________________________________
998  lock_t lock (m_mutex);
999  if (m_eventStreamingTool.empty()) {
1000  ATH_MSG_ERROR("No AthenaSharedMemoryTool configured for readEvent()");
1001  return(StatusCode::FAILURE);
1002  }
1003  ATH_MSG_VERBOSE("Called read Event " << maxevt);
1004  for (int i = 0; i < maxevt || maxevt == -1; ++i) {
1005  const RawEvent* pre = 0;
1006  if (this->nextImpl(*m_beginIter, lock).isSuccess()) {
1007  pre = m_eventSource->currentEvent();
1008  } else {
1009  if (m_NumEvents == -1) {
1010  ATH_MSG_VERBOSE("Called read Event and read last event from input: " << i);
1011  break;
1012  }
1013  ATH_MSG_ERROR("Unable to retrieve next event for " << i << "/" << maxevt);
1014  return(StatusCode::FAILURE);
1015  }
1016  if (m_eventStreamingTool->isServer()) {
1017  StatusCode sc;
1018  while ( (sc = putEvent_ST(*m_eventStreamingTool,
1019  m_NumEvents - 1,
1020  pre->start(),
1021  pre->fragment_size_word() * sizeof(uint32_t),
1022  m_eventSource->currentEventStatus())).isRecoverable() ) {
1023  usleep(1000);
1024  }
1025  if (!sc.isSuccess()) {
1026  ATH_MSG_ERROR("Cannot put Event " << m_NumEvents - 1 << " to AthenaSharedMemoryTool");
1027  return(StatusCode::FAILURE);
1028  }
1029  }
1030  }
1031  // End of file, wait for last event to be taken
1032  StatusCode sc;
1033  while ( (sc = putEvent_ST(*m_eventStreamingTool, 0, 0, 0, 0)).isRecoverable() ) {
1034  usleep(1000);
1035  }
1036  if (!sc.isSuccess()) {
1037  ATH_MSG_ERROR("Cannot put last Event marker to AthenaSharedMemoryTool");
1038  return(StatusCode::FAILURE);
1039  }
1040  return(StatusCode::SUCCESS);
1041 }
1042 
1043 //________________________________________________________________________________
1044 StatusCode EventSelectorByteStream::createAddress(const IEvtSelector::Context& /*it*/,
1045  IOpaqueAddress*& iop) const {
1046  SG::DataProxy* proxy = eventStore()->proxy(ClassID_traits<DataHeader>::ID(),"ByteStreamDataHeader");
1047  if (proxy !=0) {
1048  iop = proxy->address();
1049  return(StatusCode::SUCCESS);
1050  } else {
1051  iop = 0;
1052  return(StatusCode::FAILURE);
1053  }
1054 }
1055 
1056 //________________________________________________________________________________
1057 StatusCode
1058 EventSelectorByteStream::releaseContext(IEvtSelector::Context*& /*it*/) const {
1059  return(StatusCode::SUCCESS);
1060 }
1061 
1062 //________________________________________________________________________________
1063 StatusCode EventSelectorByteStream::queryInterface(const InterfaceID& riid, void** ppvInterface) {
1064  if (riid == IEvtSelector::interfaceID()) {
1065  *ppvInterface = dynamic_cast<IEvtSelector*>(this);
1066  } else if (riid == IIoComponent::interfaceID()) {
1067  *ppvInterface = dynamic_cast<IIoComponent*>(this);
1068  } else if (riid == IProperty::interfaceID()) {
1069  *ppvInterface = dynamic_cast<IProperty*>(this);
1070  } else if (riid == IEvtSelectorSeek::interfaceID()) {
1071  *ppvInterface = dynamic_cast<IEvtSelectorSeek*>(this);
1072  } else if (riid == IEventShare::interfaceID()) {
1073  *ppvInterface = dynamic_cast<IEventShare*>(this);
1074  } else if (riid == ISecondaryEventSelector::interfaceID()) {
1075  *ppvInterface = dynamic_cast<ISecondaryEventSelector*>(this);
1076  } else {
1077  return(Service::queryInterface(riid, ppvInterface));
1078  }
1079  addRef();
1080  return(StatusCode::SUCCESS);
1081 }
1082 //________________________________________________________________________________
1084  lock_t lock (m_mutex);
1085  ATH_MSG_INFO("I/O reinitialization...");
1086  ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
1087  if (!iomgr.retrieve().isSuccess()) {
1088  ATH_MSG_FATAL("Could not retrieve IoComponentMgr !");
1089  return(StatusCode::FAILURE);
1090  }
1091  if (!iomgr->io_hasitem(this)) {
1092  ATH_MSG_FATAL("IoComponentMgr does not know about myself !");
1093  return(StatusCode::FAILURE);
1094  }
1095  std::vector<std::string> inputCollections = m_inputCollectionsProp.value();
1096  for (std::size_t i = 0, imax = inputCollections.size(); i != imax; ++i) {
1097  ATH_MSG_INFO("I/O reinitialization, file = " << inputCollections[i]);
1098  std::string &fname = inputCollections[i];
1099  if (!iomgr->io_contains(this, fname)) {
1100  ATH_MSG_ERROR("IoComponentMgr does not know about [" << fname << "] !");
1101  return(StatusCode::FAILURE);
1102  }
1103  if (!iomgr->io_retrieve(this, fname).isSuccess()) {
1104  ATH_MSG_FATAL("Could not retrieve new value for [" << fname << "] !");
1105  return(StatusCode::FAILURE);
1106  }
1107  }
1108  // all good... copy over.
1109  m_beginFileFired = false;
1110 
1111  // Set m_inputCollectionsProp. But we _dont_ want to run the update
1112  // handler --- that calls reinit(), which will deadlock since
1113  // we're holding the lock. Instead, we'll call reinit() ourselves.
1114  auto old_cb = m_inputCollectionsProp.updateCallBack();
1115  m_inputCollectionsProp.declareUpdateHandler(
1116  [] (Gaudi::Details::PropertyBase&) {}
1117  );
1119  m_inputCollectionsProp.declareUpdateHandler (old_cb);;
1120 
1121  return(this->reinit(lock));
1122 }
1123 
1124 //__________________________________________________________________________
1126 {
1127  return true;
1128 }
python.PyKernel.retrieve
def retrieve(aClass, aKey=None)
Definition: PyKernel.py:110
ByteStreamInputSvc.h
This file contains the class definition for the ByteStreamInputSvc class.
EventSelectorByteStream.h
EventSelectorByteStream::m_firstFileFired
bool m_firstFileFired
Definition: EventSelectorByteStream.h:199
EventSelectorByteStream::curEvent
virtual int curEvent(const Context &it) const override
Return the current event number.
Definition: EventSelectorByteStream.cxx:949
EventSelectorByteStream::findEvent
int findEvent(int evtNum, lock_t &lock) const
Search for event with number evtNum.
Definition: EventSelectorByteStream.cxx:911
python.tests.PyTestsLib.finalize
def finalize(self)
_info( "content of StoreGate..." ) self.sg.dump()
Definition: PyTestsLib.py:53
EventSelectorByteStream::m_filebased
Gaudi::Property< bool > m_filebased
Definition: EventSelectorByteStream.h:210
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
ByteStreamInputSvc::previousEvent
virtual const RawEvent * previousEvent()=0
hotSpotInTAG.suffix
string suffix
Definition: hotSpotInTAG.py:186
StateLessPT_NewConfig.proxy
proxy
Definition: StateLessPT_NewConfig.py:392
EventSelectorByteStream::m_skipEvents
Gaudi::Property< long > m_skipEvents
Definition: EventSelectorByteStream.h:196
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
EventSelectorByteStream::makeClient
virtual StatusCode makeClient(int num) override
Make this a client.
Definition: EventSelectorByteStream.cxx:970
EventSelectorByteStream::fillAttributeList
virtual StatusCode fillAttributeList(coral::AttributeList *attrList, const std::string &suffix, bool copySource) const override
Fill AttributeList with specific items from the selector and a suffix.
Definition: EventSelectorByteStream.cxx:814
OFFLINE_FRAGMENTS_NAMESPACE::DataType
uint32_t DataType
Definition: RawEvent.h:24
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
ByteStreamInputSvc
This class provides the base class to services to read bytestream data. The concrete class can provid...
Definition: ByteStreamInputSvc.h:23
xAOD::uint32_t
setEventNumber uint32_t
Definition: EventInfo_v1.cxx:127
IAthMetaDataSvc.h
This file contains the class definition for the IAthMetaDataSvc class.
ByteStreamInputSvc::closeBlockIterator
virtual void closeBlockIterator(bool)
Definition: ByteStreamInputSvc.h:61
initialize
void initialize()
Definition: run_EoverP.cxx:894
LB_AnalMapSplitter.of
of
Definition: LB_AnalMapSplitter.py:48
EventContextByteStream::identifier
virtual void * identifier() const
Inequality operator.
Definition: EventContextByteStream.cxx:25
RawEvent
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition: RawEvent.h:37
skel.it
it
Definition: skel.GENtoEVGEN.py:423
EventSelectorByteStream::eventStore
StoreGateSvc * eventStore() const
Definition: EventSelectorByteStream.cxx:79
EventSelectorByteStream::openNewRun
StatusCode openNewRun(lock_t &lock) const
Definition: EventSelectorByteStream.cxx:294
EventSelectorByteStream::fillAttributeListImpl
StatusCode fillAttributeListImpl(coral::AttributeList *attrList, const std::string &suffix, bool copySource, lock_t &lock) const
Definition: EventSelectorByteStream.cxx:819
python.PyKernel.AttributeList
AttributeList
Definition: PyKernel.py:36
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
EventSelectorByteStream::m_runNo
Gaudi::CheckedProperty< int > m_runNo
Definition: EventSelectorByteStream.h:212
EventSelectorByteStream::~EventSelectorByteStream
virtual ~EventSelectorByteStream()
Standard Destructor.
Definition: EventSelectorByteStream.cxx:73
AthenaAttributeList.h
EventSelectorByteStream::nextHandleFileTransitionImpl
StatusCode nextHandleFileTransitionImpl(IEvtSelector::Context &ctxt, lock_t &lock) const
Definition: EventSelectorByteStream.cxx:528
EventSelectorByteStream::m_firstLBNo
Gaudi::CheckedProperty< int > m_firstLBNo
Definition: EventSelectorByteStream.h:215
EventSelectorByteStream::rewind
virtual StatusCode rewind(Context &it) const override
Definition: EventSelectorByteStream.cxx:712
EventSelectorByteStream::m_beginIter
EventContextByteStream * m_beginIter
Definition: EventSelectorByteStream.h:189
EventSelectorByteStream::EventSelectorByteStream
EventSelectorByteStream(const std::string &name, ISvcLocator *svcloc)
Standard Constructor.
Definition: EventSelectorByteStream.cxx:42
EventSelectorByteStream::readEvent
virtual StatusCode readEvent(int maxevt) override
Read the next maxevt events.
Definition: EventSelectorByteStream.cxx:997
atlasStyleMacro.icol
int icol
Definition: atlasStyleMacro.py:13
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
mergePhysValFiles.end
end
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:93
EventSelectorByteStream::createContext
virtual StatusCode createContext(Context *&it) const override
create context
Definition: EventSelectorByteStream.cxx:344
EventSelectorByteStream::m_mutex
mutex_t m_mutex
Definition: EventSelectorByteStream.h:173
EventSelectorByteStream::m_eventSource
ByteStreamInputSvc * m_eventSource
Definition: EventSelectorByteStream.h:191
EventSelectorByteStream::createAddress
virtual StatusCode createAddress(const Context &it, IOpaqueAddress *&iop) const override
Definition: EventSelectorByteStream.cxx:1044
EventSelectorByteStream::m_endIter
EventContextByteStream * m_endIter
Definition: EventSelectorByteStream.h:190
EventSelectorByteStream::size
virtual int size(Context &it) const override
Always returns -1.
Definition: EventSelectorByteStream.cxx:956
EventSelectorByteStream::m_helperTools
ToolHandleArray< IAthenaSelectorTool > m_helperTools
HelperTools, vector of names of AlgTools that are executed by the EventSelector.
Definition: EventSelectorByteStream.h:203
ByteStreamInputSvc::validateEvent
virtual void validateEvent()
Definition: ByteStreamInputSvc.h:65
ByteStreamInputSvc::setEvent
virtual void setEvent(void *, unsigned int)
Definition: ByteStreamInputSvc.h:36
StoreGateSvc
The Athena Transient Store API.
Definition: StoreGateSvc.h:128
TileAANtupleConfig.inputCollections
inputCollections
Definition: TileAANtupleConfig.py:133
EventSelectorByteStream::m_initTimeStamp
Gaudi::CheckedProperty< int > m_initTimeStamp
Definition: EventSelectorByteStream.h:217
IEventShare
Abstract interface for sharing within an event stream.
Definition: IEventShare.h:28
createCoolChannelIdFile.buffer
buffer
Definition: createCoolChannelIdFile.py:12
EventSelectorByteStream::reinit
StatusCode reinit(lock_t &lock)
Reinitialize the service when a fork() occurred/was-issued.
Definition: EventSelectorByteStream.cxx:178
FortranAlgorithmOptions.fileName
fileName
Definition: FortranAlgorithmOptions.py:13
EventSelectorByteStream::nextFile
void nextFile(lock_t &lock) const
Definition: EventSelectorByteStream.cxx:287
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
ByteStreamExceptions::readError
Definition: ByteStreamExceptions.h:22
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
EventSelectorByteStream::nextImpl
StatusCode nextImpl(Context &it, lock_t &lock) const
Definition: EventSelectorByteStream.cxx:353
lumiFormat.i
int i
Definition: lumiFormat.py:92
IAthenaIPCTool
Definition: IAthenaIPCTool.h:15
EventSelectorByteStream::m_counterTool
ToolHandle< IAthenaSelectorTool > m_counterTool
Definition: EventSelectorByteStream.h:204
EventSelectorByteStream::m_eventsPerRun
Gaudi::CheckedProperty< int > m_eventsPerRun
Definition: EventSelectorByteStream.h:214
EventSelectorByteStream::m_procBadEvent
Gaudi::Property< bool > m_procBadEvent
process bad events, which fail check_tree().
Definition: EventSelectorByteStream.h:186
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
IEventShare::interfaceID
static const InterfaceID & interfaceID()
Definition: IEventShare.h:35
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
EventSelectorByteStream::inputCollectionsHandler
void inputCollectionsHandler(Gaudi::Details::PropertyBase &)
Definition: EventSelectorByteStream.cxx:64
ByteStreamExceptions.h
ISecondaryEventSelector
Abstract interface for secondary event selectors.
Definition: ISecondaryEventSelector.h:28
StoreGateSvc::currentStoreGate
static StoreGateSvc * currentStoreGate()
get current StoreGate
Definition: StoreGateSvc.cxx:69
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:40
file
TFile * file
Definition: tile_monitor.h:29
EventSelectorByteStream::m_maxBadEvts
Gaudi::Property< int > m_maxBadEvts
number of bad events allowed before quitting.
Definition: EventSelectorByteStream.h:187
EventSelectorByteStream::ATLAS_THREAD_SAFE
int m_fileCount ATLAS_THREAD_SAFE
number of files to process.
Definition: EventSelectorByteStream.h:174
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
EventSelectorByteStream::last
virtual StatusCode last(Context &it) const override
Definition: EventSelectorByteStream.cxx:704
AthenaAttributeList
An AttributeList represents a logical row of attributes in a metadata table. The name and type of eac...
Definition: PersistentDataModel/PersistentDataModel/AthenaAttributeList.h:45
xAOD::eventNumber
eventNumber
Definition: EventInfo_v1.cxx:124
ByteStreamAddress.h
Handler::svc
AthROOTErrorHandlerSvc * svc
Definition: AthROOTErrorHandlerSvc.cxx:10
EventSelectorByteStream::m_firstEventNo
Gaudi::CheckedProperty< int > m_firstEventNo
Definition: EventSelectorByteStream.h:213
DataHeader.h
This file contains the class definition for the DataHeader and DataHeaderElement classes.
imax
int imax(int i, int j)
Definition: TileLaserTimingTool.cxx:33
python.egammaTruthD3PDObject.blockname
blockname
Definition: egammaTruthD3PDObject.py:64
EventSelectorByteStream::m_isSecondary
Gaudi::Property< bool > m_isSecondary
IsSecondary, know if this is an instance of secondary event selector.
Definition: EventSelectorByteStream.h:183
checkRpcDigits.allGood
bool allGood
Loop over the SDOs & Digits.
Definition: checkRpcDigits.py:171
ByteStreamInputSvc::positionInBlock
virtual long positionInBlock()
Definition: ByteStreamInputSvc.h:64
ByteStreamExceptions::badFragmentData
Definition: ByteStreamExceptions.h:34
EventSelectorByteStream::stop
virtual StatusCode stop() override
Definition: EventSelectorByteStream.cxx:240
EventSelectorByteStream::m_incidentSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
Definition: EventSelectorByteStream.h:194
EventSelectorByteStream::m_eventSourceName
Gaudi::Property< std::string > m_eventSourceName
Definition: EventSelectorByteStream.h:185
EventSelectorByteStream::m_inputCollectionsProp
Gaudi::Property< std::vector< std::string > > m_inputCollectionsProp
Definition: EventSelectorByteStream.h:192
EventSelectorByteStream::queryInterface
virtual StatusCode queryInterface(const InterfaceID &riid, void **ppvInterface) override
Definition: EventSelectorByteStream.cxx:1063
StoreGateSvc::proxy
virtual SG::DataProxy * proxy(const void *const pTransient) const override final
get proxy for a given data object address in memory
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
python.PerfMonSerializer.decode
def decode(s)
Definition: PerfMonSerializer.py:388
ByteStreamInputSvc::currentEvent
virtual const RawEvent * currentEvent() const =0
virtual method for accessing the current event
pool::READ
@ READ
Definition: Database/APR/StorageSvc/StorageSvc/pool.h:68
EventSelectorByteStream::m_beginFileFired
bool m_beginFileFired
Definition: EventSelectorByteStream.h:200
AtlCoolConsole.tool
tool
Definition: AtlCoolConsole.py:453
ByteStreamInputSvc::getBlockIterator
virtual std::pair< long, std::string > getBlockIterator(const std::string &)
Definition: ByteStreamInputSvc.h:60
EventSelectorByteStream::recordAttributeListImpl
StatusCode recordAttributeListImpl(lock_t &lock) const
Definition: EventSelectorByteStream.cxx:784
EventSelectorByteStream::previous
virtual StatusCode previous(Context &it) const override
Definition: EventSelectorByteStream.cxx:622
python.AthDsoLogger.fname
string fname
Definition: AthDsoLogger.py:67
EventSelectorByteStream::seek
virtual StatusCode seek(Context &, int evtnum) const override
Seek to a given event number.
Definition: EventSelectorByteStream.cxx:723
ByteStreamExceptions::fileAccessError
Definition: ByteStreamExceptions.h:16
EventSelectorByteStream::finalize
virtual StatusCode finalize() override
Definition: EventSelectorByteStream.cxx:255
EventSelectorByteStream::share
virtual StatusCode share(int evtnum) override
Request to share a given event number.
Definition: EventSelectorByteStream.cxx:980
ByteStreamExceptions::badFragment
Definition: ByteStreamExceptions.h:28
EventSelectorByteStream::start
virtual StatusCode start() override
Definition: EventSelectorByteStream.cxx:228
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
SG::SourceID
std::string SourceID
Definition: AthenaKernel/AthenaKernel/SourceID.h:23
re
const boost::regex re(r_e)
EventSelectorByteStream::makeServer
virtual StatusCode makeServer(int num) override
Make this a server.
Definition: EventSelectorByteStream.cxx:961
EventSelectorByteStream::releaseContext
virtual StatusCode releaseContext(Context *&it) const override
Definition: EventSelectorByteStream.cxx:1058
if
if(febId1==febId2)
Definition: LArRodBlockPhysicsV0.cxx:569
EventSelectorByteStream::disconnectIfFinished
virtual bool disconnectIfFinished(const SG::SourceID &fid) const override
Definition: EventSelectorByteStream.cxx:1125
EventContextByteStream
This class provides the Context for EventSelectorByteStream.
Definition: EventContextByteStream.h:21
declareProperty
#define declareProperty(n, p, h)
Definition: BaseFakeBkgTool.cxx:15
EventSelectorByteStream::recordAttributeList
virtual StatusCode recordAttributeList() const override
Record AttributeList in StoreGate.
Definition: EventSelectorByteStream.cxx:779
EventSelectorByteStream::m_eventsPerLB
Gaudi::CheckedProperty< int > m_eventsPerLB
Definition: EventSelectorByteStream.h:216
EventSelectorByteStream::m_skipEventSequenceProp
Gaudi::Property< std::vector< long > > m_skipEventSequenceProp
Definition: EventSelectorByteStream.h:197
ByteStreamInputSvc::ready
virtual bool ready()
Definition: ByteStreamInputSvc.h:62
merge.status
status
Definition: merge.py:17
ATLAS_THREAD_SAFE
#define ATLAS_THREAD_SAFE
Definition: checker_macros.h:211
EventContextByteStream.h
This file contains the class definition for the EventContextByteStream class.
checker_macros.h
Define macros for attributes used to control the static checker.
SG::DataProxy
Definition: DataProxy.h:44
EventSelectorByteStream::lock_t
std::lock_guard< mutex_t > lock_t
Definition: EventSelectorByteStream.h:130
ByteStreamInputSvc::currentEventStatus
virtual unsigned int currentEventStatus() const
virtual method for accessing the current event status
Definition: ByteStreamInputSvc.h:55
EventSelectorByteStream::previousImpl
StatusCode previousImpl(Context &it, lock_t &lock) const
Definition: EventSelectorByteStream.cxx:627
IAthenaIPCTool.h
IEvtSelectorSeek
Abstract interface for seeking for an event selector.
Definition: IEvtSelectorSeek.h:28
EventSelectorByteStream::initialize
virtual StatusCode initialize() override
Implementation of Service base class methods.
Definition: EventSelectorByteStream.cxx:85
ByteStreamInputSvc::generateDataHeader
virtual StatusCode generateDataHeader()
Definition: ByteStreamInputSvc.h:63
EventSelectorByteStream::next
virtual StatusCode next(Context &it) const override
Definition: EventSelectorByteStream.cxx:349
EventSelectorByteStream::resetCriteria
virtual StatusCode resetCriteria(const std::string &criteria, Context &context) const override
Set a selection criteria.
Definition: EventSelectorByteStream.cxx:718
ServiceHandle< IIoComponentMgr >
EventSelectorByteStream::io_reinit
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
Definition: EventSelectorByteStream.cxx:1083