ATLAS Offline Software
Loading...
Searching...
No Matches
BkgStreamsCache.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5
6#include "BkgStreamsCache.h"
7#include "StoreGate/StoreGateSvc.h" /*to print name() */
8#include "GaudiKernel/IEvtSelector.h"
9
10#include "CLHEP/Random/RandFlat.h"
11#include "CLHEP/Random/RandPoisson.h"
12
14
19
20
21#include <cassert>
22#include <cmath> /*ceil,sqrt*/
23#include <stdexcept> /*runtime_error*/
24
25
27 const std::string& name,
28 const IInterface* parent)
29 : base_class( type, name, parent )
30{
32}
33
40
41void
46
47StatusCode
49 unsigned int nXings,
50 unsigned int firstStore,
51 IBeamIntensity* iBM)
52{
53 assert (0 < nXings);
54 m_nXings = nXings;
55 m_zeroXing = -firstXing;
56 if (m_zeroXing >= (int)nXings) m_zeroXing = -99;
57 m_beamInt = (m_ignoreBM.value() ? 0 : iBM);
58 float largestElement( ( m_ignoreBM.value() || m_collDistrName.value() == "Fixed" ) ? 1.0 : m_beamInt->largestElementInPattern() );
59 //ceil (cmath) rounds up to the nearest integer ceil(1.5)= 2
60 float occupiedCrossings = ceil(static_cast<float>(m_nXings) * m_occupationFraction);
61 m_nStores = static_cast<unsigned int>(ceil( m_collXing * occupiedCrossings * largestElement));
62 if (m_collDistrName.value() == "Poisson")
63 {
64 //allow for fluctuations in # of overlaid events
65 m_nStores += static_cast<unsigned int>(6.0 * sqrt(static_cast<float>(m_nStores)));
66 }
67
68 //get bkg selectors, stores and iterators
69 ATH_MSG_DEBUG ( "Set up " << m_nStores << " stores" );
70
71 m_nEvtsXing.reserve(m_nStores);
72 m_streams.reserve(m_nStores);
73 m_usedStreams.reserve(m_nStores);
74 m_usedStreams.assign(m_nStores, false);
75
76 const std::string& selecName(m_selecName.name());
77 for (unsigned int i=0; i < m_nStores; ++i)
78 {
79 std::stringstream bufName;
80 bufName << "BkgEvent_" << i + firstStore;
81 const std::string& streamName(bufName.str());
82 try
83 {
84 m_streams.emplace_back(streamName,serviceLocator(),selecName);
85 }
86 catch (const std::runtime_error& e)
87 {
88 ATH_MSG_ERROR ( "Exception thrown while creating PileUpStream "
89 << streamName << " : " << e.what() );
90 return StatusCode::FAILURE;
91 }
92
93 if(!(m_streams.back().setupStore()))
94 {
95 ATH_MSG_ERROR ( "Can not setup bkg evt store for stream "
96 << streamName );
97 return StatusCode::FAILURE;
98 }
99 }
100 m_cursor = m_streams.begin();
101
102 CLHEP::HepRandomEngine* collEng(m_atRndmSvc->GetEngine(m_randomStreamName.value()));
103 if( !collEng )
104 {
105 ATH_MSG_ERROR ( "can not get random stream " << m_randomStreamName.value() );
106 return StatusCode::FAILURE;
107 }
108 //setup generator to pickup an event store at random from the cache
109 //notice how we pass collEng by reference. If ! CLHEP will take ownership...
110 m_chooseEventRand = new CLHEP::RandFlat(*(collEng),
111 0.0, double(m_nStores));
112
113 return StatusCode::SUCCESS;
114}
115
120
122{
123 if (!m_collXingPoisson) { return collXing(); }
125}
126
128{
129 ATH_MSG_DEBUG ( "newEvent called resetting used event set" );
130 unsigned int totEvts(0);
131 do {
132 totEvts = 0;
133 for (unsigned int iXing=0; iXing<m_nXings; ++iXing) {
134 totEvts += setNEvtsXing(iXing);
135 if (totEvts > m_nStores) {
136 ATH_MSG_WARNING ( "newEvent: number of required evts (" << totEvts << ") exceeds number of available stores "
137 << m_nStores << ". Regenerating bkg sequence for this event \n"
138 << " the total number of bkg events may not exceed average by more than 6 sigmas"
139 );
140 break;
141 }
142 }
143 } while(totEvts > m_nStores);
144 m_usedStreams.assign(m_streams.size(), false);
145}
146
147const xAOD::EventInfo* BkgStreamsCache::nextEvent(bool isCentralBunchCrossing)
148{
149 const xAOD::EventInfo* pNextEvt(nullptr);
150 StreamVector::size_type iS(0);
151 do {
152 iS = (StreamVector::size_type)m_chooseEventRand->fire();
153 } while (alreadyInUse(iS));
154
155 //set current store to iS
156 m_cursor = m_streams.begin();
157 std::advance(m_cursor, iS);
158
159 ATH_MSG_DEBUG ( "using store " << iS );
160 PileUpStream* pCurrStream(current());
161 if (0 != pCurrStream) {
162 pCurrStream->store().makeCurrent();
163 //read a new event every downscaleFactor accesses
164 //force reading of new event if the event is being used for in-time pile-up
165 //FIXME a more careful strategy would have the PileUpStreams knowing if
166 //they have been used for the central bunch-crossing already.
167 bool readEvent(isCentralBunchCrossing || (m_readEventRand->fire()<1.0));
168 pNextEvt = pCurrStream->nextEventPre( readEvent );
169 }
170 return pNextEvt;
171}
172
173//TODO update this method!!
174StatusCode BkgStreamsCache::nextEvent_passive(bool isCentralBunchCrossing)
175{
176 StreamVector::size_type iS(0);
177 do {
178 iS = (StreamVector::size_type)m_chooseEventRand->fire();
179 } while (alreadyInUse(iS));
180
181 //set current store to iS
182 m_cursor = m_streams.begin();
183 std::advance(m_cursor, iS);
184
185 ATH_MSG_DEBUG ( "using store " << iS );
186 PileUpStream* pCurrStream(current());
187 if (0 != pCurrStream) {
188 pCurrStream->store().makeCurrent();
189 //read a new event every downscaleFactor accesses
190 //force reading of new event if the event is being used for in-time pile-up
191 //FIXME a more careful strategy would have the PileUpStreams knowing if
192 //they have been used for the central bunch-crossing already.
193 bool readEvent(isCentralBunchCrossing || (m_readEventRand->fire()<1.0));
194 if(pCurrStream->nextEventPre_Passive(readEvent)) return StatusCode::SUCCESS;
195 }
196 return StatusCode::FAILURE;
197}
198
199bool BkgStreamsCache::alreadyInUse(StreamVector::size_type iS)
200{
201 assert(iS<m_usedStreams.size());
202 bool inUse(m_usedStreams[iS]);
203 if (!inUse) m_usedStreams[iS] = true;
204#ifndef NDEBUG
205 ATH_MSG_VERBOSE ( "alreadyInUse: store " << iS << ' '
206 << (inUse ? "already" : "not yet") << " in use" );
207#endif
208 return inUse;
209}
210
212{
213 if (m_cursor != m_streams.end()) return &*m_cursor;
214 // m_cursor->isNotEmpty()) return &*m_cursor;
215 else return 0; //FIXME should reomve empty stream and keep going
216}
217
219{
220 ATH_MSG_DEBUG ( "Initializing " << name()
221 << " - cache for events of type "
223
225
226 //create random number generators
227 ATH_CHECK(m_atRndmSvc.retrieve());
228 CLHEP::HepRandomEngine* collEng(m_atRndmSvc->GetEngine(m_randomStreamName.value()));
229 if( 0 == collEng )
230 {
231 ATH_MSG_ERROR ( "can not get random stream " << m_randomStreamName.value() );
232 return StatusCode::FAILURE;
233 }
234
235 //setup distribution to read a new event every downscaleFactor accesses
236 //notice how we pass collEng by reference. If ! CLHEP will take ownership...
237 m_readEventRand = new CLHEP::RandFlat(*(collEng),
238 0.0, double(m_readDownscale));
239
240 // select collision distribution functions
241 if (m_collDistrName.value() == "Fixed")
242 {
243 m_f_collDistr = std::bind(&BkgStreamsCache::collXing, this);
244 using namespace std::placeholders;
245 if(m_ignoreBM.value())
246 {
248 }
249 else
250 {
252 }
253 return StatusCode::SUCCESS;
254 }
255 if (m_collDistrName.value() == "Poisson")
256 {
257 //pass collEng by reference. If Not CLHEP will take ownership...
258 m_collXingPoisson = new CLHEP::RandPoisson(*(collEng), m_collXing);
259 // m_f_collDistr will call m_collXingPoisson->fire(m_collXing) USED TO BE boost::bind(&CLHEP::RandPoisson::fire, m_collXingPoisson);
261 using namespace std::placeholders;
262 if(m_ignoreBM.value())
263 {
265 }
266 else
267 {
269 }
270 return StatusCode::SUCCESS;
271 }
273 << " is not a know collision distribution function" );
274 return StatusCode::FAILURE;
275}
276
277unsigned int BkgStreamsCache::nEvtsXing(unsigned int iXing) const
278{
279 return (iXing + 1 > m_nEvtsXing.size()) ? 0 : m_nEvtsXing[iXing];
280}
281
283{
284 return m_f_collDistr();
285}
287{
288 return static_cast<unsigned int>(m_beamInt->normFactor(iXing-m_zeroXing)*static_cast<float>(m_f_collDistr()));
289}
290unsigned int BkgStreamsCache::numberOfCavernBkgForBunchCrossing(unsigned int iXing) const
291{
292 return static_cast<unsigned int>(m_beamInt->normFactor(iXing-m_zeroXing)>0.0)*m_f_collDistr();
293}
294
295unsigned int BkgStreamsCache::setNEvtsXing(unsigned int iXing)
296{
297 if (iXing + 1 > m_nEvtsXing.size()) m_nEvtsXing.resize(2 * iXing + 1);
298 unsigned int nEvts(m_f_numberOfBackgroundForBunchCrossing(iXing));
299 //this is done mainly to handle the case in which original and backround
300 //events belong to the same stream. Of course we do not want m_nEvtsXing[m_zeroXing]<0
301 if ((int)iXing==m_zeroXing)
302 {
303 unsigned int subValue(m_subtractBC0.value());
304 while (nEvts<subValue) nEvts = m_f_numberOfBackgroundForBunchCrossing(iXing);
305 nEvts -= subValue;
306#ifndef NDEBUG
307 ATH_MSG_VERBOSE ( "Subtracted " << m_subtractBC0.value()
308 << " events from BC=0 Xing " << iXing << " - Events at BC=0 "
309 << nEvts
310 );
311#endif
312 }
313 ATH_MSG_VERBOSE ( "Will pile-up " << nEvts
314 << " events for BCID " << iXing );
315 m_nEvtsXing[iXing] = nEvts;
316 return nEvts;
317}
318
319StatusCode BkgStreamsCache::addSubEvts(unsigned int iXing,
320 xAOD::EventInfo* overEvent,
321 int t0BinCenter)
322{
323 return this->addSubEvts(iXing, overEvent, t0BinCenter, true, 0);
324}
325
326
327StatusCode BkgStreamsCache::addSubEvts(unsigned int iXing,
328 xAOD::EventInfo* overEvent,
329 int t0BinCenter, bool loadEventProxies, unsigned int BCID)
330{
331 for (unsigned int iEvt=0; iEvt<nEvtsXing(iXing); ++iEvt)
332 {
333 // check if we're picking events for the central bunch-crossing,
334 // so we can choose to only use fresh events here.
335 bool isCentralBunchCrossing((0==t0BinCenter)&&m_forceReadForBC0);
336 // increment event iterators
337 if(!loadEventProxies) {
338 return this->nextEvent_passive(isCentralBunchCrossing);
339 }
340 const xAOD::EventInfo* pBkgEvent = nextEvent( isCentralBunchCrossing ); //FIXME update this for the case where loadProxies=False
341
342 //check input selector is not empty
343 PileUpStream* currStream(this->current());
344 if (0 == pBkgEvent || 0 == currStream)
345 {
346 // This is the end of the loop. No more events in the selection
347 ATH_MSG_INFO ( "end of loop: background cache has no more events" );
348 return StatusCode::FAILURE;
349 }
350 StoreGateSvc* pBkgStore = &(currStream->store());
351 ATH_MSG_DEBUG ( "added event " << pBkgEvent->eventNumber()
352 << " run " << pBkgEvent->runNumber()
353 << " from store "
354 << pBkgStore->name()
355 << " @ Xing " << iXing );
356
357 // register as sub event of the overlaid
358
359 // get the SG container for subevents infos
360 xAOD::EventInfoContainer *subEvCnt (nullptr);
361 ATH_CHECK( overEvent->evtStore()->retrieve(subEvCnt, c_pileUpEventInfoContName) );
362 // add subevent
363 addSubEvent( overEvent, pBkgEvent, t0BinCenter, m_pileUpEventType, subEvCnt, c_pileUpEventInfoContName );
364 subEvCnt->back()->setBCID( BCID );
365 subEvCnt->back()->setEvtStore( pBkgStore );
366
367#ifdef DEBUG_PILEUP
368 const xAOD::EventInfo* pStoreInfo(nullptr);
369 if (pBkgStore->retrieve(pStoreInfo).isSuccess() && pStoreInfo &&
370 pBkgEvent->eventNumber() != pStoreInfo->eventNumber())
371 {
372 ATH_MSG_ERROR ( "added event " << pBkgEvent->eventNumber()
373 << " run " << pBkgEvent->runNumber()
374 << " differ from current store "
375 << pBkgStore->name()
376 << " event " << pStoreInfo->eventNumber()
377 << " run " << pStoreInfo->runNumber()
378 );
379 assert(1);
380 }
381#endif
382 } //loop over evts in xing
383 return StatusCode::SUCCESS;
384}
385
387{
388 StatusCode sc(StatusCode::SUCCESS);
389 ATH_MSG_DEBUG ( "Finalizing " << name()
390 << " - cache for events of type "
392 while (sc.isSuccess() && m_streams.size()>0)
393 {
394 sc=m_streams.back().finalize();
395 m_streams.pop_back();
396 }
397 return sc;
398}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
In memory cache for pileup events.
Helpers for checking error return status codes and reporting errors.
provides the relative beam intensity as a function of the bunch xing.
static Double_t sc
xAOD::EventInfo * addSubEvent(xAOD::EventInfo *targetEv, const xAOD::EventInfo::SubEvent &subev, xAOD::EventInfoContainer *eiContainer, const std::string &eiContKey, StoreGateSvc *subev_store=nullptr)
const std::string c_pileUpEventInfoContName
default value for the EventInfoContainer storing subevents for PileUp
Definition PileUpMisc.h:15
Gaudi::Property< unsigned short > m_subtractBC0
unsigned int numberOfBkgForBunchCrossingIgnoringBeamIntensity(unsigned int iXing) const
meant to be used via m_f_numberOfBackgroundForBunchCrossing
ServiceHandle< IAtRndmGenSvc > m_atRndmSvc
unsigned int m_nXings
Gaudi::Property< float > m_occupationFraction
Gaudi::Property< std::string > m_collDistrName
Gaudi::Property< float > m_readDownscale
ServiceHandle< IEvtSelector > m_selecName
virtual StatusCode setup(int firstXing, unsigned int nXings, unsigned int firstStore, IBeamIntensity *) override final
virtual void newEvent() override final
inform cache that we start overlaying a new event
unsigned int m_nStores
Gaudi::Property< std::string > m_randomStreamName
int m_zeroXing
offset of BC=0 xing
CLHEP::RandPoisson * m_collXingPoisson
set number of collisions per bunch crossing (if Poisson distribution chosen)
float m_collXingSF
float scaling number of collisions per bunch crossing
const xAOD::EventInfo * nextEvent(bool isCentralBunchCrossing)
get next bkg event from cache
virtual StatusCode addSubEvts(unsigned int iXing, xAOD::EventInfo *overlaidEvent, int t0BinCenter) override final
Read input events in bkg stores and link them to overlay store.
Gaudi::Property< bool > m_ignoreBM
StreamVector::iterator m_cursor
xAOD::EventInfo::PileUpType m_pileUpEventType
the type of events in this cache
CLHEP::RandFlat * m_readEventRand
read a new event every downscaleFactor accesses
PileUpStream * current()
get current (last asked) stream
unsigned int nEvtsXing(unsigned int iXing) const
std::function< long() > m_f_collDistr
function returning the number of collisions per bunch crossing before bunch structure modulation
virtual ~BkgStreamsCache()
unsigned int setNEvtsXing(unsigned int iXing)
long collXing()
meant to be used (mainly) via m_f_collDistr
unsigned int numberOfBkgForBunchCrossingDefaultImpl(unsigned int iXing) const
Gaudi::Property< float > m_collXing
virtual StatusCode finalize() override final
BkgStreamsCache(const std::string &, const std::string &, const IInterface *)
Gaudi::Property< bool > m_ignoreSF
CLHEP::RandFlat * m_chooseEventRand
pickup an event store at random from the cache
StreamVector m_streams
virtual void resetEvtsPerXingScaleFactor(float sf) override final
reset scale factor at new run/lumiblk
bool alreadyInUse(StreamVector::size_type iStream)
unsigned int numberOfCavernBkgForBunchCrossing(unsigned int iXing) const
void PileUpEventTypeHandler(Gaudi::Details::PropertyBase &)
std::vector< bool > m_usedStreams
std::function< unsigned int(unsigned int) > m_f_numberOfBackgroundForBunchCrossing
function returning the number of bkg events per bunch crossing after bunch structure modulation
IBeamIntensity * m_beamInt
pointer to the IBeamIntensity distribution tool
Gaudi::CheckedProperty< unsigned short > m_pileUpEventTypeProp
virtual StatusCode initialize() override final
std::vector< unsigned int > m_nEvtsXing
Gaudi::Property< bool > m_forceReadForBC0
StatusCode nextEvent_passive(bool isCentralBunchCrossing)
as nextEvent except don't actually load anything
const T * back() const
Access the last element in the collection as an rvalue.
a triple selector/context/store defines a stream
StoreGateSvc & store()
const xAOD::EventInfo * nextEventPre(bool readRecord=true)
return next Event, load store with next Event
bool nextEventPre_Passive(bool readRecord)
like nextEventPre, but doesn't actually load anything
The Athena Transient Store API.
StatusCode retrieve(const T *&ptr) const
Retrieve the default object into a const T*.
void makeCurrent()
The current store is becoming the active store.
static const std::string & PileUpType2Name(PileUpType typ)
Convert PileUpType enum value to string.
uint32_t runNumber() const
The current event's run number.
uint64_t eventNumber() const
The current event's event number.
static PileUpType PileUpInt2Type(unsigned short typ)
Convert int to PileUpType enum value.
EventInfoContainer_v1 EventInfoContainer
Define the latest version of the container.
EventInfo_v1 EventInfo
Definition of the latest event info version.