Loading [MathJax]/extensions/tex2jax.js
ATLAS Offline Software
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
OnDemandMinbiasSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "OnDemandMinbiasSvc.h"
6 
7 #include <GaudiKernel/ConcurrencyFlags.h>
8 #include <fmt/chrono.h>
9 #include <fmt/format.h>
10 
11 #include <algorithm>
12 #include <boost/core/demangle.hpp>
13 #include <chrono>
14 #include <random>
15 #include <range/v3/algorithm.hpp>
16 #include <range/v3/numeric/accumulate.hpp>
17 #include <range/v3/to_container.hpp>
18 #include <range/v3/view.hpp>
19 #include <thread>
20 
25 
26 
27 inline std::string CLIDToString(const CLID& clid) {
28  return boost::core::demangle(CLIDRegistry::CLIDToTypeinfo(clid)->name());
29 }
30 
32  ISvcLocator* svc)
33  : base_class(name, svc),
34  m_bkg_evt_sel_ctx(nullptr),
35  m_proxyProviderSvc("ProxyProviderSvc/BkgPPSvc_"+name, name),
36  m_last_loaded_hs()
37 {}
38 
40 
42  m_stores.clear();
43  ATH_CHECK(m_bkgEventSelector.retrieve());
44  ATH_CHECK(m_activeStoreSvc.retrieve());
45  ATH_CHECK(m_skipEventIdxSvc.retrieve());
46  if (m_useBeamInt) {
47  ATH_CHECK(m_beamInt.retrieve());
48  }
49  if (m_useBeamLumi) {
50  ATH_CHECK(m_beamLumi.retrieve());
51  }
52  // Setup context
53  if (!m_bkgEventSelector->createContext(m_bkg_evt_sel_ctx).isSuccess()) {
54  ATH_MSG_ERROR("Failed to create background event selector context");
55  return StatusCode::FAILURE;
56  }
57  ATH_CHECK(SmartIF<IService>(m_bkgEventSelector.get())->start());
58 
59  // Setup proxy provider
60  ATH_CHECK(m_proxyProviderSvc.retrieve());
61 
62  // Setup Address Providers
63  SmartIF<IAddressProvider> addressProvider{m_bkgEventSelector.get()};
64  if (!addressProvider) {
66  "Could not cast background event selector to IAddressProvider");
67  } else {
68  m_proxyProviderSvc->addProvider(addressProvider);
69  }
70  // AthenaPoolAddressProviderSvc
71  SmartIF<IAddressProvider> athPoolAP{
72  serviceLocator()->service(fmt::format("AthenaPoolAddressProviderSvc/BkgAPAPSvc_{}", name()))
73  };
74  if (!athPoolAP) {
76  "Could not cast AthenaPoolAddressProviderSvc to IAddressProvider");
77  } else {
78  m_proxyProviderSvc->addProvider(athPoolAP);
79  }
80  // AddressRemappingSvc
81  SmartIF<IAddressProvider> addRemapAP{
82  serviceLocator()->service(fmt::format("AddressRemappingSvc/BkgARSvc_{}", name()))
83  };
84  if (!addRemapAP) {
85  ATH_MSG_WARNING("Could not cast AddressRemappingSvc to IAddressProvider");
86  } else {
87  m_proxyProviderSvc->addProvider(addRemapAP);
88  }
89 
90  const std::size_t n_concurrent =
91  Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents();
92  m_idx_lists.clear();
93  m_idx_lists.resize(n_concurrent);
94 
95  m_num_mb_by_bunch.clear();
96  m_num_mb_by_bunch.resize(n_concurrent);
97 
98  m_stores.clear();
99  m_stores.resize(n_concurrent);
100 
101  const int n_stores = 50; // Start with 50 stores per event
102  // setup n_concurrent vectors of n_stores StoreGates in m_stores
103  for (std::size_t i = 0; i < n_concurrent; ++i) {
104  auto& sgs = m_stores[i];
105  sgs.reserve(n_stores);
106  for (int j = 0; j < n_stores; ++j) {
107  // creates / retrieves a different StoreGateSvc for each slot
108  auto& sg = sgs.emplace_back(
109  fmt::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), i, j), name());
110  ATH_CHECK(sg.retrieve());
111  sg->setStoreID(StoreID::PILEUP_STORE);
112  sg->setProxyProviderSvc(m_proxyProviderSvc.get());
113  }
114  }
115 
116  // setup spare store for event skipping
117  ATH_CHECK(m_spare_store.retrieve());
119  m_spare_store->setProxyProviderSvc(m_proxyProviderSvc.get());
120  auto skipEvent_callback = [this](
123  using namespace std::chrono_literals;
124  auto* const old_store = m_activeStoreSvc->activeStore();
125  m_activeStoreSvc->setStore(m_spare_store.get());
126  ATH_MSG_INFO("Skipping " << end - begin << " HS events. ");
127  for (auto iter = begin; iter < end; ++iter) {
128  const auto& evt = *iter;
129  const std::size_t n_to_skip = calcMBRequired(
130  evt.evtIdx, s_NoSlot, evt.runNum, evt.lbNum, evt.evtNum);
131  ATH_MSG_DEBUG("Skipping HS_ID " << evt.evtIdx << " --> skipping "
132  << n_to_skip << " pileup events");
133  for (std::size_t i = 0; i < n_to_skip; ++i) {
134  if (!m_bkgEventSelector->next(*m_bkg_evt_sel_ctx).isSuccess()) {
135  ATH_MSG_ERROR("Ran out of background events");
136  return StatusCode::FAILURE;
137  }
138  }
139  }
140  m_last_loaded_hs.store((end - 1)->evtIdx);
141  m_activeStoreSvc->setStore(old_store);
142  return StatusCode::SUCCESS;
143  };
144  ATH_CHECK(m_skipEventIdxSvc->registerCallback(skipEvent_callback));
145  ATH_MSG_INFO("Initializing ODMBSvc");
146  return StatusCode::SUCCESS;
147 }
148 
149 std::size_t OnDemandMinbiasSvc::calcMBRequired(std::int64_t hs_id,
150  std::size_t slot,
151  unsigned int run,
152  unsigned int lumi,
154  ATH_MSG_DEBUG("Run " << run << ", lumi " << lumi << ", event " << event
155  << "| hs_id " << hs_id);
156  const int n_bunches = m_latestDeltaBC.value() - m_earliestDeltaBC.value() + 1;
157  // vector on stack for use if slot == s_NoSlot
158  std::vector<std::uint64_t> stack_num_mb_by_bunch{};
159  std::vector<std::uint64_t>& num_mb_by_bunch =
160  slot == s_NoSlot ? stack_num_mb_by_bunch : m_num_mb_by_bunch[slot];
161  num_mb_by_bunch.clear();
162  num_mb_by_bunch.resize(n_bunches);
163  FastReseededPRNG prng{m_seed.value(), hs_id};
164 
165  // First apply the beam luminosity SF
166  bool sf_updated_throwaway;
167  const float beam_lumi_sf =
168  m_useBeamLumi ? m_beamLumi->scaleFactor(run, lumi, sf_updated_throwaway)
169  : 1.F;
170  const float beam_lumi = beam_lumi_sf * m_nPerBunch.value();
171  std::vector<float> avg_num_mb_by_bunch(n_bunches, beam_lumi);
172  // Now update using beam intensities
173  if (m_useBeamInt) {
174  // Supposed to be once per event, but ends up running once per minbias type
175  // per event now
176  m_beamInt->selectT0(run, event);
177  for (int bunch = m_earliestDeltaBC.value();
178  bunch <= m_latestDeltaBC.value(); ++bunch) {
179  std::size_t idx = bunch - m_earliestDeltaBC.value();
180  avg_num_mb_by_bunch[idx] *= m_beamInt->normFactor(bunch);
181  }
182  }
183 
184  if (m_usePoisson) {
185  std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
186  num_mb_by_bunch.begin(), [&prng](float avg) {
187  return std::poisson_distribution<std::uint64_t>(avg)(prng);
188  });
189  } else {
190  std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
191  num_mb_by_bunch.begin(), [](float f) {
192  return static_cast<std::uint64_t>(std::round(f));
193  });
194  }
195 
196  std::uint64_t num_mb = ranges::accumulate(num_mb_by_bunch, 0UL);
197  if (slot == s_NoSlot) {
198  return num_mb;
199  }
200  // Won't go past here if slot == s_NoSlot
201 
202  std::vector<std::uint64_t>& index_array = m_idx_lists[slot];
203  index_array.clear();
204  index_array.resize(num_mb);
205  std::iota(index_array.begin(), index_array.end(), 0);
206  // Don't need to shuffle, since these events aren't reused
207  // std::shuffle(index_array.begin(), index_array.end(), prng);
208  ATH_MSG_DEBUG("HS ID " << hs_id << " uses " << num_mb << " events\n"
209  << fmt::format("\t\tBy bunch: [{}]\n",
210  fmt::join(num_mb_by_bunch, ", ")));
211  return num_mb;
212 }
213 
215  using namespace std::chrono_literals;
216  std::chrono::steady_clock::time_point order_wait_start{};
217 
218  const std::int64_t hs_id = get_hs_id(ctx);
219  const std::size_t slot = ctx.slot();
220  const std::size_t num_to_load =
221  calcMBRequired(hs_id, slot, ctx.eventID().run_number(),
222  ctx.eventID().lumi_block(), ctx.eventID().event_number());
223  auto& stores = m_stores[slot];
224  // If we don't have enough stores, make more
225  if (stores.size() < num_to_load) {
226  ATH_MSG_INFO("Adding " << num_to_load - stores.size() << " stores");
227  stores.reserve(num_to_load);
228  for (std::size_t i = stores.size(); i < num_to_load; ++i) {
229  auto& sg = stores.emplace_back(
230  fmt::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), slot, i),
231  name());
232  ATH_CHECK(sg.retrieve());
233  sg->setStoreID(StoreID::PILEUP_STORE);
234  sg->setProxyProviderSvc(m_proxyProviderSvc.get());
235  }
236  }
237  // Ensure loading is done in order
238  if (m_last_loaded_hs < hs_id - 1) {
239  ATH_MSG_INFO("Waiting to prevent out-of-order loading. Last loaded is "
240  << m_last_loaded_hs << " and we are " << hs_id);
241  order_wait_start = std::chrono::steady_clock::now();
242  while (m_last_loaded_hs < hs_id - 1) {
243  std::this_thread::sleep_for(50ms);
244  }
245  auto wait_time = std::chrono::steady_clock::now() - order_wait_start;
246  ATH_MSG_INFO(fmt::format("Waited {:%M:%S} to prevent out-of-order loading",
247  wait_time));
248  }
249  // Lock reading mutex
250  std::unique_lock lck(m_reading_batch_mtx);
252  // Remember old store to reset later
253  auto* old_store = m_activeStoreSvc->activeStore();
254  for (std::size_t i = 0; i < num_to_load; ++i) {
255  auto& sg = stores[i];
256  // Change active store
257  m_activeStoreSvc->setStore(sg.get());
258  SG::CurrentEventStore::Push reader_sg_ces(sg.get());
259  // Read next event
260  ATH_CHECK(sg->clearStore(true));
261  if (!(m_bkgEventSelector->next(*m_bkg_evt_sel_ctx)).isSuccess()) {
262  ATH_MSG_FATAL("Ran out of minbias events");
263  return StatusCode::FAILURE;
264  }
265  IOpaqueAddress* addr = nullptr;
266  if (!m_bkgEventSelector->createAddress(*m_bkg_evt_sel_ctx, addr)
267  .isSuccess()) {
268  ATH_MSG_WARNING("Failed to create address. No more events?");
269  return StatusCode::FAILURE;
270  }
271  if (addr == nullptr) {
272  ATH_MSG_WARNING("createAddress returned nullptr. No more events?");
273  return StatusCode::FAILURE;
274  }
275  ATH_CHECK(sg->recordAddress(addr));
276  ATH_CHECK(sg->loadEventProxies());
277  // Read data now if desired
278  for (const auto* proxy_ptr : sg->proxies()) {
279  if (!m_onDemandMB) {
280  // Sort of a const_cast, then ->accessData()
281  sg->proxy_exact(proxy_ptr->sgkey())->accessData();
282  }
283  }
284  }
285  // Reset active store
286  m_activeStoreSvc->setStore(old_store);
287  ATH_MSG_INFO(fmt::format("Took {:%M:%S} to load events",
289  // Update last loaded
290  m_last_loaded_hs.store(hs_id);
291  return StatusCode::SUCCESS;
292 }
293 
295  std::uint64_t mb_id) {
296  const std::size_t slot = ctx.slot();
297  const std::size_t index = m_idx_lists.at(slot).at(mb_id);
298  return m_stores.at(ctx.slot()).at(index).get();
299 }
300 
301 std::size_t OnDemandMinbiasSvc::getNumForBunch(const EventContext& ctx,
302  int bunch) const {
303  if (bunch < m_earliestDeltaBC.value() || bunch > m_latestDeltaBC.value()) {
304  throw std::logic_error(fmt::format(
305  "Tried to request bunch {} which is outside the range [{}, {}]", bunch,
306  m_earliestDeltaBC.value(), m_latestDeltaBC.value()));
307  }
308  return m_num_mb_by_bunch.at(ctx.slot()).at(bunch - m_earliestDeltaBC.value());
309 }
310 
312  // clear all stores
313  for (auto&& sg : m_stores[ctx.slot()]) {
314  ATH_CHECK(sg->clearStore());
315  }
316  return StatusCode::SUCCESS;
317 }
CurrentEventStore.h
Hold a pointer to the current event store.
OnDemandMinbiasSvc::m_beamLumi
ServiceHandle< IBeamLuminosity > m_beamLumi
Definition: OnDemandMinbiasSvc.h:83
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
CLIDToString
std::string CLIDToString(const CLID &clid)
Definition: OnDemandMinbiasSvc.cxx:27
FastReseededPRNG.h
vtune_athena.format
format
Definition: vtune_athena.py:14
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
index
Definition: index.py:1
OnDemandMinbiasSvc::m_useBeamInt
Gaudi::Property< bool > m_useBeamInt
Definition: OnDemandMinbiasSvc.h:66
mergePhysValFiles.start
start
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:14
OnDemandMinbiasSvc::m_num_mb_by_bunch
std::vector< std::vector< std::uint64_t > > m_num_mb_by_bunch
Definition: OnDemandMinbiasSvc.h:95
OnDemandMinbiasSvc::OnDemandMinbiasSvc
OnDemandMinbiasSvc(const std::string &name, ISvcLocator *svc)
Constructor.
Definition: OnDemandMinbiasSvc.cxx:31
PlotCalibFromCool.begin
begin
Definition: PlotCalibFromCool.py:94
SG::CurrentEventStore::Push
Temporarily change the current store.
Definition: SGTools/SGTools/CurrentEventStore.h:58
CLIDRegistry::CLIDToTypeinfo
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
Definition: CLIDRegistry.cxx:136
OnDemandMinbiasSvc::m_skipEventIdxSvc
ServiceHandle< ISkipEventIdxSvc > m_skipEventIdxSvc
Definition: OnDemandMinbiasSvc.h:76
OnDemandMinbiasSvc::calcMBRequired
std::size_t calcMBRequired(std::int64_t hs_id, std::size_t slot, unsigned int run, unsigned int lumi, std::uint64_t event)
Definition: OnDemandMinbiasSvc.cxx:149
LArG4FSStartPointFilter.evt
evt
Definition: LArG4FSStartPointFilter.py:42
OnDemandMinbiasSvc::m_bkgEventSelector
ServiceHandle< IEvtSelector > m_bkgEventSelector
Definition: OnDemandMinbiasSvc.h:79
python.SystemOfUnits.ms
int ms
Definition: SystemOfUnits.py:132
OnDemandMinbiasSvc::m_idx_lists
std::vector< std::vector< std::uint64_t > > m_idx_lists
Definition: OnDemandMinbiasSvc.h:96
mergePhysValFiles.end
end
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:93
OnDemandMinbiasSvc::m_usePoisson
Gaudi::Property< bool > m_usePoisson
Definition: OnDemandMinbiasSvc.h:63
OnDemandMinbiasSvc::m_proxyProviderSvc
ServiceHandle< IProxyProviderSvc > m_proxyProviderSvc
Definition: OnDemandMinbiasSvc.h:92
OnDemandMinbiasSvc::~OnDemandMinbiasSvc
~OnDemandMinbiasSvc() final
Destructor.
Definition: OnDemandMinbiasSvc.cxx:39
StoreGateSvc
The Athena Transient Store API.
Definition: StoreGateSvc.h:124
OnDemandMinbiasSvc::get_hs_id
virtual std::int64_t get_hs_id(const EventContext &ctx) const override
Definition: OnDemandMinbiasSvc.h:44
python.handimod.now
now
Definition: handimod.py:675
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
OnDemandMinbiasSvc::getMinbias
StoreGateSvc * getMinbias(const EventContext &ctx, std::uint64_t mb_id) override
Definition: OnDemandMinbiasSvc.cxx:294
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
StoreID::PILEUP_STORE
@ PILEUP_STORE
Definition: StoreID.h:31
OnDemandMinbiasSvc::m_last_loaded_hs
std::atomic_int64_t m_last_loaded_hs
Definition: OnDemandMinbiasSvc.h:99
OnDemandMinbiasSvc.h
lumiFormat.i
int i
Definition: lumiFormat.py:85
ISkipEventIdxSvc::EvtIter
std::vector< EvtId >::const_iterator EvtIter
Definition: ISkipEventIdxSvc.h:23
OnDemandMinbiasSvc::m_onDemandMB
Gaudi::Property< bool > m_onDemandMB
Definition: OnDemandMinbiasSvc.h:55
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
Recovery.avg
def avg(a, b)
Definition: Recovery.py:79
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
OnDemandMinbiasSvc::m_useBeamLumi
Gaudi::Property< bool > m_useBeamLumi
Definition: OnDemandMinbiasSvc.h:68
OnDemandMinbiasSvc::s_NoSlot
static constexpr std::size_t s_NoSlot
Definition: OnDemandMinbiasSvc.h:101
OnDemandMinbiasSvc::beginHardScatter
StatusCode beginHardScatter(const EventContext &ctx) override
Definition: OnDemandMinbiasSvc.cxx:214
Amg::transform
Amg::Vector3D transform(Amg::Vector3D &v, Amg::Transform3D &tr)
Transform a point from a Trasformation3D.
Definition: GeoPrimitivesHelpers.h:156
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
OnDemandMinbiasSvc::m_stores
std::vector< std::vector< SGHandle > > m_stores
Definition: OnDemandMinbiasSvc.h:94
hist_file_dump.f
f
Definition: hist_file_dump.py:141
run
Definition: run.py:1
Handler::svc
AthROOTErrorHandlerSvc * svc
Definition: AthROOTErrorHandlerSvc.cxx:10
CLID
uint32_t CLID
The Class ID type.
Definition: Event/xAOD/xAODCore/xAODCore/ClassID_traits.h:47
OnDemandMinbiasSvc::m_bkg_evt_sel_ctx
IEvtSelector::Context * m_bkg_evt_sel_ctx
Definition: OnDemandMinbiasSvc.h:91
OnDemandMinbiasSvc::initialize
StatusCode initialize() final
AthService initialize.
Definition: OnDemandMinbiasSvc.cxx:41
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
OnDemandMinbiasSvc::m_nPerBunch
Gaudi::Property< float > m_nPerBunch
Definition: OnDemandMinbiasSvc.h:60
OnDemandMinbiasSvc::m_latestDeltaBC
Gaudi::Property< int > m_latestDeltaBC
Definition: OnDemandMinbiasSvc.h:73
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
OnDemandMinbiasSvc::endHardScatter
StatusCode endHardScatter(const EventContext &ctx) override
Definition: OnDemandMinbiasSvc.cxx:311
OnDemandMinbiasSvc::m_beamInt
ServiceHandle< IBeamIntensity > m_beamInt
Definition: OnDemandMinbiasSvc.h:81
runIDAlign.accumulate
accumulate
Update flags based on parser line args.
Definition: runIDAlign.py:63
lumiFormat.lumi
lumi
Definition: lumiFormat.py:106
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
OnDemandMinbiasSvc::getNumForBunch
std::size_t getNumForBunch(const EventContext &ctx, int bunch) const override
Definition: OnDemandMinbiasSvc.cxx:301
LArNewCalib_DelayDump_OFC_Cali.idx
idx
Definition: LArNewCalib_DelayDump_OFC_Cali.py:69
IProxyProviderSvc.h
IAddressProvider.h
OnDemandMinbiasSvc::m_reading_batch_mtx
std::mutex m_reading_batch_mtx
Definition: OnDemandMinbiasSvc.h:98
OnDemandMinbiasSvc::m_earliestDeltaBC
Gaudi::Property< int > m_earliestDeltaBC
Definition: OnDemandMinbiasSvc.h:70
OnDemandMinbiasSvc::m_activeStoreSvc
ServiceHandle< ActiveStoreSvc > m_activeStoreSvc
Definition: OnDemandMinbiasSvc.h:85
OnDemandMinbiasSvc::m_spare_store
SGHandle m_spare_store
Definition: OnDemandMinbiasSvc.h:88
OnDemandMinbiasSvc::m_seed
Gaudi::Property< std::uint64_t > m_seed
Definition: OnDemandMinbiasSvc.h:53
FastReseededPRNG
Definition: FastReseededPRNG.h:28