7#include <GaudiKernel/ConcurrencyFlags.h>
10#include <boost/core/demangle.hpp>
14#include <range/v3/algorithm.hpp>
15#include <range/v3/numeric/accumulate.hpp>
16#include <range/v3/to_container.hpp>
17#include <range/v3/view.hpp>
33 : base_class(name, svc),
54 ATH_MSG_ERROR(
"Failed to create background event selector context");
55 return StatusCode::FAILURE;
64 if (!addressProvider) {
66 "Could not cast background event selector to IAddressProvider");
71 SmartIF<IAddressProvider> athPoolAP{
72 serviceLocator()->service(std::format(
"AthenaPoolAddressProviderSvc/BkgAPAPSvc_{}", name()))
76 "Could not cast AthenaPoolAddressProviderSvc to IAddressProvider");
81 SmartIF<IAddressProvider> addRemapAP{
82 serviceLocator()->service(std::format(
"AddressRemappingSvc/BkgARSvc_{}", name()))
85 ATH_MSG_WARNING(
"Could not cast AddressRemappingSvc to IAddressProvider");
90 const std::size_t n_concurrent =
91 Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents();
101 const int n_stores = 50;
103 for (std::size_t i = 0; i < n_concurrent; ++i) {
105 sgs.reserve(n_stores);
106 for (
int j = 0; j < n_stores; ++j) {
108 auto& sg = sgs.emplace_back(
109 std::format(
"StoreGateSvc/StoreGate_{}_{}_{}", name(), i, j), name());
120 auto skipEvent_callback = [
this](
123 using namespace std::chrono_literals;
126 ATH_MSG_INFO(
"Skipping " << end - begin <<
" HS events. ");
127 for (
auto iter = begin; iter < end; ++iter) {
128 const auto& evt = *iter;
130 evt.evtIdx,
s_NoSlot, evt.runNum, evt.lbNum, evt.evtNum);
131 ATH_MSG_DEBUG(
"Skipping HS_ID " << evt.evtIdx <<
" --> skipping "
132 << n_to_skip <<
" pileup events");
133 for (std::size_t i = 0; i < n_to_skip; ++i) {
136 return StatusCode::FAILURE;
142 return StatusCode::SUCCESS;
146 return StatusCode::SUCCESS;
153 std::uint64_t event) {
155 <<
"| hs_id " << hs_id);
158 std::vector<std::uint64_t> stack_num_mb_by_bunch{};
159 std::vector<std::uint64_t>& num_mb_by_bunch =
161 num_mb_by_bunch.clear();
162 num_mb_by_bunch.resize(n_bunches);
166 bool sf_updated_throwaway;
167 const float beam_lumi_sf =
170 const float beam_lumi = beam_lumi_sf *
m_nPerBunch.value();
171 std::vector<float> avg_num_mb_by_bunch(n_bunches, beam_lumi);
180 avg_num_mb_by_bunch[idx] *=
m_beamInt->normFactor(bunch);
185 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
186 num_mb_by_bunch.begin(), [&prng](
float avg) {
187 return std::poisson_distribution<std::uint64_t>(avg)(prng);
190 std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
191 num_mb_by_bunch.begin(), [](
float f) {
192 return static_cast<std::uint64_t>(std::round(f));
196 std::uint64_t num_mb = ranges::accumulate(num_mb_by_bunch, 0UL);
202 std::vector<std::uint64_t>& index_array =
m_idx_lists[slot];
204 index_array.resize(num_mb);
205 std::iota(index_array.begin(), index_array.end(), 0);
217 using namespace std::chrono_literals;
218 std::chrono::steady_clock::time_point order_wait_start{};
220 const std::int64_t hs_id =
get_hs_id(ctx);
221 const std::size_t slot = ctx.slot();
222 const std::size_t num_to_load =
224 ctx.eventID().lumi_block(), ctx.eventID().event_number());
227 if (stores.size() < num_to_load) {
228 ATH_MSG_INFO(
"Adding " << num_to_load - stores.size() <<
" stores");
229 stores.reserve(num_to_load);
230 for (std::size_t i = stores.size(); i < num_to_load; ++i) {
231 auto& sg = stores.emplace_back(
232 std::format(
"StoreGateSvc/StoreGate_{}_{}_{}", name(), slot, i),
241 ATH_MSG_INFO(
"Waiting to prevent out-of-order loading. Last loaded is "
243 order_wait_start = std::chrono::steady_clock::now();
245 std::this_thread::sleep_for(50ms);
247 auto wait_time = std::chrono::steady_clock::now() - order_wait_start;
248 ATH_MSG_INFO(std::format(
"Waited {:%M:%S} to prevent out-of-order loading",
253 auto start = std::chrono::steady_clock::now();
256 for (std::size_t i = 0; i < num_to_load; ++i) {
257 auto& sg = stores[i];
260 SG::CurrentEventStore::Push reader_sg_ces(sg.get());
265 return StatusCode::FAILURE;
267 IOpaqueAddress* addr =
nullptr;
271 return StatusCode::FAILURE;
273 if (addr ==
nullptr) {
275 return StatusCode::FAILURE;
281 for (
const auto* proxy_ptr : sg->proxies()) {
283 sg->proxy_exact(proxy_ptr->sgkey())->accessData();
289 ATH_MSG_INFO(std::format(
"Took {:%M:%S} to load events",
290 std::chrono::steady_clock::now() - start));
293 return StatusCode::SUCCESS;
297 std::uint64_t mb_id) {
298 const std::size_t slot = ctx.slot();
306 throw std::logic_error(std::format(
307 "Tried to request bunch {} which is outside the range [{}, {}]", bunch,
315 for (
auto&& sg :
m_stores[ctx.slot()]) {
318 return StatusCode::SUCCESS;
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)
uint32_t CLID
The Class ID type.
std::string CLIDToString(const CLID &clid)
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
std::vector< EvtId >::const_iterator EvtIter
std::vector< std::vector< std::uint64_t > > m_num_mb_by_bunch
~OnDemandMinbiasSvc() final
Destructor.
virtual std::int64_t get_hs_id(const EventContext &ctx) const override
StatusCode initialize() final
AthService initialize.
IEvtSelector::Context * m_bkg_evt_sel_ctx
Gaudi::Property< int > m_latestDeltaBC
StatusCode beginHardScatter(const EventContext &ctx) override
StoreGateSvc * getMinbias(const EventContext &ctx, std::uint64_t mb_id) override
Gaudi::Property< bool > m_useBeamLumi
ServiceHandle< ActiveStoreSvc > m_activeStoreSvc
ServiceHandle< IEvtSelector > m_bkgEventSelector
std::mutex m_reading_batch_mtx
std::vector< std::vector< SGHandle > > m_stores
std::size_t getNumForBunch(const EventContext &ctx, int bunch) const override
Gaudi::Property< bool > m_useBeamInt
OnDemandMinbiasSvc(const std::string &name, ISvcLocator *svc)
Constructor.
Gaudi::Property< bool > m_usePoisson
Gaudi::Property< std::uint64_t > m_seed
static constexpr std::size_t s_NoSlot
std::atomic_int64_t m_last_loaded_hs
ServiceHandle< IProxyProviderSvc > m_proxyProviderSvc
ServiceHandle< IBeamIntensity > m_beamInt
StatusCode endHardScatter(const EventContext &ctx) override
ServiceHandle< ISkipEventIdxSvc > m_skipEventIdxSvc
Gaudi::Property< float > m_nPerBunch
std::vector< std::vector< std::uint64_t > > m_idx_lists
Gaudi::Property< bool > m_onDemandMB
std::size_t calcMBRequired(std::int64_t hs_id, std::size_t slot, unsigned int run, unsigned int lumi, std::uint64_t event)
ServiceHandle< IBeamLuminosity > m_beamLumi
Gaudi::Property< int > m_earliestDeltaBC
The Athena Transient Store API.
int run(int argc, char *argv[])