7 #include <GaudiKernel/ConcurrencyFlags.h>
8 #include <fmt/chrono.h>
9 #include <fmt/format.h>
12 #include <boost/core/demangle.hpp>
15 #include <range/v3/algorithm.hpp>
16 #include <range/v3/numeric/accumulate.hpp>
17 #include <range/v3/to_container.hpp>
18 #include <range/v3/view.hpp>
25 #include "EventInfo/EventInfo.h"
29 namespace rv = ranges::views;
38 m_bkg_evt_sel_ctx(nullptr),
39 m_proxyProviderSvc(
"ProxyProviderSvc/BkgPPSvc_"+
name,
name),
58 ATH_MSG_ERROR(
"Failed to create background event selector context");
59 return StatusCode::FAILURE;
68 if (!addressProvider) {
70 "Could not cast background event selector to IAddressProvider");
75 SmartIF<IAddressProvider> athPoolAP{
76 serviceLocator()->service(
fmt::format(
"AthenaPoolAddressProviderSvc/BkgAPAPSvc_{}",
name()))
80 "Could not cast AthenaPoolAddressProviderSvc to IAddressProvider");
85 SmartIF<IAddressProvider> addRemapAP{
86 serviceLocator()->service(
fmt::format(
"AddressRemappingSvc/BkgARSvc_{}",
name()))
89 ATH_MSG_WARNING(
"Could not cast AddressRemappingSvc to IAddressProvider");
94 const std::size_t n_concurrent =
95 Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents();
105 const int n_stores = 50;
107 for (std::size_t
i = 0;
i < n_concurrent; ++
i) {
109 sgs.reserve(n_stores);
110 for (
int j = 0; j < n_stores; ++j) {
112 auto& sg = sgs.emplace_back(
124 auto skipEvent_callback = [
this](
127 using namespace std::chrono_literals;
131 for (
auto iter =
begin; iter <
end; ++iter) {
132 const auto&
evt = *iter;
136 << n_to_skip <<
" pileup events");
137 for (std::size_t
i = 0;
i < n_to_skip; ++
i) {
140 return StatusCode::FAILURE;
146 return StatusCode::SUCCESS;
150 return StatusCode::SUCCESS;
159 <<
"| hs_id " << hs_id);
162 std::vector<std::uint64_t> stack_num_mb_by_bunch{};
163 std::vector<std::uint64_t>& num_mb_by_bunch =
165 num_mb_by_bunch.clear();
166 num_mb_by_bunch.resize(n_bunches);
170 bool sf_updated_throwaway;
171 const float beam_lumi_sf =
174 const float beam_lumi = beam_lumi_sf *
m_nPerBunch.value();
175 std::vector<float> avg_num_mb_by_bunch(n_bunches, beam_lumi);
184 avg_num_mb_by_bunch[
idx] *=
m_beamInt->normFactor(bunch);
189 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
190 num_mb_by_bunch.begin(), [&prng](
float avg) {
191 return std::poisson_distribution<std::uint64_t>(avg)(prng);
194 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
195 num_mb_by_bunch.begin(), [](
float f) {
196 return static_cast<std::uint64_t>(std::round(f));
206 std::vector<std::uint64_t>& index_array =
m_idx_lists[slot];
208 index_array.resize(num_mb);
209 std::iota(index_array.begin(), index_array.end(), 0);
212 ATH_MSG_DEBUG(
"HS ID " << hs_id <<
" uses " << num_mb <<
" events\n"
219 using namespace std::chrono_literals;
220 std::chrono::steady_clock::time_point order_wait_start{};
222 const std::int64_t hs_id =
get_hs_id(ctx);
223 const std::size_t slot = ctx.slot();
224 const std::size_t num_to_load =
226 ctx.eventID().lumi_block(), ctx.eventID().event_number());
229 if (stores.size() < num_to_load) {
230 ATH_MSG_INFO(
"Adding " << num_to_load - stores.size() <<
" stores");
231 stores.reserve(num_to_load);
232 for (std::size_t
i = stores.size();
i < num_to_load; ++
i) {
233 auto& sg = stores.emplace_back(
243 ATH_MSG_INFO(
"Waiting to prevent out-of-order loading. Last loaded is "
247 std::this_thread::sleep_for(50
ms);
258 for (std::size_t
i = 0;
i < num_to_load; ++
i) {
259 auto& sg = stores[
i];
267 return StatusCode::FAILURE;
269 IOpaqueAddress* addr =
nullptr;
273 return StatusCode::FAILURE;
275 if (addr ==
nullptr) {
277 return StatusCode::FAILURE;
282 for (
const auto* proxy_ptr : sg->proxies()) {
285 sg->proxy_exact(proxy_ptr->sgkey())->accessData();
295 return StatusCode::SUCCESS;
300 const std::size_t slot = ctx.slot();
309 "Tried to request bunch {} which is outside the range [{}, {}]", bunch,
317 for (
auto&& sg :
m_stores[ctx.slot()]) {
320 return StatusCode::SUCCESS;