ATLAS Offline Software
PileUpStream.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 
7 #include <cassert>
8 #include <stdexcept>
9 #include <string>
10 
11 #include "GaudiKernel/Service.h"
12 #include "GaudiKernel/ISvcLocator.h"
13 #include "GaudiKernel/ISvcManager.h"
16 #include "StoreGate/StoreGateSvc.h"
17 
18 #include "SGTools/DataProxy.h"
19 
20 
23 
24 class IOpaqueAddress;
25 
28  AthMessaging ("PileUpStream"),
29  m_name("INVALID"), p_svcLoc(0), m_sel(0), m_SG(0), p_iter(0),
30  m_ownEvtIterator(false),
31  m_neverLoaded(true), m_ownStore(false),
32  m_used(false), m_hasRing(false), m_iOriginalRing(0)
33 {
34 }
35 
37  AthMessaging (rhs.m_name),
38  m_name(rhs.m_name), p_svcLoc(rhs.p_svcLoc), m_sel(rhs.m_sel),
39  m_SG(rhs.m_SG), p_iter(rhs.p_iter), m_mergeSvc(rhs.m_mergeSvc), m_ownEvtIterator(rhs.m_ownEvtIterator),
40  m_neverLoaded(rhs.m_neverLoaded), m_ownStore(rhs.m_ownStore),
41  m_used(rhs.m_used), m_hasRing(rhs.m_hasRing), m_iOriginalRing(rhs.m_iOriginalRing)
42 {
43  //transferred ownership
44  rhs.m_ownEvtIterator=false;
45  rhs.m_ownStore=false;
46 }
47 
50  if (this != &rhs) {
51  m_name=rhs.m_name;
52  p_svcLoc=rhs.p_svcLoc;
53  m_sel=rhs.m_sel;
54  m_SG=rhs.m_SG;
55  p_iter=rhs.p_iter;
56  m_mergeSvc = rhs.m_mergeSvc;
57  m_ownEvtIterator=rhs.m_ownEvtIterator;
58  rhs.m_ownEvtIterator=false;
59  m_neverLoaded=rhs.m_neverLoaded;
60  m_ownStore=rhs.m_ownStore;
61  rhs.m_ownStore=false;
62  m_used=rhs.m_used;
63  m_hasRing=rhs.m_hasRing;
64  m_iOriginalRing=rhs.m_iOriginalRing;
65  }
66  return *this;
67 }
68 
69 PileUpStream::PileUpStream(const std::string& name,
70  ISvcLocator* svcLoc,
71  IEvtSelector* sel):
73  m_name(name), p_svcLoc(svcLoc), m_sel(sel), m_SG(0), p_iter(0),
74  m_ownEvtIterator(false),
75  m_neverLoaded(true), m_ownStore(false),
76  m_used(false), m_hasRing(false), m_iOriginalRing(0)
77 {
78  assert(m_sel);
79  assert(p_svcLoc);
80  m_mergeSvc = serviceLocator()->service<PileUpMergeSvc>("PileUpMergeSvc");
81  if( !( m_sel->createContext(p_iter).isSuccess() && m_mergeSvc.isValid() ) ) {
82  const std::string errMsg("PileUpStream:: can not create stream");
83  ATH_MSG_ERROR ( errMsg );
84  throw std::runtime_error(errMsg);
85  } else m_ownEvtIterator=true;
86 }
87 
88 PileUpStream::PileUpStream(const std::string& name,
89  ISvcLocator* svcLoc,
90  const std::string& selecName):
92  m_name(name), p_svcLoc(svcLoc), m_sel(0), m_SG(0), p_iter(0),
93  m_ownEvtIterator(false),
94  m_neverLoaded(true), m_ownStore(false),
95  m_used(false), m_hasRing(false), m_iOriginalRing(0)
96 
97 {
98  assert(p_svcLoc);
99  m_sel = serviceLocator()->service<IEvtSelector>(selecName);
100  m_mergeSvc = serviceLocator()->service<PileUpMergeSvc>("PileUpMergeSvc");
101  if ( !(m_sel.isValid() && m_mergeSvc.isValid() &&
102  m_sel->createContext(p_iter).isSuccess()) ) {
103  const std::string errMsg("PileUpStream: can not create stream");
104  ATH_MSG_ERROR ( errMsg );
105  throw std::runtime_error(errMsg);
106  } else m_ownEvtIterator=true;
107 }
108 
110 {
111 }
112 
114 {
115  assert( p_iter );
116  assert( m_sel );
117  bool rc(true);
118  std::string storeName(name() + "_SG");
119 
120  //start by looking for the store directly: in overlay jobs it may already be there
121  m_SG = serviceLocator()->service<StoreGateSvc>(storeName, /*createIf*/false);
122  if (m_SG) {
123  m_ownStore = false;
124  } else {
125  //not there, create one cloning the master store
126  Service *child;
127  //if the parent store is not there barf
128  //remember the clone function also initializes the service if needed
129  SmartIF<StoreGateSvc> pIS(serviceLocator()->service("StoreGateSvc"));
130  rc = (pIS.isValid() &&
131  CloneService::clone(pIS, storeName, child).isSuccess() &&
132  (m_SG = SmartIF<StoreGateSvc>(child)).isValid());
133  if ( rc ) {
134  m_ownStore = true;
135  // further initialization of the cloned service
136  rc = (m_SG->sysInitialize()).isSuccess();
137  m_SG->setStoreID(StoreID::PILEUP_STORE); //needed by ProxyProviderSvc
138  } //clones
139  }
140  if (rc) {
141  //if the selector is an address provider like the AthenaPool one,
142  //create a dedicated ProxyProviderSvc and associate it to the store
143  SmartIF<IAddressProvider> pIAP(m_sel);
144  if (pIAP.isValid()) {
145  const std::string PPSName(name() + "_PPS");
146  SmartIF<IProxyProviderSvc> pPPSvc(serviceLocator()->service(PPSName));
147  SmartIF<ISvcManager> pISM(serviceLocator());
148  if ( pISM.isValid() &&
149  pISM->declareSvcType(PPSName, "ProxyProviderSvc").isSuccess() &&
150  pPPSvc.isValid() ) {
151 
152  pPPSvc->addProvider(pIAP);
153  SmartIF<IAddressProvider> pAthPoolAddProv(serviceLocator()->service("AthenaPoolAddressProviderSvc"));
154  if (pAthPoolAddProv.isValid()) {
155  pPPSvc->addProvider(pAthPoolAddProv);
156  } else {
157  ATH_MSG_WARNING ( "could not add AthenaPoolAddressProviderSvc as AddresssProvider for "<< PPSName );
158  }
159  SmartIF<IAddressProvider> pAddrRemap(serviceLocator()->service("AddressRemappingSvc"));
160  if (pAddrRemap.isValid()) {
161  pPPSvc->addProvider(pAddrRemap);
162  } else {
163  ATH_MSG_WARNING ( "could not add AddressRemappingSvc as AddresssProvider for "<< PPSName );
164  }
165  m_SG->setProxyProviderSvc(pPPSvc);
166  }
167  } //valid address provider
168  }
169  return rc;
170 }
171 
173 {
174  store().makeCurrent();
175 }
176 
178 {
179  this->setActiveStore();
180  // Clear the store, move to next event
181  return (this->store().clearStore().isSuccess() &&
182  this->selector().next(iterator()).isSuccess() ) ?
183  StatusCode::SUCCESS :
184  StatusCode::FAILURE;
185 }
186 
188 {
189  // Clear the store, move to next event and load the store
190  return (this->nextRecordPre_Passive().isSuccess() &&
191  this->loadStore()) ?
192  StatusCode::SUCCESS :
193  StatusCode::FAILURE;
194 }
195 
196 // bool PileUpStream::isNotEmpty() const
197 // {
198 // // std::cout << "isNotEmpty " << (0 != iterator() && *(iterator()) != *(selector().end()) ) << std::endl;
199 // return (0 != iterator() && *(iterator()) != *(selector()->end()) );
200 // }
201 
203 {
204  m_neverLoaded=false;
205 
206  IOpaqueAddress* paddr(0);
207  bool rc = (this->selector().createAddress(iterator(), paddr)).isSuccess();
208  if ( 0 != paddr) rc &= this->store().recordAddress(paddr).isSuccess();
209 
210  // load store proxies
211  rc &= this->store().loadEventProxies().isSuccess();
212  return rc;
213 }
214 
215 //return next event, load store with next event
217 {
218  if (m_neverLoaded) readRecord=true;
219  else if (readRecord) {
220  //do not reset these the first time we call nextEventPre
221  this->resetUsed();
222  m_hasRing=false;
223  }
224  // if (isNotEmpty()) {
225  if (readRecord && this->nextRecordPre().isFailure()) {
226  ATH_MSG_INFO ( "nextEventPre(): end of the loop. No more events in the selection" );
227  return nullptr;
228  }
229 
230  const xAOD::EventInfo* xAODEventInfo = m_mergeSvc->getPileUpEvent( m_SG, "" );
231  if (readRecord and xAODEventInfo) {
232  ATH_MSG_DEBUG ( "nextEventPre(): read new event "
233  << xAODEventInfo->eventNumber()
234  << " run " << xAODEventInfo->runNumber()
235  << " into store " << this->store().name() );
236  }
237 
238  return xAODEventInfo;
239 }
240 
241 bool PileUpStream::nextEventPre_Passive(bool readRecord) {
242  if (m_neverLoaded) readRecord=true;
243  else if (readRecord) {
244  //do not reset these the first time we call nextEventPre_Passive
245  this->resetUsed();
246  m_hasRing=false;
247  }
248  if (readRecord && this->nextRecordPre_Passive().isFailure()) {
249  ATH_MSG_INFO ( "nextEventPre_Passive(): end of the loop. No more events in the selection" );
250  return false;
251  }
252  return true;
253 }
254 
256  StatusCode sc(StatusCode::SUCCESS);
257  if (m_ownEvtIterator) delete p_iter;
258  //we own and manage our cloned SG instance
259 #ifdef GAUDIKERNEL_STATEMACHINE_H_
260  if (m_ownStore && Gaudi::StateMachine::INITIALIZED == store().FSMState()) {
261  sc = this->store().sysFinalize();
262  }
263 #else
264  if (m_ownStore && this->store().state() == IService::INITIALIZED) sc = this->store().sysFinalize();
265 #endif
266  this->store().release();
267  return sc;
268 }
PileUpStream::setActiveStore
void setActiveStore()
set ActiveStore
Definition: PileUpStream.cxx:172
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
AthMsgStreamMacros.h
StoreGateSvc::makeCurrent
void makeCurrent()
The current store is becoming the active store.
Definition: StoreGateSvc.cxx:496
PileUpStream::nextRecordPre
StatusCode nextRecordPre()
increment event iterator before loading store
Definition: PileUpStream.cxx:187
PileUpStream::m_name
std::string m_name
Stream name.
Definition: PileUpStream.h:114
PileUpStream::iterator
EvtIterator & iterator()
Definition: PileUpStream.h:59
isValid
bool isValid(const T &p)
Definition: AtlasPID.h:225
PileUpStream::PileUpStream
PileUpStream()
Structors.
Definition: PileUpStream.cxx:27
PileUpStream::m_hasRing
bool m_hasRing
has this stream already been used? (for the current event)
Definition: PileUpStream.h:133
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
PileUpMergeSvc
the preferred mechanism to access information from the different event stores in a pileup job.
Definition: PileUpMergeSvc.h:58
PileUpStream::selector
IEvtSelector & selector()
Definition: PileUpStream.h:58
StoreGateSvc
The Athena Transient Store API.
Definition: StoreGateSvc.h:125
PileUpStream::nextEventPre
const xAOD::EventInfo * nextEventPre(bool readRecord=true)
return next Event, load store with next Event
Definition: PileUpStream.cxx:216
PileUpStream::finalize
StatusCode finalize()
finalize and release store. To be called on ... finalize()
Definition: PileUpStream.cxx:255
PileUpStream::p_svcLoc
ISvcLocator * p_svcLoc
ServiceLocator.
Definition: PileUpStream.h:116
PileUpStream::m_used
bool m_used
is m_SG a store we cloned from the master one?
Definition: PileUpStream.h:132
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
PileUpStream::m_ownStore
bool m_ownStore
has an event been loaded into this stream?
Definition: PileUpStream.h:130
StoreID::PILEUP_STORE
@ PILEUP_STORE
Definition: StoreID.h:31
fillPileUpNoiseLumi.next
next
Definition: fillPileUpNoiseLumi.py:52
PileUpStream::operator=
PileUpStream & operator=(const PileUpStream &rhs)=delete
PileUpStream::setupStore
bool setupStore()
setup input and overlay selectors and iters
Definition: PileUpStream.cxx:113
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
sel
sel
Definition: SUSYToolsTester.cxx:97
StoreGateSvc::loadEventProxies
StatusCode loadEventProxies()
load proxies at begin event
Definition: StoreGateSvc.cxx:442
PileUpStream::m_iOriginalRing
unsigned int m_iOriginalRing
Definition: PileUpStream.h:134
AthMessaging
Class to provide easy MsgStream access and capabilities.
Definition: AthMessaging.h:55
PileUpStream
a triple selector/context/store defines a stream
Definition: PileUpStream.h:32
PileUpStream::resetUsed
void resetUsed()
Definition: PileUpStream.h:83
PileUpStream::name
const std::string & name()
Definition: PileUpStream.h:57
PileUpStream::nextEventPre_Passive
bool nextEventPre_Passive(bool readRecord)
like nextEventPre, but doesn't actually load anything
Definition: PileUpStream.cxx:241
PileUpStream::nextRecordPre_Passive
StatusCode nextRecordPre_Passive()
increment event iterator
Definition: PileUpStream.cxx:177
PileUpStream::loadStore
bool loadStore()
clear store and load new proxies
Definition: PileUpStream.cxx:202
PileUpStream::m_neverLoaded
bool m_neverLoaded
do we own p_iter?
Definition: PileUpStream.h:129
PileUpStream::~PileUpStream
virtual ~PileUpStream()
Definition: PileUpStream.cxx:109
PileUpStream::store
StoreGateSvc & store()
Definition: PileUpStream.h:60
PileUpStream::serviceLocator
ISvcLocator * serviceLocator()
Definition: PileUpStream.h:96
PileUpStream::m_ownEvtIterator
bool m_ownEvtIterator
Definition: PileUpStream.h:127
CloneService.h
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:221
StoreGateSvc::recordAddress
StatusCode recordAddress(const std::string &skey, IOpaqueAddress *pAddress, bool clearAddressFlag=true)
Create a proxy object using an IOpaqueAddress and a transient key.
xAOD::EventInfo_v1
Class describing the basic event information.
Definition: EventInfo_v1.h:43
CloneService::clone
StatusCode clone(const IService *parent, const std::string &childName, Service *&child)
given a reference to a parent svc sets a reference to a cloned child
Definition: CloneService.cxx:21
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
PileUpMergeSvc.h
the preferred mechanism to access information from the different event stores in a pileup job.
PileUpStream::m_mergeSvc
SmartIF< PileUpMergeSvc > m_mergeSvc
Definition: PileUpStream.h:124
IAddressProvider.h
PileUpStream.h
a triple selector/context/store defines a stream
PileUpStream::m_sel
SmartIF< IEvtSelector > m_sel
Selector.
Definition: PileUpStream.h:118
PileUpStream::p_iter
EvtIterator * p_iter
Input Iterators.
Definition: PileUpStream.h:122
PileUpStream::m_SG
SmartIF< StoreGateSvc > m_SG
StoreGateSvc;.
Definition: PileUpStream.h:120
StoreGateSvc.h
DataProxy.h