2   Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
 
    5  * @file CxxUtils/ConcurrentRangeMap.icc
 
    6  * @author scott snyder <snyder@bnl.gov>
 
    8  * @brief Map from range to payload object, allowing concurrent, lockless reads.
 
   17  * @param delfcn Function to delete a payload object immediately.
 
   19 template <class T, class CONTEXT>
 
   20 IRangeMapPayloadDeleter<T, CONTEXT>::IRangeMapPayloadDeleter
 
   21   (delete_function* delfcn)
 
   24   // cppcheck-suppress missingReturn; false positive
 
   29  * @brief Return a function to delete a payload object immediately.
 
   31 template <class T, class CONTEXT>
 
   33 typename IRangeMapPayloadDeleter<T, CONTEXT>::delete_function*
 
   34 IRangeMapPayloadDeleter<T, CONTEXT>::delfcn() const
 
   40 //*****************************************************************************
 
   43 #define T_CONCURRENTRANGEMAP template <class RANGE, class KEY, class T, class COMPARE, template <class> class UPDATER> \
 
   44   requires (detail::IsUpdater<UPDATER> &&                           \
 
   45             detail::IsConcurrentRangeCompare<COMPARE, RANGE, KEY, typename UPDATER<int>::Context_t>)
 
   47 #define CONCURRENTRANGEMAP ConcurrentRangeMap<RANGE, KEY, T, COMPARE, UPDATER>
 
   52  * @param capacity Size of the data vector to allocate.
 
   55 CONCURRENTRANGEMAP::Impl::Impl (size_t capacity /*= 10*/)
 
   62  * @brief Return a pointer to the start of the data vector.
 
   65 typename CONCURRENTRANGEMAP::value_type*
 
   66 CONCURRENTRANGEMAP::Impl::data()
 
   73  * @brief Return the size of the current data vector.
 
   77 CONCURRENTRANGEMAP::Impl::capacity() const
 
   79   return m_data.capacity();
 
   85  * @param updater Object used to manage memory
 
   86  *                (see comments at the start of the class).
 
   87  * @param payloadDeleter Object for deleting payload objects.
 
   88  *                       This is owned via a @c shared_ptr.
 
   89  * @param capacity Initial capacity of the map.
 
   90  * @param compare Comparison object.
 
   93 CONCURRENTRANGEMAP::ConcurrentRangeMap (Updater_t&& updater,
 
   94                                         std::shared_ptr<IPayloadDeleter> payloadDeleter,
 
   95                                         size_t capacity /*= 16*/,
 
   96                                         const COMPARE& compare /*= COMPARE()*/)
 
   98   : m_updater (std::move (updater)),
 
  100     m_payloadDeleter (std::move (payloadDeleter)),
 
  104   auto impl = std::make_unique<Impl> (capacity);
 
  105   value_type* data = impl->data();
 
  106   installImpl (std::move (impl),
 
  108                Updater_t::defaultContext());
 
  115  * Clean up any remaining payload objects.
 
  118 CONCURRENTRANGEMAP::~ConcurrentRangeMap()
 
  120   value_type* last = m_last;
 
  122     delete_function* delfcn = m_payloadDeleter->delfcn();
 
  123     for (value_type* p = m_begin; p <= m_last; ++p) {
 
  132  * @brief Return a reference to the payload deleter object.
 
  136 typename CONCURRENTRANGEMAP::IPayloadDeleter& CONCURRENTRANGEMAP::deleter()
 
  138   return *m_payloadDeleter;
 
  143  * @brief Return a reference to the payload deleter object.
 
  147 const typename CONCURRENTRANGEMAP::IPayloadDeleter&
 
  148 CONCURRENTRANGEMAP::deleter() const
 
  150   return *m_payloadDeleter;
 
  155  * @brief Search for the first item less than or equal to KEY.
 
  156  * @param key The key to search for.
 
  157  * @returns The value, or nullptr if not found.
 
  161 typename CONCURRENTRANGEMAP::const_iterator
 
  162 CONCURRENTRANGEMAP::find (const key_query_type& key) const
 
  164   // Return right away if the map's empty;
 
  165   const_iterator last = m_last;
 
  166   if (!last) return nullptr;
 
  168   // Check the last value.
 
  169   if (!m_compare (key, last->first)) {
 
  173   // Do a binary search to find the proper position.
 
  174   const_iterator begin = getBegin (last);
 
  175   if (!last) return nullptr;
 
  176   const_iterator pos = std::upper_bound (begin, last+1, key,
 
  177                                          [this](const key_query_type& key2,
 
  179                                          { return m_compare (key2,v.first); } );
 
  181   // Fail if it would be before the first value.
 
  182   if (pos == begin) return nullptr;
 
  190  * @brief Add a new element to the map.
 
  191  * @param range Validity range for this element.
 
  192  * @param ptr Payload for this element.
 
  193  * @param tryExtend If true, then allow an existing range to be extended
 
  195  * @param ctx Execution context.
 
  197  * Returns SUCCESS if the new element was successfully inserted.
 
  198  * Returns DUPLICATE if the range compared equal to an existing one.
 
  199  *         In that case, no new element is inserted (and @c ptr gets deleted).
 
  200  * Returns EXTEND if the range of the last element was extended to @c range.
 
  201  *         This happens if @c tryExtend is true, @c range is equal
 
  202  *         to the range of the last element (according to @c m_compare),
 
  203  *         and the range can be extended according to @c extendRange.
 
  204  *         (This will generally mean that the start time of @c range
 
  205  *         matches the last range, and end time of @c range is after
 
  206  *         the end time of the last range.)  In this case, again no
 
  207  *         new element is inserted and @c ptr is deleted.
 
  208  * Returns OVERLAP if the range of the new element overlaps
 
  209  *         an existing element (the new element is still inserted).
 
  212 typename CONCURRENTRANGEMAP::EmplaceResult
 
  213 CONCURRENTRANGEMAP::emplace (const RANGE& range,
 
  214                              payload_unique_ptr ptr,
 
  215                              bool tryExtend /*= false*/,
 
  216                              const typename Updater_t::Context_t& ctx
 
  217                                /*= Updater_t::defaultContext()*/)
 
  219   lock_t lock (m_mutex);
 
  221   value_type* last = m_last;
 
  222   value_type* begin = m_begin;
 
  224   // Check if the element to be inserted is greater than all existing entries.
 
  225   bool pastEnd = (!last || m_compare (last->first, range));
 
  227   // See if we want to extend the last entry.  Also check for duplicates.
 
  228   if (last && !pastEnd) {
 
  229     RANGE extendedRange = last->first;
 
  230     int flag = m_compare.extendRange (extendedRange, range);
 
  231     if (tryExtend && flag > 0 && extendImpl (lock, extendedRange, ctx) > 0) {
 
  232       return EmplaceResult::EXTENDED;
 
  235       return EmplaceResult::DUPLICATE;
 
  239   // Can we add this to the end?
 
  240   // There has to be room for another, and either the container must be empty,
 
  241   // or the new element must be greater than the current last one.
 
  242   value_type* end = last ? last+1 : begin;
 
  243   if (pastEnd && end < m_impl->data() + m_impl->capacity())
 
  245     // Yes, we can add it to the end.
 
  246     // Check for overlap with the previous range.
 
  247     EmplaceResult ret = EmplaceResult::SUCCESS;
 
  248     RANGE newRange = range;
 
  250       int flag = m_compare.overlap (ctx, (end-1)->first, newRange);
 
  252         return EmplaceResult::DUPLICATE;
 
  255         ret = EmplaceResult::OVERLAP;
 
  259     // Copy the data to the container.
 
  260     end->first = newRange;
 
  261     end->second = ptr.release();
 
  263     std::atomic_thread_fence (std::memory_order_seq_cst);
 
  265     // Update the last pointer.
 
  267     // Now the new element is visible to other threads.
 
  269     m_maxSize = std::max (m_maxSize, static_cast<size_t> (end+1 - begin));
 
  274   // No --- need to make a new implementation object and copy.
 
  275   // Make the new one bigger, if needed.
 
  276   int new_capacity = m_impl->capacity();
 
  277   int old_size = end-begin;
 
  278   if (old_size == 0) old_size = 1;
 
  279   if (old_size > new_capacity/2) {
 
  280     new_capacity = old_size*2;
 
  283   EmplaceResult ret = EmplaceResult::SUCCESS;
 
  285   // Allocate the new object.
 
  286   auto new_impl = std::make_unique<Impl> (new_capacity);
 
  287   value_type* new_begin = new_impl->data();
 
  288   value_type* new_end = new_begin;
 
  290   // Copy the data, adding the new item at the proper place.
 
  291   // Separate out the case where the new item goes at the end,
 
  292   // since we can do that faster.
 
  294     // Check for overlap with the previous range.
 
  295     RANGE newRange = range;
 
  298       flag = m_compare.overlap (ctx, (end-1)->first, newRange);
 
  301       ret = EmplaceResult::DUPLICATE;
 
  305         ret = EmplaceResult::OVERLAP;
 
  308       new_end = std::copy (begin, end, new_end);
 
  309       new_end->first = newRange;
 
  310       new_end->second = ptr.release();
 
  316     // Otherwise we need to search for the proper place to insert the new entry.
 
  317     RANGE newRange = range;
 
  318     for (; begin < end; *new_end++ = *begin++) {
 
  319       if (ptr && m_compare (newRange, begin->first)) {
 
  320         // Check for overlap / duplicate
 
  321         if (begin > m_begin) {
 
  322           int flag = m_compare.overlap (ctx, (begin-1)->first, newRange);
 
  324             ret = EmplaceResult::OVERLAP;
 
  325             // The range start may have been adjusted forward.  Make sure
 
  326             // that we're still at the insertion point.
 
  327             if (!m_compare (newRange, begin->first)) {
 
  332             // Duplicate entry; fail.  Everything allocated is held by unique_ptr,
 
  333             // so should be cleaned up properly.
 
  334             return EmplaceResult::DUPLICATE;
 
  338         int flag = m_compare.overlap (ctx, begin->first, newRange);
 
  340           ret = EmplaceResult::OVERLAP;
 
  341           // The range start may have been adjusted forward.  Make sure
 
  342           // that we're still at the insertion point.
 
  343           if (!m_compare (newRange, begin->first)) {
 
  348           // Duplicate entry; fail.  Everything allocated is held by unique_ptr,
 
  349           // so should be cleaned up properly.
 
  350           return EmplaceResult::DUPLICATE;
 
  353         new_end->first = newRange;
 
  354         new_end->second = ptr.release();
 
  360       // Possible to get here if overlap() moved the start of the range
 
  361       // forward past all existing ranges.
 
  362       new_end->first = std::move(newRange);
 
  363       new_end->second = ptr.release();
 
  368   // Install the new implementation.
 
  369   installImpl (std::move (new_impl), new_begin, new_end, ctx);
 
  372   m_maxSize = std::max (m_maxSize, static_cast<size_t> (new_end - new_begin));
 
  378  * @brief Erase the first item less than or equal to KEY.
 
  379  * @param key The key to search erase.
 
  380  * @param ctx Execution context.
 
  384 CONCURRENTRANGEMAP::erase (const key_query_type& key,
 
  385                            const typename Updater_t::Context_t& ctx
 
  386                              /*= Updater_t::defaultContext()*/)
 
  388   lock_t lock (m_mutex);
 
  390   // Return if the container's empty.
 
  391   value_type* last = m_last;
 
  392   if (last == nullptr) return;
 
  394   value_type* begin = m_begin;
 
  395   value_type* end = last+1;
 
  397   // Don't do anything if key is before the first element.
 
  398   if (m_compare (key, begin->first)) return;
 
  400   // Is the first element the one we're deleting?
 
  401   if (begin == last || m_compare (key, (begin+1)->first)) {
 
  402     // Yes --- remember the payload for deletion.
 
  403     // (Don't actually queue it for deletion until the pointers have
 
  405     mapped_type todel = begin->second;
 
  408     // Make the update visible to other threads.
 
  409     // If we need to update both pointers, then clear m_begin first.
 
  415     m_payloadDeleter->discard (todel);
 
  419   // Need to make a new implementation object and copy data.
 
  420   size_t capacity = m_impl->capacity();
 
  421   auto new_impl = std::make_unique<Impl> (capacity);
 
  422   value_type* new_begin = new_impl->data();
 
  423   value_type* new_end = new_begin;
 
  425   // Copy the data, skipping the object to be deleted.
 
  426   while (begin < end-1 && !m_compare (key, (begin+1)->first)) {
 
  427     *new_end++ = *begin++;
 
  429   mapped_type todel = begin->second;
 
  431   while (begin < end) {
 
  432     *new_end++ = *begin++;
 
  435   // Install the new implementation.
 
  436   installImpl (std::move (new_impl), new_begin, new_end, ctx);
 
  437   m_payloadDeleter->discard (todel);
 
  442  * @brief Extend the range of the last entry of the map.
 
  443  * @param newRange New range to use for the last entry.
 
  444  * @param ctx Execution context.
 
  446  * The range of the last entry in the map is updated to @c newRange.
 
  447  * Does nothing if the map is empty.
 
  448  * This will make a new version of implementation data.
 
  450  * The semantics of what it means to extend a range are given by the
 
  451  * @c extendRange method of the @c COMPARE object (see above).
 
  453  * Returns -1 if there was an error; 1 if the last range was extended;
 
  454  * and 0 if nothing was changed.
 
  458 CONCURRENTRANGEMAP::extendLastRange (const RANGE& newRange,
 
  459                                      const typename Updater_t::Context_t& ctx
 
  460                                      /*= Updater_t::defaultContext()*/)
 
  462   lock_t lock (m_mutex);
 
  463   value_type* last = m_last;
 
  467   RANGE extendedRange = last->first;
 
  468   int flag = m_compare.extendRange (extendedRange, newRange);
 
  470     return extendImpl (lock, extendedRange, ctx);
 
  480  * @brief Update all range objects.
 
  481  * @param rangeUpater Functional to call on each range object.
 
  482  * @param ctx Execution context.
 
  484  * This will iterate through the list of entries and call @c rangeUpdater
 
  485  * on the @c RANGE part of each.  Be careful: rangeUpdater must not
 
  486  * change any part of the range which might affect the sorting
 
  491 CONCURRENTRANGEMAP::updateRanges (std::function<void (RANGE&)> rangeUpdater,
 
  492                                   const typename Updater_t::Context_t& ctx
 
  493                                   /*= Updater_t::defaultContext()*/)
 
  495   lock_t lock (m_mutex);
 
  497   // Return if the container's empty.
 
  498   value_type* last = m_last;
 
  499   if (last == nullptr) return;
 
  501   // Make a new implementation object and copy data.
 
  502   size_t capacity = m_impl->capacity();
 
  503   auto new_impl = std::make_unique<Impl> (capacity);
 
  504   value_type* new_begin = new_impl->data();
 
  505   value_type* new_end = new_begin;
 
  506   value_type* begin = m_begin;
 
  507   value_type* end = m_last+1;
 
  508   new_end = std::copy (begin, end, new_end);
 
  510   // Update ranges in the new copy.
 
  511   for (value_type* v = new_begin; v != new_end; ++v) {
 
  512     rangeUpdater (v->first);
 
  515   // Install the new implementation.
 
  516   installImpl (std::move (new_impl), new_begin, new_end, ctx);
 
  521  * @brief Extend the range of the last entry of the map.
 
  522  * @param Lock object.
 
  523  * @param extendedRange New range to use for the last entry.
 
  524  * @param ctx Execution context.
 
  526  * Implementation of @c extendLastRange; see there for details.
 
  527  * Must be called with the lock held.
 
  531 CONCURRENTRANGEMAP::extendImpl (lock_t& /*lock*/,
 
  532                                 const RANGE& extendedRange,
 
  533                                 const typename Updater_t::Context_t& ctx)
 
  535   // Return if the container's empty.
 
  536   value_type* last = m_last;
 
  537   if (last == nullptr) return -1;
 
  539   // Make a new implementation object and copy data.
 
  540   size_t capacity = m_impl->capacity();
 
  541   auto new_impl = std::make_unique<Impl> (capacity);
 
  542   value_type* new_begin = new_impl->data();
 
  543   value_type* new_end = new_begin;
 
  544   value_type* begin = m_begin;
 
  545   value_type* end = m_last+1;
 
  546   new_end = std::copy (begin, end, new_end);
 
  548   // Update the range of the last entry.
 
  549   (new_end-1)->first = extendedRange;
 
  551   // Install the new implementation.
 
  552   installImpl (std::move (new_impl), new_begin, new_end, ctx);
 
  559  * @brief Remove unused entries from the front of the list.
 
  560  * @param keys List of keys that may still be in use.
 
  562  * @param trimall If true, then allow removing all elements in the container.
 
  563  *                Otherwise, stop when there's one left.
 
  565  * We examine the objects in the container, starting with the earliest one.
 
  566  * If none of the keys in @c keys match the range for this object, then
 
  567  * it is removed from the container.  We stop when we either find
 
  568  * an object with a range matching a key in @c keys or (if trimall is false)
 
  569  * when there is only one object left.
 
  571  * Removed objects are queued for deletion once all slots have been
 
  572  * marked as quiescent.
 
  574  * The list @c keys MUST be sorted.
 
  576  * Returns the number of objects that were removed.
 
  580 CONCURRENTRANGEMAP::trim (const std::vector<key_query_type>& keys,
 
  581                           bool trimall /*= false*/)
 
  585   lock_t lock (m_mutex);
 
  587   // Return immediately if the container is empty.
 
  588   value_type* last = m_last;
 
  589   if (last == nullptr) return ndel;
 
  591   value_type* pos = m_begin;
 
  593   // If trimall is set, then we want to compare trimall against end=last+1.
 
  594   // Otherwise, we compare against last in order to skip considering
 
  598     // FIXME: Can use the position where we found the last one as a hint?
 
  599     if (anyInRange (pos->first, keys)) {
 
  600       // One of the keys matched the range of this object.  Stop here.
 
  604     // We're deleting the object now.
 
  606     // Discard it.  Be sure to adjust m_begin first.
 
  607     mapped_type todel = pos->second;
 
  610       // Removing the last entry.  Be sure to clear m_begin first.
 
  615     m_payloadDeleter->discard (todel);
 
  624  * @brief Remove all entries in the container.
 
  625  *        Mostly for testing --- should not normally be used.
 
  628 void CONCURRENTRANGEMAP::clear()
 
  630   lock_t lock (m_mutex);
 
  632   // Don't do anything if the container's already empty.
 
  633   value_type* last = m_last;
 
  634   if (last == nullptr) return;
 
  636   value_type* begin = m_begin; 
 
  638   while (begin != last) {
 
  639     mapped_type todel = begin->second;
 
  642     m_payloadDeleter->discard (todel);
 
  645   // Now have one element left.  Be sure to clear m_begin first.
 
  646   mapped_type todel = begin->second;
 
  651   m_payloadDeleter->discard (todel);
 
  656  * @brief Return the current number of elements in the map.
 
  660 size_t CONCURRENTRANGEMAP::size() const
 
  662   const_iterator last = m_last;
 
  664   const_iterator begin = getBegin (last);
 
  666   return last+1 - begin;
 
  671  * @brief Test if the map is empty.
 
  675 bool CONCURRENTRANGEMAP::empty() const
 
  677   return m_last == nullptr;
 
  682  * @brief Return the current capacity of the map.
 
  686 size_t CONCURRENTRANGEMAP::capacity() const
 
  688   return m_updater.get().capacity();
 
  693  * @brief Return the number times an item was inserted into the map.
 
  697 size_t CONCURRENTRANGEMAP::nInserts() const
 
  704  * @brief Return the maximum size of the map.
 
  708 size_t CONCURRENTRANGEMAP::maxSize() const
 
  715  * @brief Return a range that can be used to iterate over the container.
 
  719 typename CONCURRENTRANGEMAP::const_iterator_range
 
  720 CONCURRENTRANGEMAP::range() const
 
  722   const_iterator last = m_last;
 
  723   if (!last) return const_iterator_range (nullptr, nullptr);
 
  724   const_iterator begin = getBegin (last);
 
  725   if (!last) return const_iterator_range (nullptr, nullptr);
 
  726   return const_iterator_range (begin, last+1);
 
  731  * @brief Called when this thread is no longer referencing anything
 
  732  *        from this container.
 
  733  * @param ctx Execution context.
 
  737 CONCURRENTRANGEMAP::quiescent (const typename Updater_t::Context_t& ctx /*= Updater_t::defaultContext()*/)
 
  739   m_updater.quiescent (ctx);
 
  740   m_payloadDeleter->quiescent (ctx);
 
  745  * @brief Return the begin/last pointers.
 
  746  * @param [inout] last Current value of last.
 
  748  * Retrieve consistent values of the begin and last pointers.
 
  749  * The last pointer should have already been fetched, and may be updated.
 
  750  * Usage should be like this:
 
  753  *  const_iterator last = m_last;
 
  754  *  if (!last) return;  // Container empty.
 
  755  *  const_iterator begin = getBegin (last);
 
  756  *  if (!last) return;  // Container empty.
 
  761 typename CONCURRENTRANGEMAP::const_iterator
 
  762 CONCURRENTRANGEMAP::getBegin (const_iterator& last) const
 
  764   // First fetch the begin pointer, then check that both there is not
 
  765   // an update in progress and that the last pointer
 
  766   // hasn't changed.  In either case, we need to refetch both pointers.
 
  767   // ABA isn't an issue here since the pointers go only in one direction,
 
  768   // and if we allocate a new chunk, it will be in a disjoint
 
  770   const_iterator begin;
 
  773     if (begin && last == m_last) break;
 
  782  * @brief Consistently update both the begin and last pointers.
 
  783  * @param begin New begin pointer.
 
  784  * @param end New end pointer.
 
  789 CONCURRENTRANGEMAP::updatePointers (value_type* new_begin, value_type* new_end)
 
  791   // Mark that there's an update in progress.
 
  793   // Then update the last pointer.
 
  794   if (new_begin == new_end) {
 
  800   // Then set the begin pointer.
 
  806  * @brief Test to see if any keys within @c keys match @c r.
 
  807  * @brief r Range to test.
 
  808  * @break keys List of keys to test.  MUST be sorted.
 
  812 CONCURRENTRANGEMAP::anyInRange (const key_type& r,
 
  813                                 const std::vector<key_query_type>& keys) const
 
  815   auto pos = std::lower_bound (keys.begin(), keys.end(), r, m_compare);
 
  816   return pos != keys.end() && m_compare.inRange (*pos, r);
 
  821  * @brief Install a new implementation instance and make it visible.
 
  822  * @param new_impl The new instance.
 
  823  * @param new_begin Begin pointer for the new instance.
 
  824  * @param new_end End pointer for the new instance.
 
  825  *                (Usual STL meaning of end.  If the instance is empty,
 
  826  *                then new_end should match new_begin.)
 
  827  * @param ctx Execution context.
 
  832 CONCURRENTRANGEMAP::installImpl (std::unique_ptr<Impl> new_impl,
 
  833                                  value_type* new_begin,
 
  835                                  const typename Updater_t::Context_t& ctx)
 
  837   // Install the new implementation.
 
  838   m_impl = new_impl.get();
 
  840   // Make the update visible to other threads.
 
  841   // Make sure not to add the old version to the garbage list before
 
  842   // we've updated the pointers.
 
  843   updatePointers (new_begin, new_end);
 
  844   m_updater.update (std::move (new_impl), ctx);
 
  848 #undef T_CONCURRENTRANGEMAP
 
  849 #undef CONCURRENTRANGEMAP
 
  852 } // namespace CxxUtils