ATLAS Offline Software
Loading...
Searching...
No Matches
DelayedConditionsCleanerSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
10
11
16#include "CxxUtils/StrFormat.h"
17#include "GaudiKernel/EventContext.h"
18#include "GaudiKernel/ServiceHandle.h"
19#include <algorithm>
20#include <unordered_set>
21
22
23// We wanted to try to allow cleaning to run as asynchronous tasks,
24// but there are some race conditions that appear to be difficult
25// to resolve. These stem from the fact that conditions are only
26// written from CondInputLoader. If we clean after that, and don't
27// have the full set of IOV keys for all executing events, then we can
28// clean an item which the slot won't be able to recover.
29// Leave the code commented-out for now while we think about this further.
30#define USE_ASYNC_TASK 0
31
32
33#if USE_ASYNC_TASK
34#include "tbb/task.h"
35#endif
36
37
38namespace Athena {
39
40
42 : public AthProperties<DelayedConditionsCleanerSvc>
43{
44public:
47
49 Gaudi::Property<size_t> m_ringSize
50 { parent(), "RingSize", 100,
51 "Number of previous events for which to remember IOV history." };
52
55 Gaudi::Property<size_t> m_cleanDelay
56 { parent(), "CleanDelay", 100,
57 "Number of events after adding a conditions object we try to clean its container." };
58
60 Gaudi::Property<size_t> m_lookAhead
61 { parent(), "LookAhead", 10,
62 "Maximum number of events to consolodate together when cleaning." };
63
64
66#if USE_ASYNC_TASK
67 Gaudi::Property<bool> m_async
68 { parent(), "Async", false,
69 "If true, run cleaning asynchronously in an MT job." };
70#else
71 bool m_async = false;
72#endif
73
76 { parent(), "RCUSvc", "Athena::RCUSvc",
77 "The RCU service." };
78};
79
80
81#if USE_ASYNC_TASK
85class DelayedConditionsCleanerTask
86 : public tbb::task
87{
88public:
96 DelayedConditionsCleanerTask (DelayedConditionsCleanerSvc& cleaner,
97 std::vector<DelayedConditionsCleanerSvc::CondContInfo*>&& cis,
98 DelayedConditionsCleanerSvc::twoKeys_t&& keys);
99
103 tbb::task* execute() override;
104
105
106private:
108 DelayedConditionsCleanerSvc& m_cleaner;
109
111 std::vector<DelayedConditionsCleanerSvc::CondContInfo*> m_cis;
112
114 DelayedConditionsCleanerSvc::twoKeys_t m_keys;
115};
116
117
124DelayedConditionsCleanerTask::DelayedConditionsCleanerTask
126 std::vector<DelayedConditionsCleanerSvc::CondContInfo*>&& cis,
128 : m_cleaner (cleaner),
129 m_cis (cis),
130 m_keys (keys)
131{
132}
133
134
138tbb::task* DelayedConditionsCleanerTask::execute()
139{
140 // Do the cleaning.
141 m_cleaner.cleanContainers (std::move (m_cis), std::move (m_keys));
142
143 // This task is terminating.
144 --m_cleaner.m_cleanTasks;
145 return nullptr;
146}
147#endif // USE_ASYNC_TASK
148
149
156 ISvcLocator* svc)
157 : base_class (name, svc),
158 m_props (std::make_unique<DelayedConditionsCleanerSvcProps> (this))
159{
160}
161
162
167{
168 // Set the ring buffer sizes.
169 m_runlbn.reset (m_props->m_ringSize);
170 m_timestamp.reset (m_props->m_ringSize);
171
172 ATH_CHECK( m_props->m_rcu.retrieve() );
173 size_t nslots = m_props->m_rcu->getNumSlots();
174 m_slotLBN.resize (nslots);
175 m_slotTimestamp.resize (nslots);
176
177 return StatusCode::SUCCESS;
178}
179
180
187StatusCode
188DelayedConditionsCleanerSvc::event (const EventContext& ctx, bool allowAsync)
189{
190 // Push the IOV key for the current event into the ring buffers.
191 // Also save in the per-slot arrays.
192 key_type key_lbn = CondContBase::keyFromRunLBN (ctx.eventID());
193 key_type key_ts = CondContBase::keyFromTimestamp (ctx.eventID());
194 m_runlbn.push (key_lbn);
195 m_timestamp.push (key_ts);
196 EventContext::ContextID_t slot = ctx.slot();
197 if (slot != EventContext::INVALID_CONTEXT_ID) {
198 m_slotLBN[slot] = key_lbn;
199 m_slotTimestamp[slot] = key_ts;
200 }
201
202 // Return now if an asynchronous cleaning task is still running ---
203 // we don't want to start a new one yet. We'll check pending work
204 // on the next call.
205 if (m_cleanTasks > 0) {
206 return StatusCode::SUCCESS;
207 }
208
209 // Collect conditions containers in need of cleaning.
210 std::vector<CondContInfo*> ci_vec;
211 {
213 // Is it time to clean the container at the top of the work queue?
214 if (!m_work.empty() && m_work.top().m_evt <= ctx.evt()) {
215 ++m_nEvents;
216 size_t sz = m_work.size();
217 m_queueSum += sz;
218 m_maxQueue = std::max (m_maxQueue, sz);
219
220 // Yes. Put it on the correct list. Also look ahead in the queue
221 // a bit; if there are other containers that we want to clean soon,
222 // go ahead and do them now.
223 do {
224 CondContInfo* ci = m_work.top().m_ci;
225 switch (ci->m_cc.keyType()) {
226 case KeyType::SINGLE:
227 break;
228 case KeyType::RUNLBN:
229 case KeyType::MIXED:
230 case KeyType::TIMESTAMP:
231 ci_vec.push_back (ci);
232 break;
233 default:
234 std::abort();
235 }
236 m_work.pop();
238 } while (!m_work.empty() && m_work.top().m_evt <= ctx.evt() + m_props->m_lookAhead);
239 }
240 }
241
242 // Clean the containers.
243 if (!ci_vec.empty()) {
244 scheduleClean (std::move (ci_vec), getKeys(m_runlbn,m_timestamp),
245 allowAsync);
246 }
247 return StatusCode::SUCCESS;
248}
249
250
256StatusCode DelayedConditionsCleanerSvc::condObjAdded (const EventContext& ctx,
257 CondContBase& cc)
258{
259 // Add this container to the priority queue.
261 CCInfoMap_t::iterator it = m_ccinfo.find (&cc);
262 if (it == m_ccinfo.end()) {
263 it = m_ccinfo.emplace (&cc, CondContInfo (cc)).first;
264 }
265
266 EventContext::ContextEvt_t evt = ctx.evt();
267 m_work.emplace (evt + m_props->m_cleanDelay, it->second);
268 return StatusCode::SUCCESS;
269}
270
271
278{
279 // Suppress output if we didn't actually do anything.
280 if (m_nEvents == 0) {
281 return StatusCode::SUCCESS;
282 }
283
284 ATH_MSG_INFO( "Conditions container statistics" );
285 ATH_MSG_INFO( CxxUtils::strformat (" Work q: Max size: %zu (%zu queries) ",
287 size_t den = std::max (m_nEvents, 1lu);
288 ATH_MSG_INFO( CxxUtils::strformat (" Avg size: %.2f / Avg removed: %.2f",
289 static_cast<float>(m_queueSum)/den,
290 static_cast<float>(m_workRemoved)/den) );
291
292 std::vector<const CondContInfo*> infos;
293 for (const auto& p : m_ccinfo) {
294 infos.push_back (&p.second);
295 }
296 std::sort (infos.begin(), infos.end(),
297 [](const CondContInfo* a, const CondContInfo* b)
298 { return a->m_cc.id().key() < b->m_cc.id().key(); });
299
300 for (const CondContInfo* ci : infos) {
301 ATH_MSG_INFO( CxxUtils::strformat (" %-20s nInserts %6zu maxSize %3zu",
302 ci->m_cc.id().key().c_str(),
303 ci->m_cc.nInserts(),
304 ci->m_cc.maxSize()) );
305 den = std::max (ci->m_nClean, 1lu);
306 ATH_MSG_INFO( CxxUtils::strformat (" nClean %zu avgRemoved %.2f 0/1/2+ %zu/%zu/%zu",
307 ci->m_nClean,
308 static_cast<float> (ci->m_nRemoved) / den,
309 ci->m_removed0,
310 ci->m_removed1,
311 ci->m_removed2plus) );
312 }
313
314 return StatusCode::SUCCESS;
315}
316
317
323{
324 m_runlbn.reset (m_props->m_ringSize);
325 m_timestamp.reset (m_props->m_ringSize);
326
327 std::fill (m_slotLBN.begin(), m_slotLBN.end(), 0);
328 std::fill (m_slotTimestamp.begin(), m_slotTimestamp.end(), 0);
329
330 m_ccinfo.clear();
331 std::priority_queue<QueueItem> tmp;
332 m_work.swap (tmp);
333
334 m_nEvents = 0;
335 m_queueSum = 0;
336 m_workRemoved = 0;
337 m_maxQueue = 0;
338 m_cleanTasks = 0;
339
340 return StatusCode::SUCCESS;
341}
342
343
345DelayedConditionsCleanerSvc::getKeys(const Ring& runLBRing, const Ring& TSRing) const {
346
347 // Get a copy of the contents of the ring buffer holding runLumi and time-stamp keys
348 std::vector<key_type> runLBKeys=runLBRing.getKeysDedup();
349 std::vector<key_type> TSKeys=TSRing.getKeysDedup();
350
351 // Add in the keys for the currently-executing slots.
352 // These are very likely to already be in the ring, but that's
353 // not absolutely guaranteed.
354 // FIXME: This probably does another memory allocation, due to
355 // growing the buffer. Would be nice to avoid that.
356 runLBKeys.insert (runLBKeys.end(), m_slotLBN.begin(), m_slotLBN.end());
357 TSKeys.insert(TSKeys.end(), m_slotTimestamp.begin(), m_slotTimestamp.end());
358
359 twoKeys_t result{std::move(runLBKeys), std::move(TSKeys)};
360
365 for ( auto& keys : result ) {
366 std::sort (keys.begin(), keys.end());
367 auto end = std::unique (keys.begin(), keys.end());
368 keys.resize (end - keys.begin());
369 }
370
371
372 return result;
373}
374
375
385void
386DelayedConditionsCleanerSvc::scheduleClean (std::vector<CondContInfo*>&& cis,
387 twoKeys_t&& twoKeys,
388 bool allowAsync)
389{
390 // Remove any duplicates from the list of containers.
391 std::sort (cis.begin(), cis.end());
392 auto pos = std::unique (cis.begin(), cis.end());
393 cis.resize (pos - cis.begin());
394
395 if (allowAsync && m_props->m_async) {
396#if USE_ASYNC_TASK
397 // Queue cleaning as a TBB task.
398 // Count that we have another executing task.
399 ++m_cleanTasks;
400
401 // Create the TBB task and queue it.
402 // TBB will delete the task object after it completes.
403 tbb::task* t = new (tbb::task::allocate_root())
404 DelayedConditionsCleanerTask (*this, std::move (cis),
405 std::move (twoKeys));
406 tbb::task::enqueue (*t);
407#endif
408 }
409 else
410 {
411 // Call cleaning directly.
412 cleanContainers (std::move (cis), std::move (twoKeys));
413 }
414}
415
416
422void
423DelayedConditionsCleanerSvc::cleanContainers (std::vector<CondContInfo*>&& cis,
424 twoKeys_t&& twoKeys)
425{
426 // FIXME: Some conditions objects have pointers to parts of other
427 // conditions objects, which violates the lifetime guarantees of
428 // conditions containers (a pointer you get from a conditions container
429 // is guaranteed to be valid until the end of the current event, but
430 // not past that). In some cases, this can be dealt with by replacing
431 // the pointers with CondLink, but that is sometimes rather inconvenient.
432 // Try to work around this for now by ensuring that when we delete an object,
433 // we also try to clean the containers of other objects that may depend
434 // on it.
435 std::vector<CondContInfo*> toclean = std::move (cis);
436 std::unordered_set<CondContInfo*> cleaned (toclean.begin(), toclean.end());
437 while (!toclean.empty()) {
438 std::vector<CondContInfo*> newclean;
439 for (CondContInfo* ci : toclean) {
440 if (cleanContainer (ci, twoKeys)) {
442 for (CondContBase* dep : ci->m_cc.getDeps()) {
443 CCInfoMap_t::iterator it = m_ccinfo.find (dep);
444 // If we don't find it, then dep must have no conditions objects.
445 if (it != m_ccinfo.end()) {
446 CondContInfo* ci_dep = &it->second;
447 if (cleaned.insert (ci_dep).second) {
448 newclean.push_back (ci_dep);
449 }
450 }
451 }
452 }
453 }
454 toclean = std::move (newclean);
455 }
456}
457
458
467bool
469 const twoKeys_t& twoKeys) const
470{
471 size_t n = ci->m_cc.trim (twoKeys[0],twoKeys[1]);
472
473 ++ci->m_nClean;
474 ci->m_nRemoved += n;
475 switch (n) {
476 case 0:
477 ++ci->m_removed0;
478 break;
479 case 1:
480 ++ci->m_removed1;
481 break;
482 default:
483 ++ci->m_removed2plus;
484 }
485
486 return n > 0;
487}
488
492
494
499{
501 return StatusCode::SUCCESS;
502}
503
504
505} // namespace Athena
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_INFO(x)
pimpl-style holder for component properties.
Hold mappings of ranges to condition objects.
Clean conditions containers after a delay.
virtual void lock()=0
Interface to allow an object to lock itself when made const in SG.
read-copy-update (RCU) style synchronization for Athena.
static Double_t sz
static Double_t a
Provide helper functions to create formatted strings.
DelayedConditionsCleanerSvc * parent()
AthProperties(DelayedConditionsCleanerSvc *parent)
bool m_async
Property: If true, run cleaning asynchronously in an MT job.
Gaudi::Property< size_t > m_cleanDelay
Property: Number of events after adding a conditions object we try to clean its container.
ServiceHandle< Athena::IRCUSvc > m_rcu
Property: RCU Service.
DelayedConditionsCleanerSvcProps(DelayedConditionsCleanerSvc *parent)
Gaudi::Property< size_t > m_ringSize
Property: Number of previous events for which to remember IOV history.
Gaudi::Property< size_t > m_lookAhead
Property: Maximum number of events to consolodate together when cleaning.
Information that we maintain about each conditions container.
Clean conditions containers after a delay.
DelayedConditionsCleanerSvc(const std::string &name, ISvcLocator *svc)
Standard Gaudi constructor.
std::priority_queue< QueueItem > m_work
Priority queue of pending cleaning requests.
size_t m_nEvents
Priority queue statistics.
virtual StatusCode finalize() override
Standard Gaudi finalize method.
twoKeys_t getKeys(const Ring &runLBRing, const Ring &TSRing) const
virtual StatusCode condObjAdded(const EventContext &ctx, CondContBase &cc) override
Called after a conditions object has been added.
virtual StatusCode initialize() override
Standard Gaudi initialize method.
Ring m_runlbn
Two ring buffers for recent IOV keys, one for run+LBN and one for timestamp.
virtual StatusCode event(const EventContext &ctx, bool allowAsync) override
Called at the start of each event.
void scheduleClean(std::vector< CondContInfo * > &&cis, twoKeys_t &&twoKeys, bool allowAsync)
Do cleaning for a set of containers.
std::unique_ptr< DelayedConditionsCleanerSvcProps > m_props
Component properties.
CxxUtils::Ring< key_type > Ring
Ring buffer holding most recent IOV keys of a given type.
CondContBase::key_type key_type
Packed key type.
std::atomic< int > m_cleanTasks
Number of active asynchronous cleaning tasks.
virtual StatusCode reset() override
Clear the internal state of the service.
std::array< std::vector< key_type >, 2 > twoKeys_t
void cleanContainers(std::vector< CondContInfo * > &&cis, twoKeys_t &&twoKeys)
Clean a set of containers.
bool cleanContainer(CondContInfo *ci, const twoKeys_t &keys) const
Clean a single container.
std::vector< key_type > m_slotLBN
IOV keys currently in use for each slot.
virtual StatusCode printStats() const override
Print some statistics about the garbage collection.
~DelayedConditionsCleanerSvc()
Standard destructor.
Base class for all conditions containers.
Definition CondCont.h:140
static key_type keyFromTimestamp(const EventIDBase &b)
Make a timestamp key from an EventIDBase.
static key_type keyFromRunLBN(const EventIDBase &b)
Make a run+lbn key from an EventIDBase.
KeyType keyType() const
Return the key type for this container.
std::vector< T > getKeysDedup() const
Return a copy of keys in the buffer.
Some weak symbol referencing magic... These are declared in AthenaKernel/getMessageSvc....
std::string strformat(const char *fmt,...)
return a std::string according to a format fmt and varargs
Definition StrFormat.cxx:49
::StatusCode StatusCode
StatusCode definition for legacy code.
STL namespace.
DataModel_detail::iterator< DVL > unique(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of unique for DataVector/List.
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.