ATLAS Offline Software
Loading...
Searching...
No Matches
DelayedConditionsCleanerSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
10
11
16#include "CxxUtils/StrFormat.h"
17#include "GaudiKernel/EventContext.h"
18#include "GaudiKernel/ServiceHandle.h"
19#include <algorithm>
20#include <unordered_set>
21
22
23// We wanted to try to allow cleaning to run as asynchronous tasks,
24// but there are some race conditions that appear to be difficult
25// to resolve. These stem from the fact that conditions are only
26// written from CondInputLoader. If we clean after that, and don't
27// have the full set of IOV keys for all executing events, then we can
28// clean an item which the slot won't be able to recover.
29// Leave the code commented-out for now while we think about this further.
30#define USE_ASYNC_TASK 0
31
32
33#if USE_ASYNC_TASK
34#include "tbb/task.h"
35#endif
36
37
38namespace Athena {
39
40
42 : public AthProperties<DelayedConditionsCleanerSvc>
43{
44public:
47
49 Gaudi::Property<size_t> m_ringSize
50 { parent(), "RingSize", 100,
51 "Number of previous events for which to remember IOV history." };
52
55 Gaudi::Property<size_t> m_cleanDelay
56 { parent(), "CleanDelay", 100,
57 "Number of events after adding a conditions object we try to clean its container." };
58
60 Gaudi::Property<size_t> m_lookAhead
61 { parent(), "LookAhead", 10,
62 "Maximum number of events to consolodate together when cleaning." };
63
64
66#if USE_ASYNC_TASK
67 Gaudi::Property<bool> m_async
68 { parent(), "Async", false,
69 "If true, run cleaning asynchronously in an MT job." };
70#else
71 bool m_async = false;
72#endif
73
76 { parent(), "RCUSvc", "Athena::RCUSvc",
77 "The RCU service." };
78};
79
80
81#if USE_ASYNC_TASK
85class DelayedConditionsCleanerTask
86 : public tbb::task
87{
88public:
96 DelayedConditionsCleanerTask (DelayedConditionsCleanerSvc& cleaner,
97 std::vector<DelayedConditionsCleanerSvc::CondContInfo*>&& cis,
98 DelayedConditionsCleanerSvc::twoKeys_t&& keys);
99
103 tbb::task* execute() override;
104
105
106private:
108 DelayedConditionsCleanerSvc& m_cleaner;
109
111 std::vector<DelayedConditionsCleanerSvc::CondContInfo*> m_cis;
112
114 DelayedConditionsCleanerSvc::twoKeys_t m_keys;
115};
116
117
124DelayedConditionsCleanerTask::DelayedConditionsCleanerTask
126 std::vector<DelayedConditionsCleanerSvc::CondContInfo*>&& cis,
128 : m_cleaner (cleaner),
129 m_cis (cis),
130 m_keys (keys)
131{
132}
133
134
138tbb::task* DelayedConditionsCleanerTask::execute()
139{
140 // Do the cleaning.
141 m_cleaner.cleanContainers (std::move (m_cis), std::move (m_keys));
142
143 // This task is terminating.
144 --m_cleaner.m_cleanTasks;
145 return nullptr;
146}
147#endif // USE_ASYNC_TASK
148
149
156 ISvcLocator* svc)
157 : base_class (name, svc),
158 m_props (std::make_unique<DelayedConditionsCleanerSvcProps> (this))
159{
160}
161
162
167{
168 // Set the ring buffer sizes.
169 m_runlbn.reset (m_props->m_ringSize);
170 m_timestamp.reset (m_props->m_ringSize);
171
172 ATH_CHECK( m_props->m_rcu.retrieve() );
173 size_t nslots = m_props->m_rcu->getNumSlots();
174 m_slotLBN.resize (nslots);
175 m_slotTimestamp.resize (nslots);
176
177 return StatusCode::SUCCESS;
178}
179
180
187StatusCode
188DelayedConditionsCleanerSvc::event (const EventContext& ctx, bool allowAsync)
189{
190 // Push the IOV key for the current event into the ring buffers.
191 // Also save in the per-slot arrays.
192 key_type key_lbn = CondContBase::keyFromRunLBN (ctx.eventID());
193 key_type key_ts = CondContBase::keyFromTimestamp (ctx.eventID());
194 m_runlbn.push (key_lbn);
195 m_timestamp.push (key_ts);
196 EventContext::ContextID_t slot = ctx.slot();
197 if (slot != EventContext::INVALID_CONTEXT_ID) {
198 m_slotLBN[slot] = key_lbn;
199 m_slotTimestamp[slot] = key_ts;
200 }
201
202 // Return now if an asynchronous cleaning task is still running ---
203 // we don't want to start a new one yet. We'll check pending work
204 // on the next call.
205 if (m_cleanTasks > 0) {
206 return StatusCode::SUCCESS;
207 }
208
209 // Collect conditions containers in need of cleaning.
210 std::vector<CondContInfo*> ci_vec;
211 {
212 lock_t lock (m_workMutex);
213 // Is it time to clean the container at the top of the work queue?
214 if (!m_work.empty() && m_work.top().m_evt <= ctx.evt()) {
215 ++m_nEvents;
216 size_t sz = m_work.size();
217 m_queueSum += sz;
218 m_maxQueue = std::max (m_maxQueue, sz);
219
220 // Yes. Put it on the correct list. Also look ahead in the queue
221 // a bit; if there are other containers that we want to clean soon,
222 // go ahead and do them now.
223 do {
224 CondContInfo* ci = m_work.top().m_ci;
225 switch (ci->m_cc.keyType()) {
226 case KeyType::SINGLE:
227 break;
228 case KeyType::RUNLBN:
229 case KeyType::MIXED:
230 case KeyType::TIMESTAMP:
231 ci_vec.push_back (ci);
232 break;
233 default:
234 std::abort();
235 }
236 m_work.pop();
238 } while (!m_work.empty() && m_work.top().m_evt <= ctx.evt() + m_props->m_lookAhead);
239 }
240 }
241
242 // Clean the containers.
243 if (!ci_vec.empty()) {
244 scheduleClean (std::move (ci_vec), getKeys(m_runlbn,m_timestamp),
245 allowAsync);
246 }
247 return StatusCode::SUCCESS;
248}
249
250
256StatusCode DelayedConditionsCleanerSvc::condObjAdded (const EventContext& ctx,
257 CondContBase& cc)
258{
259 // Add this container to the priority queue.
260 lock_t lock (m_workMutex);
261 CCInfoMap_t::iterator it = m_ccinfo.find (&cc);
262 if (it == m_ccinfo.end()) {
263 it = m_ccinfo.emplace (&cc, CondContInfo (cc)).first;
264 }
265
266 EventContext::ContextEvt_t evt = ctx.evt();
267 m_work.emplace (evt + m_props->m_cleanDelay, it->second);
268 return StatusCode::SUCCESS;
269}
270
271
278{
279 // Suppress output if we didn't actually do anything.
280 if (m_nEvents == 0) {
281 return StatusCode::SUCCESS;
282 }
283
284 ATH_MSG_INFO( "Conditions container statistics" );
285 ATH_MSG_INFO( CxxUtils::strformat (" Work q: Max size: %zu (%zu queries) ",
287 size_t den = std::max (m_nEvents, 1lu);
288 ATH_MSG_INFO( CxxUtils::strformat (" Avg size: %.2f / Avg removed: %.2f",
289 static_cast<float>(m_queueSum)/den,
290 static_cast<float>(m_workRemoved)/den) );
291
292 std::vector<const CondContInfo*> infos;
293 for (const auto& p : m_ccinfo) {
294 infos.push_back (&p.second);
295 }
296 std::sort (infos.begin(), infos.end(),
297 [](const CondContInfo* a, const CondContInfo* b)
298 { return a->m_cc.id().key() < b->m_cc.id().key(); });
299
300 for (const CondContInfo* ci : infos) {
301 ATH_MSG_INFO( CxxUtils::strformat (" %-20s nInserts %6zu maxSize %3zu",
302 ci->m_cc.id().key().c_str(),
303 ci->m_cc.nInserts(),
304 ci->m_cc.maxSize()) );
305 den = std::max (ci->m_nClean, 1lu);
306 ATH_MSG_INFO( CxxUtils::strformat (" nClean %zu avgRemoved %.2f 0/1/2+ %zu/%zu/%zu",
307 ci->m_nClean,
308 static_cast<float> (ci->m_nRemoved) / den,
309 ci->m_removed0,
310 ci->m_removed1,
311 ci->m_removed2plus) );
312 }
313
314 return StatusCode::SUCCESS;
315}
316
317
323{
324 m_runlbn.reset (m_props->m_ringSize);
325 m_timestamp.reset (m_props->m_ringSize);
326
327 std::fill (m_slotLBN.begin(), m_slotLBN.end(), 0);
328 std::fill (m_slotTimestamp.begin(), m_slotTimestamp.end(), 0);
329
330 m_ccinfo.clear();
331 std::priority_queue<QueueItem> tmp;
332 m_work.swap (tmp);
333
334 m_nEvents = 0;
335 m_queueSum = 0;
336 m_workRemoved = 0;
337 m_maxQueue = 0;
338 m_cleanTasks = 0;
339
340 return StatusCode::SUCCESS;
341}
342
343
345DelayedConditionsCleanerSvc::getKeys(const Ring& runLBRing, const Ring& TSRing) const {
346
347 // Get a copy of the contents of the ring buffer holding runLumi and time-stamp keys
348 std::vector<key_type> runLBKeys=runLBRing.getKeysDedup();
349 std::vector<key_type> TSKeys=TSRing.getKeysDedup();
350
351 // Add in the keys for the currently-executing slots.
352 // These are very likely to already be in the ring, but that's
353 // not absolutely guaranteed.
354 // FIXME: This probably does another memory allocation, due to
355 // growing the buffer. Would be nice to avoid that.
356 runLBKeys.insert (runLBKeys.end(), m_slotLBN.begin(), m_slotLBN.end());
357 TSKeys.insert(TSKeys.end(), m_slotTimestamp.begin(), m_slotTimestamp.end());
358
359 twoKeys_t result{std::move(runLBKeys), std::move(TSKeys)};
360
365 for ( auto& keys : result ) {
366 std::sort (keys.begin(), keys.end());
367 auto end = std::unique (keys.begin(), keys.end());
368 keys.resize (end - keys.begin());
369 }
370
371
372 return result;
373}
374
375
385void
386DelayedConditionsCleanerSvc::scheduleClean (std::vector<CondContInfo*>&& cis,
387 twoKeys_t&& twoKeys,
388 bool allowAsync)
389{
390 // Remove any duplicates from the list of containers.
391 std::sort (cis.begin(), cis.end());
392 auto pos = std::unique (cis.begin(), cis.end());
393 cis.resize (pos - cis.begin());
394
395 if (allowAsync && m_props->m_async) {
396#if USE_ASYNC_TASK
397 // Queue cleaning as a TBB task.
398 // Count that we have another executing task.
399 ++m_cleanTasks;
400
401 // Create the TBB task and queue it.
402 // TBB will delete the task object after it completes.
403 tbb::task* t = new (tbb::task::allocate_root())
404 DelayedConditionsCleanerTask (*this, std::move (cis),
405 std::move (twoKeys));
406 tbb::task::enqueue (*t);
407#endif
408 }
409 else
410 {
411 // Call cleaning directly.
412 cleanContainers (std::move (cis), std::move (twoKeys));
413 }
414}
415
416
422void
423DelayedConditionsCleanerSvc::cleanContainers (std::vector<CondContInfo*>&& cis,
424 twoKeys_t&& twoKeys)
425{
426 // FIXME: Some conditions objects have pointers to parts of other
427 // conditions objects, which violates the lifetime guarantees of
428 // conditions containers (a pointer you get from a conditions container
429 // is guaranteed to be valid until the end of the current event, but
430 // not past that). In some cases, this can be dealt with by replacing
431 // the pointers with CondLink, but that is sometimes rather inconvenient.
432 // Try to work around this for now by ensuring that when we delete an object,
433 // we also try to clean the containers of other objects that may depend
434 // on it.
435 std::vector<CondContInfo*> toclean = std::move (cis);
436 std::unordered_set<CondContInfo*> cleaned (toclean.begin(), toclean.end());
437 while (!toclean.empty()) {
438 std::vector<CondContInfo*> newclean;
439 for (CondContInfo* ci : toclean) {
440 if (cleanContainer (ci, twoKeys)) {
441 lock_t lock (m_workMutex);
442 for (CondContBase* dep : ci->m_cc.getDeps()) {
443 CCInfoMap_t::iterator it = m_ccinfo.find (dep);
444 // If we don't find it, then dep must have no conditions objects.
445 if (it != m_ccinfo.end()) {
446 CondContInfo* ci_dep = &it->second;
447 if (cleaned.insert (ci_dep).second) {
448 newclean.push_back (ci_dep);
449 }
450 }
451 }
452 }
453 }
454 toclean = std::move (newclean);
455 }
456}
457
458
467bool
469 const twoKeys_t& twoKeys) const
470{
471 size_t n = ci->m_cc.trim (twoKeys[0],twoKeys[1]);
472
473 ++ci->m_nClean;
474 ci->m_nRemoved += n;
475 switch (n) {
476 case 0:
477 ++ci->m_removed0;
478 break;
479 case 1:
480 ++ci->m_removed1;
481 break;
482 default:
483 ++ci->m_removed2plus;
484 }
485
486 return n > 0;
487}
488
492
494
499{
501 return StatusCode::SUCCESS;
502}
503
504
505} // namespace Athena
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_INFO(x)
pimpl-style holder for component properties.
Hold mappings of ranges to condition objects.
Clean conditions containers after a delay.
read-copy-update (RCU) style synchronization for Athena.
Provide helper functions to create formatted strings.
DelayedConditionsCleanerSvc * parent()
AthProperties(DelayedConditionsCleanerSvc *parent)
bool m_async
Property: If true, run cleaning asynchronously in an MT job.
Gaudi::Property< size_t > m_cleanDelay
Property: Number of events after adding a conditions object we try to clean its container.
ServiceHandle< Athena::IRCUSvc > m_rcu
Property: RCU Service.
DelayedConditionsCleanerSvcProps(DelayedConditionsCleanerSvc *parent)
Gaudi::Property< size_t > m_ringSize
Property: Number of previous events for which to remember IOV history.
Gaudi::Property< size_t > m_lookAhead
Property: Maximum number of events to consolodate together when cleaning.
Information that we maintain about each conditions container.
size_t m_removed2plus
Number of times two or more objects were removed.
size_t m_removed1
Number of times exactly 1 object was removed.
size_t m_removed0
Number of times exactly 0 objects were removed.
size_t m_nClean
Number of times cleaning was attempted.
size_t m_nRemoved
Total number of objects removed by cleaning.
Clean conditions containers after a delay.
DelayedConditionsCleanerSvc(const std::string &name, ISvcLocator *svc)
Standard Gaudi constructor.
std::priority_queue< QueueItem > m_work
Priority queue of pending cleaning requests.
size_t m_nEvents
Priority queue statistics.
twoKeys_t getKeys(const Ring &runLBRing, const Ring &TSRing) const
virtual StatusCode condObjAdded(const EventContext &ctx, CondContBase &cc) override
Called after a conditions object has been added.
virtual StatusCode initialize() override
Standard Gaudi initialize method.
Ring m_runlbn
Two ring buffers for recent IOV keys, one for run+LBN and one for timestamp.
virtual StatusCode event(const EventContext &ctx, bool allowAsync) override
Called at the start of each event.
void scheduleClean(std::vector< CondContInfo * > &&cis, twoKeys_t &&twoKeys, bool allowAsync)
Do cleaning for a set of containers.
std::unique_ptr< DelayedConditionsCleanerSvcProps > m_props
Component properties.
CxxUtils::Ring< key_type > Ring
Ring buffer holding most recent IOV keys of a given type.
CondContBase::key_type key_type
Packed key type.
std::atomic< int > m_cleanTasks
Number of active asynchronous cleaning tasks.
virtual StatusCode reset() override
Clear the internal state of the service.
std::array< std::vector< key_type >, 2 > twoKeys_t
void cleanContainers(std::vector< CondContInfo * > &&cis, twoKeys_t &&twoKeys)
Clean a set of containers.
bool cleanContainer(CondContInfo *ci, const twoKeys_t &keys) const
Clean a single container.
std::vector< key_type > m_slotLBN
IOV keys currently in use for each slot.
virtual StatusCode finalize() override
Standard Gaudi finalize method.
virtual StatusCode printStats() const override
Print some statistics about the garbage collection.
Base class for all conditions containers.
Definition CondCont.h:140
static key_type keyFromTimestamp(const EventIDBase &b)
Make a timestamp key from an EventIDBase.
static key_type keyFromRunLBN(const EventIDBase &b)
Make a run+lbn key from an EventIDBase.
KeyType keyType() const
Return the key type for this container.
virtual size_t trim(const std::vector< key_type > &runLbnKeys, const std::vector< key_type > &TSKeys)
Remove unused entries from the front of the list.
Definition CondCont.cxx:320
std::vector< CondContBase * > getDeps()
Return the list of conditions containers that depend on this one.
Definition CondCont.cxx:705
std::vector< T > getKeysDedup() const
Return a copy of keys in the buffer.
Some weak symbol referencing magic... These are declared in AthenaKernel/getMessageSvc....
std::string strformat(const char *fmt,...)
return a std::string according to a format fmt and varargs
Definition StrFormat.cxx:49
STL namespace.
DataModel_detail::iterator< DVL > unique(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of unique for DataVector/List.
void sort(typename DataModel_detail::iterator< DVL > beg, typename DataModel_detail::iterator< DVL > end)
Specialization of sort for DataVector/List.