ATLAS Offline Software
OnDemandMinbiasSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "OnDemandMinbiasSvc.h"
6 
7 #include <GaudiKernel/ConcurrencyFlags.h>
8 
9 #include <algorithm>
10 #include <boost/core/demangle.hpp>
11 #include <chrono>
12 #include <format>
13 #include <random>
14 #include <range/v3/algorithm.hpp>
15 #include <range/v3/numeric/accumulate.hpp>
16 #include <range/v3/to_container.hpp>
17 #include <range/v3/view.hpp>
18 #include <thread>
19 
24 
25 
26 inline std::string CLIDToString(const CLID& clid) {
27  return boost::core::demangle(CLIDRegistry::CLIDToTypeinfo(clid)->name());
28 }
29 
31  ISvcLocator* svc)
32  : base_class(name, svc),
33  m_bkg_evt_sel_ctx(nullptr),
34  m_proxyProviderSvc("ProxyProviderSvc/BkgPPSvc_"+name, name),
35  m_last_loaded_hs()
36 {}
37 
39 
41  m_stores.clear();
42  ATH_CHECK(m_bkgEventSelector.retrieve());
43  ATH_CHECK(m_activeStoreSvc.retrieve());
44  ATH_CHECK(m_skipEventIdxSvc.retrieve());
45  if (m_useBeamInt) {
46  ATH_CHECK(m_beamInt.retrieve());
47  }
48  if (m_useBeamLumi) {
49  ATH_CHECK(m_beamLumi.retrieve());
50  }
51  // Setup context
52  if (!m_bkgEventSelector->createContext(m_bkg_evt_sel_ctx).isSuccess()) {
53  ATH_MSG_ERROR("Failed to create background event selector context");
54  return StatusCode::FAILURE;
55  }
56  ATH_CHECK(SmartIF<IService>(m_bkgEventSelector.get())->start());
57 
58  // Setup proxy provider
59  ATH_CHECK(m_proxyProviderSvc.retrieve());
60 
61  // Setup Address Providers
62  SmartIF<IAddressProvider> addressProvider{m_bkgEventSelector.get()};
63  if (!addressProvider) {
65  "Could not cast background event selector to IAddressProvider");
66  } else {
67  m_proxyProviderSvc->addProvider(addressProvider);
68  }
69  // AthenaPoolAddressProviderSvc
70  SmartIF<IAddressProvider> athPoolAP{
71  serviceLocator()->service(std::format("AthenaPoolAddressProviderSvc/BkgAPAPSvc_{}", name()))
72  };
73  if (!athPoolAP) {
75  "Could not cast AthenaPoolAddressProviderSvc to IAddressProvider");
76  } else {
77  m_proxyProviderSvc->addProvider(athPoolAP);
78  }
79  // AddressRemappingSvc
80  SmartIF<IAddressProvider> addRemapAP{
81  serviceLocator()->service(std::format("AddressRemappingSvc/BkgARSvc_{}", name()))
82  };
83  if (!addRemapAP) {
84  ATH_MSG_WARNING("Could not cast AddressRemappingSvc to IAddressProvider");
85  } else {
86  m_proxyProviderSvc->addProvider(addRemapAP);
87  }
88 
89  const std::size_t n_concurrent =
90  Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents();
91  m_idx_lists.clear();
92  m_idx_lists.resize(n_concurrent);
93 
94  m_num_mb_by_bunch.clear();
95  m_num_mb_by_bunch.resize(n_concurrent);
96 
97  m_stores.clear();
98  m_stores.resize(n_concurrent);
99 
100  const int n_stores = 50; // Start with 50 stores per event
101  // setup n_concurrent vectors of n_stores StoreGates in m_stores
102  for (std::size_t i = 0; i < n_concurrent; ++i) {
103  auto& sgs = m_stores[i];
104  sgs.reserve(n_stores);
105  for (int j = 0; j < n_stores; ++j) {
106  // creates / retrieves a different StoreGateSvc for each slot
107  auto& sg = sgs.emplace_back(
108  std::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), i, j), name());
109  ATH_CHECK(sg.retrieve());
110  sg->setStoreID(StoreID::PILEUP_STORE);
111  sg->setProxyProviderSvc(m_proxyProviderSvc.get());
112  }
113  }
114 
115  // setup spare store for event skipping
116  ATH_CHECK(m_spare_store.retrieve());
118  m_spare_store->setProxyProviderSvc(m_proxyProviderSvc.get());
119  auto skipEvent_callback = [this](
122  using namespace std::chrono_literals;
123  auto* const old_store = m_activeStoreSvc->activeStore();
124  m_activeStoreSvc->setStore(m_spare_store.get());
125  ATH_MSG_INFO("Skipping " << end - begin << " HS events. ");
126  for (auto iter = begin; iter < end; ++iter) {
127  const auto& evt = *iter;
128  const std::size_t n_to_skip = calcMBRequired(
129  evt.evtIdx, s_NoSlot, evt.runNum, evt.lbNum, evt.evtNum);
130  ATH_MSG_DEBUG("Skipping HS_ID " << evt.evtIdx << " --> skipping "
131  << n_to_skip << " pileup events");
132  for (std::size_t i = 0; i < n_to_skip; ++i) {
133  if (!m_bkgEventSelector->next(*m_bkg_evt_sel_ctx).isSuccess()) {
134  ATH_MSG_ERROR("Ran out of background events");
135  return StatusCode::FAILURE;
136  }
137  }
138  }
139  m_last_loaded_hs.store((end - 1)->evtIdx);
140  m_activeStoreSvc->setStore(old_store);
141  return StatusCode::SUCCESS;
142  };
143  ATH_CHECK(m_skipEventIdxSvc->registerCallback(skipEvent_callback));
144  ATH_MSG_INFO("Initializing ODMBSvc");
145  return StatusCode::SUCCESS;
146 }
147 
148 std::size_t OnDemandMinbiasSvc::calcMBRequired(std::int64_t hs_id,
149  std::size_t slot,
150  unsigned int run,
151  unsigned int lumi,
153  ATH_MSG_DEBUG("Run " << run << ", lumi " << lumi << ", event " << event
154  << "| hs_id " << hs_id);
155  const int n_bunches = m_latestDeltaBC.value() - m_earliestDeltaBC.value() + 1;
156  // vector on stack for use if slot == s_NoSlot
157  std::vector<std::uint64_t> stack_num_mb_by_bunch{};
158  std::vector<std::uint64_t>& num_mb_by_bunch =
159  slot == s_NoSlot ? stack_num_mb_by_bunch : m_num_mb_by_bunch[slot];
160  num_mb_by_bunch.clear();
161  num_mb_by_bunch.resize(n_bunches);
162  FastReseededPRNG prng{m_seed.value(), hs_id};
163 
164  // First apply the beam luminosity SF
165  bool sf_updated_throwaway;
166  const float beam_lumi_sf =
167  m_useBeamLumi ? m_beamLumi->scaleFactor(run, lumi, sf_updated_throwaway)
168  : 1.F;
169  const float beam_lumi = beam_lumi_sf * m_nPerBunch.value();
170  std::vector<float> avg_num_mb_by_bunch(n_bunches, beam_lumi);
171  // Now update using beam intensities
172  if (m_useBeamInt) {
173  // Supposed to be once per event, but ends up running once per minbias type
174  // per event now
175  m_beamInt->selectT0(run, event);
176  for (int bunch = m_earliestDeltaBC.value();
177  bunch <= m_latestDeltaBC.value(); ++bunch) {
178  std::size_t idx = bunch - m_earliestDeltaBC.value();
179  avg_num_mb_by_bunch[idx] *= m_beamInt->normFactor(bunch);
180  }
181  }
182 
183  if (m_usePoisson) {
184  std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
185  num_mb_by_bunch.begin(), [&prng](float avg) {
186  return std::poisson_distribution<std::uint64_t>(avg)(prng);
187  });
188  } else {
189  std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
190  num_mb_by_bunch.begin(), [](float f) {
191  return static_cast<std::uint64_t>(std::round(f));
192  });
193  }
194 
195  std::uint64_t num_mb = ranges::accumulate(num_mb_by_bunch, 0UL);
196  if (slot == s_NoSlot) {
197  return num_mb;
198  }
199  // Won't go past here if slot == s_NoSlot
200 
201  std::vector<std::uint64_t>& index_array = m_idx_lists[slot];
202  index_array.clear();
203  index_array.resize(num_mb);
204  std::iota(index_array.begin(), index_array.end(), 0);
205  // Don't need to shuffle, since these events aren't reused
206  // std::shuffle(index_array.begin(), index_array.end(), prng);
207 
208  // Commented out until we can use C++ 23 range formatting
209  // ATH_MSG_DEBUG("HS ID " << hs_id << " uses " << num_mb << " events\n"
210  // << fmt::format("\t\tBy bunch: [{}]\n",
211  // fmt::join(num_mb_by_bunch, ", ")));
212  return num_mb;
213 }
214 
216  using namespace std::chrono_literals;
217  std::chrono::steady_clock::time_point order_wait_start{};
218 
219  const std::int64_t hs_id = get_hs_id(ctx);
220  const std::size_t slot = ctx.slot();
221  const std::size_t num_to_load =
222  calcMBRequired(hs_id, slot, ctx.eventID().run_number(),
223  ctx.eventID().lumi_block(), ctx.eventID().event_number());
224  auto& stores = m_stores[slot];
225  // If we don't have enough stores, make more
226  if (stores.size() < num_to_load) {
227  ATH_MSG_INFO("Adding " << num_to_load - stores.size() << " stores");
228  stores.reserve(num_to_load);
229  for (std::size_t i = stores.size(); i < num_to_load; ++i) {
230  auto& sg = stores.emplace_back(
231  std::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), slot, i),
232  name());
233  ATH_CHECK(sg.retrieve());
234  sg->setStoreID(StoreID::PILEUP_STORE);
235  sg->setProxyProviderSvc(m_proxyProviderSvc.get());
236  }
237  }
238  // Ensure loading is done in order
239  if (m_last_loaded_hs < hs_id - 1) {
240  ATH_MSG_INFO("Waiting to prevent out-of-order loading. Last loaded is "
241  << m_last_loaded_hs << " and we are " << hs_id);
242  order_wait_start = std::chrono::steady_clock::now();
243  while (m_last_loaded_hs < hs_id - 1) {
244  std::this_thread::sleep_for(50ms);
245  }
246  auto wait_time = std::chrono::steady_clock::now() - order_wait_start;
247  ATH_MSG_INFO(std::format("Waited {:%M:%S} to prevent out-of-order loading",
248  wait_time));
249  }
250  // Lock reading mutex
251  std::unique_lock lck(m_reading_batch_mtx);
253  // Remember old store to reset later
254  auto* old_store = m_activeStoreSvc->activeStore();
255  for (std::size_t i = 0; i < num_to_load; ++i) {
256  auto& sg = stores[i];
257  // Change active store
258  m_activeStoreSvc->setStore(sg.get());
259  SG::CurrentEventStore::Push reader_sg_ces(sg.get());
260  // Read next event
261  ATH_CHECK(sg->clearStore(true));
262  if (!(m_bkgEventSelector->next(*m_bkg_evt_sel_ctx)).isSuccess()) {
263  ATH_MSG_FATAL("Ran out of minbias events");
264  return StatusCode::FAILURE;
265  }
266  IOpaqueAddress* addr = nullptr;
267  if (!m_bkgEventSelector->createAddress(*m_bkg_evt_sel_ctx, addr)
268  .isSuccess()) {
269  ATH_MSG_WARNING("Failed to create address. No more events?");
270  return StatusCode::FAILURE;
271  }
272  if (addr == nullptr) {
273  ATH_MSG_WARNING("createAddress returned nullptr. No more events?");
274  return StatusCode::FAILURE;
275  }
276  ATH_CHECK(sg->recordAddress(addr));
277  ATH_CHECK(sg->loadEventProxies());
278  // Read data now if desired
279  if (!m_onDemandMB) {
280  for (const auto* proxy_ptr : sg->proxies()) {
281  // Sort of a const_cast, then ->accessData()
282  sg->proxy_exact(proxy_ptr->sgkey())->accessData();
283  }
284  }
285  }
286  // Reset active store
287  m_activeStoreSvc->setStore(old_store);
288  ATH_MSG_INFO(std::format("Took {:%M:%S} to load events",
290  // Update last loaded
291  m_last_loaded_hs.store(hs_id);
292  return StatusCode::SUCCESS;
293 }
294 
296  std::uint64_t mb_id) {
297  const std::size_t slot = ctx.slot();
298  const std::size_t index = m_idx_lists.at(slot).at(mb_id);
299  return m_stores.at(ctx.slot()).at(index).get();
300 }
301 
302 std::size_t OnDemandMinbiasSvc::getNumForBunch(const EventContext& ctx,
303  int bunch) const {
304  if (bunch < m_earliestDeltaBC.value() || bunch > m_latestDeltaBC.value()) {
305  throw std::logic_error(std::format(
306  "Tried to request bunch {} which is outside the range [{}, {}]", bunch,
307  m_earliestDeltaBC.value(), m_latestDeltaBC.value()));
308  }
309  return m_num_mb_by_bunch.at(ctx.slot()).at(bunch - m_earliestDeltaBC.value());
310 }
311 
313  // clear all stores
314  for (auto&& sg : m_stores[ctx.slot()]) {
315  ATH_CHECK(sg->clearStore());
316  }
317  return StatusCode::SUCCESS;
318 }
CurrentEventStore.h
Hold a pointer to the current event store.
OnDemandMinbiasSvc::m_beamLumi
ServiceHandle< IBeamLuminosity > m_beamLumi
Definition: OnDemandMinbiasSvc.h:82
createLinkingScheme.iter
iter
Definition: createLinkingScheme.py:62
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
CLIDToString
std::string CLIDToString(const CLID &clid)
Definition: OnDemandMinbiasSvc.cxx:26
FastReseededPRNG.h
vtune_athena.format
format
Definition: vtune_athena.py:14
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
index
Definition: index.py:1
OnDemandMinbiasSvc::m_useBeamInt
Gaudi::Property< bool > m_useBeamInt
Definition: OnDemandMinbiasSvc.h:65
mergePhysValFiles.start
start
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:13
OnDemandMinbiasSvc::m_num_mb_by_bunch
std::vector< std::vector< std::uint64_t > > m_num_mb_by_bunch
Definition: OnDemandMinbiasSvc.h:94
OnDemandMinbiasSvc::OnDemandMinbiasSvc
OnDemandMinbiasSvc(const std::string &name, ISvcLocator *svc)
Constructor.
Definition: OnDemandMinbiasSvc.cxx:30
PlotCalibFromCool.begin
begin
Definition: PlotCalibFromCool.py:94
SG::CurrentEventStore::Push
Temporarily change the current store.
Definition: SGTools/SGTools/CurrentEventStore.h:58
CLIDRegistry::CLIDToTypeinfo
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
Definition: CLIDRegistry.cxx:136
OnDemandMinbiasSvc::m_skipEventIdxSvc
ServiceHandle< ISkipEventIdxSvc > m_skipEventIdxSvc
Definition: OnDemandMinbiasSvc.h:75
OnDemandMinbiasSvc::calcMBRequired
std::size_t calcMBRequired(std::int64_t hs_id, std::size_t slot, unsigned int run, unsigned int lumi, std::uint64_t event)
Definition: OnDemandMinbiasSvc.cxx:148
LArG4FSStartPointFilter.evt
evt
Definition: LArG4FSStartPointFilter.py:42
OnDemandMinbiasSvc::m_bkgEventSelector
ServiceHandle< IEvtSelector > m_bkgEventSelector
Definition: OnDemandMinbiasSvc.h:78
OnDemandMinbiasSvc::m_idx_lists
std::vector< std::vector< std::uint64_t > > m_idx_lists
Definition: OnDemandMinbiasSvc.h:95
mergePhysValFiles.end
end
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:92
OnDemandMinbiasSvc::m_usePoisson
Gaudi::Property< bool > m_usePoisson
Definition: OnDemandMinbiasSvc.h:62
OnDemandMinbiasSvc::m_proxyProviderSvc
ServiceHandle< IProxyProviderSvc > m_proxyProviderSvc
Definition: OnDemandMinbiasSvc.h:91
OnDemandMinbiasSvc::~OnDemandMinbiasSvc
~OnDemandMinbiasSvc() final
Destructor.
Definition: OnDemandMinbiasSvc.cxx:38
StoreGateSvc
The Athena Transient Store API.
Definition: StoreGateSvc.h:120
OnDemandMinbiasSvc::get_hs_id
virtual std::int64_t get_hs_id(const EventContext &ctx) const override
Definition: OnDemandMinbiasSvc.h:43
python.handimod.now
now
Definition: handimod.py:674
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
OnDemandMinbiasSvc::getMinbias
StoreGateSvc * getMinbias(const EventContext &ctx, std::uint64_t mb_id) override
Definition: OnDemandMinbiasSvc.cxx:295
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
StoreID::PILEUP_STORE
@ PILEUP_STORE
Definition: StoreID.h:31
OnDemandMinbiasSvc::m_last_loaded_hs
std::atomic_int64_t m_last_loaded_hs
Definition: OnDemandMinbiasSvc.h:98
OnDemandMinbiasSvc.h
lumiFormat.i
int i
Definition: lumiFormat.py:85
ISkipEventIdxSvc::EvtIter
std::vector< EvtId >::const_iterator EvtIter
Definition: ISkipEventIdxSvc.h:23
OnDemandMinbiasSvc::m_onDemandMB
Gaudi::Property< bool > m_onDemandMB
Definition: OnDemandMinbiasSvc.h:54
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
Recovery.avg
def avg(a, b)
Definition: Recovery.py:79
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
OnDemandMinbiasSvc::m_useBeamLumi
Gaudi::Property< bool > m_useBeamLumi
Definition: OnDemandMinbiasSvc.h:67
OnDemandMinbiasSvc::s_NoSlot
static constexpr std::size_t s_NoSlot
Definition: OnDemandMinbiasSvc.h:100
OnDemandMinbiasSvc::beginHardScatter
StatusCode beginHardScatter(const EventContext &ctx) override
Definition: OnDemandMinbiasSvc.cxx:215
Amg::transform
Amg::Vector3D transform(Amg::Vector3D &v, Amg::Transform3D &tr)
Transform a point from a Trasformation3D.
Definition: GeoPrimitivesHelpers.h:156
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
OnDemandMinbiasSvc::m_stores
std::vector< std::vector< SGHandle > > m_stores
Definition: OnDemandMinbiasSvc.h:93
hist_file_dump.f
f
Definition: hist_file_dump.py:140
run
Definition: run.py:1
Handler::svc
AthROOTErrorHandlerSvc * svc
Definition: AthROOTErrorHandlerSvc.cxx:10
CLID
uint32_t CLID
The Class ID type.
Definition: Event/xAOD/xAODCore/xAODCore/ClassID_traits.h:47
OnDemandMinbiasSvc::m_bkg_evt_sel_ctx
IEvtSelector::Context * m_bkg_evt_sel_ctx
Definition: OnDemandMinbiasSvc.h:90
OnDemandMinbiasSvc::initialize
StatusCode initialize() final
AthService initialize.
Definition: OnDemandMinbiasSvc.cxx:40
OnDemandMinbiasSvc::m_nPerBunch
Gaudi::Property< float > m_nPerBunch
Definition: OnDemandMinbiasSvc.h:59
OnDemandMinbiasSvc::m_latestDeltaBC
Gaudi::Property< int > m_latestDeltaBC
Definition: OnDemandMinbiasSvc.h:72
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
OnDemandMinbiasSvc::endHardScatter
StatusCode endHardScatter(const EventContext &ctx) override
Definition: OnDemandMinbiasSvc.cxx:312
OnDemandMinbiasSvc::m_beamInt
ServiceHandle< IBeamIntensity > m_beamInt
Definition: OnDemandMinbiasSvc.h:80
runIDAlign.accumulate
accumulate
Update flags based on parser line args.
Definition: runIDAlign.py:60
lumiFormat.lumi
lumi
Definition: lumiFormat.py:106
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
OnDemandMinbiasSvc::getNumForBunch
std::size_t getNumForBunch(const EventContext &ctx, int bunch) const override
Definition: OnDemandMinbiasSvc.cxx:302
LArNewCalib_DelayDump_OFC_Cali.idx
idx
Definition: LArNewCalib_DelayDump_OFC_Cali.py:69
IProxyProviderSvc.h
IAddressProvider.h
OnDemandMinbiasSvc::m_reading_batch_mtx
std::mutex m_reading_batch_mtx
Definition: OnDemandMinbiasSvc.h:97
OnDemandMinbiasSvc::m_earliestDeltaBC
Gaudi::Property< int > m_earliestDeltaBC
Definition: OnDemandMinbiasSvc.h:69
OnDemandMinbiasSvc::m_activeStoreSvc
ServiceHandle< ActiveStoreSvc > m_activeStoreSvc
Definition: OnDemandMinbiasSvc.h:84
OnDemandMinbiasSvc::m_spare_store
SGHandle m_spare_store
Definition: OnDemandMinbiasSvc.h:87
OnDemandMinbiasSvc::m_seed
Gaudi::Property< std::uint64_t > m_seed
Definition: OnDemandMinbiasSvc.h:52
python.SystemOfUnits.ms
float ms
Definition: SystemOfUnits.py:148
FastReseededPRNG
Definition: FastReseededPRNG.h:28