7 #include <GaudiKernel/ConcurrencyFlags.h>
8 #include <fmt/chrono.h>
9 #include <fmt/format.h>
12 #include <boost/core/demangle.hpp>
16 #include <range/v3/algorithm/stable_sort.hpp>
17 #include <range/v3/numeric/accumulate.hpp>
18 #include <range/v3/to_container.hpp>
19 #include <range/v3/view.hpp>
26 #include "EventInfo/EventInfo.h"
30 namespace rv = ranges::views;
38 m_bkg_evt_sel_ctx(nullptr),
39 m_last_loaded_batch() {}
51 std::size_t n_concurrent =
52 Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents();
70 ATH_MSG_ERROR(
"Failed to create background event selector context");
71 return StatusCode::FAILURE;
76 SmartIF<IProxyProviderSvc> proxyProviderSvc{
77 serviceLocator()->service(
fmt::format(
"ProxyProviderSvc/BkgPPSvc_{}",
name()))
83 if (!addressProvider) {
85 "Could not cast background event selector to IAddressProvider");
87 proxyProviderSvc->addProvider(addressProvider);
90 SmartIF<IAddressProvider> athPoolAP{
91 serviceLocator()->service(
fmt::format(
"AthenaPoolAddressProviderSvc/BkgAPAPSvc_{}",
name()))
95 "Could not cast AthenaPoolAddressProviderSvc to IAddressProvider");
97 proxyProviderSvc->addProvider(athPoolAP);
100 SmartIF<IAddressProvider> addRemapAP{
101 serviceLocator()->service(
fmt::format(
"AddressRemappingSvc/BkgARSvc_{}",
name()))
104 ATH_MSG_WARNING(
"Could not cast AddressRemappingSvc to IAddressProvider");
106 proxyProviderSvc->addProvider(addRemapAP);
113 auto& sgs =
m_empty_caches.emplace_back(std::make_unique<SGHandleArray>());
114 sgs->reserve(mbBatchSize);
115 for (
int j = 0; j < mbBatchSize; ++j) {
117 auto& sg = sgs->emplace_back(
121 sg->setProxyProviderSvc(proxyProviderSvc);
131 auto skipEvent_callback = [
this, mbBatchSize](
134 using namespace std::chrono_literals;
135 auto evts = ranges::make_subrange(
begin,
end);
141 std::vector<std::tuple<int, int>> batches_with_counts{};
143 for (
int batch : batches_all) {
145 if (batches_with_counts.empty()) {
146 batches_with_counts.emplace_back(
batch, 1);
150 auto& last_entry = batches_with_counts.back();
151 if (
batch == std::get<0>(last_entry)) {
152 std::get<1>(last_entry) += 1;
155 batches_with_counts.emplace_back(
batch, 1);
163 for (
const auto& [
batch,
count] : batches_with_counts) {
171 std::this_thread::sleep_for(50
ms);
184 return StatusCode::FAILURE;
191 return StatusCode::SUCCESS;
196 return StatusCode::SUCCESS;
208 bool sf_updated_throwaway;
209 const float beam_lumi_sf =
212 std::vector<float> avg_num_mb_by_bunch(n_bunches,
222 avg_num_mb_by_bunch[
idx] *=
m_beamInt->normFactor(bunch);
227 num_mb_by_bunch.clear();
228 num_mb_by_bunch.resize(n_bunches);
231 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
232 num_mb_by_bunch.begin(), [&prng](
float avg) {
233 return std::poisson_distribution<std::uint64_t>(avg)(prng);
236 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
237 num_mb_by_bunch.begin(), [](
float f) {
238 return static_cast<std::uint64_t>(std::round(f));
243 std::vector<std::uint64_t>& index_array =
m_idx_lists[slot];
246 if (num_mb > mbBatchSize) {
249 rv::iota(0ULL, num_mb_by_bunch.size()) |
251 bool good =
idx != center_bunch;
253 good && num_mb_by_bunch[
idx] > 0;
256 ranges::to<std::vector>;
259 [center_bunch](std::size_t
idx) {
260 return std::size_t(std::abs(
int(
idx) - center_bunch));
267 num_mb_by_bunch[
idx] -= num_subtracted;
268 num_mb -= num_subtracted;
269 if (num_mb <= mbBatchSize) {
274 ATH_MSG_ERROR(
"We need " << num_mb <<
" events but the batch size is "
275 << mbBatchSize <<
". Restricting to "
276 << mbBatchSize <<
" events!");
278 index_array = rv::ints(0,
int(mbBatchSize)) |
rv::sample(num_mb, prng) |
279 ranges::to<std::vector<std::uint64_t>>;
280 ranges::shuffle(index_array, prng);
282 ATH_MSG_DEBUG(
"HS ID " << hs_id <<
" uses " << num_mb <<
" events");
284 ATH_MSG_DEBUG(
"HS ID " << hs_id <<
" uses " << num_mb <<
" events\n"
294 using namespace std::chrono_literals;
295 bool first_wait =
true;
296 std::chrono::steady_clock::time_point cache_wait_start{};
297 std::chrono::steady_clock::time_point order_wait_start{};
298 const std::int64_t hs_id =
get_hs_id(ctx);
301 ctx.eventID().run_number(),
302 ctx.eventID().lumi_block(),
303 ctx.eventID().event_number());
310 return StatusCode::SUCCESS;
314 ATH_MSG_INFO(
"Waiting to prevent out-of-order loading of batches");
317 std::this_thread::sleep_for(50
ms);
321 "Waited {:%M:%S} to prevent out-of-order loading", wait_time));
335 std::this_thread::sleep_for(100
ms);
341 fmt::format(
"Waited {:%M:%S} for a free cache", wait_time));
345 ATH_MSG_INFO(
"Reading next batch in event " << ctx.evt() <<
", slot "
346 << ctx.slot() <<
" (hs_id "
362 return StatusCode::FAILURE;
364 IOpaqueAddress* addr =
nullptr;
368 return StatusCode::FAILURE;
370 if (addr ==
nullptr) {
372 return StatusCode::FAILURE;
377 for (
const auto* proxy_ptr : sg->proxies()) {
378 if (!proxy_ptr->isValid()) {
384 sg->proxy_exact(proxy_ptr->sgkey())->accessData();
397 return StatusCode::SUCCESS;
400 return StatusCode::SUCCESS;
405 const std::int64_t hs_id =
get_hs_id(ctx);
406 const std::size_t slot = ctx.slot();
416 "Tried to request bunch {} which is outside the range [{}, {}]", bunch,
423 using namespace std::chrono_literals;
424 const std::int64_t hs_id =
get_hs_id(ctx);
433 for (
auto&& sg : *temp) {
443 return StatusCode::SUCCESS;