ATLAS Offline Software
Loading...
Searching...
No Matches
PileUpStream.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7#include <stdexcept>
8#include <string>
9
10#include "GaudiKernel/Service.h"
11#include "GaudiKernel/ISvcLocator.h"
12#include "GaudiKernel/ISvcManager.h"
16
17#include "SGTools/DataProxy.h"
18
20
21
22class IOpaqueAddress;
23
26 AthMessaging ("PileUpStream"),
27 m_name("INVALID"),
28 m_mergeSvc("PileUpMergeSvc", m_name)
29{
30}
31
34{
35 *this = std::move(rhs);
36}
37
40{
41 if (this != &rhs) {
42 m_name=rhs.m_name;
43 p_svcLoc=rhs.p_svcLoc;
44 m_sel=rhs.m_sel;
45 m_SG=rhs.m_SG;
46 p_iter=rhs.p_iter;
47 m_mergeSvc = rhs.m_mergeSvc;
48 m_ownEvtIterator=rhs.m_ownEvtIterator;
49 m_neverLoaded=rhs.m_neverLoaded;
50 m_ownStore=rhs.m_ownStore;
51 m_used=rhs.m_used;
52 m_hasRing=rhs.m_hasRing;
53 m_iOriginalRing=rhs.m_iOriginalRing;
54 //transferred ownership
55 rhs.m_ownEvtIterator=false;
56 rhs.m_ownStore=false;
57 }
58 return *this;
59}
60
62 ISvcLocator* svcLoc,
63 IEvtSelector* sel):
65 m_name(name),
66 p_svcLoc(svcLoc),
67 m_sel(sel),
68 m_mergeSvc("PileUpMergeSvc", name)
69{
70 if( !( m_sel->createContext(p_iter).isSuccess() && m_mergeSvc.isValid() ) ) {
71 const std::string errMsg("PileUpStream:: can not create stream");
72 ATH_MSG_ERROR ( errMsg );
73 throw std::runtime_error(errMsg);
74 } else m_ownEvtIterator=true;
75}
76
77PileUpStream::PileUpStream(const std::string& name,
78 ISvcLocator* svcLoc,
79 const std::string& selecName):
81 m_name(name),
82 p_svcLoc(svcLoc),
83 m_mergeSvc("PileUpMergeSvc", name)
84{
85 m_sel = serviceLocator()->service<IEvtSelector>(selecName);
86 if ( !(m_sel.isValid() && m_mergeSvc.isValid() &&
87 m_sel->createContext(p_iter).isSuccess()) ) {
88 const std::string errMsg("PileUpStream: can not create stream");
89 ATH_MSG_ERROR ( errMsg );
90 throw std::runtime_error(errMsg);
91 } else m_ownEvtIterator=true;
92}
93
97
99{
100 bool rc(true);
101 std::string storeName(name() + "_SG");
102
103 //start by looking for the store directly: in overlay jobs it may already be there
104 m_SG = serviceLocator()->service<StoreGateSvc>(storeName, /*createIf*/false);
105 if (m_SG) {
106 m_ownStore = false;
107 } else {
108 //not there, create one cloning the master store
109 Service *child;
110 //if the parent store is not there barf
111 //remember the clone function also initializes the service if needed
112 SmartIF<StoreGateSvc> pIS(serviceLocator()->service("StoreGateSvc"));
113 rc = (pIS.isValid() &&
114 CloneService::clone(pIS, storeName, child).isSuccess() &&
115 (m_SG = SmartIF<StoreGateSvc>(child)).isValid());
116 if ( rc ) {
117 m_ownStore = true;
118 // further initialization of the cloned service
119 rc = (m_SG->sysInitialize()).isSuccess();
120 m_SG->setStoreID(StoreID::PILEUP_STORE); //needed by ProxyProviderSvc
121 } //clones
122 }
123 if (rc) {
124 //if the selector is an address provider like the AthenaPool one,
125 //create a dedicated ProxyProviderSvc and associate it to the store
126 SmartIF<IAddressProvider> pIAP(m_sel);
127 if (pIAP.isValid()) {
128 const std::string PPSName(name() + "_PPS");
129 SmartIF<IProxyProviderSvc> pPPSvc(serviceLocator()->service(PPSName));
130 SmartIF<ISvcManager> pISM(serviceLocator());
131 if ( pISM.isValid() &&
132 pISM->declareSvcType(PPSName, "ProxyProviderSvc").isSuccess() &&
133 pPPSvc.isValid() ) {
134
135 pPPSvc->addProvider(pIAP);
136 SmartIF<IAddressProvider> pAthPoolAddProv(serviceLocator()->service("AthenaPoolAddressProviderSvc"));
137 if (pAthPoolAddProv.isValid()) {
138 pPPSvc->addProvider(pAthPoolAddProv);
139 } else {
140 ATH_MSG_WARNING ( "could not add AthenaPoolAddressProviderSvc as AddresssProvider for "<< PPSName );
141 }
142 SmartIF<IAddressProvider> pAddrRemap(serviceLocator()->service("AddressRemappingSvc"));
143 if (pAddrRemap.isValid()) {
144 pPPSvc->addProvider(pAddrRemap);
145 } else {
146 ATH_MSG_WARNING ( "could not add AddressRemappingSvc as AddresssProvider for "<< PPSName );
147 }
148 m_SG->setProxyProviderSvc(pPPSvc);
149 }
150 } //valid address provider
151 }
152 return rc;
153}
154
159
161{
162 this->setActiveStore();
163 // Clear the store, move to next event
164 return (this->store().clearStore().isSuccess() &&
165 this->selector().next(iterator()).isSuccess() ) ?
166 StatusCode::SUCCESS :
167 StatusCode::FAILURE;
168}
169
171{
172 // Clear the store, move to next event and load the store
173 return (this->nextRecordPre_Passive().isSuccess() &&
174 this->loadStore()) ?
175 StatusCode::SUCCESS :
176 StatusCode::FAILURE;
177}
178
180{
181 m_neverLoaded=false;
182
183 IOpaqueAddress* paddr(0);
184 bool rc = (this->selector().createAddress(iterator(), paddr)).isSuccess();
185 if ( 0 != paddr) rc &= this->store().recordAddress(paddr).isSuccess();
186
187 // load store proxies
188 rc &= this->store().loadEventProxies().isSuccess();
189 return rc;
190}
191
192//return next event, load store with next event
194{
195 if (m_neverLoaded) readRecord=true;
196 else if (readRecord) {
197 //do not reset these the first time we call nextEventPre
198 this->resetUsed();
199 m_hasRing=false;
200 }
201 // if (isNotEmpty()) {
202 if (readRecord && this->nextRecordPre().isFailure()) {
203 ATH_MSG_INFO ( "nextEventPre(): end of the loop. No more events in the selection" );
204 return nullptr;
205 }
206
207 const xAOD::EventInfo* xAODEventInfo = m_mergeSvc->getPileUpEvent( m_SG, "" );
208 if (readRecord and xAODEventInfo) {
209 ATH_MSG_DEBUG ( "nextEventPre(): read new event "
210 << xAODEventInfo->eventNumber()
211 << " run " << xAODEventInfo->runNumber()
212 << " into store " << this->store().name() );
213 }
214
215 return xAODEventInfo;
216}
217
219 if (m_neverLoaded) readRecord=true;
220 else if (readRecord) {
221 //do not reset these the first time we call nextEventPre_Passive
222 this->resetUsed();
223 m_hasRing=false;
224 }
225 if (readRecord && this->nextRecordPre_Passive().isFailure()) {
226 ATH_MSG_INFO ( "nextEventPre_Passive(): end of the loop. No more events in the selection" );
227 return false;
228 }
229 return true;
230}
231
233 StatusCode sc(StatusCode::SUCCESS);
234 if (m_ownEvtIterator) delete p_iter;
235 //we own and manage our cloned SG instance
236 if (m_ownStore && Gaudi::StateMachine::INITIALIZED == store().FSMState()) {
237 sc = this->store().sysFinalize();
238 }
239 this->store().release();
240 return sc;
241}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
bool isValid(const T &p)
Av: we implement here an ATLAS-sepcific convention: all particles which are 99xxxxx are fine.
Definition AtlasPID.h:878
static Double_t sc
static Double_t rc
a triple selector/context/store defines a stream
AthMessaging(IMessageSvc *msgSvc, const std::string &name)
Constructor.
a triple selector/context/store defines a stream
StoreGateSvc & store()
PileUpStream & operator=(const PileUpStream &rhs)=delete
bool m_ownEvtIterator
do we own p_iter?
bool m_ownStore
is m_SG a store we cloned from the master one?
bool setupStore()
setup input and overlay selectors and iters
const std::string & name()
bool m_neverLoaded
has an event been loaded into this stream?
ISvcLocator * serviceLocator()
const xAOD::EventInfo * nextEventPre(bool readRecord=true)
return next Event, load store with next Event
StatusCode nextRecordPre_Passive()
increment event iterator
bool m_used
has this stream already been used? (for the current event)
virtual ~PileUpStream()
bool loadStore()
clear store and load new proxies
SmartIF< IEvtSelector > m_sel
Selector.
PileUpStream()
Structors.
StatusCode nextRecordPre()
increment event iterator before loading store
std::string m_name
Stream name.
StatusCode finalize()
finalize and release store. To be called on ... finalize()
unsigned int m_iOriginalRing
original ring in which event was used
EvtIterator * p_iter
Input Iterators.
EvtIterator & iterator()
void resetUsed()
SmartIF< StoreGateSvc > m_SG
StoreGateSvc;.
void setActiveStore()
set ActiveStore
bool nextEventPre_Passive(bool readRecord)
like nextEventPre, but doesn't actually load anything
IEvtSelector & selector()
ServiceHandle< PileUpMergeSvc > m_mergeSvc
ISvcLocator * p_svcLoc
ServiceLocator.
The Athena Transient Store API.
StatusCode recordAddress(const std::string &skey, CxxUtils::RefCountedPtr< IOpaqueAddress > pAddress, bool clearAddressFlag=true)
Create a proxy object using an IOpaqueAddress and a transient key.
StatusCode loadEventProxies()
load proxies at begin event
void makeCurrent()
The current store is becoming the active store.
@ PILEUP_STORE
Definition StoreID.h:31
uint32_t runNumber() const
The current event's run number.
uint64_t eventNumber() const
The current event's event number.
StatusCode clone(const IService *parent, const std::string &childName, Service *&child)
given a reference to a parent svc sets a reference to a cloned child
EventInfo_v1 EventInfo
Definition of the latest event info version.