7#include <GaudiKernel/ConcurrencyFlags.h>
10#include <boost/core/demangle.hpp>
15#include <range/v3/algorithm/stable_sort.hpp>
16#include <range/v3/numeric/accumulate.hpp>
17#include <range/v3/to_container.hpp>
18#include <range/v3/view.hpp>
27namespace rv = ranges::views;
34 : base_class(name, svc),
48 std::size_t n_concurrent =
49 Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents();
67 ATH_MSG_ERROR(
"Failed to create background event selector context");
68 return StatusCode::FAILURE;
73 SmartIF<IProxyProviderSvc> proxyProviderSvc{
74 serviceLocator()->service(std::format(
"ProxyProviderSvc/BkgPPSvc_{}", name()))
80 if (!addressProvider) {
82 "Could not cast background event selector to IAddressProvider");
84 proxyProviderSvc->addProvider(addressProvider);
87 SmartIF<IAddressProvider> athPoolAP{
88 serviceLocator()->service(std::format(
"AthenaPoolAddressProviderSvc/BkgAPAPSvc_{}", name()))
92 "Could not cast AthenaPoolAddressProviderSvc to IAddressProvider");
94 proxyProviderSvc->addProvider(athPoolAP);
97 SmartIF<IAddressProvider> addRemapAP{
98 serviceLocator()->service(std::format(
"AddressRemappingSvc/BkgARSvc_{}", name()))
101 ATH_MSG_WARNING(
"Could not cast AddressRemappingSvc to IAddressProvider");
103 proxyProviderSvc->addProvider(addRemapAP);
110 auto& sgs =
m_empty_caches.emplace_back(std::make_unique<SGHandleArray>());
111 sgs->reserve(mbBatchSize);
112 for (
int j = 0; j < mbBatchSize; ++j) {
114 auto& sg = sgs->emplace_back(
115 std::format(
"StoreGateSvc/StoreGate_{}_{}_{}", name(), i, j), name());
118 sg->setProxyProviderSvc(proxyProviderSvc);
128 auto skipEvent_callback = [
this, mbBatchSize](
131 using namespace std::chrono_literals;
132 auto evts = ranges::make_subrange(begin, end);
133 ATH_MSG_INFO(
"Skipping " << end - begin <<
" HS events.");
138 std::vector<std::tuple<int, int>> batches_with_counts{};
140 for (
int batch : batches_all) {
142 if (batches_with_counts.empty()) {
143 batches_with_counts.emplace_back(batch, 1);
147 auto& last_entry = batches_with_counts.back();
148 if (batch == std::get<0>(last_entry)) {
149 std::get<1>(last_entry) += 1;
152 batches_with_counts.emplace_back(batch, 1);
160 for (
const auto& [batch,
count] : batches_with_counts) {
161 if (
m_cache.count(batch) != 0) {
168 std::this_thread::sleep_for(50ms);
181 return StatusCode::FAILURE;
188 return StatusCode::SUCCESS;
193 return StatusCode::SUCCESS;
200 std::uint64_t event) {
205 bool sf_updated_throwaway;
206 const float beam_lumi_sf =
209 std::vector<float> avg_num_mb_by_bunch(n_bunches,
219 avg_num_mb_by_bunch[idx] *=
m_beamInt->normFactor(bunch);
224 num_mb_by_bunch.clear();
225 num_mb_by_bunch.resize(n_bunches);
228 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
229 num_mb_by_bunch.begin(), [&prng](
float avg) {
230 return std::poisson_distribution<std::uint64_t>(avg)(prng);
233 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
234 num_mb_by_bunch.begin(), [](
float f) {
235 return static_cast<std::uint64_t>(std::round(f));
239 std::uint64_t num_mb = ranges::accumulate(num_mb_by_bunch, 0UL);
240 std::vector<std::uint64_t>& index_array =
m_idx_lists[slot];
243 if (num_mb > mbBatchSize) {
246 rv::iota(0ULL, num_mb_by_bunch.size()) |
247 rv::filter([center_bunch, &num_mb_by_bunch](
int idx) {
248 bool good = idx != center_bunch;
250 good && num_mb_by_bunch[idx] > 0;
253 ranges::to<std::vector>;
255 ranges::stable_sort(indices, std::greater{},
256 [center_bunch](std::size_t idx) {
257 return std::size_t(std::abs(
int(idx) - center_bunch));
260 for (
auto idx : indices) {
261 const std::uint64_t max_to_subtract = num_mb - mbBatchSize;
262 const std::uint64_t num_subtracted =
263 std::min(max_to_subtract, num_mb_by_bunch[idx]);
264 num_mb_by_bunch[idx] -= num_subtracted;
265 num_mb -= num_subtracted;
266 if (num_mb <= mbBatchSize) {
271 ATH_MSG_ERROR(
"We need " << num_mb <<
" events but the batch size is "
272 << mbBatchSize <<
". Restricting to "
273 << mbBatchSize <<
" events!");
275 index_array = rv::ints(0,
int(mbBatchSize)) | rv::sample(num_mb, prng) |
276 ranges::to<std::vector<std::uint64_t>>;
277 ranges::shuffle(index_array, prng);
278 ATH_MSG_DEBUG(
"HS ID " << hs_id <<
" uses " << num_mb <<
" events");
288 using namespace std::chrono_literals;
289 bool first_wait =
true;
290 std::chrono::steady_clock::time_point cache_wait_start{};
291 std::chrono::steady_clock::time_point order_wait_start{};
292 const std::int64_t hs_id =
get_hs_id(ctx);
295 ctx.eventID().run_number(),
296 ctx.eventID().lumi_block(),
297 ctx.eventID().event_number());
299 if (
m_cache.count(batch) != 0) {
304 return StatusCode::SUCCESS;
308 ATH_MSG_INFO(
"Waiting to prevent out-of-order loading of batches");
309 order_wait_start = std::chrono::steady_clock::now();
311 std::this_thread::sleep_for(50ms);
313 auto wait_time = std::chrono::steady_clock::now() - order_wait_start;
315 "Waited {:%M:%S} to prevent out-of-order loading", wait_time));
321 if (empty_caches_lock.owns_lock()) {
324 empty_caches_lock.unlock();
328 cache_wait_start = std::chrono::steady_clock::now();
331 std::this_thread::sleep_for(100ms);
335 auto wait_time = std::chrono::steady_clock::now() - cache_wait_start;
337 std::format(
"Waited {:%M:%S} for a free cache", wait_time));
341 ATH_MSG_INFO(
"Reading next batch in event " << ctx.evt() <<
", slot "
342 << ctx.slot() <<
" (hs_id "
345 auto start_time = std::chrono::system_clock::now();
350 for (
auto&& sg : *
m_cache[batch]) {
358 return StatusCode::FAILURE;
360 IOpaqueAddress* addr =
nullptr;
364 return StatusCode::FAILURE;
366 if (addr ==
nullptr) {
368 return StatusCode::FAILURE;
374 for (
const auto* proxy_ptr : sg->proxies()) {
375 if (!proxy_ptr->isValid()) {
380 sg->proxy_exact(proxy_ptr->sgkey())->accessData();
388 "Reading {} events took {:%OMm %OSs}",
m_cache[batch]->size(),
389 std::chrono::system_clock::now() - start_time));
392 return StatusCode::SUCCESS;
395 return StatusCode::SUCCESS;
399 std::uint64_t mb_id) {
400 const std::int64_t hs_id =
get_hs_id(ctx);
401 const std::size_t slot = ctx.slot();
410 throw std::logic_error(std::format(
411 "Tried to request bunch {} which is outside the range [{}, {}]", bunch,
418 using namespace std::chrono_literals;
419 const std::int64_t hs_id =
get_hs_id(ctx);
426 std::unique_ptr temp = std::move(
m_cache[batch]);
428 for (
auto&& sg : *temp) {
434 ATH_MSG_DEBUG(
"BATCH " << batch <<
": " << uses <<
" uses out of "
438 return StatusCode::SUCCESS;
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)
std::string CLIDToString(const CLID &clid)
uint32_t CLID
The Class ID type.
StatusCode initialize() override
AthService initialize.
ServiceHandle< IEvtSelector > m_bkgEventSelector
Gaudi::Property< std::uint64_t > m_seed
Gaudi::Property< float > m_nPerBunch
Gaudi::Property< bool > m_useBeamLumi
std::vector< std::vector< std::uint64_t > > m_idx_lists
std::map< int, std::unique_ptr< SGHandleArray > > m_cache
ServiceHandle< IBeamIntensity > m_beamInt
~BatchedMinbiasSvc()
Destructor.
Gaudi::Property< std::vector< int > > m_actualNHSEventsPerBatch
Gaudi::Property< bool > m_onDemandMB
int event_to_batch(std::int64_t hs_id)
StoreGateSvc * getMinbias(const EventContext &ctx, std::uint64_t mb_id) override
Gaudi::Property< int > m_earliestDeltaBC
std::vector< std::unique_ptr< std::atomic_int > > m_batch_use_count
Gaudi::Property< bool > m_usePoisson
std::vector< std::vector< std::uint64_t > > m_num_mb_by_bunch
ServiceHandle< ActiveStoreSvc > m_activeStoreSvc
Gaudi::Property< int > m_latestDeltaBC
std::deque< std::unique_ptr< SGHandleArray > > m_empty_caches
std::mutex m_reading_batch_mtx
std::size_t getNumForBunch(const EventContext &ctx, int bunch) const override
StatusCode endHardScatter(const EventContext &ctx) override
Gaudi::Property< bool > m_useBeamInt
std::mutex m_empty_caches_mtx
Gaudi::Property< int > m_MBBatchSize
BatchedMinbiasSvc(const std::string &name, ISvcLocator *svc)
Constructor.
ServiceHandle< ISkipEventIdxSvc > m_skipEventIdxSvc
IEvtSelector::Context * m_bkg_evt_sel_ctx
virtual std::int64_t get_hs_id(const EventContext &ctx) const override
Gaudi::Property< int > m_HSBatchSize
std::map< int, std::mutex > m_cache_mtxs
std::size_t calcMBRequired(std::int64_t hs_id, std::size_t slot, unsigned int run, unsigned int lumi, std::uint64_t event)
StatusCode beginHardScatter(const EventContext &ctx) override
std::atomic_int m_last_loaded_batch
ServiceHandle< IBeamLuminosity > m_beamLumi
Gaudi::Property< int > m_NSimultaneousBatches
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
std::vector< EvtId >::const_iterator EvtIter
Temporarily change the current store.
The Athena Transient Store API.
int count(std::string s, const std::string ®x)
count how many occurances of a regx are in a string
int run(int argc, char *argv[])