7 #include <GaudiKernel/ConcurrencyFlags.h>
8 #include <fmt/chrono.h>
9 #include <fmt/format.h>
12 #include <boost/core/demangle.hpp>
16 #include <range/v3/algorithm/stable_sort.hpp>
17 #include <range/v3/numeric/accumulate.hpp>
18 #include <range/v3/to_container.hpp>
19 #include <range/v3/view.hpp>
26 #include "EventInfo/EventInfo.h"
30 namespace rv = ranges::views;
38 m_bkg_evt_sel_ctx(nullptr),
39 m_last_loaded_batch() {}
51 std::size_t n_concurrent =
52 Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents();
70 ATH_MSG_ERROR(
"Failed to create background event selector context");
71 return StatusCode::FAILURE;
81 auto* addressProvider =
83 if (addressProvider ==
nullptr) {
85 "Could not cast background event selector to IAddressProvider");
90 IService* athPoolSvc =
nullptr;
95 if (athPoolAP ==
nullptr) {
97 "Could not cast AthenaPoolAddressProviderSvc to IAddressProvider");
102 IService* addRemapSvc =
nullptr;
106 if (addRemapAP ==
nullptr) {
107 ATH_MSG_WARNING(
"Could not cast AddressRemappingSvc to IAddressProvider");
116 auto& sgs =
m_empty_caches.emplace_back(std::make_unique<SGHandleArray>());
117 sgs->reserve(mbBatchSize);
118 for (
int j = 0; j < mbBatchSize; ++j) {
120 auto& sg = sgs->emplace_back(
124 sg->setProxyProviderSvc(proxyProviderSvc);
134 auto skipEvent_callback = [
this, mbBatchSize](
137 using namespace std::chrono_literals;
138 auto evts = ranges::make_subrange(
begin,
end);
144 std::vector<std::tuple<int, int>> batches_with_counts{};
146 for (
int batch : batches_all) {
148 if (batches_with_counts.empty()) {
149 batches_with_counts.emplace_back(
batch, 1);
153 auto& last_entry = batches_with_counts.back();
154 if (
batch == std::get<0>(last_entry)) {
155 std::get<1>(last_entry) += 1;
158 batches_with_counts.emplace_back(
batch, 1);
166 for (
const auto& [
batch,
count] : batches_with_counts) {
174 std::this_thread::sleep_for(50
ms);
187 return StatusCode::FAILURE;
194 return StatusCode::SUCCESS;
199 return StatusCode::SUCCESS;
211 bool sf_updated_throwaway;
212 const float beam_lumi_sf =
215 std::vector<float> avg_num_mb_by_bunch(n_bunches,
225 avg_num_mb_by_bunch[
idx] *=
m_beamInt->normFactor(bunch);
230 num_mb_by_bunch.clear();
231 num_mb_by_bunch.resize(n_bunches);
234 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
235 num_mb_by_bunch.begin(), [&prng](
float avg) {
236 return std::poisson_distribution<std::uint64_t>(avg)(prng);
239 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
240 num_mb_by_bunch.begin(), [](
float f) {
241 return static_cast<std::uint64_t>(std::round(f));
246 std::vector<std::uint64_t>& index_array =
m_idx_lists[slot];
249 if (num_mb > mbBatchSize) {
252 rv::iota(0ULL, num_mb_by_bunch.size()) |
254 bool good =
idx != center_bunch;
256 good && num_mb_by_bunch[
idx] > 0;
259 ranges::to<std::vector>;
262 [center_bunch](std::size_t
idx) {
263 return std::size_t(std::abs(
int(
idx) - center_bunch));
270 num_mb_by_bunch[
idx] -= num_subtracted;
271 num_mb -= num_subtracted;
272 if (num_mb <= mbBatchSize) {
277 ATH_MSG_ERROR(
"We need " << num_mb <<
" events but the batch size is "
278 << mbBatchSize <<
". Restricting to "
279 << mbBatchSize <<
" events!");
281 index_array = rv::ints(0,
int(mbBatchSize)) |
rv::sample(num_mb, prng) |
282 ranges::to<std::vector<std::uint64_t>>;
283 ranges::shuffle(index_array, prng);
285 ATH_MSG_DEBUG(
"HS ID " << hs_id <<
" uses " << num_mb <<
" events");
287 ATH_MSG_DEBUG(
"HS ID " << hs_id <<
" uses " << num_mb <<
" events\n"
297 using namespace std::chrono_literals;
298 bool first_wait =
true;
299 std::chrono::steady_clock::time_point cache_wait_start{};
300 std::chrono::steady_clock::time_point order_wait_start{};
301 const std::int64_t hs_id =
get_hs_id(ctx);
304 ctx.eventID().run_number(),
305 ctx.eventID().lumi_block(),
306 ctx.eventID().event_number());
313 return StatusCode::SUCCESS;
317 ATH_MSG_INFO(
"Waiting to prevent out-of-order loading of batches");
320 std::this_thread::sleep_for(50
ms);
324 "Waited {:%M:%S} to prevent out-of-order loading", wait_time));
338 std::this_thread::sleep_for(100
ms);
344 fmt::format(
"Waited {:%M:%S} for a free cache", wait_time));
348 ATH_MSG_INFO(
"Reading next batch in event " << ctx.evt() <<
", slot "
349 << ctx.slot() <<
" (hs_id "
365 return StatusCode::FAILURE;
367 IOpaqueAddress* addr =
nullptr;
371 return StatusCode::FAILURE;
373 if (addr ==
nullptr) {
375 return StatusCode::FAILURE;
380 for (
const auto* proxy_ptr : sg->proxies()) {
381 if (!proxy_ptr->isValid()) {
387 sg->proxy_exact(proxy_ptr->sgkey())->accessData();
400 return StatusCode::SUCCESS;
403 return StatusCode::SUCCESS;
408 const std::int64_t hs_id =
get_hs_id(ctx);
409 const std::size_t slot = ctx.slot();
419 "Tried to request bunch {} which is outside the range [{}, {}]", bunch,
426 using namespace std::chrono_literals;
427 const std::int64_t hs_id =
get_hs_id(ctx);
436 for (
auto&& sg : *temp) {
446 return StatusCode::SUCCESS;