ATLAS Offline Software
Loading...
Searching...
No Matches
OnDemandMinbiasSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7#include <GaudiKernel/ConcurrencyFlags.h>
8
9#include <algorithm>
10#include <boost/core/demangle.hpp>
11#include <chrono>
12#include <format>
13#include <random>
14#include <range/v3/algorithm.hpp>
15#include <range/v3/numeric/accumulate.hpp>
16#include <range/v3/to_container.hpp>
17#include <range/v3/view.hpp>
18#include <thread>
19
24
25
26inline std::string CLIDToString(const CLID& clid) {
27 return boost::core::demangle(CLIDRegistry::CLIDToTypeinfo(clid)->name());
28}
29
31 ISvcLocator* svc)
32 : base_class(name, svc),
33 m_bkg_evt_sel_ctx(nullptr),
34 m_proxyProviderSvc("ProxyProviderSvc/BkgPPSvc_"+name, name),
36{}
37
39
41 m_stores.clear();
42 ATH_CHECK(m_bkgEventSelector.retrieve());
43 ATH_CHECK(m_activeStoreSvc.retrieve());
44 ATH_CHECK(m_skipEventIdxSvc.retrieve());
45 if (m_useBeamInt) {
46 ATH_CHECK(m_beamInt.retrieve());
47 }
48 if (m_useBeamLumi) {
49 ATH_CHECK(m_beamLumi.retrieve());
50 }
51 // Setup context
52 if (!m_bkgEventSelector->createContext(m_bkg_evt_sel_ctx).isSuccess()) {
53 ATH_MSG_ERROR("Failed to create background event selector context");
54 return StatusCode::FAILURE;
55 }
56 ATH_CHECK(SmartIF<IService>(m_bkgEventSelector.get())->start());
57
58 // Setup proxy provider
59 ATH_CHECK(m_proxyProviderSvc.retrieve());
60
61 // Setup Address Providers
62 SmartIF<IAddressProvider> addressProvider{m_bkgEventSelector.get()};
63 if (!addressProvider) {
65 "Could not cast background event selector to IAddressProvider");
66 } else {
67 m_proxyProviderSvc->addProvider(addressProvider);
68 }
69 // AthenaPoolAddressProviderSvc
70 SmartIF<IAddressProvider> athPoolAP{
71 serviceLocator()->service(std::format("AthenaPoolAddressProviderSvc/BkgAPAPSvc_{}", name()))
72 };
73 if (!athPoolAP) {
75 "Could not cast AthenaPoolAddressProviderSvc to IAddressProvider");
76 } else {
77 m_proxyProviderSvc->addProvider(athPoolAP);
78 }
79 // AddressRemappingSvc
80 SmartIF<IAddressProvider> addRemapAP{
81 serviceLocator()->service(std::format("AddressRemappingSvc/BkgARSvc_{}", name()))
82 };
83 if (!addRemapAP) {
84 ATH_MSG_WARNING("Could not cast AddressRemappingSvc to IAddressProvider");
85 } else {
86 m_proxyProviderSvc->addProvider(addRemapAP);
87 }
88
89 const std::size_t n_concurrent =
90 Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents();
91 m_idx_lists.clear();
92 m_idx_lists.resize(n_concurrent);
93
94 m_num_mb_by_bunch.clear();
95 m_num_mb_by_bunch.resize(n_concurrent);
96
97 m_stores.clear();
98 m_stores.resize(n_concurrent);
99
100 const int n_stores = 50; // Start with 50 stores per event
101 // setup n_concurrent vectors of n_stores StoreGates in m_stores
102 for (std::size_t i = 0; i < n_concurrent; ++i) {
103 auto& sgs = m_stores[i];
104 sgs.reserve(n_stores);
105 for (int j = 0; j < n_stores; ++j) {
106 // creates / retrieves a different StoreGateSvc for each slot
107 auto& sg = sgs.emplace_back(
108 std::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), i, j), name());
109 ATH_CHECK(sg.retrieve());
110 sg->setStoreID(StoreID::PILEUP_STORE);
111 sg->setProxyProviderSvc(m_proxyProviderSvc.get());
112 }
113 }
114
115 // setup spare store for event skipping
116 ATH_CHECK(m_spare_store.retrieve());
118 m_spare_store->setProxyProviderSvc(m_proxyProviderSvc.get());
119 auto skipEvent_callback = [this](
121 ISkipEventIdxSvc::EvtIter end) -> StatusCode {
122 using namespace std::chrono_literals;
123 auto* const old_store = m_activeStoreSvc->activeStore();
124 m_activeStoreSvc->setStore(m_spare_store.get());
125 ATH_MSG_INFO("Skipping " << end - begin << " HS events. ");
126 for (auto iter = begin; iter < end; ++iter) {
127 const auto& evt = *iter;
128 const std::size_t n_to_skip = calcMBRequired(
129 evt.evtIdx, s_NoSlot, evt.runNum, evt.lbNum, evt.evtNum);
130 ATH_MSG_DEBUG("Skipping HS_ID " << evt.evtIdx << " --> skipping "
131 << n_to_skip << " pileup events");
132 for (std::size_t i = 0; i < n_to_skip; ++i) {
133 if (!m_bkgEventSelector->next(*m_bkg_evt_sel_ctx).isSuccess()) {
134 ATH_MSG_ERROR("Ran out of background events");
135 return StatusCode::FAILURE;
136 }
137 }
138 }
139 m_last_loaded_hs.store((end - 1)->evtIdx);
140 m_activeStoreSvc->setStore(old_store);
141 return StatusCode::SUCCESS;
142 };
143 ATH_CHECK(m_skipEventIdxSvc->registerCallback(skipEvent_callback));
144 ATH_MSG_INFO("Initializing ODMBSvc");
145 return StatusCode::SUCCESS;
146}
147
148std::size_t OnDemandMinbiasSvc::calcMBRequired(std::int64_t hs_id,
149 std::size_t slot,
150 unsigned int run,
151 unsigned int lumi,
152 std::uint64_t event) {
153 ATH_MSG_DEBUG("Run " << run << ", lumi " << lumi << ", event " << event
154 << "| hs_id " << hs_id);
155 const int n_bunches = m_latestDeltaBC.value() - m_earliestDeltaBC.value() + 1;
156 // vector on stack for use if slot == s_NoSlot
157 std::vector<std::uint64_t> stack_num_mb_by_bunch{};
158 std::vector<std::uint64_t>& num_mb_by_bunch =
159 slot == s_NoSlot ? stack_num_mb_by_bunch : m_num_mb_by_bunch[slot];
160 num_mb_by_bunch.clear();
161 num_mb_by_bunch.resize(n_bunches);
162 FastReseededPRNG prng{m_seed.value(), hs_id};
163
164 // First apply the beam luminosity SF
165 bool sf_updated_throwaway;
166 const float beam_lumi_sf =
167 m_useBeamLumi ? m_beamLumi->scaleFactor(run, lumi, sf_updated_throwaway)
168 : 1.F;
169 const float beam_lumi = beam_lumi_sf * m_nPerBunch.value();
170 std::vector<float> avg_num_mb_by_bunch(n_bunches, beam_lumi);
171 // Now update using beam intensities
172 if (m_useBeamInt) {
173 // Supposed to be once per event, but ends up running once per minbias type
174 // per event now
175 m_beamInt->selectT0(run, event);
176 for (int bunch = m_earliestDeltaBC.value();
177 bunch <= m_latestDeltaBC.value(); ++bunch) {
178 std::size_t idx = bunch - m_earliestDeltaBC.value();
179 avg_num_mb_by_bunch[idx] *= m_beamInt->normFactor(bunch);
180 }
181 }
182
183 if (m_usePoisson) {
184 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
185 num_mb_by_bunch.begin(), [&prng](float avg) {
186 return std::poisson_distribution<std::uint64_t>(avg)(prng);
187 });
188 } else {
189 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
190 num_mb_by_bunch.begin(), [](float f) {
191 return static_cast<std::uint64_t>(std::round(f));
192 });
193 }
194
195 std::uint64_t num_mb = ranges::accumulate(num_mb_by_bunch, 0UL);
196 if (slot == s_NoSlot) {
197 return num_mb;
198 }
199 // Won't go past here if slot == s_NoSlot
200
201 std::vector<std::uint64_t>& index_array = m_idx_lists[slot];
202 index_array.clear();
203 index_array.resize(num_mb);
204 std::iota(index_array.begin(), index_array.end(), 0);
205 // Don't need to shuffle, since these events aren't reused
206 // std::shuffle(index_array.begin(), index_array.end(), prng);
207
208 // Commented out until we can use C++ 23 range formatting
209 // ATH_MSG_DEBUG("HS ID " << hs_id << " uses " << num_mb << " events\n"
210 // << fmt::format("\t\tBy bunch: [{}]\n",
211 // fmt::join(num_mb_by_bunch, ", ")));
212 return num_mb;
213}
214
215StatusCode OnDemandMinbiasSvc::beginHardScatter(const EventContext& ctx) {
216 using namespace std::chrono_literals;
217 std::chrono::steady_clock::time_point order_wait_start{};
218
219 const std::int64_t hs_id = get_hs_id(ctx);
220 const std::size_t slot = ctx.slot();
221 const std::size_t num_to_load =
222 calcMBRequired(hs_id, slot, ctx.eventID().run_number(),
223 ctx.eventID().lumi_block(), ctx.eventID().event_number());
224 auto& stores = m_stores[slot];
225 // If we don't have enough stores, make more
226 if (stores.size() < num_to_load) {
227 ATH_MSG_INFO("Adding " << num_to_load - stores.size() << " stores");
228 stores.reserve(num_to_load);
229 for (std::size_t i = stores.size(); i < num_to_load; ++i) {
230 auto& sg = stores.emplace_back(
231 std::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), slot, i),
232 name());
233 ATH_CHECK(sg.retrieve());
234 sg->setStoreID(StoreID::PILEUP_STORE);
235 sg->setProxyProviderSvc(m_proxyProviderSvc.get());
236 }
237 }
238 // Ensure loading is done in order
239 if (m_last_loaded_hs < hs_id - 1) {
240 ATH_MSG_INFO("Waiting to prevent out-of-order loading. Last loaded is "
241 << m_last_loaded_hs << " and we are " << hs_id);
242 order_wait_start = std::chrono::steady_clock::now();
243 while (m_last_loaded_hs < hs_id - 1) {
244 std::this_thread::sleep_for(50ms);
245 }
246 auto wait_time = std::chrono::steady_clock::now() - order_wait_start;
247 ATH_MSG_INFO(std::format("Waited {:%M:%S} to prevent out-of-order loading",
248 wait_time));
249 }
250 // Lock reading mutex
251 std::unique_lock lck(m_reading_batch_mtx);
252 auto start = std::chrono::steady_clock::now();
253 // Remember old store to reset later
254 auto* old_store = m_activeStoreSvc->activeStore();
255 for (std::size_t i = 0; i < num_to_load; ++i) {
256 auto& sg = stores[i];
257 // Change active store
258 m_activeStoreSvc->setStore(sg.get());
259 SG::CurrentEventStore::Push reader_sg_ces(sg.get());
260 // Read next event
261 ATH_CHECK(sg->clearStore(true));
262 if (!(m_bkgEventSelector->next(*m_bkg_evt_sel_ctx)).isSuccess()) {
263 ATH_MSG_FATAL("Ran out of minbias events");
264 return StatusCode::FAILURE;
265 }
266 IOpaqueAddress* addr = nullptr;
267 if (!m_bkgEventSelector->createAddress(*m_bkg_evt_sel_ctx, addr)
268 .isSuccess()) {
269 ATH_MSG_WARNING("Failed to create address. No more events?");
270 return StatusCode::FAILURE;
271 }
272 if (addr == nullptr) {
273 ATH_MSG_WARNING("createAddress returned nullptr. No more events?");
274 return StatusCode::FAILURE;
275 }
276 ATH_CHECK(sg->recordAddress(addr));
277 ATH_CHECK(sg->loadEventProxies());
278 // Read data now if desired
279 if (!m_onDemandMB) {
280 for (const auto* proxy_ptr : sg->proxies()) {
281 // Sort of a const_cast, then ->accessData()
282 sg->proxy_exact(proxy_ptr->sgkey())->accessData();
283 }
284 }
285 }
286 // Reset active store
287 m_activeStoreSvc->setStore(old_store);
288 ATH_MSG_INFO(std::format("Took {:%M:%S} to load events",
289 std::chrono::steady_clock::now() - start));
290 // Update last loaded
291 m_last_loaded_hs.store(hs_id);
292 return StatusCode::SUCCESS;
293}
294
296 std::uint64_t mb_id) {
297 const std::size_t slot = ctx.slot();
298 const std::size_t index = m_idx_lists.at(slot).at(mb_id);
299 return m_stores.at(ctx.slot()).at(index).get();
300}
301
302std::size_t OnDemandMinbiasSvc::getNumForBunch(const EventContext& ctx,
303 int bunch) const {
304 if (bunch < m_earliestDeltaBC.value() || bunch > m_latestDeltaBC.value()) {
305 throw std::logic_error(std::format(
306 "Tried to request bunch {} which is outside the range [{}, {}]", bunch,
307 m_earliestDeltaBC.value(), m_latestDeltaBC.value()));
308 }
309 return m_num_mb_by_bunch.at(ctx.slot()).at(bunch - m_earliestDeltaBC.value());
310}
311
312StatusCode OnDemandMinbiasSvc::endHardScatter(const EventContext& ctx) {
313 // clear all stores
314 for (auto&& sg : m_stores[ctx.slot()]) {
315 ATH_CHECK(sg->clearStore());
316 }
317 return StatusCode::SUCCESS;
318}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
uint32_t CLID
The Class ID type.
std::string CLIDToString(const CLID &clid)
Hold a pointer to the current event store.
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
std::vector< EvtId >::const_iterator EvtIter
std::vector< std::vector< std::uint64_t > > m_num_mb_by_bunch
~OnDemandMinbiasSvc() final
Destructor.
virtual std::int64_t get_hs_id(const EventContext &ctx) const override
StatusCode initialize() final
AthService initialize.
IEvtSelector::Context * m_bkg_evt_sel_ctx
Gaudi::Property< int > m_latestDeltaBC
StatusCode beginHardScatter(const EventContext &ctx) override
StoreGateSvc * getMinbias(const EventContext &ctx, std::uint64_t mb_id) override
Gaudi::Property< bool > m_useBeamLumi
ServiceHandle< ActiveStoreSvc > m_activeStoreSvc
ServiceHandle< IEvtSelector > m_bkgEventSelector
std::vector< std::vector< SGHandle > > m_stores
std::size_t getNumForBunch(const EventContext &ctx, int bunch) const override
Gaudi::Property< bool > m_useBeamInt
OnDemandMinbiasSvc(const std::string &name, ISvcLocator *svc)
Constructor.
Gaudi::Property< bool > m_usePoisson
Gaudi::Property< std::uint64_t > m_seed
static constexpr std::size_t s_NoSlot
std::atomic_int64_t m_last_loaded_hs
ServiceHandle< IProxyProviderSvc > m_proxyProviderSvc
ServiceHandle< IBeamIntensity > m_beamInt
StatusCode endHardScatter(const EventContext &ctx) override
ServiceHandle< ISkipEventIdxSvc > m_skipEventIdxSvc
Gaudi::Property< float > m_nPerBunch
std::vector< std::vector< std::uint64_t > > m_idx_lists
Gaudi::Property< bool > m_onDemandMB
std::size_t calcMBRequired(std::int64_t hs_id, std::size_t slot, unsigned int run, unsigned int lumi, std::uint64_t event)
ServiceHandle< IBeamLuminosity > m_beamLumi
Gaudi::Property< int > m_earliestDeltaBC
Temporarily change the current store.
The Athena Transient Store API.
@ PILEUP_STORE
Definition StoreID.h:31
Definition index.py:1
Definition run.py:1