ATLAS Offline Software
Loading...
Searching...
No Matches
OnDemandMinbiasSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7#include <GaudiKernel/ConcurrencyFlags.h>
8
9#include <algorithm>
10#include <boost/core/demangle.hpp>
11#include <chrono>
12#include <format>
13#include <random>
14#include <range/v3/algorithm.hpp>
15#include <range/v3/numeric/accumulate.hpp>
16#include <range/v3/to_container.hpp>
17#include <range/v3/view.hpp>
18#include <thread>
19
25
26
27inline std::string CLIDToString(const CLID& clid) {
28 return boost::core::demangle(CLIDRegistry::CLIDToTypeinfo(clid)->name());
29}
30
32 ISvcLocator* svc)
33 : base_class(name, svc),
34 m_bkg_evt_sel_ctx(nullptr),
35 m_proxyProviderSvc("ProxyProviderSvc/BkgPPSvc_"+name, name),
37{}
38
40
42 m_stores.clear();
43 ATH_CHECK(m_bkgEventSelector.retrieve());
44 ATH_CHECK(m_activeStoreSvc.retrieve());
45 ATH_CHECK(m_skipEventIdxSvc.retrieve());
46 if (m_useBeamInt) {
47 ATH_CHECK(m_beamInt.retrieve());
48 }
49 if (m_useBeamLumi) {
50 ATH_CHECK(m_beamLumi.retrieve());
51 }
52 // Setup context
53 if (!m_bkgEventSelector->createContext(m_bkg_evt_sel_ctx).isSuccess()) {
54 ATH_MSG_ERROR("Failed to create background event selector context");
55 return StatusCode::FAILURE;
56 }
57 ATH_CHECK(SmartIF<IService>(m_bkgEventSelector.get())->start());
58
59 // Setup proxy provider
60 ATH_CHECK(m_proxyProviderSvc.retrieve());
61
62 // Setup Address Providers
63 SmartIF<IAddressProvider> addressProvider{m_bkgEventSelector.get()};
64 if (!addressProvider) {
66 "Could not cast background event selector to IAddressProvider");
67 } else {
68 m_proxyProviderSvc->addProvider(addressProvider);
69 }
70 // AthenaPoolAddressProviderSvc
71 SmartIF<IAddressProvider> athPoolAP{
72 serviceLocator()->service(std::format("AthenaPoolAddressProviderSvc/BkgAPAPSvc_{}", name()))
73 };
74 if (!athPoolAP) {
76 "Could not cast AthenaPoolAddressProviderSvc to IAddressProvider");
77 } else {
78 m_proxyProviderSvc->addProvider(athPoolAP);
79 }
80 // AddressRemappingSvc
81 SmartIF<IAddressProvider> addRemapAP{
82 serviceLocator()->service(std::format("AddressRemappingSvc/BkgARSvc_{}", name()))
83 };
84 if (!addRemapAP) {
85 ATH_MSG_WARNING("Could not cast AddressRemappingSvc to IAddressProvider");
86 } else {
87 m_proxyProviderSvc->addProvider(addRemapAP);
88 }
89
90 const std::size_t n_concurrent =
91 Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents();
92 m_idx_lists.clear();
93 m_idx_lists.resize(n_concurrent);
94
95 m_num_mb_by_bunch.clear();
96 m_num_mb_by_bunch.resize(n_concurrent);
97
98 m_stores.clear();
99 m_stores.resize(n_concurrent);
100
101 const int n_stores = 50; // Start with 50 stores per event
102 // setup n_concurrent vectors of n_stores StoreGates in m_stores
103 for (std::size_t i = 0; i < n_concurrent; ++i) {
104 auto& sgs = m_stores[i];
105 sgs.reserve(n_stores);
106 for (int j = 0; j < n_stores; ++j) {
107 // creates / retrieves a different StoreGateSvc for each slot
108 auto& sg = sgs.emplace_back(
109 std::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), i, j), name());
110 ATH_CHECK(sg.retrieve());
111 sg->setStoreID(StoreID::PILEUP_STORE);
112 sg->setProxyProviderSvc(m_proxyProviderSvc.get());
113 }
114 }
115
116 // setup spare store for event skipping
117 ATH_CHECK(m_spare_store.retrieve());
119 m_spare_store->setProxyProviderSvc(m_proxyProviderSvc.get());
120 auto skipEvent_callback = [this](
122 ISkipEventIdxSvc::EvtIter end) -> StatusCode {
123 using namespace std::chrono_literals;
124 auto* const old_store = m_activeStoreSvc->activeStore();
125 m_activeStoreSvc->setStore(m_spare_store.get());
126 ATH_MSG_INFO("Skipping " << end - begin << " HS events. ");
127 for (auto iter = begin; iter < end; ++iter) {
128 const auto& evt = *iter;
129 const std::size_t n_to_skip = calcMBRequired(
130 evt.evtIdx, s_NoSlot, evt.runNum, evt.lbNum, evt.evtNum);
131 ATH_MSG_DEBUG("Skipping HS_ID " << evt.evtIdx << " --> skipping "
132 << n_to_skip << " pileup events");
133 for (std::size_t i = 0; i < n_to_skip; ++i) {
134 if (!m_bkgEventSelector->next(*m_bkg_evt_sel_ctx).isSuccess()) {
135 ATH_MSG_ERROR("Ran out of background events");
136 return StatusCode::FAILURE;
137 }
138 }
139 }
140 m_last_loaded_hs.store((end - 1)->evtIdx);
141 m_activeStoreSvc->setStore(old_store);
142 return StatusCode::SUCCESS;
143 };
144 ATH_CHECK(m_skipEventIdxSvc->registerCallback(skipEvent_callback));
145 ATH_MSG_INFO("Initializing ODMBSvc");
146 return StatusCode::SUCCESS;
147}
148
149std::size_t OnDemandMinbiasSvc::calcMBRequired(std::int64_t hs_id,
150 std::size_t slot,
151 unsigned int run,
152 unsigned int lumi,
153 std::uint64_t event) {
154 ATH_MSG_DEBUG("Run " << run << ", lumi " << lumi << ", event " << event
155 << "| hs_id " << hs_id);
156 const int n_bunches = m_latestDeltaBC.value() - m_earliestDeltaBC.value() + 1;
157 // vector on stack for use if slot == s_NoSlot
158 std::vector<std::uint64_t> stack_num_mb_by_bunch{};
159 std::vector<std::uint64_t>& num_mb_by_bunch =
160 slot == s_NoSlot ? stack_num_mb_by_bunch : m_num_mb_by_bunch[slot];
161 num_mb_by_bunch.clear();
162 num_mb_by_bunch.resize(n_bunches);
163 FastReseededPRNG prng{m_seed.value(), hs_id};
164
165 // First apply the beam luminosity SF
166 bool sf_updated_throwaway;
167 const float beam_lumi_sf =
168 m_useBeamLumi ? m_beamLumi->scaleFactor(run, lumi, sf_updated_throwaway)
169 : 1.F;
170 const float beam_lumi = beam_lumi_sf * m_nPerBunch.value();
171 std::vector<float> avg_num_mb_by_bunch(n_bunches, beam_lumi);
172 // Now update using beam intensities
173 if (m_useBeamInt) {
174 // Supposed to be once per event, but ends up running once per minbias type
175 // per event now
176 m_beamInt->selectT0(run, event);
177 for (int bunch = m_earliestDeltaBC.value();
178 bunch <= m_latestDeltaBC.value(); ++bunch) {
179 std::size_t idx = bunch - m_earliestDeltaBC.value();
180 avg_num_mb_by_bunch[idx] *= m_beamInt->normFactor(bunch);
181 }
182 }
183
184 if (m_usePoisson) {
185 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
186 num_mb_by_bunch.begin(), [&prng](float avg) {
187 return std::poisson_distribution<std::uint64_t>(avg)(prng);
188 });
189 } else {
190 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
191 num_mb_by_bunch.begin(), [](float f) {
192 return static_cast<std::uint64_t>(std::round(f));
193 });
194 }
195
196 std::uint64_t num_mb = ranges::accumulate(num_mb_by_bunch, 0UL);
197 if (slot == s_NoSlot) {
198 return num_mb;
199 }
200 // Won't go past here if slot == s_NoSlot
201
202 std::vector<std::uint64_t>& index_array = m_idx_lists[slot];
203 index_array.clear();
204 index_array.resize(num_mb);
205 std::iota(index_array.begin(), index_array.end(), 0);
206 // Don't need to shuffle, since these events aren't reused
207 // std::shuffle(index_array.begin(), index_array.end(), prng);
208
209 // Commented out until we can use C++ 23 range formatting
210 // ATH_MSG_DEBUG("HS ID " << hs_id << " uses " << num_mb << " events\n"
211 // << fmt::format("\t\tBy bunch: [{}]\n",
212 // fmt::join(num_mb_by_bunch, ", ")));
213 return num_mb;
214}
215
216StatusCode OnDemandMinbiasSvc::beginHardScatter(const EventContext& ctx) {
217 using namespace std::chrono_literals;
218 std::chrono::steady_clock::time_point order_wait_start{};
219
220 const std::int64_t hs_id = get_hs_id(ctx);
221 const std::size_t slot = ctx.slot();
222 const std::size_t num_to_load =
223 calcMBRequired(hs_id, slot, ctx.eventID().run_number(),
224 ctx.eventID().lumi_block(), ctx.eventID().event_number());
225 auto& stores = m_stores[slot];
226 // If we don't have enough stores, make more
227 if (stores.size() < num_to_load) {
228 ATH_MSG_INFO("Adding " << num_to_load - stores.size() << " stores");
229 stores.reserve(num_to_load);
230 for (std::size_t i = stores.size(); i < num_to_load; ++i) {
231 auto& sg = stores.emplace_back(
232 std::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), slot, i),
233 name());
234 ATH_CHECK(sg.retrieve());
235 sg->setStoreID(StoreID::PILEUP_STORE);
236 sg->setProxyProviderSvc(m_proxyProviderSvc.get());
237 }
238 }
239 // Ensure loading is done in order
240 if (m_last_loaded_hs < hs_id - 1) {
241 ATH_MSG_INFO("Waiting to prevent out-of-order loading. Last loaded is "
242 << m_last_loaded_hs << " and we are " << hs_id);
243 order_wait_start = std::chrono::steady_clock::now();
244 while (m_last_loaded_hs < hs_id - 1) {
245 std::this_thread::sleep_for(50ms);
246 }
247 auto wait_time = std::chrono::steady_clock::now() - order_wait_start;
248 ATH_MSG_INFO(std::format("Waited {:%M:%S} to prevent out-of-order loading",
249 wait_time));
250 }
251 // Lock reading mutex
252 std::unique_lock lck(m_reading_batch_mtx);
253 auto start = std::chrono::steady_clock::now();
254 // Remember old store to reset later
255 auto* old_store = m_activeStoreSvc->activeStore();
256 for (std::size_t i = 0; i < num_to_load; ++i) {
257 auto& sg = stores[i];
258 // Change active store
259 m_activeStoreSvc->setStore(sg.get());
260 SG::CurrentEventStore::Push reader_sg_ces(sg.get());
261 // Read next event
262 ATH_CHECK(sg->clearStore(true));
263 if (!(m_bkgEventSelector->next(*m_bkg_evt_sel_ctx)).isSuccess()) {
264 ATH_MSG_FATAL("Ran out of minbias events");
265 return StatusCode::FAILURE;
266 }
267 IOpaqueAddress* addr = nullptr;
268 if (!m_bkgEventSelector->createAddress(*m_bkg_evt_sel_ctx, addr)
269 .isSuccess()) {
270 ATH_MSG_WARNING("Failed to create address. No more events?");
271 return StatusCode::FAILURE;
272 }
273 if (addr == nullptr) {
274 ATH_MSG_WARNING("createAddress returned nullptr. No more events?");
275 return StatusCode::FAILURE;
276 }
277 ATH_CHECK(sg->recordAddress(addr));
278 ATH_CHECK(sg->loadEventProxies());
279 // Read data now if desired
280 if (!m_onDemandMB) {
281 for (const auto* proxy_ptr : sg->proxies()) {
282 // Sort of a const_cast, then ->accessData()
283 sg->proxy_exact(proxy_ptr->sgkey())->accessData();
284 }
285 }
286 }
287 // Reset active store
288 m_activeStoreSvc->setStore(old_store);
289 ATH_MSG_INFO(std::format("Took {:%M:%S} to load events",
290 std::chrono::steady_clock::now() - start));
291 // Update last loaded
292 m_last_loaded_hs.store(hs_id);
293 return StatusCode::SUCCESS;
294}
295
297 std::uint64_t mb_id) {
298 const std::size_t slot = ctx.slot();
299 const std::size_t index = m_idx_lists.at(slot).at(mb_id);
300 return m_stores.at(ctx.slot()).at(index).get();
301}
302
303std::size_t OnDemandMinbiasSvc::getNumForBunch(const EventContext& ctx,
304 int bunch) const {
305 if (bunch < m_earliestDeltaBC.value() || bunch > m_latestDeltaBC.value()) {
306 throw std::logic_error(std::format(
307 "Tried to request bunch {} which is outside the range [{}, {}]", bunch,
308 m_earliestDeltaBC.value(), m_latestDeltaBC.value()));
309 }
310 return m_num_mb_by_bunch.at(ctx.slot()).at(bunch - m_earliestDeltaBC.value());
311}
312
313StatusCode OnDemandMinbiasSvc::endHardScatter(const EventContext& ctx) {
314 // clear all stores
315 for (auto&& sg : m_stores[ctx.slot()]) {
316 ATH_CHECK(sg->clearStore());
317 }
318 return StatusCode::SUCCESS;
319}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
uint32_t CLID
The Class ID type.
std::string CLIDToString(const CLID &clid)
Hold a pointer to the current event store.
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
std::vector< EvtId >::const_iterator EvtIter
std::vector< std::vector< std::uint64_t > > m_num_mb_by_bunch
~OnDemandMinbiasSvc() final
Destructor.
virtual std::int64_t get_hs_id(const EventContext &ctx) const override
StatusCode initialize() final
AthService initialize.
IEvtSelector::Context * m_bkg_evt_sel_ctx
Gaudi::Property< int > m_latestDeltaBC
StatusCode beginHardScatter(const EventContext &ctx) override
StoreGateSvc * getMinbias(const EventContext &ctx, std::uint64_t mb_id) override
Gaudi::Property< bool > m_useBeamLumi
ServiceHandle< ActiveStoreSvc > m_activeStoreSvc
ServiceHandle< IEvtSelector > m_bkgEventSelector
std::vector< std::vector< SGHandle > > m_stores
std::size_t getNumForBunch(const EventContext &ctx, int bunch) const override
Gaudi::Property< bool > m_useBeamInt
OnDemandMinbiasSvc(const std::string &name, ISvcLocator *svc)
Constructor.
Gaudi::Property< bool > m_usePoisson
Gaudi::Property< std::uint64_t > m_seed
static constexpr std::size_t s_NoSlot
std::atomic_int64_t m_last_loaded_hs
ServiceHandle< IProxyProviderSvc > m_proxyProviderSvc
ServiceHandle< IBeamIntensity > m_beamInt
StatusCode endHardScatter(const EventContext &ctx) override
ServiceHandle< ISkipEventIdxSvc > m_skipEventIdxSvc
Gaudi::Property< float > m_nPerBunch
std::vector< std::vector< std::uint64_t > > m_idx_lists
Gaudi::Property< bool > m_onDemandMB
std::size_t calcMBRequired(std::int64_t hs_id, std::size_t slot, unsigned int run, unsigned int lumi, std::uint64_t event)
ServiceHandle< IBeamLuminosity > m_beamLumi
Gaudi::Property< int > m_earliestDeltaBC
The Athena Transient Store API.
@ PILEUP_STORE
Definition StoreID.h:31
Definition index.py:1
int run(int argc, char *argv[])