ATLAS Offline Software
OnDemandMinbiasSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2023 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "OnDemandMinbiasSvc.h"
6 
7 #include <GaudiKernel/ConcurrencyFlags.h>
8 #include <fmt/chrono.h>
9 #include <fmt/format.h>
10 
11 #include <algorithm>
12 #include <boost/core/demangle.hpp>
13 #include <chrono>
14 #include <random>
15 #include <range/v3/algorithm.hpp>
16 #include <range/v3/numeric/accumulate.hpp>
17 #include <range/v3/to_container.hpp>
18 #include <range/v3/view.hpp>
19 #include <thread>
20 
24 #include "EventInfo/EventID.h"
25 #include "EventInfo/EventInfo.h"
28 
29 namespace rv = ranges::views;
30 
31 inline std::string CLIDToString(const CLID& clid) {
32  return boost::core::demangle(CLIDRegistry::CLIDToTypeinfo(clid)->name());
33 }
34 
36  ISvcLocator* svc)
37  : base_class(name, svc), m_bkg_evt_sel_ctx(nullptr), m_last_loaded_hs() {}
38 
40 
42  m_stores.clear();
43  ATH_CHECK(m_bkgEventSelector.retrieve());
44  ATH_CHECK(m_activeStoreSvc.retrieve());
45  ATH_CHECK(m_skipEventIdxSvc.retrieve());
46  if (m_useBeamInt) {
47  ATH_CHECK(m_beamInt.retrieve());
48  }
49  if (m_useBeamLumi) {
50  ATH_CHECK(m_beamLumi.retrieve());
51  }
52  // Setup context
53  if (!m_bkgEventSelector->createContext(m_bkg_evt_sel_ctx).isSuccess()) {
54  ATH_MSG_ERROR("Failed to create background event selector context");
55  return StatusCode::FAILURE;
56  }
57  ATH_CHECK(dynamic_cast<Service*>(m_bkgEventSelector.get())->start());
58 
59  // Setup proxy provider
60  m_proxyProviderSvc = nullptr;
61  ATH_CHECK(serviceLocator()->service(
62  fmt::format("ProxyProviderSvc/BkgPPSvc_{}", name()), m_proxyProviderSvc,
63  true));
64  // Setup Address Providers
65  auto* addressProvider =
66  dynamic_cast<IAddressProvider*>(m_bkgEventSelector.get());
67  if (addressProvider == nullptr) {
69  "Could not cast background event selector to IAddressProvider");
70  } else {
71  m_proxyProviderSvc->addProvider(addressProvider);
72  }
73  // AthenaPoolAddressProviderSvc
74  IService* athPoolSvc = nullptr;
75  ATH_CHECK(serviceLocator()->service(
76  fmt::format("AthenaPoolAddressProviderSvc/BkgAPAPSvc_{}", name()),
77  athPoolSvc));
78  auto* athPoolAP = dynamic_cast<IAddressProvider*>(athPoolSvc);
79  if (athPoolAP == nullptr) {
81  "Could not cast AthenaPoolAddressProviderSvc to IAddressProvider");
82  } else {
83  m_proxyProviderSvc->addProvider(athPoolAP);
84  }
85  // AddressRemappingSvc
86  IService* addRemapSvc = nullptr;
87  ATH_CHECK(serviceLocator()->service(
88  fmt::format("AddressRemappingSvc/BkgARSvc_{}", name()), addRemapSvc));
89  auto* addRemapAP = dynamic_cast<IAddressProvider*>(addRemapSvc);
90  if (addRemapAP == nullptr) {
91  ATH_MSG_WARNING("Could not cast AddressRemappingSvc to IAddressProvider");
92  } else {
93  m_proxyProviderSvc->addProvider(addRemapAP);
94  }
95 
96  const std::size_t n_concurrent =
97  Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents();
98  m_idx_lists.clear();
99  m_idx_lists.resize(n_concurrent);
100 
101  m_num_mb_by_bunch.clear();
102  m_num_mb_by_bunch.resize(n_concurrent);
103 
104  m_stores.clear();
105  m_stores.resize(n_concurrent);
106 
107  const int n_stores = 50; // Start with 50 stores per event
108  // setup n_concurrent vectors of n_stores StoreGates in m_stores
109  for (std::size_t i = 0; i < n_concurrent; ++i) {
110  auto& sgs = m_stores[i];
111  sgs.reserve(n_stores);
112  for (int j = 0; j < n_stores; ++j) {
113  // creates / retrieves a different StoreGateSvc for each slot
114  auto& sg = sgs.emplace_back(
115  fmt::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), i, j), name());
116  ATH_CHECK(sg.retrieve());
117  sg->setStoreID(StoreID::PILEUP_STORE);
118  sg->setProxyProviderSvc(m_proxyProviderSvc);
119  }
120  }
121 
122  // setup spare store for event skipping
123  ATH_CHECK(m_spare_store.retrieve());
125  m_spare_store->setProxyProviderSvc(m_proxyProviderSvc);
126  auto skipEvent_callback = [this](
129  using namespace std::chrono_literals;
130  auto* const old_store = m_activeStoreSvc->activeStore();
131  m_activeStoreSvc->setStore(m_spare_store.get());
132  ATH_MSG_INFO("Skipping " << end - begin << " HS events. ");
133  for (auto iter = begin; iter < end; ++iter) {
134  const auto& evt = *iter;
135  const std::size_t n_to_skip = calcMBRequired(
136  evt.evtIdx, s_NoSlot, evt.runNum, evt.lbNum, evt.evtNum);
137  ATH_MSG_DEBUG("Skipping HS_ID " << evt.evtIdx << " --> skipping "
138  << n_to_skip << " pileup events");
139  for (std::size_t i = 0; i < n_to_skip; ++i) {
140  if (!m_bkgEventSelector->next(*m_bkg_evt_sel_ctx).isSuccess()) {
141  ATH_MSG_ERROR("Ran out of background events");
142  return StatusCode::FAILURE;
143  }
144  }
145  }
146  m_last_loaded_hs.store((end - 1)->evtIdx);
147  m_activeStoreSvc->setStore(old_store);
148  return StatusCode::SUCCESS;
149  };
150  ATH_CHECK(m_skipEventIdxSvc->registerCallback(skipEvent_callback));
151  ATH_MSG_INFO("Initializing ODMBSvc");
152  return StatusCode::SUCCESS;
153 }
154 
155 std::size_t OnDemandMinbiasSvc::calcMBRequired(std::int64_t hs_id,
156  std::size_t slot,
157  unsigned int run,
158  unsigned int lumi,
160  ATH_MSG_DEBUG("Run " << run << ", lumi " << lumi << ", event " << event
161  << "| hs_id " << hs_id);
162  const int n_bunches = m_latestDeltaBC.value() - m_earliestDeltaBC.value() + 1;
163  // vector on stack for use if slot == s_NoSlot
164  std::vector<std::uint64_t> stack_num_mb_by_bunch{};
165  std::vector<std::uint64_t>& num_mb_by_bunch =
166  slot == s_NoSlot ? stack_num_mb_by_bunch : m_num_mb_by_bunch[slot];
167  num_mb_by_bunch.clear();
168  num_mb_by_bunch.resize(n_bunches);
169  FastReseededPRNG prng{m_seed.value(), hs_id};
170 
171  // First apply the beam luminosity SF
172  bool sf_updated_throwaway;
173  const float beam_lumi_sf =
174  m_useBeamLumi ? m_beamLumi->scaleFactor(run, lumi, sf_updated_throwaway)
175  : 1.F;
176  const float beam_lumi = beam_lumi_sf * m_nPerBunch.value();
177  std::vector<float> avg_num_mb_by_bunch(n_bunches, beam_lumi);
178  // Now update using beam intensities
179  if (m_useBeamInt) {
180  // Supposed to be once per event, but ends up running once per minbias type
181  // per event now
182  m_beamInt->selectT0(run, event);
183  for (int bunch = m_earliestDeltaBC.value();
184  bunch <= m_latestDeltaBC.value(); ++bunch) {
185  std::size_t idx = bunch - m_earliestDeltaBC.value();
186  avg_num_mb_by_bunch[idx] *= m_beamInt->normFactor(bunch);
187  }
188  }
189 
190  if (m_usePoisson) {
191  std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
192  num_mb_by_bunch.begin(), [&prng](float avg) {
193  return std::poisson_distribution<std::uint64_t>(avg)(prng);
194  });
195  } else {
196  std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
197  num_mb_by_bunch.begin(), [](float f) {
198  return static_cast<std::uint64_t>(std::round(f));
199  });
200  }
201 
202  std::uint64_t num_mb = ranges::accumulate(num_mb_by_bunch, 0UL);
203  if (slot == s_NoSlot) {
204  return num_mb;
205  }
206  // Won't go past here if slot == s_NoSlot
207 
208  std::vector<std::uint64_t>& index_array = m_idx_lists[slot];
209  index_array.clear();
210  index_array.resize(num_mb);
211  std::iota(index_array.begin(), index_array.end(), 0);
212  // Don't need to shuffle, since these events aren't reused
213  // std::shuffle(index_array.begin(), index_array.end(), prng);
214  ATH_MSG_DEBUG("HS ID " << hs_id << " uses " << num_mb << " events\n"
215  << fmt::format("\t\tBy bunch: [{}]\n",
216  fmt::join(num_mb_by_bunch, ", ")));
217  return num_mb;
218 }
219 
221  using namespace std::chrono_literals;
222  std::chrono::steady_clock::time_point order_wait_start{};
223 
224  const std::int64_t hs_id = get_hs_id(ctx);
225  const std::size_t slot = ctx.slot();
226  const std::size_t num_to_load =
227  calcMBRequired(hs_id, slot, ctx.eventID().run_number(),
228  ctx.eventID().lumi_block(), ctx.eventID().event_number());
229  auto& stores = m_stores[slot];
230  // If we don't have enough stores, make more
231  if (stores.size() < num_to_load) {
232  ATH_MSG_INFO("Adding " << num_to_load - stores.size() << " stores");
233  stores.reserve(num_to_load);
234  for (std::size_t i = stores.size(); i < num_to_load; ++i) {
235  auto& sg = stores.emplace_back(
236  fmt::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), slot, i),
237  name());
238  ATH_CHECK(sg.retrieve());
239  sg->setStoreID(StoreID::PILEUP_STORE);
240  sg->setProxyProviderSvc(m_proxyProviderSvc);
241  }
242  }
243  // Ensure loading is done in order
244  if (m_last_loaded_hs < hs_id - 1) {
245  ATH_MSG_INFO("Waiting to prevent out-of-order loading. Last loaded is "
246  << m_last_loaded_hs << " and we are " << hs_id);
247  order_wait_start = std::chrono::steady_clock::now();
248  while (m_last_loaded_hs < hs_id - 1) {
249  std::this_thread::sleep_for(50ms);
250  }
251  auto wait_time = std::chrono::steady_clock::now() - order_wait_start;
252  ATH_MSG_INFO(fmt::format("Waited {:%M:%S} to prevent out-of-order loading",
253  wait_time));
254  }
255  // Lock reading mutex
256  std::unique_lock lck(m_reading_batch_mtx);
258  // Remember old store to reset later
259  auto* old_store = m_activeStoreSvc->activeStore();
260  for (std::size_t i = 0; i < num_to_load; ++i) {
261  auto& sg = stores[i];
262  // Change active store
263  m_activeStoreSvc->setStore(sg.get());
264  SG::CurrentEventStore::Push reader_sg_ces(sg.get());
265  // Read next event
266  ATH_CHECK(sg->clearStore(true));
267  if (!(m_bkgEventSelector->next(*m_bkg_evt_sel_ctx)).isSuccess()) {
268  ATH_MSG_FATAL("Ran out of minbias events");
269  return StatusCode::FAILURE;
270  }
271  IOpaqueAddress* addr = nullptr;
272  if (!m_bkgEventSelector->createAddress(*m_bkg_evt_sel_ctx, addr)
273  .isSuccess()) {
274  ATH_MSG_WARNING("Failed to create address. No more events?");
275  return StatusCode::FAILURE;
276  }
277  if (addr == nullptr) {
278  ATH_MSG_WARNING("createAddress returned nullptr. No more events?");
279  return StatusCode::FAILURE;
280  }
281  ATH_CHECK(sg->recordAddress(addr));
282  ATH_CHECK(sg->loadEventProxies());
283  // Read data now if desired
284  for (const auto* proxy_ptr : sg->proxies()) {
285  if (!m_onDemandMB) {
286  // Sort of a const_cast, then ->accessData()
287  sg->proxy_exact(proxy_ptr->sgkey())->accessData();
288  }
289  }
290  }
291  // Reset active store
292  m_activeStoreSvc->setStore(old_store);
293  ATH_MSG_INFO(fmt::format("Took {:%M:%S} to load events",
295  // Update last loaded
296  m_last_loaded_hs.store(hs_id);
297  return StatusCode::SUCCESS;
298 }
299 
301  std::uint64_t mb_id) {
302  const std::size_t slot = ctx.slot();
303  const std::size_t index = m_idx_lists.at(slot).at(mb_id);
304  return m_stores.at(ctx.slot()).at(index).get();
305 }
306 
307 std::size_t OnDemandMinbiasSvc::getNumForBunch(const EventContext& ctx,
308  int bunch) const {
309  if (bunch < m_earliestDeltaBC.value() || bunch > m_latestDeltaBC.value()) {
310  throw std::logic_error(fmt::format(
311  "Tried to request bunch {} which is outside the range [{}, {}]", bunch,
312  m_earliestDeltaBC.value(), m_latestDeltaBC.value()));
313  }
314  return m_num_mb_by_bunch.at(ctx.slot()).at(bunch - m_earliestDeltaBC.value());
315 }
316 
318  // clear all stores
319  for (auto&& sg : m_stores[ctx.slot()]) {
320  ATH_CHECK(sg->clearStore());
321  }
322  return StatusCode::SUCCESS;
323 }
CurrentEventStore.h
Hold a pointer to the current event store.
OnDemandMinbiasSvc::m_beamLumi
ServiceHandle< IBeamLuminosity > m_beamLumi
Definition: OnDemandMinbiasSvc.h:83
python.CaloRecoConfig.f
f
Definition: CaloRecoConfig.py:127
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
CLIDToString
std::string CLIDToString(const CLID &clid)
Definition: OnDemandMinbiasSvc.cxx:31
FastReseededPRNG.h
vtune_athena.format
format
Definition: vtune_athena.py:14
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
index
Definition: index.py:1
OnDemandMinbiasSvc::m_useBeamInt
Gaudi::Property< bool > m_useBeamInt
Definition: OnDemandMinbiasSvc.h:66
accumulate
bool accumulate(AccumulateMap &map, std::vector< module_t > const &modules, FPGATrackSimMatrixAccumulator const &acc)
Accumulates an accumulator (e.g.
Definition: FPGATrackSimMatrixAccumulator.cxx:22
mergePhysValFiles.start
start
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:14
OnDemandMinbiasSvc::m_num_mb_by_bunch
std::vector< std::vector< std::uint64_t > > m_num_mb_by_bunch
Definition: OnDemandMinbiasSvc.h:95
OnDemandMinbiasSvc::OnDemandMinbiasSvc
OnDemandMinbiasSvc(const std::string &name, ISvcLocator *svc)
Constructor.
Definition: OnDemandMinbiasSvc.cxx:35
PlotCalibFromCool.begin
begin
Definition: PlotCalibFromCool.py:94
SG::CurrentEventStore::Push
Temporarily change the current store.
Definition: SGTools/SGTools/CurrentEventStore.h:58
CLIDRegistry::CLIDToTypeinfo
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
Definition: CLIDRegistry.cxx:136
OnDemandMinbiasSvc::m_skipEventIdxSvc
ServiceHandle< ISkipEventIdxSvc > m_skipEventIdxSvc
Definition: OnDemandMinbiasSvc.h:76
OnDemandMinbiasSvc::calcMBRequired
std::size_t calcMBRequired(std::int64_t hs_id, std::size_t slot, unsigned int run, unsigned int lumi, std::uint64_t event)
Definition: OnDemandMinbiasSvc.cxx:155
LArG4FSStartPointFilter.evt
evt
Definition: LArG4FSStartPointFilter.py:42
OnDemandMinbiasSvc::m_bkgEventSelector
ServiceHandle< IEvtSelector > m_bkgEventSelector
Definition: OnDemandMinbiasSvc.h:79
python.SystemOfUnits.ms
int ms
Definition: SystemOfUnits.py:132
OnDemandMinbiasSvc::m_idx_lists
std::vector< std::vector< std::uint64_t > > m_idx_lists
Definition: OnDemandMinbiasSvc.h:96
python.FakeAthena.Service
def Service(name)
Definition: FakeAthena.py:38
mergePhysValFiles.end
end
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:93
OnDemandMinbiasSvc::m_usePoisson
Gaudi::Property< bool > m_usePoisson
Definition: OnDemandMinbiasSvc.h:63
OnDemandMinbiasSvc::m_proxyProviderSvc
IProxyProviderSvc * m_proxyProviderSvc
Definition: OnDemandMinbiasSvc.h:92
OnDemandMinbiasSvc::~OnDemandMinbiasSvc
~OnDemandMinbiasSvc() final
Destructor.
Definition: OnDemandMinbiasSvc.cxx:39
StoreGateSvc
The Athena Transient Store API.
Definition: StoreGateSvc.h:128
EventID.h
This class provides a unique identification for each event, in terms of run/event number and/or a tim...
IProxyProviderSvc::addProvider
virtual void addProvider(IAddressProvider *aProvider)=0
IAddressProvider manager functionality add a provider to the set of known ones.
OnDemandMinbiasSvc::get_hs_id
virtual std::int64_t get_hs_id(const EventContext &ctx) const override
Definition: OnDemandMinbiasSvc.h:44
python.handimod.now
now
Definition: handimod.py:675
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
OnDemandMinbiasSvc::getMinbias
StoreGateSvc * getMinbias(const EventContext &ctx, std::uint64_t mb_id) override
Definition: OnDemandMinbiasSvc.cxx:300
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
StoreID::PILEUP_STORE
@ PILEUP_STORE
Definition: StoreID.h:31
OnDemandMinbiasSvc::m_last_loaded_hs
std::atomic_int64_t m_last_loaded_hs
Definition: OnDemandMinbiasSvc.h:99
OnDemandMinbiasSvc.h
lumiFormat.i
int i
Definition: lumiFormat.py:92
ISkipEventIdxSvc::EvtIter
std::vector< EvtId >::const_iterator EvtIter
Definition: ISkipEventIdxSvc.h:23
OnDemandMinbiasSvc::m_onDemandMB
Gaudi::Property< bool > m_onDemandMB
Definition: OnDemandMinbiasSvc.h:55
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
Recovery.avg
def avg(a, b)
Definition: Recovery.py:79
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
OnDemandMinbiasSvc::m_useBeamLumi
Gaudi::Property< bool > m_useBeamLumi
Definition: OnDemandMinbiasSvc.h:68
OnDemandMinbiasSvc::s_NoSlot
static constexpr std::size_t s_NoSlot
Definition: OnDemandMinbiasSvc.h:101
IAddressProvider
interface for IOA providers
Definition: IAddressProvider.h:28
OnDemandMinbiasSvc::beginHardScatter
StatusCode beginHardScatter(const EventContext &ctx) override
Definition: OnDemandMinbiasSvc.cxx:220
Amg::transform
Amg::Vector3D transform(Amg::Vector3D &v, Amg::Transform3D &tr)
Transform a point from a Trasformation3D.
Definition: GeoPrimitivesHelpers.h:156
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
OnDemandMinbiasSvc::m_stores
std::vector< std::vector< SGHandle > > m_stores
Definition: OnDemandMinbiasSvc.h:94
run
Definition: run.py:1
Handler::svc
AthROOTErrorHandlerSvc * svc
Definition: AthROOTErrorHandlerSvc.cxx:10
CLID
uint32_t CLID
The Class ID type.
Definition: Event/xAOD/xAODCore/xAODCore/ClassID_traits.h:47
OnDemandMinbiasSvc::m_bkg_evt_sel_ctx
IEvtSelector::Context * m_bkg_evt_sel_ctx
Definition: OnDemandMinbiasSvc.h:91
OnDemandMinbiasSvc::initialize
StatusCode initialize() final
AthService initialize.
Definition: OnDemandMinbiasSvc.cxx:41
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
OnDemandMinbiasSvc::m_nPerBunch
Gaudi::Property< float > m_nPerBunch
Definition: OnDemandMinbiasSvc.h:60
OnDemandMinbiasSvc::m_latestDeltaBC
Gaudi::Property< int > m_latestDeltaBC
Definition: OnDemandMinbiasSvc.h:73
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
OnDemandMinbiasSvc::endHardScatter
StatusCode endHardScatter(const EventContext &ctx) override
Definition: OnDemandMinbiasSvc.cxx:317
dq_make_web_display.rv
def rv
Definition: dq_make_web_display.py:219
EventInfo.h
OnDemandMinbiasSvc::m_beamInt
ServiceHandle< IBeamIntensity > m_beamInt
Definition: OnDemandMinbiasSvc.h:81
lumiFormat.lumi
lumi
Definition: lumiFormat.py:113
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
OnDemandMinbiasSvc::getNumForBunch
std::size_t getNumForBunch(const EventContext &ctx, int bunch) const override
Definition: OnDemandMinbiasSvc.cxx:307
LArNewCalib_DelayDump_OFC_Cali.idx
idx
Definition: LArNewCalib_DelayDump_OFC_Cali.py:69
IProxyProviderSvc.h
IAddressProvider.h
OnDemandMinbiasSvc::m_reading_batch_mtx
std::mutex m_reading_batch_mtx
Definition: OnDemandMinbiasSvc.h:98
OnDemandMinbiasSvc::m_earliestDeltaBC
Gaudi::Property< int > m_earliestDeltaBC
Definition: OnDemandMinbiasSvc.h:70
OnDemandMinbiasSvc::m_activeStoreSvc
ServiceHandle< ActiveStoreSvc > m_activeStoreSvc
Definition: OnDemandMinbiasSvc.h:85
OnDemandMinbiasSvc::m_spare_store
SGHandle m_spare_store
Definition: OnDemandMinbiasSvc.h:88
OnDemandMinbiasSvc::m_seed
Gaudi::Property< std::uint64_t > m_seed
Definition: OnDemandMinbiasSvc.h:53
FastReseededPRNG
Definition: FastReseededPRNG.h:28