2 Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
5 * @file CxxUtils/ConcurrentRangeMap.icc
6 * @author scott snyder <snyder@bnl.gov>
8 * @brief Map from range to payload object, allowing concurrent, lockless reads.
17 * @param delfcn Function to delete a payload object immediately.
19 template <class T, class CONTEXT>
20 IRangeMapPayloadDeleter<T, CONTEXT>::IRangeMapPayloadDeleter
21 (delete_function* delfcn)
28 * @brief Return a function to delete a payload object immediately.
30 template <class T, class CONTEXT>
32 typename IRangeMapPayloadDeleter<T, CONTEXT>::delete_function*
33 IRangeMapPayloadDeleter<T, CONTEXT>::delfcn() const
39 //*****************************************************************************
42 #define T_CONCURRENTRANGEMAP template <class RANGE, class KEY, class T, class COMPARE, template <class> class UPDATER> \
43 ATH_REQUIRES (detail::IsUpdater<UPDATER> && \
44 detail::IsConcurrentRangeCompare<COMPARE, RANGE, KEY, typename UPDATER<int>::Context_t>)
46 #define CONCURRENTRANGEMAP ConcurrentRangeMap<RANGE, KEY, T, COMPARE, UPDATER>
51 * @param capacity Size of the data vector to allocate.
54 CONCURRENTRANGEMAP::Impl::Impl (size_t capacity /*= 10*/)
61 * @brief Return a pointer to the start of the data vector.
64 typename CONCURRENTRANGEMAP::value_type*
65 CONCURRENTRANGEMAP::Impl::data()
72 * @brief Return the size of the current data vector.
76 CONCURRENTRANGEMAP::Impl::capacity() const
78 return m_data.capacity();
84 * @param updater Object used to manage memory
85 * (see comments at the start of the class).
86 * @param payloadDeleter Object for deleting payload objects.
87 * This is owned via a @c shared_ptr.
88 * @param capacity Initial capacity of the map.
89 * @param compare Comparison object.
92 CONCURRENTRANGEMAP::ConcurrentRangeMap (Updater_t&& updater,
93 std::shared_ptr<IPayloadDeleter> payloadDeleter,
94 size_t capacity /*= 16*/,
95 const COMPARE& compare /*= COMPARE()*/)
97 : m_updater (std::move (updater)),
99 m_payloadDeleter (payloadDeleter),
103 auto impl = std::make_unique<Impl> (capacity);
104 value_type* data = impl->data();
105 installImpl (std::move (impl),
107 Updater_t::defaultContext());
114 * Clean up any remaining payload objects.
117 CONCURRENTRANGEMAP::~ConcurrentRangeMap()
119 value_type* last = m_last;
121 delete_function* delfcn = m_payloadDeleter->delfcn();
122 for (value_type* p = m_begin; p <= m_last; ++p) {
131 * @brief Return a reference to the payload deleter object.
135 typename CONCURRENTRANGEMAP::IPayloadDeleter& CONCURRENTRANGEMAP::deleter()
137 return *m_payloadDeleter;
142 * @brief Return a reference to the payload deleter object.
146 const typename CONCURRENTRANGEMAP::IPayloadDeleter&
147 CONCURRENTRANGEMAP::deleter() const
149 return *m_payloadDeleter;
154 * @brief Search for the first item less than or equal to KEY.
155 * @param key The key to search for.
156 * @returns The value, or nullptr if not found.
160 typename CONCURRENTRANGEMAP::const_iterator
161 CONCURRENTRANGEMAP::find (const key_query_type& key) const
163 // Return right away if the map's empty;
164 const_iterator last = m_last;
165 if (!last) return nullptr;
167 // Check the last value.
168 if (!m_compare (key, last->first)) {
172 // Do a binary search to find the proper position.
173 const_iterator begin = getBegin (last);
174 if (!last) return nullptr;
175 const_iterator pos = std::upper_bound (begin, last+1, key,
176 [this](const key_query_type& key2,
178 { return m_compare (key2,v.first); } );
180 // Fail if it would be before the first value.
181 if (pos == begin) return nullptr;
189 * @brief Add a new element to the map.
190 * @param range Validity range for this element.
191 * @param ptr Payload for this element.
192 * @param tryExtend If true, then allow an existing range to be extended
194 * @param ctx Execution context.
196 * Returns SUCCESS if the new element was successfully inserted.
197 * Returns DUPLICATE if the range compared equal to an existing one.
198 * In that case, no new element is inserted (and @c ptr gets deleted).
199 * Returns EXTEND if the range of the last element was extended to @c range.
200 * This happens if @c tryExtend is true, @c range is equal
201 * to the range of the last element (according to @c m_compare),
202 * and the range can be extended according to @c extendRange.
203 * (This will generally mean that the start time of @c range
204 * matches the last range, and end time of @c range is after
205 * the end time of the last range.) In this case, again no
206 * new element is inserted and @c ptr is deleted.
207 * Returns OVERLAP if the range of the new element overlaps
208 * an existing element (the new element is still inserted).
211 typename CONCURRENTRANGEMAP::EmplaceResult
212 CONCURRENTRANGEMAP::emplace (const RANGE& range,
213 payload_unique_ptr ptr,
214 bool tryExtend /*= false*/,
215 const typename Updater_t::Context_t& ctx
216 /*= Updater_t::defaultContext()*/)
218 lock_t lock (m_mutex);
220 value_type* last = m_last;
221 value_type* begin = m_begin;
223 // Check if the element to be inserted is greater than all existing entries.
224 bool pastEnd = (!last || m_compare (last->first, range));
226 // See if we want to extend the last entry. Also check for duplicates.
227 if (last && !pastEnd) {
228 RANGE extendedRange = last->first;
229 int flag = m_compare.extendRange (extendedRange, range);
230 if (tryExtend && flag > 0 && extendImpl (lock, extendedRange, ctx) > 0) {
231 return EmplaceResult::EXTENDED;
234 return EmplaceResult::DUPLICATE;
238 // Can we add this to the end?
239 // There has to be room for another, and either the container must be empty,
240 // or the new element must be greater than the current last one.
241 value_type* end = last ? last+1 : begin;
242 if (pastEnd && end < m_impl->data() + m_impl->capacity())
244 // Yes, we can add it to the end.
245 // Check for overlap with the previous range.
246 EmplaceResult ret = EmplaceResult::SUCCESS;
247 RANGE newRange = range;
249 int flag = m_compare.overlap (ctx, (end-1)->first, newRange);
251 return EmplaceResult::DUPLICATE;
254 ret = EmplaceResult::OVERLAP;
258 // Copy the data to the container.
259 end->first = newRange;
260 end->second = ptr.release();
262 std::atomic_thread_fence (std::memory_order_seq_cst);
264 // Update the last pointer.
266 // Now the new element is visible to other threads.
268 m_maxSize = std::max (m_maxSize, static_cast<size_t> (end+1 - begin));
273 // No --- need to make a new implementation object and copy.
274 // Make the new one bigger, if needed.
275 int new_capacity = m_impl->capacity();
276 int old_size = end-begin;
277 if (old_size == 0) old_size = 1;
278 if (old_size > new_capacity/2) {
279 new_capacity = old_size*2;
282 EmplaceResult ret = EmplaceResult::SUCCESS;
284 // Allocate the new object.
285 auto new_impl = std::make_unique<Impl> (new_capacity);
286 value_type* new_begin = new_impl->data();
287 value_type* new_end = new_begin;
289 // Copy the data, adding the new item at the proper place.
290 // Separate out the case where the new item goes at the end,
291 // since we can do that faster.
293 // Check for overlap with the previous range.
294 RANGE newRange = range;
297 flag = m_compare.overlap (ctx, (end-1)->first, newRange);
300 ret = EmplaceResult::DUPLICATE;
304 ret = EmplaceResult::OVERLAP;
307 new_end = std::copy (begin, end, new_end);
308 new_end->first = newRange;
309 new_end->second = ptr.release();
315 // Otherwise we need to search for the proper place to insert the new entry.
316 RANGE newRange = range;
317 for (; begin < end; *new_end++ = *begin++) {
318 if (ptr && m_compare (newRange, begin->first)) {
319 // Check for overlap / duplicate
320 if (begin > m_begin) {
321 int flag = m_compare.overlap (ctx, (begin-1)->first, newRange);
323 ret = EmplaceResult::OVERLAP;
324 // The range start may have been adjusted forward. Make sure
325 // that we're still at the insertion point.
326 if (!m_compare (newRange, begin->first)) {
331 // Duplicate entry; fail. Everything allocated is held by unique_ptr,
332 // so should be cleaned up properly.
333 return EmplaceResult::DUPLICATE;
337 int flag = m_compare.overlap (ctx, begin->first, newRange);
339 ret = EmplaceResult::OVERLAP;
340 // The range start may have been adjusted forward. Make sure
341 // that we're still at the insertion point.
342 if (!m_compare (newRange, begin->first)) {
347 // Duplicate entry; fail. Everything allocated is held by unique_ptr,
348 // so should be cleaned up properly.
349 return EmplaceResult::DUPLICATE;
352 new_end->first = newRange;
353 new_end->second = ptr.release();
359 // Possible to get here if overlap() moved the start of the range
360 // forward past all existing ranges.
361 new_end->first = newRange;
362 new_end->second = ptr.release();
367 // Install the new implementation.
368 installImpl (std::move (new_impl), new_begin, new_end, ctx);
371 m_maxSize = std::max (m_maxSize, static_cast<size_t> (new_end - new_begin));
377 * @brief Erase the first item less than or equal to KEY.
378 * @param key The key to search erase.
379 * @param ctx Execution context.
383 CONCURRENTRANGEMAP::erase (const key_query_type& key,
384 const typename Updater_t::Context_t& ctx
385 /*= Updater_t::defaultContext()*/)
387 lock_t lock (m_mutex);
389 // Return if the container's empty.
390 value_type* last = m_last;
391 if (last == nullptr) return;
393 value_type* begin = m_begin;
394 value_type* end = last+1;
396 // Don't do anything if key is before the first element.
397 if (m_compare (key, begin->first)) return;
399 // Is the first element the one we're deleting?
400 if (begin == last || m_compare (key, (begin+1)->first)) {
401 // Yes --- remember the payload for deletion.
402 // (Don't actually queue it for deletion until the pointers have
404 mapped_type todel = begin->second;
407 // Make the update visible to other threads.
408 // If we need to update both pointers, then clear m_begin first.
414 m_payloadDeleter->discard (todel);
418 // Need to make a new implementation object and copy data.
419 size_t capacity = m_impl->capacity();
420 auto new_impl = std::make_unique<Impl> (capacity);
421 value_type* new_begin = new_impl->data();
422 value_type* new_end = new_begin;
424 // Copy the data, skipping the object to be deleted.
425 while (begin < end-1 && !m_compare (key, (begin+1)->first)) {
426 *new_end++ = *begin++;
428 mapped_type todel = begin->second;
430 while (begin < end) {
431 *new_end++ = *begin++;
434 // Install the new implementation.
435 installImpl (std::move (new_impl), new_begin, new_end, ctx);
436 m_payloadDeleter->discard (todel);
441 * @brief Extend the range of the last entry of the map.
442 * @param newRange New range to use for the last entry.
443 * @param ctx Execution context.
445 * The range of the last entry in the map is updated to @c newRange.
446 * Does nothing if the map is empty.
447 * This will make a new version of implementation data.
449 * The semantics of what it means to extend a range are given by the
450 * @c extendRange method of the @c COMPARE object (see above).
452 * Returns -1 if there was an error; 1 if the last range was extended;
453 * and 0 if nothing was changed.
457 CONCURRENTRANGEMAP::extendLastRange (const RANGE& newRange,
458 const typename Updater_t::Context_t& ctx
459 /*= Updater_t::defaultContext()*/)
461 lock_t lock (m_mutex);
462 value_type* last = m_last;
466 RANGE extendedRange = last->first;
467 int flag = m_compare.extendRange (extendedRange, newRange);
469 return extendImpl (lock, extendedRange, ctx);
479 * @brief Update all range objects.
480 * @param rangeUpater Functional to call on each range object.
481 * @param ctx Execution context.
483 * This will iterate through the list of entries and call @c rangeUpdater
484 * on the @c RANGE part of each. Be careful: rangeUpdater must not
485 * change any part of the range which might affect the sorting
490 CONCURRENTRANGEMAP::updateRanges (std::function<void (RANGE&)> rangeUpdater,
491 const typename Updater_t::Context_t& ctx
492 /*= Updater_t::defaultContext()*/)
494 lock_t lock (m_mutex);
496 // Return if the container's empty.
497 value_type* last = m_last;
498 if (last == nullptr) return;
500 // Make a new implementation object and copy data.
501 size_t capacity = m_impl->capacity();
502 auto new_impl = std::make_unique<Impl> (capacity);
503 value_type* new_begin = new_impl->data();
504 value_type* new_end = new_begin;
505 value_type* begin = m_begin;
506 value_type* end = m_last+1;
507 new_end = std::copy (begin, end, new_end);
509 // Update ranges in the new copy.
510 for (value_type* v = new_begin; v != new_end; ++v) {
511 rangeUpdater (v->first);
514 // Install the new implementation.
515 installImpl (std::move (new_impl), new_begin, new_end, ctx);
520 * @brief Extend the range of the last entry of the map.
521 * @param Lock object.
522 * @param extendedRange New range to use for the last entry.
523 * @param ctx Execution context.
525 * Implementation of @c extendLastRange; see there for details.
526 * Must be called with the lock held.
530 CONCURRENTRANGEMAP::extendImpl (lock_t& /*lock*/,
531 const RANGE& extendedRange,
532 const typename Updater_t::Context_t& ctx)
534 // Return if the container's empty.
535 value_type* last = m_last;
536 if (last == nullptr) return -1;
538 // Make a new implementation object and copy data.
539 size_t capacity = m_impl->capacity();
540 auto new_impl = std::make_unique<Impl> (capacity);
541 value_type* new_begin = new_impl->data();
542 value_type* new_end = new_begin;
543 value_type* begin = m_begin;
544 value_type* end = m_last+1;
545 new_end = std::copy (begin, end, new_end);
547 // Update the range of the last entry.
548 (new_end-1)->first = extendedRange;
550 // Install the new implementation.
551 installImpl (std::move (new_impl), new_begin, new_end, ctx);
558 * @brief Remove unused entries from the front of the list.
559 * @param keys List of keys that may still be in use.
561 * @param trimall If true, then allow removing all elements in the container.
562 * Otherwise, stop when there's one left.
564 * We examine the objects in the container, starting with the earliest one.
565 * If none of the keys in @c keys match the range for this object, then
566 * it is removed from the container. We stop when we either find
567 * an object with a range matching a key in @c keys or (if trimall is false)
568 * when there is only one object left.
570 * Removed objects are queued for deletion once all slots have been
571 * marked as quiescent.
573 * The list @c keys MUST be sorted.
575 * Returns the number of objects that were removed.
579 CONCURRENTRANGEMAP::trim (const std::vector<key_query_type>& keys,
580 bool trimall /*= false*/)
584 lock_t lock (m_mutex);
586 // Return immediately if the container is empty.
587 value_type* last = m_last;
588 if (last == nullptr) return ndel;
590 value_type* pos = m_begin;
592 // If trimall is set, then we want to compare trimall against end=last+1.
593 // Otherwise, we compare against last in order to skip considering
597 // FIXME: Can use the position where we found the last one as a hint?
598 if (anyInRange (pos->first, keys)) {
599 // One of the keys matched the range of this object. Stop here.
603 // We're deleting the object now.
605 // Discard it. Be sure to adjust m_begin first.
606 mapped_type todel = pos->second;
609 // Removing the last entry. Be sure to clear m_begin first.
614 m_payloadDeleter->discard (todel);
623 * @brief Remove all entries in the container.
624 * Mostly for testing --- should not normally be used.
627 void CONCURRENTRANGEMAP::clear()
629 lock_t lock (m_mutex);
631 // Don't do anything if the container's already empty.
632 value_type* last = m_last;
633 if (last == nullptr) return;
635 value_type* begin = m_begin;
637 while (begin != last) {
638 mapped_type todel = begin->second;
641 m_payloadDeleter->discard (todel);
644 // Now have one element left. Be sure to clear m_begin first.
645 mapped_type todel = begin->second;
650 m_payloadDeleter->discard (todel);
655 * @brief Return the current number of elements in the map.
659 size_t CONCURRENTRANGEMAP::size() const
661 const_iterator last = m_last;
663 const_iterator begin = getBegin (last);
665 return last+1 - begin;
670 * @brief Test if the map is empty.
674 bool CONCURRENTRANGEMAP::empty() const
676 return m_last == nullptr;
681 * @brief Return the current capacity of the map.
685 size_t CONCURRENTRANGEMAP::capacity() const
687 return m_updater.get().capacity();
692 * @brief Return the number times an item was inserted into the map.
696 size_t CONCURRENTRANGEMAP::nInserts() const
703 * @brief Return the maximum size of the map.
707 size_t CONCURRENTRANGEMAP::maxSize() const
714 * @brief Return a range that can be used to iterate over the container.
718 typename CONCURRENTRANGEMAP::const_iterator_range
719 CONCURRENTRANGEMAP::range() const
721 const_iterator last = m_last;
722 if (!last) return const_iterator_range (nullptr, nullptr);
723 const_iterator begin = getBegin (last);
724 if (!last) return const_iterator_range (nullptr, nullptr);
725 return const_iterator_range (begin, last+1);
730 * @brief Called when this thread is no longer referencing anything
731 * from this container.
732 * @param ctx Execution context.
736 CONCURRENTRANGEMAP::quiescent (const typename Updater_t::Context_t& ctx /*= Updater_t::defaultContext()*/)
738 m_updater.quiescent (ctx);
739 m_payloadDeleter->quiescent (ctx);
744 * @brief Return the begin/last pointers.
745 * @param [inout] last Current value of last.
747 * Retrieve consistent values of the begin and last pointers.
748 * The last pointer should have already been fetched, and may be updated.
749 * Usage should be like this:
752 * const_iterator last = m_last;
753 * if (!last) return; // Container empty.
754 * const_iterator begin = getBegin (last);
755 * if (!last) return; // Container empty.
760 typename CONCURRENTRANGEMAP::const_iterator
761 CONCURRENTRANGEMAP::getBegin (const_iterator& last) const
763 // First fetch the begin pointer, then check that both there is not
764 // an update in progress and that the last pointer
765 // hasn't changed. In either case, we need to refetch both pointers.
766 // ABA isn't an issue here since the pointers go only in one direction,
767 // and if we allocate a new chunk, it will be in a disjoint
769 const_iterator begin;
772 if (begin && last == m_last) break;
781 * @brief Consistently update both the begin and last pointers.
782 * @param begin New begin pointer.
783 * @param end New end pointer.
788 CONCURRENTRANGEMAP::updatePointers (value_type* new_begin, value_type* new_end)
790 // Mark that there's an update in progress.
792 // Then update the last pointer.
793 if (new_begin == new_end) {
799 // Then set the begin pointer.
805 * @brief Test to see if any keys within @c keys match @c r.
806 * @brief r Range to test.
807 * @break keys List of keys to test. MUST be sorted.
811 CONCURRENTRANGEMAP::anyInRange (const key_type& r,
812 const std::vector<key_query_type>& keys) const
814 auto pos = std::lower_bound (keys.begin(), keys.end(), r, m_compare);
815 return pos != keys.end() && m_compare.inRange (*pos, r);
820 * @brief Install a new implementation instance and make it visible.
821 * @param new_impl The new instance.
822 * @param new_begin Begin pointer for the new instance.
823 * @param new_end End pointer for the new instance.
824 * (Usual STL meaning of end. If the instance is empty,
825 * then new_end should match new_begin.)
826 * @param ctx Execution context.
831 CONCURRENTRANGEMAP::installImpl (std::unique_ptr<Impl> new_impl,
832 value_type* new_begin,
834 const typename Updater_t::Context_t& ctx)
836 // Install the new implementation.
837 m_impl = new_impl.get();
839 // Make the update visible to other threads.
840 // Make sure not to add the old version to the garbage list before
841 // we've updated the pointers.
842 updatePointers (new_begin, new_end);
843 m_updater.update (std::move (new_impl), ctx);
847 #undef T_CONCURRENTRANGEMAP
848 #undef CONCURRENTRANGEMAP
851 } // namespace CxxUtils