ATLAS Offline Software
OnDemandMinbiasSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "OnDemandMinbiasSvc.h"
6 
7 #include <GaudiKernel/ConcurrencyFlags.h>
8 #include <fmt/chrono.h>
9 #include <fmt/format.h>
10 
11 #include <algorithm>
12 #include <boost/core/demangle.hpp>
13 #include <chrono>
14 #include <random>
15 #include <range/v3/algorithm.hpp>
16 #include <range/v3/numeric/accumulate.hpp>
17 #include <range/v3/to_container.hpp>
18 #include <range/v3/view.hpp>
19 #include <thread>
20 
24 #include "EventInfo/EventID.h"
25 #include "EventInfo/EventInfo.h"
28 
29 namespace rv = ranges::views;
30 
31 inline std::string CLIDToString(const CLID& clid) {
32  return boost::core::demangle(CLIDRegistry::CLIDToTypeinfo(clid)->name());
33 }
34 
36  ISvcLocator* svc)
37  : base_class(name, svc),
38  m_bkg_evt_sel_ctx(nullptr),
39  m_proxyProviderSvc("ProxyProviderSvc/BkgPPSvc_"+name, name),
40  m_last_loaded_hs()
41 {}
42 
44 
46  m_stores.clear();
47  ATH_CHECK(m_bkgEventSelector.retrieve());
48  ATH_CHECK(m_activeStoreSvc.retrieve());
49  ATH_CHECK(m_skipEventIdxSvc.retrieve());
50  if (m_useBeamInt) {
51  ATH_CHECK(m_beamInt.retrieve());
52  }
53  if (m_useBeamLumi) {
54  ATH_CHECK(m_beamLumi.retrieve());
55  }
56  // Setup context
57  if (!m_bkgEventSelector->createContext(m_bkg_evt_sel_ctx).isSuccess()) {
58  ATH_MSG_ERROR("Failed to create background event selector context");
59  return StatusCode::FAILURE;
60  }
61  ATH_CHECK(SmartIF<IService>(m_bkgEventSelector.get())->start());
62 
63  // Setup proxy provider
64  ATH_CHECK(m_proxyProviderSvc.retrieve());
65 
66  // Setup Address Providers
67  SmartIF<IAddressProvider> addressProvider{m_bkgEventSelector.get()};
68  if (!addressProvider) {
70  "Could not cast background event selector to IAddressProvider");
71  } else {
72  m_proxyProviderSvc->addProvider(addressProvider);
73  }
74  // AthenaPoolAddressProviderSvc
75  SmartIF<IAddressProvider> athPoolAP{
76  serviceLocator()->service(fmt::format("AthenaPoolAddressProviderSvc/BkgAPAPSvc_{}", name()))
77  };
78  if (!athPoolAP) {
80  "Could not cast AthenaPoolAddressProviderSvc to IAddressProvider");
81  } else {
82  m_proxyProviderSvc->addProvider(athPoolAP);
83  }
84  // AddressRemappingSvc
85  SmartIF<IAddressProvider> addRemapAP{
86  serviceLocator()->service(fmt::format("AddressRemappingSvc/BkgARSvc_{}", name()))
87  };
88  if (!addRemapAP) {
89  ATH_MSG_WARNING("Could not cast AddressRemappingSvc to IAddressProvider");
90  } else {
91  m_proxyProviderSvc->addProvider(addRemapAP);
92  }
93 
94  const std::size_t n_concurrent =
95  Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents();
96  m_idx_lists.clear();
97  m_idx_lists.resize(n_concurrent);
98 
99  m_num_mb_by_bunch.clear();
100  m_num_mb_by_bunch.resize(n_concurrent);
101 
102  m_stores.clear();
103  m_stores.resize(n_concurrent);
104 
105  const int n_stores = 50; // Start with 50 stores per event
106  // setup n_concurrent vectors of n_stores StoreGates in m_stores
107  for (std::size_t i = 0; i < n_concurrent; ++i) {
108  auto& sgs = m_stores[i];
109  sgs.reserve(n_stores);
110  for (int j = 0; j < n_stores; ++j) {
111  // creates / retrieves a different StoreGateSvc for each slot
112  auto& sg = sgs.emplace_back(
113  fmt::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), i, j), name());
114  ATH_CHECK(sg.retrieve());
115  sg->setStoreID(StoreID::PILEUP_STORE);
116  sg->setProxyProviderSvc(m_proxyProviderSvc.get());
117  }
118  }
119 
120  // setup spare store for event skipping
121  ATH_CHECK(m_spare_store.retrieve());
123  m_spare_store->setProxyProviderSvc(m_proxyProviderSvc.get());
124  auto skipEvent_callback = [this](
127  using namespace std::chrono_literals;
128  auto* const old_store = m_activeStoreSvc->activeStore();
129  m_activeStoreSvc->setStore(m_spare_store.get());
130  ATH_MSG_INFO("Skipping " << end - begin << " HS events. ");
131  for (auto iter = begin; iter < end; ++iter) {
132  const auto& evt = *iter;
133  const std::size_t n_to_skip = calcMBRequired(
134  evt.evtIdx, s_NoSlot, evt.runNum, evt.lbNum, evt.evtNum);
135  ATH_MSG_DEBUG("Skipping HS_ID " << evt.evtIdx << " --> skipping "
136  << n_to_skip << " pileup events");
137  for (std::size_t i = 0; i < n_to_skip; ++i) {
138  if (!m_bkgEventSelector->next(*m_bkg_evt_sel_ctx).isSuccess()) {
139  ATH_MSG_ERROR("Ran out of background events");
140  return StatusCode::FAILURE;
141  }
142  }
143  }
144  m_last_loaded_hs.store((end - 1)->evtIdx);
145  m_activeStoreSvc->setStore(old_store);
146  return StatusCode::SUCCESS;
147  };
148  ATH_CHECK(m_skipEventIdxSvc->registerCallback(skipEvent_callback));
149  ATH_MSG_INFO("Initializing ODMBSvc");
150  return StatusCode::SUCCESS;
151 }
152 
153 std::size_t OnDemandMinbiasSvc::calcMBRequired(std::int64_t hs_id,
154  std::size_t slot,
155  unsigned int run,
156  unsigned int lumi,
158  ATH_MSG_DEBUG("Run " << run << ", lumi " << lumi << ", event " << event
159  << "| hs_id " << hs_id);
160  const int n_bunches = m_latestDeltaBC.value() - m_earliestDeltaBC.value() + 1;
161  // vector on stack for use if slot == s_NoSlot
162  std::vector<std::uint64_t> stack_num_mb_by_bunch{};
163  std::vector<std::uint64_t>& num_mb_by_bunch =
164  slot == s_NoSlot ? stack_num_mb_by_bunch : m_num_mb_by_bunch[slot];
165  num_mb_by_bunch.clear();
166  num_mb_by_bunch.resize(n_bunches);
167  FastReseededPRNG prng{m_seed.value(), hs_id};
168 
169  // First apply the beam luminosity SF
170  bool sf_updated_throwaway;
171  const float beam_lumi_sf =
172  m_useBeamLumi ? m_beamLumi->scaleFactor(run, lumi, sf_updated_throwaway)
173  : 1.F;
174  const float beam_lumi = beam_lumi_sf * m_nPerBunch.value();
175  std::vector<float> avg_num_mb_by_bunch(n_bunches, beam_lumi);
176  // Now update using beam intensities
177  if (m_useBeamInt) {
178  // Supposed to be once per event, but ends up running once per minbias type
179  // per event now
180  m_beamInt->selectT0(run, event);
181  for (int bunch = m_earliestDeltaBC.value();
182  bunch <= m_latestDeltaBC.value(); ++bunch) {
183  std::size_t idx = bunch - m_earliestDeltaBC.value();
184  avg_num_mb_by_bunch[idx] *= m_beamInt->normFactor(bunch);
185  }
186  }
187 
188  if (m_usePoisson) {
189  std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
190  num_mb_by_bunch.begin(), [&prng](float avg) {
191  return std::poisson_distribution<std::uint64_t>(avg)(prng);
192  });
193  } else {
194  std::transform(avg_num_mb_by_bunch.begin(), avg_num_mb_by_bunch.end(),
195  num_mb_by_bunch.begin(), [](float f) {
196  return static_cast<std::uint64_t>(std::round(f));
197  });
198  }
199 
200  std::uint64_t num_mb = ranges::accumulate(num_mb_by_bunch, 0UL);
201  if (slot == s_NoSlot) {
202  return num_mb;
203  }
204  // Won't go past here if slot == s_NoSlot
205 
206  std::vector<std::uint64_t>& index_array = m_idx_lists[slot];
207  index_array.clear();
208  index_array.resize(num_mb);
209  std::iota(index_array.begin(), index_array.end(), 0);
210  // Don't need to shuffle, since these events aren't reused
211  // std::shuffle(index_array.begin(), index_array.end(), prng);
212  ATH_MSG_DEBUG("HS ID " << hs_id << " uses " << num_mb << " events\n"
213  << fmt::format("\t\tBy bunch: [{}]\n",
214  fmt::join(num_mb_by_bunch, ", ")));
215  return num_mb;
216 }
217 
219  using namespace std::chrono_literals;
220  std::chrono::steady_clock::time_point order_wait_start{};
221 
222  const std::int64_t hs_id = get_hs_id(ctx);
223  const std::size_t slot = ctx.slot();
224  const std::size_t num_to_load =
225  calcMBRequired(hs_id, slot, ctx.eventID().run_number(),
226  ctx.eventID().lumi_block(), ctx.eventID().event_number());
227  auto& stores = m_stores[slot];
228  // If we don't have enough stores, make more
229  if (stores.size() < num_to_load) {
230  ATH_MSG_INFO("Adding " << num_to_load - stores.size() << " stores");
231  stores.reserve(num_to_load);
232  for (std::size_t i = stores.size(); i < num_to_load; ++i) {
233  auto& sg = stores.emplace_back(
234  fmt::format("StoreGateSvc/StoreGate_{}_{}_{}", name(), slot, i),
235  name());
236  ATH_CHECK(sg.retrieve());
237  sg->setStoreID(StoreID::PILEUP_STORE);
238  sg->setProxyProviderSvc(m_proxyProviderSvc.get());
239  }
240  }
241  // Ensure loading is done in order
242  if (m_last_loaded_hs < hs_id - 1) {
243  ATH_MSG_INFO("Waiting to prevent out-of-order loading. Last loaded is "
244  << m_last_loaded_hs << " and we are " << hs_id);
245  order_wait_start = std::chrono::steady_clock::now();
246  while (m_last_loaded_hs < hs_id - 1) {
247  std::this_thread::sleep_for(50ms);
248  }
249  auto wait_time = std::chrono::steady_clock::now() - order_wait_start;
250  ATH_MSG_INFO(fmt::format("Waited {:%M:%S} to prevent out-of-order loading",
251  wait_time));
252  }
253  // Lock reading mutex
254  std::unique_lock lck(m_reading_batch_mtx);
256  // Remember old store to reset later
257  auto* old_store = m_activeStoreSvc->activeStore();
258  for (std::size_t i = 0; i < num_to_load; ++i) {
259  auto& sg = stores[i];
260  // Change active store
261  m_activeStoreSvc->setStore(sg.get());
262  SG::CurrentEventStore::Push reader_sg_ces(sg.get());
263  // Read next event
264  ATH_CHECK(sg->clearStore(true));
265  if (!(m_bkgEventSelector->next(*m_bkg_evt_sel_ctx)).isSuccess()) {
266  ATH_MSG_FATAL("Ran out of minbias events");
267  return StatusCode::FAILURE;
268  }
269  IOpaqueAddress* addr = nullptr;
270  if (!m_bkgEventSelector->createAddress(*m_bkg_evt_sel_ctx, addr)
271  .isSuccess()) {
272  ATH_MSG_WARNING("Failed to create address. No more events?");
273  return StatusCode::FAILURE;
274  }
275  if (addr == nullptr) {
276  ATH_MSG_WARNING("createAddress returned nullptr. No more events?");
277  return StatusCode::FAILURE;
278  }
279  ATH_CHECK(sg->recordAddress(addr));
280  ATH_CHECK(sg->loadEventProxies());
281  // Read data now if desired
282  for (const auto* proxy_ptr : sg->proxies()) {
283  if (!m_onDemandMB) {
284  // Sort of a const_cast, then ->accessData()
285  sg->proxy_exact(proxy_ptr->sgkey())->accessData();
286  }
287  }
288  }
289  // Reset active store
290  m_activeStoreSvc->setStore(old_store);
291  ATH_MSG_INFO(fmt::format("Took {:%M:%S} to load events",
293  // Update last loaded
294  m_last_loaded_hs.store(hs_id);
295  return StatusCode::SUCCESS;
296 }
297 
299  std::uint64_t mb_id) {
300  const std::size_t slot = ctx.slot();
301  const std::size_t index = m_idx_lists.at(slot).at(mb_id);
302  return m_stores.at(ctx.slot()).at(index).get();
303 }
304 
305 std::size_t OnDemandMinbiasSvc::getNumForBunch(const EventContext& ctx,
306  int bunch) const {
307  if (bunch < m_earliestDeltaBC.value() || bunch > m_latestDeltaBC.value()) {
308  throw std::logic_error(fmt::format(
309  "Tried to request bunch {} which is outside the range [{}, {}]", bunch,
310  m_earliestDeltaBC.value(), m_latestDeltaBC.value()));
311  }
312  return m_num_mb_by_bunch.at(ctx.slot()).at(bunch - m_earliestDeltaBC.value());
313 }
314 
316  // clear all stores
317  for (auto&& sg : m_stores[ctx.slot()]) {
318  ATH_CHECK(sg->clearStore());
319  }
320  return StatusCode::SUCCESS;
321 }
CurrentEventStore.h
Hold a pointer to the current event store.
OnDemandMinbiasSvc::m_beamLumi
ServiceHandle< IBeamLuminosity > m_beamLumi
Definition: OnDemandMinbiasSvc.h:83
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
CLIDToString
std::string CLIDToString(const CLID &clid)
Definition: OnDemandMinbiasSvc.cxx:31
FastReseededPRNG.h
vtune_athena.format
format
Definition: vtune_athena.py:14
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
index
Definition: index.py:1
OnDemandMinbiasSvc::m_useBeamInt
Gaudi::Property< bool > m_useBeamInt
Definition: OnDemandMinbiasSvc.h:66
accumulate
bool accumulate(AccumulateMap &map, std::vector< module_t > const &modules, FPGATrackSimMatrixAccumulator const &acc)
Accumulates an accumulator (e.g.
Definition: FPGATrackSimMatrixAccumulator.cxx:22
mergePhysValFiles.start
start
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:14
OnDemandMinbiasSvc::m_num_mb_by_bunch
std::vector< std::vector< std::uint64_t > > m_num_mb_by_bunch
Definition: OnDemandMinbiasSvc.h:95
OnDemandMinbiasSvc::OnDemandMinbiasSvc
OnDemandMinbiasSvc(const std::string &name, ISvcLocator *svc)
Constructor.
Definition: OnDemandMinbiasSvc.cxx:35
PlotCalibFromCool.begin
begin
Definition: PlotCalibFromCool.py:94
SG::CurrentEventStore::Push
Temporarily change the current store.
Definition: SGTools/SGTools/CurrentEventStore.h:58
CLIDRegistry::CLIDToTypeinfo
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
Definition: CLIDRegistry.cxx:136
OnDemandMinbiasSvc::m_skipEventIdxSvc
ServiceHandle< ISkipEventIdxSvc > m_skipEventIdxSvc
Definition: OnDemandMinbiasSvc.h:76
OnDemandMinbiasSvc::calcMBRequired
std::size_t calcMBRequired(std::int64_t hs_id, std::size_t slot, unsigned int run, unsigned int lumi, std::uint64_t event)
Definition: OnDemandMinbiasSvc.cxx:153
LArG4FSStartPointFilter.evt
evt
Definition: LArG4FSStartPointFilter.py:42
OnDemandMinbiasSvc::m_bkgEventSelector
ServiceHandle< IEvtSelector > m_bkgEventSelector
Definition: OnDemandMinbiasSvc.h:79
python.SystemOfUnits.ms
int ms
Definition: SystemOfUnits.py:132
OnDemandMinbiasSvc::m_idx_lists
std::vector< std::vector< std::uint64_t > > m_idx_lists
Definition: OnDemandMinbiasSvc.h:96
mergePhysValFiles.end
end
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:93
OnDemandMinbiasSvc::m_usePoisson
Gaudi::Property< bool > m_usePoisson
Definition: OnDemandMinbiasSvc.h:63
OnDemandMinbiasSvc::m_proxyProviderSvc
ServiceHandle< IProxyProviderSvc > m_proxyProviderSvc
Definition: OnDemandMinbiasSvc.h:92
OnDemandMinbiasSvc::~OnDemandMinbiasSvc
~OnDemandMinbiasSvc() final
Destructor.
Definition: OnDemandMinbiasSvc.cxx:43
StoreGateSvc
The Athena Transient Store API.
Definition: StoreGateSvc.h:125
EventID.h
This class provides a unique identification for each event, in terms of run/event number and/or a tim...
OnDemandMinbiasSvc::get_hs_id
virtual std::int64_t get_hs_id(const EventContext &ctx) const override
Definition: OnDemandMinbiasSvc.h:44
python.handimod.now
now
Definition: handimod.py:675
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
OnDemandMinbiasSvc::getMinbias
StoreGateSvc * getMinbias(const EventContext &ctx, std::uint64_t mb_id) override
Definition: OnDemandMinbiasSvc.cxx:298
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
StoreID::PILEUP_STORE
@ PILEUP_STORE
Definition: StoreID.h:31
OnDemandMinbiasSvc::m_last_loaded_hs
std::atomic_int64_t m_last_loaded_hs
Definition: OnDemandMinbiasSvc.h:99
OnDemandMinbiasSvc.h
lumiFormat.i
int i
Definition: lumiFormat.py:85
ISkipEventIdxSvc::EvtIter
std::vector< EvtId >::const_iterator EvtIter
Definition: ISkipEventIdxSvc.h:23
OnDemandMinbiasSvc::m_onDemandMB
Gaudi::Property< bool > m_onDemandMB
Definition: OnDemandMinbiasSvc.h:55
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
Recovery.avg
def avg(a, b)
Definition: Recovery.py:79
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
OnDemandMinbiasSvc::m_useBeamLumi
Gaudi::Property< bool > m_useBeamLumi
Definition: OnDemandMinbiasSvc.h:68
OnDemandMinbiasSvc::s_NoSlot
static constexpr std::size_t s_NoSlot
Definition: OnDemandMinbiasSvc.h:101
OnDemandMinbiasSvc::beginHardScatter
StatusCode beginHardScatter(const EventContext &ctx) override
Definition: OnDemandMinbiasSvc.cxx:218
Amg::transform
Amg::Vector3D transform(Amg::Vector3D &v, Amg::Transform3D &tr)
Transform a point from a Trasformation3D.
Definition: GeoPrimitivesHelpers.h:156
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
OnDemandMinbiasSvc::m_stores
std::vector< std::vector< SGHandle > > m_stores
Definition: OnDemandMinbiasSvc.h:94
hist_file_dump.f
f
Definition: hist_file_dump.py:135
run
Definition: run.py:1
Handler::svc
AthROOTErrorHandlerSvc * svc
Definition: AthROOTErrorHandlerSvc.cxx:10
CLID
uint32_t CLID
The Class ID type.
Definition: Event/xAOD/xAODCore/xAODCore/ClassID_traits.h:47
OnDemandMinbiasSvc::m_bkg_evt_sel_ctx
IEvtSelector::Context * m_bkg_evt_sel_ctx
Definition: OnDemandMinbiasSvc.h:91
OnDemandMinbiasSvc::initialize
StatusCode initialize() final
AthService initialize.
Definition: OnDemandMinbiasSvc.cxx:45
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
OnDemandMinbiasSvc::m_nPerBunch
Gaudi::Property< float > m_nPerBunch
Definition: OnDemandMinbiasSvc.h:60
OnDemandMinbiasSvc::m_latestDeltaBC
Gaudi::Property< int > m_latestDeltaBC
Definition: OnDemandMinbiasSvc.h:73
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:221
OnDemandMinbiasSvc::endHardScatter
StatusCode endHardScatter(const EventContext &ctx) override
Definition: OnDemandMinbiasSvc.cxx:315
dq_make_web_display.rv
def rv
Definition: dq_make_web_display.py:219
EventInfo.h
OnDemandMinbiasSvc::m_beamInt
ServiceHandle< IBeamIntensity > m_beamInt
Definition: OnDemandMinbiasSvc.h:81
lumiFormat.lumi
lumi
Definition: lumiFormat.py:106
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
OnDemandMinbiasSvc::getNumForBunch
std::size_t getNumForBunch(const EventContext &ctx, int bunch) const override
Definition: OnDemandMinbiasSvc.cxx:305
LArNewCalib_DelayDump_OFC_Cali.idx
idx
Definition: LArNewCalib_DelayDump_OFC_Cali.py:69
IProxyProviderSvc.h
IAddressProvider.h
OnDemandMinbiasSvc::m_reading_batch_mtx
std::mutex m_reading_batch_mtx
Definition: OnDemandMinbiasSvc.h:98
OnDemandMinbiasSvc::m_earliestDeltaBC
Gaudi::Property< int > m_earliestDeltaBC
Definition: OnDemandMinbiasSvc.h:70
OnDemandMinbiasSvc::m_activeStoreSvc
ServiceHandle< ActiveStoreSvc > m_activeStoreSvc
Definition: OnDemandMinbiasSvc.h:85
OnDemandMinbiasSvc::m_spare_store
SGHandle m_spare_store
Definition: OnDemandMinbiasSvc.h:88
OnDemandMinbiasSvc::m_seed
Gaudi::Property< std::uint64_t > m_seed
Definition: OnDemandMinbiasSvc.h:53
FastReseededPRNG
Definition: FastReseededPRNG.h:28