123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398 |
- [/
- / Copyright (c) 2009 Helge Bahmann
- /
- / Distributed under the Boost Software License, Version 1.0. (See accompanying
- / file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
- /]
- [section:example_reference_counters Reference counting]
- The purpose of a ['reference counter] is to count the number
- of pointers to an object. The object can be destroyed as
- soon as the reference counter reaches zero.
- [section Implementation]
- [c++]
- #include <boost/intrusive_ptr.hpp>
- #include <boost/atomic.hpp>
- class X {
- public:
- typedef boost::intrusive_ptr<X> pointer;
- X() : refcount_(0) {}
- private:
- mutable boost::atomic<int> refcount_;
- friend void intrusive_ptr_add_ref(const X * x)
- {
- x->refcount_.fetch_add(1, boost::memory_order_relaxed);
- }
- friend void intrusive_ptr_release(const X * x)
- {
- if (x->refcount_.fetch_sub(1, boost::memory_order_release) == 1) {
- boost::atomic_thread_fence(boost::memory_order_acquire);
- delete x;
- }
- }
- };
- [endsect]
- [section Usage]
- [c++]
- X::pointer x = new X;
- [endsect]
- [section Discussion]
- Increasing the reference counter can always be done with
- [^memory_order_relaxed]: New references to an object can only
- be formed from an existing reference, and passing an existing
- reference from one thread to another must already provide any
- required synchronization.
- It is important to enforce any possible access to the object in
- one thread (through an existing reference) to ['happen before]
- deleting the object in a different thread. This is achieved
- by a "release" operation after dropping a reference (any
- access to the object through this reference must obviously
- happened before), and an "acquire" operation before
- deleting the object.
- It would be possible to use [^memory_order_acq_rel] for the
- [^fetch_sub] operation, but this results in unneeded "acquire"
- operations when the reference counter does not yet reach zero
- and may impose a performance penalty.
- [endsect]
- [endsect]
- [section:example_spinlock Spinlock]
- The purpose of a ['spin lock] is to prevent multiple threads
- from concurrently accessing a shared data structure. In contrast
- to a mutex, threads will busy-wait and waste CPU cycles instead
- of yielding the CPU to another thread. ['Do not use spinlocks
- unless you are certain that you understand the consequences.]
- [section Implementation]
- [c++]
- #include <boost/atomic.hpp>
- class spinlock {
- private:
- typedef enum {Locked, Unlocked} LockState;
- boost::atomic<LockState> state_;
- public:
- spinlock() : state_(Unlocked) {}
- void lock()
- {
- while (state_.exchange(Locked, boost::memory_order_acquire) == Locked) {
- /* busy-wait */
- }
- }
- void unlock()
- {
- state_.store(Unlocked, boost::memory_order_release);
- }
- };
- [endsect]
- [section Usage]
- [c++]
- spinlock s;
- s.lock();
- // access data structure here
- s.unlock();
- [endsect]
- [section Discussion]
- The purpose of the spinlock is to make sure that one access
- to the shared data structure always strictly "happens before"
- another. The usage of acquire/release in lock/unlock is required
- and sufficient to guarantee this ordering.
- It would be correct to write the "lock" operation in the following
- way:
- [c++]
- lock()
- {
- while (state_.exchange(Locked, boost::memory_order_relaxed) == Locked) {
- /* busy-wait */
- }
- atomic_thread_fence(boost::memory_order_acquire);
- }
- This "optimization" is however a) useless and b) may in fact hurt:
- a) Since the thread will be busily spinning on a blocked spinlock,
- it does not matter if it will waste the CPU cycles with just
- "exchange" operations or with both useless "exchange" and "acquire"
- operations. b) A tight "exchange" loop without any
- memory-synchronizing instruction introduced through an "acquire"
- operation will on some systems monopolize the memory subsystem
- and degrade the performance of other system components.
- [endsect]
- [endsect]
- [section:singleton Singleton with double-checked locking pattern]
- The purpose of the ['Singleton with double-checked locking pattern] is to ensure
- that at most one instance of a particular object is created.
- If one instance has been created already, access to the existing
- object should be as light-weight as possible.
- [section Implementation]
- [c++]
- #include <boost/atomic.hpp>
- #include <boost/thread/mutex.hpp>
- class X {
- public:
- static X * instance()
- {
- X * tmp = instance_.load(boost::memory_order_consume);
- if (!tmp) {
- boost::mutex::scoped_lock guard(instantiation_mutex);
- tmp = instance_.load(boost::memory_order_consume);
- if (!tmp) {
- tmp = new X;
- instance_.store(tmp, boost::memory_order_release);
- }
- }
- return tmp;
- }
- private:
- static boost::atomic<X *> instance_;
- static boost::mutex instantiation_mutex;
- };
- boost::atomic<X *> X::instance_(0);
- [endsect]
- [section Usage]
- [c++]
- X * x = X::instance();
- // dereference x
- [endsect]
- [section Discussion]
- The mutex makes sure that only one instance of the object is
- ever created. The [^instance] method must make sure that any
- dereference of the object strictly "happens after" creating
- the instance in another thread. The use of [^memory_order_release]
- after creating and initializing the object and [^memory_order_consume]
- before dereferencing the object provides this guarantee.
- It would be permissible to use [^memory_order_acquire] instead of
- [^memory_order_consume], but this provides a stronger guarantee
- than is required since only operations depending on the value of
- the pointer need to be ordered.
- [endsect]
- [endsect]
- [section:example_ringbuffer Wait-free ring buffer]
- A ['wait-free ring buffer] provides a mechanism for relaying objects
- from one single "producer" thread to one single "consumer" thread without
- any locks. The operations on this data structure are "wait-free" which
- means that each operation finishes within a constant number of steps.
- This makes this data structure suitable for use in hard real-time systems
- or for communication with interrupt/signal handlers.
- [section Implementation]
- [c++]
- #include <boost/atomic.hpp>
- template<typename T, size_t Size>
- class ringbuffer {
- public:
- ringbuffer() : head_(0), tail_(0) {}
- bool push(const T & value)
- {
- size_t head = head_.load(boost::memory_order_relaxed);
- size_t next_head = next(head);
- if (next_head == tail_.load(boost::memory_order_acquire))
- return false;
- ring_[head] = value;
- head_.store(next_head, boost::memory_order_release);
- return true;
- }
- bool pop(T & value)
- {
- size_t tail = tail_.load(boost::memory_order_relaxed);
- if (tail == head_.load(boost::memory_order_acquire))
- return false;
- value = ring_[tail];
- tail_.store(next(tail), boost::memory_order_release);
- return true;
- }
- private:
- size_t next(size_t current)
- {
- return (current + 1) % Size;
- }
- T ring_[Size];
- boost::atomic<size_t> head_, tail_;
- };
- [endsect]
- [section Usage]
- [c++]
- ringbuffer<int, 32> r;
- // try to insert an element
- if (r.push(42)) { /* succeeded */ }
- else { /* buffer full */ }
- // try to retrieve an element
- int value;
- if (r.pop(value)) { /* succeeded */ }
- else { /* buffer empty */ }
- [endsect]
- [section Discussion]
- The implementation makes sure that the ring indices do
- not "lap-around" each other to ensure that no elements
- are either lost or read twice.
- Furthermore it must guarantee that read-access to a
- particular object in [^pop] "happens after" it has been
- written in [^push]. This is achieved by writing [^head_ ]
- with "release" and reading it with "acquire". Conversely
- the implementation also ensures that read access to
- a particular ring element "happens before" before
- rewriting this element with a new value by accessing [^tail_]
- with appropriate ordering constraints.
- [endsect]
- [endsect]
- [section:mp_queue Wait-free multi-producer queue]
- The purpose of the ['wait-free multi-producer queue] is to allow
- an arbitrary number of producers to enqueue objects which are
- retrieved and processed in FIFO order by a single consumer.
- [section Implementation]
- [c++]
- template<typename T>
- class waitfree_queue {
- public:
- struct node {
- T data;
- node * next;
- };
- void push(const T &data)
- {
- node * n = new node;
- n->data = data;
- node * stale_head = head_.load(boost::memory_order_relaxed);
- do {
- n->next = stale_head;
- } while (!head_.compare_exchange_weak(stale_head, n, boost::memory_order_release));
- }
- node * pop_all(void)
- {
- T * last = pop_all_reverse(), * first = 0;
- while(last) {
- T * tmp = last;
- last = last->next;
- tmp->next = first;
- first = tmp;
- }
- return first;
- }
- waitfree_queue() : head_(0) {}
- // alternative interface if ordering is of no importance
- node * pop_all_reverse(void)
- {
- return head_.exchange(0, boost::memory_order_consume);
- }
- private:
- boost::atomic<node *> head_;
- };
- [endsect]
- [section Usage]
- [c++]
- waitfree_queue<int> q;
- // insert elements
- q.push(42);
- q.push(2);
- // pop elements
- waitfree_queue<int>::node * x = q.pop_all()
- while(x) {
- X * tmp = x;
- x = x->next;
- // process tmp->data, probably delete it afterwards
- delete tmp;
- }
- [endsect]
- [section Discussion]
- The implementation guarantees that all objects enqueued are
- processed in the order they were enqueued by building a singly-linked
- list of object in reverse processing order. The queue is atomically
- emptied by the consumer and brought into correct order.
- It must be guaranteed that any access to an object to be enqueued
- by the producer "happens before" any access by the consumer. This
- is assured by inserting objects into the list with ['release] and
- dequeuing them with ['consume] memory order. It is not
- necessary to use ['acquire] memory order in [^waitfree_queue::pop_all]
- because all operations involved depend on the value of
- the atomic pointer through dereference
- [endsect]
- [endsect]
|