2 Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
5 * @file CxxUtils/ConcurrentRangeMap.icc
6 * @author scott snyder <snyder@bnl.gov>
8 * @brief Map from range to payload object, allowing concurrent, lockless reads.
17 * @param delfcn Function to delete a payload object immediately.
19 template <class T, class CONTEXT>
20 IRangeMapPayloadDeleter<T, CONTEXT>::IRangeMapPayloadDeleter
21 (delete_function* delfcn)
24 // cppcheck-suppress missingReturn; false positive
29 * @brief Return a function to delete a payload object immediately.
31 template <class T, class CONTEXT>
33 typename IRangeMapPayloadDeleter<T, CONTEXT>::delete_function*
34 IRangeMapPayloadDeleter<T, CONTEXT>::delfcn() const
40 //*****************************************************************************
43 #define T_CONCURRENTRANGEMAP template <class RANGE, class KEY, class T, class COMPARE, template <class> class UPDATER> \
44 ATH_REQUIRES (detail::IsUpdater<UPDATER> && \
45 detail::IsConcurrentRangeCompare<COMPARE, RANGE, KEY, typename UPDATER<int>::Context_t>)
47 #define CONCURRENTRANGEMAP ConcurrentRangeMap<RANGE, KEY, T, COMPARE, UPDATER>
52 * @param capacity Size of the data vector to allocate.
55 CONCURRENTRANGEMAP::Impl::Impl (size_t capacity /*= 10*/)
62 * @brief Return a pointer to the start of the data vector.
65 typename CONCURRENTRANGEMAP::value_type*
66 CONCURRENTRANGEMAP::Impl::data()
73 * @brief Return the size of the current data vector.
77 CONCURRENTRANGEMAP::Impl::capacity() const
79 return m_data.capacity();
85 * @param updater Object used to manage memory
86 * (see comments at the start of the class).
87 * @param payloadDeleter Object for deleting payload objects.
88 * This is owned via a @c shared_ptr.
89 * @param capacity Initial capacity of the map.
90 * @param compare Comparison object.
93 CONCURRENTRANGEMAP::ConcurrentRangeMap (Updater_t&& updater,
94 std::shared_ptr<IPayloadDeleter> payloadDeleter,
95 size_t capacity /*= 16*/,
96 const COMPARE& compare /*= COMPARE()*/)
98 : m_updater (std::move (updater)),
100 m_payloadDeleter (payloadDeleter),
104 auto impl = std::make_unique<Impl> (capacity);
105 value_type* data = impl->data();
106 installImpl (std::move (impl),
108 Updater_t::defaultContext());
115 * Clean up any remaining payload objects.
118 CONCURRENTRANGEMAP::~ConcurrentRangeMap()
120 value_type* last = m_last;
122 delete_function* delfcn = m_payloadDeleter->delfcn();
123 for (value_type* p = m_begin; p <= m_last; ++p) {
132 * @brief Return a reference to the payload deleter object.
136 typename CONCURRENTRANGEMAP::IPayloadDeleter& CONCURRENTRANGEMAP::deleter()
138 return *m_payloadDeleter;
143 * @brief Return a reference to the payload deleter object.
147 const typename CONCURRENTRANGEMAP::IPayloadDeleter&
148 CONCURRENTRANGEMAP::deleter() const
150 return *m_payloadDeleter;
155 * @brief Search for the first item less than or equal to KEY.
156 * @param key The key to search for.
157 * @returns The value, or nullptr if not found.
161 typename CONCURRENTRANGEMAP::const_iterator
162 CONCURRENTRANGEMAP::find (const key_query_type& key) const
164 // Return right away if the map's empty;
165 const_iterator last = m_last;
166 if (!last) return nullptr;
168 // Check the last value.
169 if (!m_compare (key, last->first)) {
173 // Do a binary search to find the proper position.
174 const_iterator begin = getBegin (last);
175 if (!last) return nullptr;
176 const_iterator pos = std::upper_bound (begin, last+1, key,
177 [this](const key_query_type& key2,
179 { return m_compare (key2,v.first); } );
181 // Fail if it would be before the first value.
182 if (pos == begin) return nullptr;
190 * @brief Add a new element to the map.
191 * @param range Validity range for this element.
192 * @param ptr Payload for this element.
193 * @param tryExtend If true, then allow an existing range to be extended
195 * @param ctx Execution context.
197 * Returns SUCCESS if the new element was successfully inserted.
198 * Returns DUPLICATE if the range compared equal to an existing one.
199 * In that case, no new element is inserted (and @c ptr gets deleted).
200 * Returns EXTEND if the range of the last element was extended to @c range.
201 * This happens if @c tryExtend is true, @c range is equal
202 * to the range of the last element (according to @c m_compare),
203 * and the range can be extended according to @c extendRange.
204 * (This will generally mean that the start time of @c range
205 * matches the last range, and end time of @c range is after
206 * the end time of the last range.) In this case, again no
207 * new element is inserted and @c ptr is deleted.
208 * Returns OVERLAP if the range of the new element overlaps
209 * an existing element (the new element is still inserted).
212 typename CONCURRENTRANGEMAP::EmplaceResult
213 CONCURRENTRANGEMAP::emplace (const RANGE& range,
214 payload_unique_ptr ptr,
215 bool tryExtend /*= false*/,
216 const typename Updater_t::Context_t& ctx
217 /*= Updater_t::defaultContext()*/)
219 lock_t lock (m_mutex);
221 value_type* last = m_last;
222 value_type* begin = m_begin;
224 // Check if the element to be inserted is greater than all existing entries.
225 bool pastEnd = (!last || m_compare (last->first, range));
227 // See if we want to extend the last entry. Also check for duplicates.
228 if (last && !pastEnd) {
229 RANGE extendedRange = last->first;
230 int flag = m_compare.extendRange (extendedRange, range);
231 if (tryExtend && flag > 0 && extendImpl (lock, extendedRange, ctx) > 0) {
232 return EmplaceResult::EXTENDED;
235 return EmplaceResult::DUPLICATE;
239 // Can we add this to the end?
240 // There has to be room for another, and either the container must be empty,
241 // or the new element must be greater than the current last one.
242 value_type* end = last ? last+1 : begin;
243 if (pastEnd && end < m_impl->data() + m_impl->capacity())
245 // Yes, we can add it to the end.
246 // Check for overlap with the previous range.
247 EmplaceResult ret = EmplaceResult::SUCCESS;
248 RANGE newRange = range;
250 int flag = m_compare.overlap (ctx, (end-1)->first, newRange);
252 return EmplaceResult::DUPLICATE;
255 ret = EmplaceResult::OVERLAP;
259 // Copy the data to the container.
260 end->first = newRange;
261 end->second = ptr.release();
263 std::atomic_thread_fence (std::memory_order_seq_cst);
265 // Update the last pointer.
267 // Now the new element is visible to other threads.
269 m_maxSize = std::max (m_maxSize, static_cast<size_t> (end+1 - begin));
274 // No --- need to make a new implementation object and copy.
275 // Make the new one bigger, if needed.
276 int new_capacity = m_impl->capacity();
277 int old_size = end-begin;
278 if (old_size == 0) old_size = 1;
279 if (old_size > new_capacity/2) {
280 new_capacity = old_size*2;
283 EmplaceResult ret = EmplaceResult::SUCCESS;
285 // Allocate the new object.
286 auto new_impl = std::make_unique<Impl> (new_capacity);
287 value_type* new_begin = new_impl->data();
288 value_type* new_end = new_begin;
290 // Copy the data, adding the new item at the proper place.
291 // Separate out the case where the new item goes at the end,
292 // since we can do that faster.
294 // Check for overlap with the previous range.
295 RANGE newRange = range;
298 flag = m_compare.overlap (ctx, (end-1)->first, newRange);
301 ret = EmplaceResult::DUPLICATE;
305 ret = EmplaceResult::OVERLAP;
308 new_end = std::copy (begin, end, new_end);
309 new_end->first = newRange;
310 new_end->second = ptr.release();
316 // Otherwise we need to search for the proper place to insert the new entry.
317 RANGE newRange = range;
318 for (; begin < end; *new_end++ = *begin++) {
319 if (ptr && m_compare (newRange, begin->first)) {
320 // Check for overlap / duplicate
321 if (begin > m_begin) {
322 int flag = m_compare.overlap (ctx, (begin-1)->first, newRange);
324 ret = EmplaceResult::OVERLAP;
325 // The range start may have been adjusted forward. Make sure
326 // that we're still at the insertion point.
327 if (!m_compare (newRange, begin->first)) {
332 // Duplicate entry; fail. Everything allocated is held by unique_ptr,
333 // so should be cleaned up properly.
334 return EmplaceResult::DUPLICATE;
338 int flag = m_compare.overlap (ctx, begin->first, newRange);
340 ret = EmplaceResult::OVERLAP;
341 // The range start may have been adjusted forward. Make sure
342 // that we're still at the insertion point.
343 if (!m_compare (newRange, begin->first)) {
348 // Duplicate entry; fail. Everything allocated is held by unique_ptr,
349 // so should be cleaned up properly.
350 return EmplaceResult::DUPLICATE;
353 new_end->first = newRange;
354 new_end->second = ptr.release();
360 // Possible to get here if overlap() moved the start of the range
361 // forward past all existing ranges.
362 new_end->first = newRange;
363 new_end->second = ptr.release();
368 // Install the new implementation.
369 installImpl (std::move (new_impl), new_begin, new_end, ctx);
372 m_maxSize = std::max (m_maxSize, static_cast<size_t> (new_end - new_begin));
378 * @brief Erase the first item less than or equal to KEY.
379 * @param key The key to search erase.
380 * @param ctx Execution context.
384 CONCURRENTRANGEMAP::erase (const key_query_type& key,
385 const typename Updater_t::Context_t& ctx
386 /*= Updater_t::defaultContext()*/)
388 lock_t lock (m_mutex);
390 // Return if the container's empty.
391 value_type* last = m_last;
392 if (last == nullptr) return;
394 value_type* begin = m_begin;
395 value_type* end = last+1;
397 // Don't do anything if key is before the first element.
398 if (m_compare (key, begin->first)) return;
400 // Is the first element the one we're deleting?
401 if (begin == last || m_compare (key, (begin+1)->first)) {
402 // Yes --- remember the payload for deletion.
403 // (Don't actually queue it for deletion until the pointers have
405 mapped_type todel = begin->second;
408 // Make the update visible to other threads.
409 // If we need to update both pointers, then clear m_begin first.
415 m_payloadDeleter->discard (todel);
419 // Need to make a new implementation object and copy data.
420 size_t capacity = m_impl->capacity();
421 auto new_impl = std::make_unique<Impl> (capacity);
422 value_type* new_begin = new_impl->data();
423 value_type* new_end = new_begin;
425 // Copy the data, skipping the object to be deleted.
426 while (begin < end-1 && !m_compare (key, (begin+1)->first)) {
427 *new_end++ = *begin++;
429 mapped_type todel = begin->second;
431 while (begin < end) {
432 *new_end++ = *begin++;
435 // Install the new implementation.
436 installImpl (std::move (new_impl), new_begin, new_end, ctx);
437 m_payloadDeleter->discard (todel);
442 * @brief Extend the range of the last entry of the map.
443 * @param newRange New range to use for the last entry.
444 * @param ctx Execution context.
446 * The range of the last entry in the map is updated to @c newRange.
447 * Does nothing if the map is empty.
448 * This will make a new version of implementation data.
450 * The semantics of what it means to extend a range are given by the
451 * @c extendRange method of the @c COMPARE object (see above).
453 * Returns -1 if there was an error; 1 if the last range was extended;
454 * and 0 if nothing was changed.
458 CONCURRENTRANGEMAP::extendLastRange (const RANGE& newRange,
459 const typename Updater_t::Context_t& ctx
460 /*= Updater_t::defaultContext()*/)
462 lock_t lock (m_mutex);
463 value_type* last = m_last;
467 RANGE extendedRange = last->first;
468 int flag = m_compare.extendRange (extendedRange, newRange);
470 return extendImpl (lock, extendedRange, ctx);
480 * @brief Update all range objects.
481 * @param rangeUpater Functional to call on each range object.
482 * @param ctx Execution context.
484 * This will iterate through the list of entries and call @c rangeUpdater
485 * on the @c RANGE part of each. Be careful: rangeUpdater must not
486 * change any part of the range which might affect the sorting
491 CONCURRENTRANGEMAP::updateRanges (std::function<void (RANGE&)> rangeUpdater,
492 const typename Updater_t::Context_t& ctx
493 /*= Updater_t::defaultContext()*/)
495 lock_t lock (m_mutex);
497 // Return if the container's empty.
498 value_type* last = m_last;
499 if (last == nullptr) return;
501 // Make a new implementation object and copy data.
502 size_t capacity = m_impl->capacity();
503 auto new_impl = std::make_unique<Impl> (capacity);
504 value_type* new_begin = new_impl->data();
505 value_type* new_end = new_begin;
506 value_type* begin = m_begin;
507 value_type* end = m_last+1;
508 new_end = std::copy (begin, end, new_end);
510 // Update ranges in the new copy.
511 for (value_type* v = new_begin; v != new_end; ++v) {
512 rangeUpdater (v->first);
515 // Install the new implementation.
516 installImpl (std::move (new_impl), new_begin, new_end, ctx);
521 * @brief Extend the range of the last entry of the map.
522 * @param Lock object.
523 * @param extendedRange New range to use for the last entry.
524 * @param ctx Execution context.
526 * Implementation of @c extendLastRange; see there for details.
527 * Must be called with the lock held.
531 CONCURRENTRANGEMAP::extendImpl (lock_t& /*lock*/,
532 const RANGE& extendedRange,
533 const typename Updater_t::Context_t& ctx)
535 // Return if the container's empty.
536 value_type* last = m_last;
537 if (last == nullptr) return -1;
539 // Make a new implementation object and copy data.
540 size_t capacity = m_impl->capacity();
541 auto new_impl = std::make_unique<Impl> (capacity);
542 value_type* new_begin = new_impl->data();
543 value_type* new_end = new_begin;
544 value_type* begin = m_begin;
545 value_type* end = m_last+1;
546 new_end = std::copy (begin, end, new_end);
548 // Update the range of the last entry.
549 (new_end-1)->first = extendedRange;
551 // Install the new implementation.
552 installImpl (std::move (new_impl), new_begin, new_end, ctx);
559 * @brief Remove unused entries from the front of the list.
560 * @param keys List of keys that may still be in use.
562 * @param trimall If true, then allow removing all elements in the container.
563 * Otherwise, stop when there's one left.
565 * We examine the objects in the container, starting with the earliest one.
566 * If none of the keys in @c keys match the range for this object, then
567 * it is removed from the container. We stop when we either find
568 * an object with a range matching a key in @c keys or (if trimall is false)
569 * when there is only one object left.
571 * Removed objects are queued for deletion once all slots have been
572 * marked as quiescent.
574 * The list @c keys MUST be sorted.
576 * Returns the number of objects that were removed.
580 CONCURRENTRANGEMAP::trim (const std::vector<key_query_type>& keys,
581 bool trimall /*= false*/)
585 lock_t lock (m_mutex);
587 // Return immediately if the container is empty.
588 value_type* last = m_last;
589 if (last == nullptr) return ndel;
591 value_type* pos = m_begin;
593 // If trimall is set, then we want to compare trimall against end=last+1.
594 // Otherwise, we compare against last in order to skip considering
598 // FIXME: Can use the position where we found the last one as a hint?
599 if (anyInRange (pos->first, keys)) {
600 // One of the keys matched the range of this object. Stop here.
604 // We're deleting the object now.
606 // Discard it. Be sure to adjust m_begin first.
607 mapped_type todel = pos->second;
610 // Removing the last entry. Be sure to clear m_begin first.
615 m_payloadDeleter->discard (todel);
624 * @brief Remove all entries in the container.
625 * Mostly for testing --- should not normally be used.
628 void CONCURRENTRANGEMAP::clear()
630 lock_t lock (m_mutex);
632 // Don't do anything if the container's already empty.
633 value_type* last = m_last;
634 if (last == nullptr) return;
636 value_type* begin = m_begin;
638 while (begin != last) {
639 mapped_type todel = begin->second;
642 m_payloadDeleter->discard (todel);
645 // Now have one element left. Be sure to clear m_begin first.
646 mapped_type todel = begin->second;
651 m_payloadDeleter->discard (todel);
656 * @brief Return the current number of elements in the map.
660 size_t CONCURRENTRANGEMAP::size() const
662 const_iterator last = m_last;
664 const_iterator begin = getBegin (last);
666 return last+1 - begin;
671 * @brief Test if the map is empty.
675 bool CONCURRENTRANGEMAP::empty() const
677 return m_last == nullptr;
682 * @brief Return the current capacity of the map.
686 size_t CONCURRENTRANGEMAP::capacity() const
688 return m_updater.get().capacity();
693 * @brief Return the number times an item was inserted into the map.
697 size_t CONCURRENTRANGEMAP::nInserts() const
704 * @brief Return the maximum size of the map.
708 size_t CONCURRENTRANGEMAP::maxSize() const
715 * @brief Return a range that can be used to iterate over the container.
719 typename CONCURRENTRANGEMAP::const_iterator_range
720 CONCURRENTRANGEMAP::range() const
722 const_iterator last = m_last;
723 if (!last) return const_iterator_range (nullptr, nullptr);
724 const_iterator begin = getBegin (last);
725 if (!last) return const_iterator_range (nullptr, nullptr);
726 return const_iterator_range (begin, last+1);
731 * @brief Called when this thread is no longer referencing anything
732 * from this container.
733 * @param ctx Execution context.
737 CONCURRENTRANGEMAP::quiescent (const typename Updater_t::Context_t& ctx /*= Updater_t::defaultContext()*/)
739 m_updater.quiescent (ctx);
740 m_payloadDeleter->quiescent (ctx);
745 * @brief Return the begin/last pointers.
746 * @param [inout] last Current value of last.
748 * Retrieve consistent values of the begin and last pointers.
749 * The last pointer should have already been fetched, and may be updated.
750 * Usage should be like this:
753 * const_iterator last = m_last;
754 * if (!last) return; // Container empty.
755 * const_iterator begin = getBegin (last);
756 * if (!last) return; // Container empty.
761 typename CONCURRENTRANGEMAP::const_iterator
762 CONCURRENTRANGEMAP::getBegin (const_iterator& last) const
764 // First fetch the begin pointer, then check that both there is not
765 // an update in progress and that the last pointer
766 // hasn't changed. In either case, we need to refetch both pointers.
767 // ABA isn't an issue here since the pointers go only in one direction,
768 // and if we allocate a new chunk, it will be in a disjoint
770 const_iterator begin;
773 if (begin && last == m_last) break;
782 * @brief Consistently update both the begin and last pointers.
783 * @param begin New begin pointer.
784 * @param end New end pointer.
789 CONCURRENTRANGEMAP::updatePointers (value_type* new_begin, value_type* new_end)
791 // Mark that there's an update in progress.
793 // Then update the last pointer.
794 if (new_begin == new_end) {
800 // Then set the begin pointer.
806 * @brief Test to see if any keys within @c keys match @c r.
807 * @brief r Range to test.
808 * @break keys List of keys to test. MUST be sorted.
812 CONCURRENTRANGEMAP::anyInRange (const key_type& r,
813 const std::vector<key_query_type>& keys) const
815 auto pos = std::lower_bound (keys.begin(), keys.end(), r, m_compare);
816 return pos != keys.end() && m_compare.inRange (*pos, r);
821 * @brief Install a new implementation instance and make it visible.
822 * @param new_impl The new instance.
823 * @param new_begin Begin pointer for the new instance.
824 * @param new_end End pointer for the new instance.
825 * (Usual STL meaning of end. If the instance is empty,
826 * then new_end should match new_begin.)
827 * @param ctx Execution context.
832 CONCURRENTRANGEMAP::installImpl (std::unique_ptr<Impl> new_impl,
833 value_type* new_begin,
835 const typename Updater_t::Context_t& ctx)
837 // Install the new implementation.
838 m_impl = new_impl.get();
840 // Make the update visible to other threads.
841 // Make sure not to add the old version to the garbage list before
842 // we've updated the pointers.
843 updatePointers (new_begin, new_end);
844 m_updater.update (std::move (new_impl), ctx);
848 #undef T_CONCURRENTRANGEMAP
849 #undef CONCURRENTRANGEMAP
852 } // namespace CxxUtils