ATLAS Offline Software
Loading...
Searching...
No Matches
BatchedMinbiasSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include "BatchedMinbiasSvc.h"
6
7#include <GaudiKernel/ConcurrencyFlags.h>
8
9#include <algorithm>
10#include <boost/core/demangle.hpp>
11#include <chrono>
12#include <cmath>
13#include <format>
14#include <random>
15#include <range/v3/algorithm/stable_sort.hpp>
16#include <range/v3/numeric/accumulate.hpp>
17#include <range/v3/to_container.hpp>
18#include <range/v3/view.hpp>
19#include <thread>
20
26
27namespace rv = ranges::views;
28
29inline std::string CLIDToString(const CLID& clid) {
30 return boost::core::demangle(CLIDRegistry::CLIDToTypeinfo(clid)->name());
31}
32
33BatchedMinbiasSvc::BatchedMinbiasSvc(const std::string& name, ISvcLocator* svc)
34 : base_class(name, svc),
35 m_bkg_evt_sel_ctx(nullptr),
37
39
40int BatchedMinbiasSvc::event_to_batch(std::int64_t hs_id) {
41 return int(hs_id / m_HSBatchSize.value());
42}
43
45 ATH_CHECK(m_skipEventIdxSvc.retrieve());
46 ATH_CHECK(m_beamInt.retrieve());
47 ATH_CHECK(m_beamLumi.retrieve());
48
49 m_cache.clear();
50 m_empty_caches.clear();
51 m_batch_use_count.clear();
52 m_batch_use_count.reserve(m_actualNHSEventsPerBatch.value().size());
53 for (std::size_t i = 0; i < m_actualNHSEventsPerBatch.value().size(); ++i) {
54 m_batch_use_count.emplace_back(std::make_unique<std::atomic_int>(0));
55 }
56 ATH_CHECK(m_bkgEventSelector.retrieve());
57 ATH_CHECK(m_activeStoreSvc.retrieve());
58 // Setup context
59 if (!m_bkgEventSelector->createContext(m_bkg_evt_sel_ctx).isSuccess()) {
60 ATH_MSG_ERROR("Failed to create background event selector context");
61 return StatusCode::FAILURE;
62 }
63 ATH_CHECK(SmartIF<IService>(m_bkgEventSelector.get())->start());
64
65 // Setup proxy provider
66 SmartIF<IProxyProviderSvc> proxyProviderSvc{
67 serviceLocator()->service(std::format("ProxyProviderSvc/BkgPPSvc_{}", name()))
68 };
69 ATH_CHECK(proxyProviderSvc.isValid());
70
71 // Setup Address Providers
72 SmartIF<IAddressProvider> addressProvider{m_bkgEventSelector.get()};
73 if (!addressProvider) {
75 "Could not cast background event selector to IAddressProvider");
76 } else {
77 proxyProviderSvc->addProvider(addressProvider);
78 }
79 // AthenaPoolAddressProviderSvc
80 SmartIF<IAddressProvider> athPoolAP{
81 serviceLocator()->service(std::format("AthenaPoolAddressProviderSvc/BkgAPAPSvc_{}", name()))
82 };
83 if (!athPoolAP) {
85 "Could not cast AthenaPoolAddressProviderSvc to IAddressProvider");
86 } else {
87 proxyProviderSvc->addProvider(athPoolAP);
88 }
89 // AddressRemappingSvc
90 SmartIF<IAddressProvider> addRemapAP{
91 serviceLocator()->service(std::format("AddressRemappingSvc/BkgARSvc_{}", name()))
92 };
93 if (!addRemapAP) {
94 ATH_MSG_WARNING("Could not cast AddressRemappingSvc to IAddressProvider");
95 } else {
96 proxyProviderSvc->addProvider(addRemapAP);
97 }
98
99 int mbBatchSize = m_MBBatchSize.value();
100 // setup NSimultaneousBatches vectors of MBBatchSize StoreGates in
101 // m_empty_caches
102 for (int i = 0; i < m_NSimultaneousBatches.value(); ++i) {
103 auto& sgs = m_empty_caches.emplace_back(std::make_unique<SGHandleArray>());
104 sgs->reserve(mbBatchSize);
105 for (int j = 0; j < mbBatchSize; ++j) {
106 // creates / retrieves a different StoreGateSvc for each slot
107 auto& sg = sgs->emplace_back(
108 std::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), i, j), name());
109 ATH_CHECK(sg.retrieve());
110 sg->setStoreID(StoreID::PILEUP_STORE);
111 sg->setProxyProviderSvc(proxyProviderSvc);
112 }
113 }
114
115 // Setup the spare store for event skipping
116 ATH_CHECK(m_spare_store.retrieve());
118 m_spare_store->setProxyProviderSvc(proxyProviderSvc);
119
120 // Setup the callback for event skipping
121 auto skipEvent_callback = [this, mbBatchSize](
123 ISkipEventIdxSvc::EvtIter end) -> StatusCode {
124 using namespace std::chrono_literals;
125 auto evts = ranges::make_subrange(begin, end);
126 ATH_MSG_INFO("Skipping " << end - begin << " HS events.");
127 auto batches_all =
128 evts | rv::transform([this](const ISkipEventIdxSvc::EvtId& evt) {
129 return event_to_batch(evt.evtIdx);
130 });
131 std::vector<std::tuple<int, int>> batches_with_counts{};
132 // Produce a list of batches, and how many times they appear
133 for (int batch : batches_all) {
134 // First entry
135 if (batches_with_counts.empty()) {
136 batches_with_counts.emplace_back(batch, 1);
137 continue;
138 }
139 // Subsequent entries
140 auto& last_entry = batches_with_counts.back();
141 if (batch == std::get<0>(last_entry)) {
142 std::get<1>(last_entry) += 1;
143 continue;
144 }
145 batches_with_counts.emplace_back(batch, 1);
146 }
147
148 // Discard batches
149 const int hs_batch_size = m_HSBatchSize.value();
150 auto* const old_store = m_activeStoreSvc->activeStore();
151 m_activeStoreSvc->setStore(m_spare_store.get());
152 ATH_CHECK(m_spare_store->clearStore());
153 for (const auto& [batch, count] : batches_with_counts) {
154 if (m_cache.count(batch) != 0) {
155 // batch is currently loaded, just update the use count
156 m_batch_use_count[batch]->fetch_add(count);
157 continue;
158 }
159 // force ordering in background stream
160 while (m_last_loaded_batch < batch - 1) {
161 std::this_thread::sleep_for(50ms);
162 }
163 // if we aren't skipping all the hardscatters in the batch, do nothing
164 if ((m_batch_use_count[batch]->fetch_add(count) + count) <
165 hs_batch_size) {
166 continue;
167 }
168 // otherwise discard the batch
169 ATH_MSG_INFO("Discarding batch " << batch);
170 std::unique_lock lck{m_reading_batch_mtx};
171 if (!m_bkgEventSelector->next(*m_bkg_evt_sel_ctx, mbBatchSize)
172 .isSuccess()) {
173 ATH_MSG_INFO("Ran out of background events");
174 return StatusCode::FAILURE;
175 }
176 // increment counters
177 m_last_loaded_batch.fetch_add(1);
178 }
179 ATH_CHECK(m_spare_store->clearStore());
180 m_activeStoreSvc->setStore(old_store);
181 return StatusCode::SUCCESS;
182 };
183
184 // register callback
185 ATH_CHECK(m_skipEventIdxSvc->registerCallback(skipEvent_callback));
186 return StatusCode::SUCCESS;
187}
188
189std::size_t BatchedMinbiasSvc::calcMBRequired(std::int64_t hs_id,
190 const EventContext& ctx) {
191 const int n_bunches = m_latestDeltaBC.value() - m_earliestDeltaBC.value() + 1;
192 FastReseededPRNG prng{m_seed.value(), hs_id};
193
194 // First apply the beam luminosity SF
195 bool sf_updated_throwaway;
196 const float beam_lumi_sf =
197 m_useBeamLumi ? m_beamLumi->scaleFactor(ctx.eventID().run_number(),
198 ctx.eventID().lumi_block(),
199 sf_updated_throwaway)
200 : 1.f;
201 std::vector<float> avg_num_mb_by_bunch(n_bunches,
202 beam_lumi_sf * m_nPerBunch.value());
203 // Now update using beam intensities
204 if (m_useBeamInt) {
205 // Supposed to be once per event, but ends up running once per minbias type
206 // per event now
207 m_beamInt->selectT0(ctx);
208 for (int bunch = m_earliestDeltaBC.value();
209 bunch <= m_latestDeltaBC.value(); ++bunch) {
210 std::size_t idx = bunch - m_earliestDeltaBC.value();
211 avg_num_mb_by_bunch[idx] *= m_beamInt->normFactor(bunch);
212 }
213 }
214
215 std::vector<std::uint64_t>& num_mb_by_bunch = *m_num_mb_by_bunch.get(ctx);
216 num_mb_by_bunch.clear();
217 num_mb_by_bunch.resize(n_bunches);
218
219 if (m_usePoisson) {
220 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
221 num_mb_by_bunch.begin(), [&prng](float avg) {
222 return std::poisson_distribution<std::uint64_t>(avg)(prng);
223 });
224 } else {
225 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
226 num_mb_by_bunch.begin(), [](float f) {
227 return static_cast<std::uint64_t>(std::round(f));
228 });
229 }
230
231 std::uint64_t num_mb = ranges::accumulate(num_mb_by_bunch, 0UL);
232 std::vector<std::uint64_t>& index_array = *m_idx_lists.get(ctx);
233 const std::uint64_t mbBatchSize = m_MBBatchSize.value();
234 // Prevent running out of events
235 if (num_mb > mbBatchSize) {
236 const int center_bunch = -m_earliestDeltaBC.value();
237 auto indices =
238 rv::iota(0ULL, num_mb_by_bunch.size()) |
239 rv::filter([center_bunch, &num_mb_by_bunch](int idx) {
240 bool good = idx != center_bunch; // filter out the central bunch
241 good =
242 good && num_mb_by_bunch[idx] > 0; // filter out unfilled bunches
243 return good;
244 }) |
245 ranges::to<std::vector>;
246 // sort by distance from central bunch
247 ranges::stable_sort(indices, std::greater{},
248 [center_bunch](std::size_t idx) {
249 return std::size_t(std::abs(int(idx) - center_bunch));
250 });
251 // subtract from bunches until we aren't using too many events
252 for (auto idx : indices) {
253 const std::uint64_t max_to_subtract = num_mb - mbBatchSize;
254 const std::uint64_t num_subtracted =
255 std::min(max_to_subtract, num_mb_by_bunch[idx]);
256 num_mb_by_bunch[idx] -= num_subtracted;
257 num_mb -= num_subtracted;
258 if (num_mb <= mbBatchSize) {
259 break;
260 }
261 }
262 // Print an error anyway so we can fix the job
263 ATH_MSG_ERROR("We need " << num_mb << " events but the batch size is "
264 << mbBatchSize << ". Restricting to "
265 << mbBatchSize << " events!");
266 }
267 index_array = rv::ints(0, int(mbBatchSize)) | rv::sample(num_mb, prng) |
268 ranges::to<std::vector<std::uint64_t>>;
269 ranges::shuffle(index_array, prng);
270 ATH_MSG_DEBUG("HS ID " << hs_id << " uses " << num_mb << " events");
271 // Disabled until C++ 23 range formatting can be used
272 // if (m_HSBatchSize <= 1) {
273 // ATH_MSG_DEBUG(fmt::format("\t\tBy bunch: [{}]\n", fmt::join(num_mb_by_bunch, ", "))
274 // << fmt::format("\t\tOrder: [{}]", fmt::join(index_array, ", ")));
275 // }
276 return num_mb;
277}
278
279StatusCode BatchedMinbiasSvc::beginHardScatter(const EventContext& ctx) {
280 using namespace std::chrono_literals;
281 bool first_wait = true;
282 std::chrono::steady_clock::time_point cache_wait_start{};
283 std::chrono::steady_clock::time_point order_wait_start{};
284 const std::int64_t hs_id = get_hs_id(ctx);
285 const int batch = event_to_batch(hs_id);
286 calcMBRequired(hs_id, ctx); // don't need the total, only need to populate the arrays
287
288 while (true) {
289 if (m_cache.count(batch) != 0) {
290 // batch already loaded
291 // mutex prevents returning when batch is partially loaded
292 m_cache_mtxs[batch].lock();
293 m_cache_mtxs[batch].unlock();
294 return StatusCode::SUCCESS;
295 }
296 // prevent batches loading out-of-order
297 if (m_last_loaded_batch < (batch - 1)) {
298 ATH_MSG_INFO("Waiting to prevent out-of-order loading of batches");
299 order_wait_start = std::chrono::steady_clock::now();
300 while (m_last_loaded_batch < (batch - 1)) {
301 std::this_thread::sleep_for(50ms);
302 }
303 auto wait_time = std::chrono::steady_clock::now() - order_wait_start;
304 ATH_MSG_INFO(std::format(
305 "Waited {:%M:%S} to prevent out-of-order loading", wait_time));
306 }
307 // See if there are any free caches
308 // Using try_lock here to avoid reading same batch twice
309 std::unique_lock<std::mutex> empty_caches_lock (m_empty_caches_mtx,
310 std::try_to_lock);
311 if (empty_caches_lock.owns_lock()) {
312 if (m_empty_caches.empty()) {
313 // Unlock mutex if we got the lock but there were no free caches
314 empty_caches_lock.unlock();
315 if (first_wait) {
316 ATH_MSG_INFO("Waiting for a free cache");
317 first_wait = false;
318 cache_wait_start = std::chrono::steady_clock::now();
319 }
320 // Wait 100ms then try again
321 std::this_thread::sleep_for(100ms);
322 continue;
323 }
324 if (!first_wait) {
325 auto wait_time = std::chrono::steady_clock::now() - cache_wait_start;
327 std::format("Waited {:%M:%S} for a free cache", wait_time));
328 }
329 std::scoped_lock reading{m_cache_mtxs[batch], m_reading_batch_mtx};
330 if (m_HSBatchSize != 0) {
331 ATH_MSG_INFO("Reading next batch in event " << ctx.evt() << ", slot "
332 << ctx.slot() << " (hs_id "
333 << hs_id << ")");
334 }
335 auto start_time = std::chrono::system_clock::now();
336 m_cache[batch] = std::move(m_empty_caches.front());
337 m_empty_caches.pop_front();
338 // Remember old store to reset later
339 auto* old_store = m_activeStoreSvc->activeStore();
340 for (auto&& sg : *m_cache[batch]) {
341 // Change active store
342 m_activeStoreSvc->setStore(sg.get());
343 SG::CurrentEventStore::Push reader_sg_ces(sg.get());
344 // Read next event
345 ATH_CHECK(sg->clearStore(true));
346 if (!(m_bkgEventSelector->next(*m_bkg_evt_sel_ctx)).isSuccess()) {
347 ATH_MSG_FATAL("Ran out of minbias events");
348 return StatusCode::FAILURE;
349 }
350 IOpaqueAddress* addr = nullptr;
351 if (!m_bkgEventSelector->createAddress(*m_bkg_evt_sel_ctx, addr)
352 .isSuccess()) {
353 ATH_MSG_WARNING("Failed to create address. No more events?");
354 return StatusCode::FAILURE;
355 }
356 if (addr == nullptr) {
357 ATH_MSG_WARNING("createAddress returned nullptr. No more events?");
358 return StatusCode::FAILURE;
359 }
360 ATH_CHECK(sg->recordAddress(addr));
361 ATH_CHECK(sg->loadEventProxies());
362 // Read data now if desired
363 if (!m_onDemandMB) {
364 for (const auto* proxy_ptr : sg->proxies()) {
365 if (!proxy_ptr->isValid()) {
366 continue;
367 }
368
369 // Sort of a const_cast, then ->accessData()
370 sg->proxy_exact(proxy_ptr->sgkey())->accessData();
371 }
372 }
373 }
374 // Reset active store
375 m_activeStoreSvc->setStore(old_store);
376 if (m_HSBatchSize != 0) {
377 ATH_MSG_INFO(std::format(
378 "Reading {} events took {:%OMm %OSs}", m_cache[batch]->size(),
379 std::chrono::system_clock::now() - start_time));
380 }
381 m_last_loaded_batch.exchange(batch);
382 return StatusCode::SUCCESS;
383 }
384 }
385 return StatusCode::SUCCESS;
386}
387
389 std::uint64_t mb_id) {
390 const std::int64_t hs_id = get_hs_id(ctx);
391 const std::size_t index = m_idx_lists.get(ctx)->at(mb_id);
392 const int batch = event_to_batch(hs_id);
393 return m_cache[batch]->at(index).get();
394}
395
396std::size_t BatchedMinbiasSvc::getNumForBunch(const EventContext& ctx,
397 int bunch) const {
398 if (bunch < m_earliestDeltaBC.value() || bunch > m_latestDeltaBC.value()) {
399 throw std::logic_error(std::format(
400 "Tried to request bunch {} which is outside the range [{}, {}]", bunch,
401 m_earliestDeltaBC.value(), m_latestDeltaBC.value()));
402 }
403 return m_num_mb_by_bunch.get(ctx)->at(bunch - m_earliestDeltaBC.value());
404}
405
406StatusCode BatchedMinbiasSvc::endHardScatter(const EventContext& ctx) {
407 using namespace std::chrono_literals;
408 const std::int64_t hs_id = get_hs_id(ctx);
409 const int batch = event_to_batch(hs_id);
410 const int uses = m_batch_use_count[batch]->fetch_add(1) + 1;
411
412 // If we're done with every event in the batch, clear the stores and return
413 // them
414 if (uses == m_HSBatchSize.value()) {
415 std::unique_ptr temp = std::move(m_cache[batch]);
416 m_cache.erase(batch);
417 for (auto&& sg : *temp) {
418 ATH_CHECK(sg->clearStore());
419 }
420 std::lock_guard lg{m_empty_caches_mtx};
421 m_empty_caches.emplace_back(std::move(temp));
422 } else {
423 ATH_MSG_DEBUG("BATCH " << batch << ": " << uses << " uses out of "
424 << m_HSBatchSize << " "
426 }
427 return StatusCode::SUCCESS;
428}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
std::string CLIDToString(const CLID &clid)
uint32_t CLID
The Class ID type.
Hold a pointer to the current event store.
size_t size() const
Number of registered mappings.
StatusCode initialize() override
AthService initialize.
ServiceHandle< IEvtSelector > m_bkgEventSelector
Gaudi::Property< std::uint64_t > m_seed
Gaudi::Property< float > m_nPerBunch
Gaudi::Property< bool > m_useBeamLumi
std::map< int, std::unique_ptr< SGHandleArray > > m_cache
ServiceHandle< IBeamIntensity > m_beamInt
~BatchedMinbiasSvc()
Destructor.
Gaudi::Property< std::vector< int > > m_actualNHSEventsPerBatch
Gaudi::Property< bool > m_onDemandMB
int event_to_batch(std::int64_t hs_id)
StoreGateSvc * getMinbias(const EventContext &ctx, std::uint64_t mb_id) override
Gaudi::Property< int > m_earliestDeltaBC
std::vector< std::unique_ptr< std::atomic_int > > m_batch_use_count
Gaudi::Property< bool > m_usePoisson
ServiceHandle< ActiveStoreSvc > m_activeStoreSvc
Gaudi::Property< int > m_latestDeltaBC
std::deque< std::unique_ptr< SGHandleArray > > m_empty_caches
std::mutex m_reading_batch_mtx
std::size_t getNumForBunch(const EventContext &ctx, int bunch) const override
StatusCode endHardScatter(const EventContext &ctx) override
SG::SlotSpecificObj< std::vector< std::uint64_t > > m_idx_lists
Gaudi::Property< bool > m_useBeamInt
std::mutex m_empty_caches_mtx
Gaudi::Property< int > m_MBBatchSize
BatchedMinbiasSvc(const std::string &name, ISvcLocator *svc)
Constructor.
ServiceHandle< ISkipEventIdxSvc > m_skipEventIdxSvc
IEvtSelector::Context * m_bkg_evt_sel_ctx
virtual std::int64_t get_hs_id(const EventContext &ctx) const override
std::size_t calcMBRequired(std::int64_t hs_id, const EventContext &ctx)
SG::SlotSpecificObj< std::vector< std::uint64_t > > m_num_mb_by_bunch
Gaudi::Property< int > m_HSBatchSize
std::map< int, std::mutex > m_cache_mtxs
StatusCode beginHardScatter(const EventContext &ctx) override
std::atomic_int m_last_loaded_batch
ServiceHandle< IBeamLuminosity > m_beamLumi
Gaudi::Property< int > m_NSimultaneousBatches
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
std::vector< EvtId >::const_iterator EvtIter
The Athena Transient Store API.
@ PILEUP_STORE
Definition StoreID.h:31
int count(std::string s, const std::string &regx)
count how many occurances of a regx are in a string
Definition hcg.cxx:148
Definition index.py:1