ATLAS Offline Software
SkipEventIdxSvc.cxx
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2023 CERN for the benefit of the ATLAS collaboration.
3  */
4 #include "SkipEventIdxSvc.h"
5 
6 #include <GaudiKernel/DataIncident.h> // This header contains ContextIncident
7 #include <GaudiKernel/IEvtSelector.h>
8 #include <GaudiKernel/IIncidentSvc.h>
9 
10 #include <boost/core/typeinfo.hpp>
11 #include <range/v3/to_container.hpp>
12 #include <range/v3/view.hpp>
13 #include <string>
14 
17 
18 using namespace std::literals;
19 namespace rv = ranges::views;
20 
21 namespace {
22 template <typename propType, typename T>
23 const Gaudi::Property<propType>&
24 getProp(SmartIF<T>& iface, const std::string& name)
25 {
26  auto prop_iface = iface.template as<IProperty>();
27  const auto& prop = prop_iface->getProperty(name);
28  try {
29  const auto& conv_prop =
30  dynamic_cast<const Gaudi::Property<propType>&>(prop);
31  return conv_prop;
32  } catch (const std::bad_cast&) {
33  std::string if_name = "UNKNOWN";
34  try {
35  if_name = iface.template as<INamedInterface>()->name();
36  } catch (...) {
37  }
38  const std::string what = fmt::format(
39  "The {} object's {} property has type {} not Gaudi::Property<{}>",
40  if_name, name, boost::core::demangled_name(typeid(prop)),
41  boost::core::demangled_name(typeid(propType)));
42  throw std::logic_error(what);
43  }
44 }
45 
46 template <typename T>
47 StatusCode setProp(SmartIF<T>& iface, const std::string& name,
48  const std::string& val) {
49  auto prop_iface = iface.template as<IProperty>();
50  return prop_iface->setProperty(name, val);
51 }
52 } // namespace
53 
54 SkipEventIdxSvc::SkipEventIdxSvc(const std::string& name, ISvcLocator* svc)
55  : base_class(name, svc) {}
56 
58  // This is a special service that runs through all the input events ahead of
59  // time, and records the run and lb numbers
60  ATH_MSG_INFO("Initializing SkipEventIdxSvc");
61  SmartIF<IProperty> propMgr(serviceLocator());
62  std::string evt_sel_name{};
63  ATH_CHECK(propMgr->getProperty("EvtSel", evt_sel_name));
64  if (evt_sel_name.empty()) {
65  // No event selector
66  //
67  ATH_MSG_WARNING("No event selector");
68  m_started = true;
69  return StatusCode::SUCCESS;
70  }
71  auto sg = serviceLocator()->service<StoreGateSvc>("StoreGateSvc/StoreGateSvc",
72  false);
73  auto evtSel = serviceLocator()->service<IEvtSelector>(evt_sel_name, false);
74  if (!sg.isValid() || !evtSel.isValid()) {
75  ATH_MSG_WARNING("Event selector or storegate is invalid");
76  return StatusCode::FAILURE;
77  }
78 
79  auto modSvc = serviceLocator()->service<IEvtIdModifierSvc>(
80  "EvtIdModifierSvc/EvtIdModifierSvc", false);
81  std::uint64_t evts_skipped_before_mod = 0;
82  std::vector<EvtId> modifier_evts{};
83  if (modSvc.isValid()) {
84  // Steal it's configuration
85  try {
86  evts_skipped_before_mod =
87  getProp<std::uint64_t&>(modSvc, "SkipEvents").value();
88  ATH_MSG_INFO("Skipping " << evts_skipped_before_mod
89  << " events before modifying");
90  std::vector<std::uint64_t> lst =
91  getProp<std::vector<std::uint64_t>&>(modSvc, "Modifiers").value();
92  ATH_MSG_DEBUG("Printing EvtId modifier config. There are "
93  << lst.size() / 6 << " entries.");
94  std::string config_str{};
95  auto config_str_iter = std::back_inserter(config_str);
96  modifier_evts =
97  lst | rv::chunk(6) |
98  rv::for_each([&config_str_iter](const auto& rec) {
99  const int mod_bitset = rec[5];
100  const bool mod_run_num = mod_bitset & 1;
101  const bool mod_evt_num = mod_bitset & (1 << 1);
102  const bool mod_lb_num = mod_bitset & (1 << 3);
103 
104  const std::uint64_t runNum = mod_run_num ? rec[0] : 0;
105  const std::uint64_t evtNum = mod_evt_num ? rec[1] : 0;
106  const std::uint64_t lbNum = mod_lb_num ? rec[3] : 0;
107  const std::uint64_t numEvts = rec[4];
108 
109  fmt::format_to(config_str_iter,
110  "Run: {} [{:c}] LB: {} [{:c}] EVT: {} [{:c}] "
111  "NumEvts: {}\n",
112  runNum, mod_run_num ? 'Y' : 'N', lbNum,
113  mod_lb_num ? 'Y' : 'N', evtNum,
114  mod_evt_num ? 'Y' : 'N', numEvts);
115  return ranges::yield_from(
116  rv::repeat_n(EvtId{static_cast<uint32_t>(runNum),
117  static_cast<uint32_t>(lbNum), evtNum},
118  numEvts));
119  }) |
120  ranges::to<std::vector<EvtId>>;
121  ATH_MSG_DEBUG(config_str);
122  } catch (const std::bad_cast&) {
123  ATH_MSG_ERROR("Wrong type for property of EvtIdModifierSvc.");
124  }
125  } else {
126  ATH_MSG_INFO("No EvtIdModifierSvc found");
127  }
128 
129  m_initial_skip_events = getProp<int>(evtSel, "SkipEvents");
130  ATH_CHECK(setProp(evtSel, "SkipEvents", "0"));
131  IEvtSelector::Context* ctx = nullptr;
132  ATH_CHECK(sg->clearStore());
133  ATH_CHECK(evtSel->createContext(ctx));
134 
135  std::uint64_t idx = 0;
136  ATH_CHECK(dynamic_cast<Service*>(evtSel.get())->start());
137  while (evtSel->next(*ctx).isSuccess()) {
138  EvtId evt_id{};
139  // Load event
140  IOpaqueAddress* addr = nullptr;
141  ATH_CHECK(evtSel->createAddress(*ctx, addr));
142  ATH_CHECK(sg->recordAddress(addr));
143  ATH_CHECK(sg->loadEventProxies());
144 
145  // Read EventInfo
146  // We don't look for the legacy EventInfo, only an Input attribute list and
147  // if that doesn't exist, and xAOD::EventInfo
148  std::vector<std::string> attr_lists;
149  sg->keys<AthenaAttributeList> (attr_lists);
151  "Attr lists are: " << fmt::format("[{}]", fmt::join(attr_lists, ", ")));
152  const auto* attr_list_p =
153  sg->tryConstRetrieve<AthenaAttributeList>("Input");
154  if (attr_list_p != nullptr && attr_list_p->size() > 6) {
155  try {
156  const AthenaAttributeList& attr_list = *attr_list_p;
157  const auto runNum = attr_list["RunNumber"].data<unsigned>();
158  const auto evtNum = attr_list["EventNumber"].data<unsigned long long>();
159  const auto lbNum = attr_list["LumiBlockN"].data<unsigned>();
160  evt_id = EvtId{runNum, lbNum, evtNum};
161  } catch (...) {
162  }
163  }
164  ATH_CHECK(sg->clearStore());
165  if (idx >= evts_skipped_before_mod && !modifier_evts.empty()) {
166  const std::size_t mod_idx =
167  (idx - evts_skipped_before_mod) % modifier_evts.size();
168  evt_id.runNum = modifier_evts[mod_idx].runNum != 0U
169  ? modifier_evts[mod_idx].runNum
170  : evt_id.runNum;
171  evt_id.lbNum = modifier_evts[mod_idx].lbNum != 0U
172  ? modifier_evts[mod_idx].lbNum
173  : evt_id.lbNum;
174  evt_id.evtNum = modifier_evts[mod_idx].evtNum != 0U
175  ? modifier_evts[mod_idx].evtNum
176  : evt_id.evtNum;
177  }
178  evt_id.evtIdx = idx;
179  m_events.push_back(evt_id);
180  ++idx;
181  }
182 
183  // reset current storegate
184  ATH_MSG_INFO("Setting SkipEvents back to " << m_initial_skip_events
185  << " and rewinding");
186  ATH_CHECK(
187  setProp(evtSel, "SkipEvents", fmt::format("{}", m_initial_skip_events)));
188  ATH_CHECK(evtSel->rewind(*ctx));
189  ATH_MSG_INFO("Recorded a total of " << m_events.size() << " events");
190 
191  // Register as incident handler
192  ServiceHandle<IIncidentSvc> incident_svc("IncidentSvc", name());
193  ATH_CHECK(incident_svc.retrieve());
194  incident_svc->addListener(this, "BeginRun");
195  incident_svc->addListener(this, "SkipEvents");
196  return StatusCode::SUCCESS;
197 }
198 
200  ATH_MSG_INFO("Starting SkipEventIdxSvc");
201  // Call callbacks if any have already been added
202  for (auto&& callback : m_callbacks) {
203  auto begin = m_events.cbegin();
204  auto end = m_events.cbegin() + m_initial_skip_events;
205  ATH_CHECK(std::invoke(callback, begin, end));
206  }
207  m_started = true;
208  return StatusCode::SUCCESS;
209 }
210 
212  std::function<StatusCode(EvtIter, EvtIter)>&& callback) {
213  m_callbacks.push_back(callback);
214  // Call callback immediately for events skipped in EventSelector if already
215  // started
216  if (m_started && !m_events.empty()) {
217  auto begin = m_events.cbegin();
218  auto end = m_events.cbegin() +
219  std::min(std::size_t(m_initial_skip_events), m_events.size());
220  ATH_CHECK(std::invoke(callback, begin, end));
221  }
222  return StatusCode::SUCCESS;
223 }
224 
225 void SkipEventIdxSvc::handle(const Incident& inc) {
226  if (inc.type() != "BeginRun"s && inc.type() != "SkipEvents"s) {
227  return;
228  }
229  ATH_MSG_DEBUG("Received incident of type " << inc.type() << " from "
230  << inc.source());
231  if (inc.type() == "SkipEvents"s) {
232  ATH_MSG_DEBUG("Running callbacks");
233  const auto& cInc =
234  dynamic_cast<const ContextIncident<std::tuple<int, int>>&>(inc);
235  auto begin = m_events.cbegin() + std::get<0>(cInc.tag());
236  auto end = m_events.cbegin() + std::get<1>(cInc.tag()) +
237  1; // plus 1 for 1 after end
238  for (auto&& fn : m_callbacks) {
239  if (!std::invoke(fn, begin, end).isSuccess()) {
240  throw std::runtime_error(
241  "A skipEvent callback returned a failure error code!");
242  }
243  }
244  }
245 }
python.SystemOfUnits.s
int s
Definition: SystemOfUnits.py:131
vtune_athena.format
format
Definition: vtune_athena.py:14
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
xAOD::uint32_t
setEventNumber uint32_t
Definition: EventInfo_v1.cxx:127
PlotCalibFromCool.begin
begin
Definition: PlotCalibFromCool.py:94
runLayerRecalibration.callback
callback
Definition: runLayerRecalibration.py:64
athena.value
value
Definition: athena.py:122
skel.runNum
runNum
Definition: skel.ABtoEVGEN.py:137
IEvtIdModifierSvc
Definition: IEvtIdModifierSvc.h:37
python.FakeAthena.Service
def Service(name)
Definition: FakeAthena.py:38
mergePhysValFiles.end
end
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:93
SkipEventIdxSvc::m_initial_skip_events
int m_initial_skip_events
Definition: SkipEventIdxSvc.h:26
IEvtIdModifierSvc.h
python.getCurrentFolderTag.fn
fn
Definition: getCurrentFolderTag.py:65
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
Atlas.StoreGateSvc
StoreGateSvc
Definition: Atlas.UnixStandardJob.py:25
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
SkipEventIdxSvc::handle
void handle(const Incident &inc) override
Definition: SkipEventIdxSvc.cxx:225
SkipEventIdxSvc::start
StatusCode start() override
Definition: SkipEventIdxSvc.cxx:199
SkipEventIdxSvc::m_callbacks
std::vector< std::function< StatusCode(EvtIter, EvtIter)> > m_callbacks
Definition: SkipEventIdxSvc.h:28
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaAttributeList
An AttributeList represents a logical row of attributes in a metadata table. The name and type of eac...
Definition: PersistentDataModel/PersistentDataModel/AthenaAttributeList.h:45
Handler::svc
AthROOTErrorHandlerSvc * svc
Definition: AthROOTErrorHandlerSvc.cxx:10
SkipEventIdxSvc::SkipEventIdxSvc
SkipEventIdxSvc(const std::string &name, ISvcLocator *svc)
Definition: SkipEventIdxSvc.cxx:54
min
#define min(a, b)
Definition: cfImp.cxx:40
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.ExitCodes.what
def what(code)
Definition: ExitCodes.py:73
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
SkipEventIdxSvc::initialize
StatusCode initialize() override
Definition: SkipEventIdxSvc.cxx:57
SkipEventIdxSvc::registerCallback
StatusCode registerCallback(std::function< StatusCode(EvtIter, EvtIter)> &&callback) override
Definition: SkipEventIdxSvc.cxx:211
dq_make_web_display.rv
def rv
Definition: dq_make_web_display.py:219
EventInfo.h
CxxUtils::to
CONT to(RANGE &&r)
Definition: ranges.h:32
SkipEventIdxSvc::m_events
std::vector< EvtId > m_events
Definition: SkipEventIdxSvc.h:27
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
Pythia8_RapidityOrderMPI.val
val
Definition: Pythia8_RapidityOrderMPI.py:14
LArNewCalib_DelayDump_OFC_Cali.idx
idx
Definition: LArNewCalib_DelayDump_OFC_Cali.py:69
SkipEventIdxSvc::m_started
bool m_started
Definition: SkipEventIdxSvc.h:29
SkipEventIdxSvc.h
ServiceHandle< IIncidentSvc >