7 #include <GaudiKernel/ConcurrencyFlags.h>
10 #include <boost/core/demangle.hpp>
15 #include <range/v3/algorithm/stable_sort.hpp>
16 #include <range/v3/numeric/accumulate.hpp>
17 #include <range/v3/to_container.hpp>
18 #include <range/v3/view.hpp>
26 namespace rv = ranges::views;
34 m_bkg_evt_sel_ctx(nullptr),
35 m_last_loaded_batch() {}
47 std::size_t n_concurrent =
48 Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents();
66 ATH_MSG_ERROR(
"Failed to create background event selector context");
67 return StatusCode::FAILURE;
72 SmartIF<IProxyProviderSvc> proxyProviderSvc{
73 serviceLocator()->service(
std::format(
"ProxyProviderSvc/BkgPPSvc_{}",
name()))
79 if (!addressProvider) {
81 "Could not cast background event selector to IAddressProvider");
83 proxyProviderSvc->addProvider(addressProvider);
86 SmartIF<IAddressProvider> athPoolAP{
87 serviceLocator()->service(
std::format(
"AthenaPoolAddressProviderSvc/BkgAPAPSvc_{}",
name()))
91 "Could not cast AthenaPoolAddressProviderSvc to IAddressProvider");
93 proxyProviderSvc->addProvider(athPoolAP);
96 SmartIF<IAddressProvider> addRemapAP{
97 serviceLocator()->service(
std::format(
"AddressRemappingSvc/BkgARSvc_{}",
name()))
100 ATH_MSG_WARNING(
"Could not cast AddressRemappingSvc to IAddressProvider");
102 proxyProviderSvc->addProvider(addRemapAP);
109 auto& sgs =
m_empty_caches.emplace_back(std::make_unique<SGHandleArray>());
110 sgs->reserve(mbBatchSize);
111 for (
int j = 0; j < mbBatchSize; ++j) {
113 auto& sg = sgs->emplace_back(
117 sg->setProxyProviderSvc(proxyProviderSvc);
127 auto skipEvent_callback = [
this, mbBatchSize](
130 using namespace std::chrono_literals;
131 auto evts = ranges::make_subrange(
begin,
end);
137 std::vector<std::tuple<int, int>> batches_with_counts{};
139 for (
int batch : batches_all) {
141 if (batches_with_counts.empty()) {
142 batches_with_counts.emplace_back(
batch, 1);
146 auto& last_entry = batches_with_counts.back();
147 if (
batch == std::get<0>(last_entry)) {
148 std::get<1>(last_entry) += 1;
151 batches_with_counts.emplace_back(
batch, 1);
159 for (
const auto& [
batch,
count] : batches_with_counts) {
167 std::this_thread::sleep_for(50
ms);
180 return StatusCode::FAILURE;
187 return StatusCode::SUCCESS;
192 return StatusCode::SUCCESS;
204 bool sf_updated_throwaway;
205 const float beam_lumi_sf =
208 std::vector<float> avg_num_mb_by_bunch(n_bunches,
218 avg_num_mb_by_bunch[
idx] *=
m_beamInt->normFactor(bunch);
223 num_mb_by_bunch.clear();
224 num_mb_by_bunch.resize(n_bunches);
227 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
228 num_mb_by_bunch.begin(), [&prng](
float avg) {
229 return std::poisson_distribution<std::uint64_t>(avg)(prng);
232 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
233 num_mb_by_bunch.begin(), [](
float f) {
234 return static_cast<std::uint64_t>(std::round(f));
239 std::vector<std::uint64_t>& index_array =
m_idx_lists[slot];
242 if (num_mb > mbBatchSize) {
245 rv::iota(0ULL, num_mb_by_bunch.size()) |
247 bool good =
idx != center_bunch;
249 good && num_mb_by_bunch[
idx] > 0;
252 ranges::to<std::vector>;
255 [center_bunch](std::size_t
idx) {
256 return std::size_t(std::abs(
int(
idx) - center_bunch));
263 num_mb_by_bunch[
idx] -= num_subtracted;
264 num_mb -= num_subtracted;
265 if (num_mb <= mbBatchSize) {
270 ATH_MSG_ERROR(
"We need " << num_mb <<
" events but the batch size is "
271 << mbBatchSize <<
". Restricting to "
272 << mbBatchSize <<
" events!");
274 index_array = rv::ints(0,
int(mbBatchSize)) |
rv::sample(num_mb, prng) |
275 ranges::to<std::vector<std::uint64_t>>;
276 ranges::shuffle(index_array, prng);
277 ATH_MSG_DEBUG(
"HS ID " << hs_id <<
" uses " << num_mb <<
" events");
287 using namespace std::chrono_literals;
288 bool first_wait =
true;
289 std::chrono::steady_clock::time_point cache_wait_start{};
290 std::chrono::steady_clock::time_point order_wait_start{};
291 const std::int64_t hs_id =
get_hs_id(ctx);
294 ctx.eventID().run_number(),
295 ctx.eventID().lumi_block(),
296 ctx.eventID().event_number());
303 return StatusCode::SUCCESS;
307 ATH_MSG_INFO(
"Waiting to prevent out-of-order loading of batches");
310 std::this_thread::sleep_for(50
ms);
314 "Waited {:%M:%S} to prevent out-of-order loading", wait_time));
320 if (empty_caches_lock.owns_lock()) {
323 empty_caches_lock.unlock();
330 std::this_thread::sleep_for(100
ms);
336 std::format(
"Waited {:%M:%S} for a free cache", wait_time));
340 ATH_MSG_INFO(
"Reading next batch in event " << ctx.evt() <<
", slot "
341 << ctx.slot() <<
" (hs_id "
357 return StatusCode::FAILURE;
359 IOpaqueAddress* addr =
nullptr;
363 return StatusCode::FAILURE;
365 if (addr ==
nullptr) {
367 return StatusCode::FAILURE;
373 for (
const auto* proxy_ptr : sg->proxies()) {
374 if (!proxy_ptr->isValid()) {
379 sg->proxy_exact(proxy_ptr->sgkey())->accessData();
391 return StatusCode::SUCCESS;
394 return StatusCode::SUCCESS;
399 const std::int64_t hs_id =
get_hs_id(ctx);
400 const std::size_t slot = ctx.slot();
410 "Tried to request bunch {} which is outside the range [{}, {}]", bunch,
417 using namespace std::chrono_literals;
418 const std::int64_t hs_id =
get_hs_id(ctx);
427 for (
auto&& sg : *temp) {
437 return StatusCode::SUCCESS;