ATLAS Offline Software
Loading...
Searching...
No Matches
OnDemandMinbiasSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7#include <GaudiKernel/ConcurrencyFlags.h>
8
9#include <algorithm>
10#include <boost/core/demangle.hpp>
11#include <chrono>
12#include <format>
13#include <random>
14#include <range/v3/algorithm.hpp>
15#include <range/v3/numeric/accumulate.hpp>
16#include <range/v3/to_container.hpp>
17#include <range/v3/view.hpp>
18#include <thread>
19
25
26
27inline std::string CLIDToString(const CLID& clid) {
28 return boost::core::demangle(CLIDRegistry::CLIDToTypeinfo(clid)->name());
29}
30
32 ISvcLocator* svc)
33 : base_class(name, svc),
34 m_bkg_evt_sel_ctx(nullptr),
35 m_proxyProviderSvc("ProxyProviderSvc/BkgPPSvc_"+name, name),
37{}
38
40
42 ATH_CHECK(m_bkgEventSelector.retrieve());
43 ATH_CHECK(m_activeStoreSvc.retrieve());
44 ATH_CHECK(m_skipEventIdxSvc.retrieve());
45 if (m_useBeamInt) {
46 ATH_CHECK(m_beamInt.retrieve());
47 }
48 if (m_useBeamLumi) {
49 ATH_CHECK(m_beamLumi.retrieve());
50 }
51 // Setup context
52 if (!m_bkgEventSelector->createContext(m_bkg_evt_sel_ctx).isSuccess()) {
53 ATH_MSG_ERROR("Failed to create background event selector context");
54 return StatusCode::FAILURE;
55 }
56 ATH_CHECK(SmartIF<IService>(m_bkgEventSelector.get())->start());
57
58 // Setup proxy provider
59 ATH_CHECK(m_proxyProviderSvc.retrieve());
60
61 // Setup Address Providers
62 SmartIF<IAddressProvider> addressProvider{m_bkgEventSelector.get()};
63 if (!addressProvider) {
65 "Could not cast background event selector to IAddressProvider");
66 } else {
67 m_proxyProviderSvc->addProvider(addressProvider);
68 }
69 // AthenaPoolAddressProviderSvc
70 SmartIF<IAddressProvider> athPoolAP{
71 serviceLocator()->service(std::format("AthenaPoolAddressProviderSvc/BkgAPAPSvc_{}", name()))
72 };
73 if (!athPoolAP) {
75 "Could not cast AthenaPoolAddressProviderSvc to IAddressProvider");
76 } else {
77 m_proxyProviderSvc->addProvider(athPoolAP);
78 }
79 // AddressRemappingSvc
80 SmartIF<IAddressProvider> addRemapAP{
81 serviceLocator()->service(std::format("AddressRemappingSvc/BkgARSvc_{}", name()))
82 };
83 if (!addRemapAP) {
84 ATH_MSG_WARNING("Could not cast AddressRemappingSvc to IAddressProvider");
85 } else {
86 m_proxyProviderSvc->addProvider(addRemapAP);
87 }
88
89 const int n_stores = 50; // Start with 50 stores per event
90 std::size_t i = 0;
91 // setup n_concurrent vectors of n_stores StoreGates in m_stores
92 for (auto& sgs : m_stores) {
93 sgs.reserve(n_stores);
94 for (int j = 0; j < n_stores; ++j) {
95 // creates / retrieves a different StoreGateSvc for each slot
96 auto& sg = sgs.emplace_back(
97 std::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), i, j), name());
98 ATH_CHECK(sg.retrieve());
99 sg->setStoreID(StoreID::PILEUP_STORE);
100 sg->setProxyProviderSvc(m_proxyProviderSvc.get());
101 }
102 ++i;
103 }
104
105 // setup spare store for event skipping
106 ATH_CHECK(m_spare_store.retrieve());
108 m_spare_store->setProxyProviderSvc(m_proxyProviderSvc.get());
109 auto skipEvent_callback = [this](
111 ISkipEventIdxSvc::EvtIter end) -> StatusCode {
112 using namespace std::chrono_literals;
113 auto* const old_store = m_activeStoreSvc->activeStore();
114 m_activeStoreSvc->setStore(m_spare_store.get());
115 ATH_MSG_INFO("Skipping " << end - begin << " HS events. ");
116 for (auto iter = begin; iter < end; ++iter) {
117 const auto& evt = *iter;
118 EventContext ctx;
119 EventIDBase eid;
120 eid.set_run_number(evt.runNum);
121 eid.set_lumi_block(evt.lbNum);
122 eid.set_event_number(evt.evtNum);
123 ctx.setEventID(eid);
124 const std::size_t n_to_skip = calcMBRequired(evt.evtIdx, ctx);
125 ATH_MSG_DEBUG("Skipping HS_ID " << evt.evtIdx << " --> skipping "
126 << n_to_skip << " pileup events");
127 for (std::size_t i = 0; i < n_to_skip; ++i) {
128 if (!m_bkgEventSelector->next(*m_bkg_evt_sel_ctx).isSuccess()) {
129 ATH_MSG_ERROR("Ran out of background events");
130 return StatusCode::FAILURE;
131 }
132 }
133 }
134 m_last_loaded_hs.store((end - 1)->evtIdx);
135 m_activeStoreSvc->setStore(old_store);
136 return StatusCode::SUCCESS;
137 };
138 ATH_CHECK(m_skipEventIdxSvc->registerCallback(skipEvent_callback));
139 ATH_MSG_INFO("Initializing ODMBSvc");
140 return StatusCode::SUCCESS;
141}
142
143std::size_t OnDemandMinbiasSvc::calcMBRequired(std::int64_t hs_id,
144 const EventContext& ctx) {
145 ATH_MSG_DEBUG("Run " << ctx.eventID().run_number()
146 << ", lumi " << ctx.eventID().lumi_block()
147 << ", event " << ctx.eventID().event_number()
148 << "| hs_id " << hs_id);
149 const int n_bunches = m_latestDeltaBC.value() - m_earliestDeltaBC.value() + 1;
150
151 std::vector<std::uint64_t>& num_mb_by_bunch = *m_num_mb_by_bunch.get(ctx);
152 num_mb_by_bunch.clear();
153 num_mb_by_bunch.resize(n_bunches);
154 FastReseededPRNG prng{m_seed.value(), hs_id};
155
156 // First apply the beam luminosity SF
157 bool sf_updated_throwaway;
158 const float beam_lumi_sf =
159 m_useBeamLumi ? m_beamLumi->scaleFactor(ctx.eventID().run_number(),
160 ctx.eventID().lumi_block(),
161 sf_updated_throwaway)
162 : 1.F;
163 const float beam_lumi = beam_lumi_sf * m_nPerBunch.value();
164 std::vector<float> avg_num_mb_by_bunch(n_bunches, beam_lumi);
165 // Now update using beam intensities
166 if (m_useBeamInt) {
167 // Supposed to be once per event, but ends up running once per minbias type
168 // per event now
169 m_beamInt->selectT0(ctx);
170 for (int bunch = m_earliestDeltaBC.value();
171 bunch <= m_latestDeltaBC.value(); ++bunch) {
172 std::size_t idx = bunch - m_earliestDeltaBC.value();
173 avg_num_mb_by_bunch[idx] *= m_beamInt->normFactor(bunch);
174 }
175 }
176
177 if (m_usePoisson) {
178 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
179 num_mb_by_bunch.begin(), [&prng](float avg) {
180 return std::poisson_distribution<std::uint64_t>(avg)(prng);
181 });
182 } else {
183 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
184 num_mb_by_bunch.begin(), [](float f) {
185 return static_cast<std::uint64_t>(std::round(f));
186 });
187 }
188
189 std::uint64_t num_mb = ranges::accumulate(num_mb_by_bunch, 0UL);
190 if (!ctx.valid()) {
191 return num_mb;
192 }
193 // Won't go past here for an invalid slot (during initialize()
194
195 std::vector<std::uint64_t>& index_array = *m_idx_lists.get(ctx);
196 index_array.clear();
197 index_array.resize(num_mb);
198 std::iota(index_array.begin(), index_array.end(), 0);
199 // Don't need to shuffle, since these events aren't reused
200 // std::shuffle(index_array.begin(), index_array.end(), prng);
201
202 // Commented out until we can use C++ 23 range formatting
203 // ATH_MSG_DEBUG("HS ID " << hs_id << " uses " << num_mb << " events\n"
204 // << fmt::format("\t\tBy bunch: [{}]\n",
205 // fmt::join(num_mb_by_bunch, ", ")));
206 return num_mb;
207}
208
209StatusCode OnDemandMinbiasSvc::beginHardScatter(const EventContext& ctx) {
210 using namespace std::chrono_literals;
211 std::chrono::steady_clock::time_point order_wait_start{};
212
213 const std::int64_t hs_id = get_hs_id(ctx);
214 const std::size_t slot = ctx.slot();
215 const std::size_t num_to_load = calcMBRequired(hs_id, ctx);
216 auto& stores = *m_stores.get(ctx);
217 // If we don't have enough stores, make more
218 if (stores.size() < num_to_load) {
219 ATH_MSG_INFO("Adding " << num_to_load - stores.size() << " stores");
220 stores.reserve(num_to_load);
221 for (std::size_t i = stores.size(); i < num_to_load; ++i) {
222 auto& sg = stores.emplace_back(
223 std::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), slot, i),
224 name());
225 ATH_CHECK(sg.retrieve());
226 sg->setStoreID(StoreID::PILEUP_STORE);
227 sg->setProxyProviderSvc(m_proxyProviderSvc.get());
228 }
229 }
230 // Ensure loading is done in order
231 if (m_last_loaded_hs < hs_id - 1) {
232 ATH_MSG_INFO("Waiting to prevent out-of-order loading. Last loaded is "
233 << m_last_loaded_hs << " and we are " << hs_id);
234 order_wait_start = std::chrono::steady_clock::now();
235 while (m_last_loaded_hs < hs_id - 1) {
236 std::this_thread::sleep_for(50ms);
237 }
238 auto wait_time = std::chrono::steady_clock::now() - order_wait_start;
239 ATH_MSG_INFO(std::format("Waited {:%M:%S} to prevent out-of-order loading",
240 wait_time));
241 }
242 // Lock reading mutex
243 std::unique_lock lck(m_reading_batch_mtx);
244 auto start = std::chrono::steady_clock::now();
245 // Remember old store to reset later
246 auto* old_store = m_activeStoreSvc->activeStore();
247 for (std::size_t i = 0; i < num_to_load; ++i) {
248 auto& sg = stores[i];
249 // Change active store
250 m_activeStoreSvc->setStore(sg.get());
251 SG::CurrentEventStore::Push reader_sg_ces(sg.get());
252 // Read next event
253 ATH_CHECK(sg->clearStore(true));
254 if (!(m_bkgEventSelector->next(*m_bkg_evt_sel_ctx)).isSuccess()) {
255 ATH_MSG_FATAL("Ran out of minbias events");
256 return StatusCode::FAILURE;
257 }
258 IOpaqueAddress* addr = nullptr;
259 if (!m_bkgEventSelector->createAddress(*m_bkg_evt_sel_ctx, addr)
260 .isSuccess()) {
261 ATH_MSG_WARNING("Failed to create address. No more events?");
262 return StatusCode::FAILURE;
263 }
264 if (addr == nullptr) {
265 ATH_MSG_WARNING("createAddress returned nullptr. No more events?");
266 return StatusCode::FAILURE;
267 }
268 ATH_CHECK(sg->recordAddress(addr));
269 ATH_CHECK(sg->loadEventProxies());
270 // Read data now if desired
271 if (!m_onDemandMB) {
272 for (const auto* proxy_ptr : sg->proxies()) {
273 // Sort of a const_cast, then ->accessData()
274 sg->proxy_exact(proxy_ptr->sgkey())->accessData();
275 }
276 }
277 }
278 // Reset active store
279 m_activeStoreSvc->setStore(old_store);
280 ATH_MSG_INFO(std::format("Took {:%M:%S} to load events",
281 std::chrono::steady_clock::now() - start));
282 // Update last loaded
283 m_last_loaded_hs.store(hs_id);
284 return StatusCode::SUCCESS;
285}
286
288 std::uint64_t mb_id) {
289 const std::size_t index = m_idx_lists.get(ctx)->at(mb_id);
290 return m_stores.get(ctx)->at(index).get();
291}
292
293std::size_t OnDemandMinbiasSvc::getNumForBunch(const EventContext& ctx,
294 int bunch) const {
295 if (bunch < m_earliestDeltaBC.value() || bunch > m_latestDeltaBC.value()) {
296 throw std::logic_error(std::format(
297 "Tried to request bunch {} which is outside the range [{}, {}]", bunch,
298 m_earliestDeltaBC.value(), m_latestDeltaBC.value()));
299 }
300 return m_num_mb_by_bunch.get(ctx)->at(bunch - m_earliestDeltaBC.value());
301}
302
303StatusCode OnDemandMinbiasSvc::endHardScatter(const EventContext& ctx) {
304 // clear all stores
305 for (SGHandle& sg : *m_stores.get(ctx)) {
306 ATH_CHECK(sg->clearStore());
307 }
308 return StatusCode::SUCCESS;
309}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
uint32_t CLID
The Class ID type.
std::string CLIDToString(const CLID &clid)
Hold a pointer to the current event store.
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
std::vector< EvtId >::const_iterator EvtIter
ServiceHandle< StoreGateSvc > SGHandle
~OnDemandMinbiasSvc() final
Destructor.
virtual std::int64_t get_hs_id(const EventContext &ctx) const override
StatusCode initialize() final
AthService initialize.
IEvtSelector::Context * m_bkg_evt_sel_ctx
Gaudi::Property< int > m_latestDeltaBC
StatusCode beginHardScatter(const EventContext &ctx) override
StoreGateSvc * getMinbias(const EventContext &ctx, std::uint64_t mb_id) override
Gaudi::Property< bool > m_useBeamLumi
ServiceHandle< ActiveStoreSvc > m_activeStoreSvc
ServiceHandle< IEvtSelector > m_bkgEventSelector
std::size_t getNumForBunch(const EventContext &ctx, int bunch) const override
SG::SlotSpecificObj< std::vector< std::uint64_t >, SG::InvalidSlot::Enabled > m_num_mb_by_bunch
SG::SlotSpecificObj< std::vector< std::uint64_t > > m_idx_lists
Gaudi::Property< bool > m_useBeamInt
SG::SlotSpecificObj< std::vector< SGHandle > > m_stores
OnDemandMinbiasSvc(const std::string &name, ISvcLocator *svc)
Constructor.
Gaudi::Property< bool > m_usePoisson
Gaudi::Property< std::uint64_t > m_seed
std::atomic_int64_t m_last_loaded_hs
ServiceHandle< IProxyProviderSvc > m_proxyProviderSvc
ServiceHandle< IBeamIntensity > m_beamInt
StatusCode endHardScatter(const EventContext &ctx) override
ServiceHandle< ISkipEventIdxSvc > m_skipEventIdxSvc
Gaudi::Property< float > m_nPerBunch
std::size_t calcMBRequired(std::int64_t hs_id, const EventContext &ctx)
Gaudi::Property< bool > m_onDemandMB
ServiceHandle< IBeamLuminosity > m_beamLumi
Gaudi::Property< int > m_earliestDeltaBC
The Athena Transient Store API.
@ PILEUP_STORE
Definition StoreID.h:31
Definition index.py:1