Loading [MathJax]/extensions/tex2jax.js
ATLAS Offline Software
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
PileUpStream.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
6 
7 #include <stdexcept>
8 #include <string>
9 
10 #include "GaudiKernel/Service.h"
11 #include "GaudiKernel/ISvcLocator.h"
12 #include "GaudiKernel/ISvcManager.h"
15 #include "StoreGate/StoreGateSvc.h"
16 
17 #include "SGTools/DataProxy.h"
18 
20 
21 
22 class IOpaqueAddress;
23 
26  AthMessaging ("PileUpStream"),
27  m_name("INVALID"),
28  m_mergeSvc("PileUpMergeSvc", m_name)
29 {
30 }
31 
33  PileUpStream()
34 {
35  *this = std::move(rhs);
36 }
37 
40 {
41  if (this != &rhs) {
42  m_name=rhs.m_name;
43  p_svcLoc=rhs.p_svcLoc;
44  m_sel=rhs.m_sel;
45  m_SG=rhs.m_SG;
46  p_iter=rhs.p_iter;
47  m_mergeSvc = rhs.m_mergeSvc;
48  m_ownEvtIterator=rhs.m_ownEvtIterator;
49  m_neverLoaded=rhs.m_neverLoaded;
50  m_ownStore=rhs.m_ownStore;
51  m_used=rhs.m_used;
52  m_hasRing=rhs.m_hasRing;
53  m_iOriginalRing=rhs.m_iOriginalRing;
54  //transferred ownership
55  rhs.m_ownEvtIterator=false;
56  rhs.m_ownStore=false;
57  }
58  return *this;
59 }
60 
61 PileUpStream::PileUpStream(const std::string& name,
62  ISvcLocator* svcLoc,
63  IEvtSelector* sel):
65  m_name(name),
66  p_svcLoc(svcLoc),
67  m_sel(sel),
68  m_mergeSvc("PileUpMergeSvc", name)
69 {
70  if( !( m_sel->createContext(p_iter).isSuccess() && m_mergeSvc.isValid() ) ) {
71  const std::string errMsg("PileUpStream:: can not create stream");
72  ATH_MSG_ERROR ( errMsg );
73  throw std::runtime_error(errMsg);
74  } else m_ownEvtIterator=true;
75 }
76 
77 PileUpStream::PileUpStream(const std::string& name,
78  ISvcLocator* svcLoc,
79  const std::string& selecName):
81  m_name(name),
82  p_svcLoc(svcLoc),
83  m_mergeSvc("PileUpMergeSvc", name)
84 {
85  m_sel = serviceLocator()->service<IEvtSelector>(selecName);
86  if ( !(m_sel.isValid() && m_mergeSvc.isValid() &&
87  m_sel->createContext(p_iter).isSuccess()) ) {
88  const std::string errMsg("PileUpStream: can not create stream");
89  ATH_MSG_ERROR ( errMsg );
90  throw std::runtime_error(errMsg);
91  } else m_ownEvtIterator=true;
92 }
93 
95 {
96 }
97 
99 {
100  bool rc(true);
101  std::string storeName(name() + "_SG");
102 
103  //start by looking for the store directly: in overlay jobs it may already be there
104  m_SG = serviceLocator()->service<StoreGateSvc>(storeName, /*createIf*/false);
105  if (m_SG) {
106  m_ownStore = false;
107  } else {
108  //not there, create one cloning the master store
109  Service *child;
110  //if the parent store is not there barf
111  //remember the clone function also initializes the service if needed
112  SmartIF<StoreGateSvc> pIS(serviceLocator()->service("StoreGateSvc"));
113  rc = (pIS.isValid() &&
114  CloneService::clone(pIS, storeName, child).isSuccess() &&
115  (m_SG = SmartIF<StoreGateSvc>(child)).isValid());
116  if ( rc ) {
117  m_ownStore = true;
118  // further initialization of the cloned service
119  rc = (m_SG->sysInitialize()).isSuccess();
120  m_SG->setStoreID(StoreID::PILEUP_STORE); //needed by ProxyProviderSvc
121  } //clones
122  }
123  if (rc) {
124  //if the selector is an address provider like the AthenaPool one,
125  //create a dedicated ProxyProviderSvc and associate it to the store
126  SmartIF<IAddressProvider> pIAP(m_sel);
127  if (pIAP.isValid()) {
128  const std::string PPSName(name() + "_PPS");
129  SmartIF<IProxyProviderSvc> pPPSvc(serviceLocator()->service(PPSName));
130  SmartIF<ISvcManager> pISM(serviceLocator());
131  if ( pISM.isValid() &&
132  pISM->declareSvcType(PPSName, "ProxyProviderSvc").isSuccess() &&
133  pPPSvc.isValid() ) {
134 
135  pPPSvc->addProvider(pIAP);
136  SmartIF<IAddressProvider> pAthPoolAddProv(serviceLocator()->service("AthenaPoolAddressProviderSvc"));
137  if (pAthPoolAddProv.isValid()) {
138  pPPSvc->addProvider(pAthPoolAddProv);
139  } else {
140  ATH_MSG_WARNING ( "could not add AthenaPoolAddressProviderSvc as AddresssProvider for "<< PPSName );
141  }
142  SmartIF<IAddressProvider> pAddrRemap(serviceLocator()->service("AddressRemappingSvc"));
143  if (pAddrRemap.isValid()) {
144  pPPSvc->addProvider(pAddrRemap);
145  } else {
146  ATH_MSG_WARNING ( "could not add AddressRemappingSvc as AddresssProvider for "<< PPSName );
147  }
148  m_SG->setProxyProviderSvc(pPPSvc);
149  }
150  } //valid address provider
151  }
152  return rc;
153 }
154 
156 {
157  store().makeCurrent();
158 }
159 
161 {
162  this->setActiveStore();
163  // Clear the store, move to next event
164  return (this->store().clearStore().isSuccess() &&
165  this->selector().next(iterator()).isSuccess() ) ?
166  StatusCode::SUCCESS :
167  StatusCode::FAILURE;
168 }
169 
171 {
172  // Clear the store, move to next event and load the store
173  return (this->nextRecordPre_Passive().isSuccess() &&
174  this->loadStore()) ?
175  StatusCode::SUCCESS :
176  StatusCode::FAILURE;
177 }
178 
180 {
181  m_neverLoaded=false;
182 
183  IOpaqueAddress* paddr(0);
184  bool rc = (this->selector().createAddress(iterator(), paddr)).isSuccess();
185  if ( 0 != paddr) rc &= this->store().recordAddress(paddr).isSuccess();
186 
187  // load store proxies
188  rc &= this->store().loadEventProxies().isSuccess();
189  return rc;
190 }
191 
192 //return next event, load store with next event
194 {
195  if (m_neverLoaded) readRecord=true;
196  else if (readRecord) {
197  //do not reset these the first time we call nextEventPre
198  this->resetUsed();
199  m_hasRing=false;
200  }
201  // if (isNotEmpty()) {
202  if (readRecord && this->nextRecordPre().isFailure()) {
203  ATH_MSG_INFO ( "nextEventPre(): end of the loop. No more events in the selection" );
204  return nullptr;
205  }
206 
207  const xAOD::EventInfo* xAODEventInfo = m_mergeSvc->getPileUpEvent( m_SG, "" );
208  if (readRecord and xAODEventInfo) {
209  ATH_MSG_DEBUG ( "nextEventPre(): read new event "
210  << xAODEventInfo->eventNumber()
211  << " run " << xAODEventInfo->runNumber()
212  << " into store " << this->store().name() );
213  }
214 
215  return xAODEventInfo;
216 }
217 
218 bool PileUpStream::nextEventPre_Passive(bool readRecord) {
219  if (m_neverLoaded) readRecord=true;
220  else if (readRecord) {
221  //do not reset these the first time we call nextEventPre_Passive
222  this->resetUsed();
223  m_hasRing=false;
224  }
225  if (readRecord && this->nextRecordPre_Passive().isFailure()) {
226  ATH_MSG_INFO ( "nextEventPre_Passive(): end of the loop. No more events in the selection" );
227  return false;
228  }
229  return true;
230 }
231 
233  StatusCode sc(StatusCode::SUCCESS);
234  if (m_ownEvtIterator) delete p_iter;
235  //we own and manage our cloned SG instance
236 #ifdef GAUDIKERNEL_STATEMACHINE_H_
237  if (m_ownStore && Gaudi::StateMachine::INITIALIZED == store().FSMState()) {
238  sc = this->store().sysFinalize();
239  }
240 #else
241  if (m_ownStore && this->store().state() == IService::INITIALIZED) sc = this->store().sysFinalize();
242 #endif
243  this->store().release();
244  return sc;
245 }
PileUpStream::setActiveStore
void setActiveStore()
set ActiveStore
Definition: PileUpStream.cxx:155
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
AthMsgStreamMacros.h
StoreGateSvc::makeCurrent
void makeCurrent()
The current store is becoming the active store.
Definition: StoreGateSvc.cxx:496
PileUpStream::nextRecordPre
StatusCode nextRecordPre()
increment event iterator before loading store
Definition: PileUpStream.cxx:170
PileUpStream::m_name
std::string m_name
Stream name.
Definition: PileUpStream.h:108
PileUpStream::iterator
EvtIterator & iterator()
Definition: PileUpStream.h:56
isValid
bool isValid(const T &p)
Av: we implement here an ATLAS-sepcific convention: all particles which are 99xxxxx are fine.
Definition: AtlasPID.h:812
m_name
std::string m_name
Definition: ColumnarPhysliteTest.cxx:53
PileUpStream::PileUpStream
PileUpStream()
Structors.
Definition: PileUpStream.cxx:25
PileUpStream::m_hasRing
bool m_hasRing
Definition: PileUpStream.h:126
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
PileUpStream::selector
IEvtSelector & selector()
Definition: PileUpStream.h:55
StoreGateSvc
The Athena Transient Store API.
Definition: StoreGateSvc.h:124
PileUpStream::nextEventPre
const xAOD::EventInfo * nextEventPre(bool readRecord=true)
return next Event, load store with next Event
Definition: PileUpStream.cxx:193
PileUpStream::finalize
StatusCode finalize()
finalize and release store. To be called on ... finalize()
Definition: PileUpStream.cxx:232
PileUpStream::p_svcLoc
ISvcLocator * p_svcLoc
ServiceLocator.
Definition: PileUpStream.h:110
PileUpStream::m_used
bool m_used
has this stream already been used? (for the current event)
Definition: PileUpStream.h:125
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
PileUpStream::m_ownStore
bool m_ownStore
is m_SG a store we cloned from the master one?
Definition: PileUpStream.h:123
PileUpStream::m_mergeSvc
ServiceHandle< PileUpMergeSvc > m_mergeSvc
Definition: PileUpStream.h:118
StoreID::PILEUP_STORE
@ PILEUP_STORE
Definition: StoreID.h:31
fillPileUpNoiseLumi.next
next
Definition: fillPileUpNoiseLumi.py:52
PileUpStream::operator=
PileUpStream & operator=(const PileUpStream &rhs)=delete
PileUpStream::setupStore
bool setupStore()
setup input and overlay selectors and iters
Definition: PileUpStream.cxx:98
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
sel
sel
Definition: SUSYToolsTester.cxx:97
StoreGateSvc::loadEventProxies
StatusCode loadEventProxies()
load proxies at begin event
Definition: StoreGateSvc.cxx:442
PileUpStream::m_iOriginalRing
unsigned int m_iOriginalRing
original ring in which event was used
Definition: PileUpStream.h:127
AthMessaging
Class to provide easy MsgStream access and capabilities.
Definition: AthMessaging.h:55
PileUpStream
a triple selector/context/store defines a stream
Definition: PileUpStream.h:32
PileUpStream::resetUsed
void resetUsed()
Definition: PileUpStream.h:80
PileUpStream::name
const std::string & name()
Definition: PileUpStream.h:54
PileUpStream::nextEventPre_Passive
bool nextEventPre_Passive(bool readRecord)
like nextEventPre, but doesn't actually load anything
Definition: PileUpStream.cxx:218
PileUpStream::nextRecordPre_Passive
StatusCode nextRecordPre_Passive()
increment event iterator
Definition: PileUpStream.cxx:160
PileUpStream::loadStore
bool loadStore()
clear store and load new proxies
Definition: PileUpStream.cxx:179
PileUpStream::m_neverLoaded
bool m_neverLoaded
has an event been loaded into this stream?
Definition: PileUpStream.h:122
PileUpStream::~PileUpStream
virtual ~PileUpStream()
Definition: PileUpStream.cxx:94
PileUpStream::store
StoreGateSvc & store()
Definition: PileUpStream.h:57
PileUpStream::serviceLocator
ISvcLocator * serviceLocator()
Definition: PileUpStream.h:93
PileUpStream::m_ownEvtIterator
bool m_ownEvtIterator
do we own p_iter?
Definition: PileUpStream.h:121
CloneService.h
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
StoreGateSvc::recordAddress
StatusCode recordAddress(const std::string &skey, IOpaqueAddress *pAddress, bool clearAddressFlag=true)
Create a proxy object using an IOpaqueAddress and a transient key.
xAOD::EventInfo_v1
Class describing the basic event information.
Definition: EventInfo_v1.h:43
CloneService::clone
StatusCode clone(const IService *parent, const std::string &childName, Service *&child)
given a reference to a parent svc sets a reference to a cloned child
Definition: CloneService.cxx:21
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
IAddressProvider.h
PileUpStream.h
a triple selector/context/store defines a stream
PileUpStream::m_sel
SmartIF< IEvtSelector > m_sel
Selector.
Definition: PileUpStream.h:112
PileUpStream::p_iter
EvtIterator * p_iter
Input Iterators.
Definition: PileUpStream.h:116
PileUpStream::m_SG
SmartIF< StoreGateSvc > m_SG
StoreGateSvc;.
Definition: PileUpStream.h:114
StoreGateSvc.h
DataProxy.h