ATLAS Offline Software
PileUpStream.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 
7 #include <cassert>
8 #include <stdexcept>
9 #include <string>
10 
11 #include "GaudiKernel/Service.h"
12 #include "GaudiKernel/ISvcLocator.h"
13 #include "GaudiKernel/ISvcManager.h"
16 #include "StoreGate/StoreGateSvc.h"
17 
18 #include "SGTools/DataProxy.h"
19 
20 
23 
24 class IOpaqueAddress;
25 
28  AthMessaging ("PileUpStream"),
29  m_name("INVALID"), p_svcLoc(0), p_sel(0), p_SG(0), p_iter(0),
30  p_mergeSvc(nullptr), m_ownEvtIterator(false),
31  m_neverLoaded(true), m_ownStore(false),
32  m_used(false), m_hasRing(false), m_iOriginalRing(0)
33 {
34 }
35 
37  AthMessaging (rhs.m_name),
38  m_name(rhs.m_name), p_svcLoc(rhs.p_svcLoc), p_sel(rhs.p_sel),
39  p_SG(rhs.p_SG), p_iter(rhs.p_iter), p_mergeSvc(rhs.p_mergeSvc), m_ownEvtIterator(rhs.m_ownEvtIterator),
40  m_neverLoaded(rhs.m_neverLoaded), m_ownStore(rhs.m_ownStore),
41  m_used(rhs.m_used), m_hasRing(rhs.m_hasRing), m_iOriginalRing(rhs.m_iOriginalRing)
42 {
43  //transferred ownership
44  rhs.m_ownEvtIterator=false;
45  rhs.m_ownStore=false;
46 }
47 
50  if (this != &rhs) {
51  m_name=rhs.m_name;
52  p_svcLoc=rhs.p_svcLoc;
53  p_sel=rhs.p_sel;
54  p_SG=rhs.p_SG;
55  p_iter=rhs.p_iter;
56  p_mergeSvc = rhs.p_mergeSvc;
57  m_ownEvtIterator=rhs.m_ownEvtIterator;
58  rhs.m_ownEvtIterator=false;
59  m_neverLoaded=rhs.m_neverLoaded;
60  m_ownStore=rhs.m_ownStore;
61  rhs.m_ownStore=false;
62  m_used=rhs.m_used;
63  m_hasRing=rhs.m_hasRing;
64  m_iOriginalRing=rhs.m_iOriginalRing;
65  }
66  return *this;
67 }
68 
69 PileUpStream::PileUpStream(const std::string& name,
70  ISvcLocator* svcLoc,
71  IEvtSelector* sel):
73  m_name(name), p_svcLoc(svcLoc), p_sel(sel), p_SG(0), p_iter(0),
74  m_ownEvtIterator(false),
75  m_neverLoaded(true), m_ownStore(false),
76  m_used(false), m_hasRing(false), m_iOriginalRing(0)
77 {
78  assert(p_sel);
79  assert(p_svcLoc);
80  if( !( p_sel->createContext(p_iter).isSuccess() &&
81  serviceLocator()->service("PileUpMergeSvc", p_mergeSvc, true).isSuccess() ) ) {
82 
83  const std::string errMsg("PileUpStream:: can not create stream");
84  ATH_MSG_ERROR ( errMsg );
85  throw std::runtime_error(errMsg);
86  } else m_ownEvtIterator=true;
87 }
88 
89 PileUpStream::PileUpStream(const std::string& name,
90  ISvcLocator* svcLoc,
91  const std::string& selecName):
93  m_name(name), p_svcLoc(svcLoc), p_sel(0), p_SG(0), p_iter(0),
94  m_ownEvtIterator(false),
95  m_neverLoaded(true), m_ownStore(false),
96  m_used(false), m_hasRing(false), m_iOriginalRing(0)
97 
98 {
99  assert(p_svcLoc);
100  if (!(serviceLocator()->service(selecName, p_sel).isSuccess() &&
101  serviceLocator()->service("PileUpMergeSvc", p_mergeSvc, true).isSuccess() &&
102  p_sel->createContext(p_iter).isSuccess() )) {
103  const std::string errMsg("PileUpStream: can not create stream");
104  ATH_MSG_ERROR ( errMsg );
105  throw std::runtime_error(errMsg);
106  } else m_ownEvtIterator=true;
107 }
108 
110 {
111 }
112 
114 {
115  assert( p_iter );
116  assert( p_sel );
117  bool rc(true);
118  std::string storeName(name() + "_SG");
119 
120  //start by looking for the store directly: in overlay jobs it may already be there
121  const bool DONOTCREATE(false);
122  rc = (serviceLocator()->service(storeName, p_SG, DONOTCREATE)).isSuccess();
123  if (rc) {
124  m_ownStore = false;
125  } else {
126  //not there, create one cloning the master store
127  Service *child;
128  //if the parent store is not there barf
129  //remember the clone function also initializes the service if needed
130  IService* pIS(0);
131  rc = ((serviceLocator()->getService("StoreGateSvc", pIS)).isSuccess() &&
132  CloneService::clone(pIS, storeName, child).isSuccess() &&
133  0 != (p_SG = dynamic_cast<StoreGateSvc*>(child)));
134  if ( rc ) {
135  m_ownStore = true;
136  // further initialization of the cloned service
137  rc = (p_SG->sysInitialize()).isSuccess();
138  p_SG->setStoreID(StoreID::PILEUP_STORE); //needed by ProxyProviderSvc
139  } //clones
140  }
141  if (rc) {
142  //if the selector is an address provider like the AthenaPool one,
143  //create a dedicated ProxyProviderSvc and associate it to the store
144  IAddressProvider* pIAP(dynamic_cast<IAddressProvider*>(p_sel));
145  if (0 != pIAP) {
146  IProxyProviderSvc* pPPSvc(0);
147  std::string PPSName(name() + "_PPS");
148  ISvcManager* pISM(dynamic_cast<ISvcManager*>(serviceLocator()));
149  if ( 0 != pISM &&
150  (pISM->declareSvcType(PPSName, "ProxyProviderSvc")).isSuccess() &&
151  //check the service is not there then create it
152  (serviceLocator()->service(PPSName,
153  pPPSvc,
154  true)).isSuccess()) {
155  pPPSvc->addProvider(pIAP);
156  IService* pSAthPoolAddProv(0);
157  IAddressProvider* pAthPoolAddProv(0);
158  if (serviceLocator()->service("AthenaPoolAddressProviderSvc", pSAthPoolAddProv).isSuccess() &&
159  0 != (pAthPoolAddProv = dynamic_cast<IAddressProvider*>(pSAthPoolAddProv))) {
160  pPPSvc->addProvider(pAthPoolAddProv);
161  } else {
162  ATH_MSG_WARNING ( "could not add AthenaPoolAddressProviderSvc as AddresssProvider for "<< PPSName );
163  }
164  IService* pSAddrRemap(0);
165  IAddressProvider* pAddrRemap(0);
166  if (serviceLocator()->service("AddressRemappingSvc", pSAddrRemap).isSuccess() &&
167  0 != (pAddrRemap = dynamic_cast<IAddressProvider*>(pSAddrRemap))) {
168  pPPSvc->addProvider(pAddrRemap);
169  } else {
170  ATH_MSG_WARNING ( "could not add AddressRemappingSvc as AddresssProvider for "<< PPSName );
171  }
172  p_SG->setProxyProviderSvc(pPPSvc);
173  }
174  } //valid address provider
175  }
176  return rc;
177 }
178 
180 {
181  store().makeCurrent();
182 }
183 
185 {
186  this->setActiveStore();
187  // Clear the store, move to next event
188  return (this->store().clearStore().isSuccess() &&
189  this->selector().next(iterator()).isSuccess() ) ?
190  StatusCode::SUCCESS :
191  StatusCode::FAILURE;
192 }
193 
195 {
196  // Clear the store, move to next event and load the store
197  return (this->nextRecordPre_Passive().isSuccess() &&
198  this->loadStore()) ?
199  StatusCode::SUCCESS :
200  StatusCode::FAILURE;
201 }
202 
203 // bool PileUpStream::isNotEmpty() const
204 // {
205 // // std::cout << "isNotEmpty " << (0 != iterator() && *(iterator()) != *(selector().end()) ) << std::endl;
206 // return (0 != iterator() && *(iterator()) != *(selector()->end()) );
207 // }
208 
210 {
211  m_neverLoaded=false;
212 
213  IOpaqueAddress* paddr(0);
214  bool rc = (this->selector().createAddress(iterator(), paddr)).isSuccess();
215  if ( 0 != paddr) rc &= this->store().recordAddress(paddr).isSuccess();
216 
217  // load store proxies
218  rc &= this->store().loadEventProxies().isSuccess();
219  return rc;
220 }
221 
222 //return next event, load store with next event
224 {
225  if (m_neverLoaded) readRecord=true;
226  else if (readRecord) {
227  //do not reset these the first time we call nextEventPre
228  this->resetUsed();
229  m_hasRing=false;
230  }
231  // if (isNotEmpty()) {
232  if (readRecord && this->nextRecordPre().isFailure()) {
233  ATH_MSG_INFO ( "nextEventPre(): end of the loop. No more events in the selection" );
234  return nullptr;
235  }
236 
237  const xAOD::EventInfo* xAODEventInfo = p_mergeSvc->getPileUpEvent( p_SG, "" );
238  if (readRecord and xAODEventInfo) {
239  ATH_MSG_DEBUG ( "nextEventPre(): read new event "
240  << xAODEventInfo->eventNumber()
241  << " run " << xAODEventInfo->runNumber()
242  << " into store " << this->store().name() );
243  }
244 
245  return xAODEventInfo;
246 }
247 
248 bool PileUpStream::nextEventPre_Passive(bool readRecord) {
249  if (m_neverLoaded) readRecord=true;
250  else if (readRecord) {
251  //do not reset these the first time we call nextEventPre_Passive
252  this->resetUsed();
253  m_hasRing=false;
254  }
255  if (readRecord && this->nextRecordPre_Passive().isFailure()) {
256  ATH_MSG_INFO ( "nextEventPre_Passive(): end of the loop. No more events in the selection" );
257  return false;
258  }
259  return true;
260 }
261 
263  StatusCode sc(StatusCode::SUCCESS);
264  if (m_ownEvtIterator) delete p_iter;
265  //we own and manage our cloned SG instance
266 #ifdef GAUDIKERNEL_STATEMACHINE_H_
267  if (m_ownStore && Gaudi::StateMachine::INITIALIZED == store().FSMState()) {
268  sc = this->store().sysFinalize();
269  }
270 #else
271  if (m_ownStore && this->store().state() == IService::INITIALIZED) sc = this->store().sysFinalize();
272 #endif
273  this->store().release();
274  return sc;
275 }
PileUpStream::setActiveStore
void setActiveStore()
set ActiveStore
Definition: PileUpStream.cxx:179
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
AthMsgStreamMacros.h
StoreGateSvc::setProxyProviderSvc
void setProxyProviderSvc(IProxyProviderSvc *pPPSvc)
associate ProxyProviderSvc to this store
PileUpMergeSvc::getPileUpEvent
const xAOD::EventInfo * getPileUpEvent(StoreGateSvc *sg, const std::string &einame) const
get EventInfo from SG, by default using p_overStore
Definition: PileUpMergeSvc.cxx:122
StoreGateSvc::makeCurrent
void makeCurrent()
The current store is becoming the active store.
Definition: StoreGateSvc.cxx:507
PileUpStream::nextRecordPre
StatusCode nextRecordPre()
increment event iterator before loading store
Definition: PileUpStream.cxx:194
PileUpStream::p_mergeSvc
PileUpMergeSvc * p_mergeSvc
Definition: PileUpStream.h:125
PileUpStream::m_name
std::string m_name
Stream name.
Definition: PileUpStream.h:115
PileUpStream::p_sel
IEvtSelector * p_sel
Selector.
Definition: PileUpStream.h:119
PileUpStream::iterator
EvtIterator & iterator()
Definition: PileUpStream.h:60
PileUpStream::PileUpStream
PileUpStream()
Structors.
Definition: PileUpStream.cxx:27
PileUpStream::m_hasRing
bool m_hasRing
has this stream already been used? (for the current event)
Definition: PileUpStream.h:134
python.FakeAthena.Service
def Service(name)
Definition: FakeAthena.py:38
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
PileUpStream::selector
IEvtSelector & selector()
Definition: PileUpStream.h:59
StoreGateSvc
The Athena Transient Store API.
Definition: StoreGateSvc.h:128
IProxyProviderSvc::addProvider
virtual void addProvider(IAddressProvider *aProvider)=0
IAddressProvider manager functionality add a provider to the set of known ones.
PileUpStream::nextEventPre
const xAOD::EventInfo * nextEventPre(bool readRecord=true)
return next Event, load store with next Event
Definition: PileUpStream.cxx:223
PileUpStream::finalize
StatusCode finalize()
finalize and release store. To be called on ... finalize()
Definition: PileUpStream.cxx:262
PileUpStream::p_svcLoc
ISvcLocator * p_svcLoc
ServiceLocator.
Definition: PileUpStream.h:117
PileUpStream::m_used
bool m_used
is p_SG a store we cloned from the master one?
Definition: PileUpStream.h:133
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
PileUpStream::m_ownStore
bool m_ownStore
has an event been loaded into this stream?
Definition: PileUpStream.h:131
StoreID::PILEUP_STORE
@ PILEUP_STORE
Definition: StoreID.h:31
fillPileUpNoiseLumi.next
next
Definition: fillPileUpNoiseLumi.py:52
PileUpStream::operator=
PileUpStream & operator=(const PileUpStream &rhs)=delete
PileUpStream::setupStore
bool setupStore()
setup input and overlay selectors and iters
Definition: PileUpStream.cxx:113
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
sel
sel
Definition: SUSYToolsTester.cxx:92
IAddressProvider
interface for IOA providers
Definition: IAddressProvider.h:28
StoreGateSvc::loadEventProxies
StatusCode loadEventProxies()
load proxies at begin event
Definition: StoreGateSvc.cxx:453
PileUpStream::m_iOriginalRing
unsigned int m_iOriginalRing
Definition: PileUpStream.h:135
AthMessaging
Class to provide easy MsgStream access and capabilities.
Definition: AthMessaging.h:55
PileUpStream
a triple selector/context/store defines a stream
Definition: PileUpStream.h:33
PileUpStream::resetUsed
void resetUsed()
Definition: PileUpStream.h:84
PileUpStream::name
const std::string & name()
Definition: PileUpStream.h:58
PileUpStream::nextEventPre_Passive
bool nextEventPre_Passive(bool readRecord)
like nextEventPre, but doesn't actually load anything
Definition: PileUpStream.cxx:248
PileUpStream::nextRecordPre_Passive
StatusCode nextRecordPre_Passive()
increment event iterator
Definition: PileUpStream.cxx:184
PileUpStream::loadStore
bool loadStore()
clear store and load new proxies
Definition: PileUpStream.cxx:209
PileUpStream::m_neverLoaded
bool m_neverLoaded
do we own p_iter?
Definition: PileUpStream.h:130
PileUpStream::~PileUpStream
virtual ~PileUpStream()
Definition: PileUpStream.cxx:109
PileUpStream::store
StoreGateSvc & store()
Definition: PileUpStream.h:61
PileUpStream::serviceLocator
ISvcLocator * serviceLocator()
Definition: PileUpStream.h:97
PileUpStream::m_ownEvtIterator
bool m_ownEvtIterator
Definition: PileUpStream.h:128
CloneService.h
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
IProxyProviderSvc
Definition: IProxyProviderSvc.h:34
StoreGateSvc::recordAddress
StatusCode recordAddress(const std::string &skey, IOpaqueAddress *pAddress, bool clearAddressFlag=true)
Create a proxy object using an IOpaqueAddress and a transient key.
xAOD::EventInfo_v1
Class describing the basic event information.
Definition: EventInfo_v1.h:43
PileUpStream::p_SG
StoreGateSvc * p_SG
StoreGateSvc;.
Definition: PileUpStream.h:121
CloneService::clone
StatusCode clone(const IService *parent, const std::string &childName, Service *&child)
given a reference to a parent svc sets a reference to a cloned child
Definition: CloneService.cxx:19
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
PileUpMergeSvc.h
the preferred mechanism to access information from the different event stores in a pileup job.
StoreGateSvc::setStoreID
void setStoreID(StoreID::type id)
set store ID. request forwarded to DataStore:
Definition: StoreGateSvc.cxx:370
IAddressProvider.h
PileUpStream.h
a triple selector/context/store defines a stream
PileUpStream::p_iter
EvtIterator * p_iter
Input Iterators.
Definition: PileUpStream.h:123
StoreGateSvc.h
DataProxy.h