bounded_fifo_queue.hpp 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. /*
  2. * Copyright Andrey Semashev 2007 - 2015.
  3. * Distributed under the Boost Software License, Version 1.0.
  4. * (See accompanying file LICENSE_1_0.txt or copy at
  5. * http://www.boost.org/LICENSE_1_0.txt)
  6. */
  7. /*!
  8. * \file bounded_fifo_queue.hpp
  9. * \author Andrey Semashev
  10. * \date 04.01.2012
  11. *
  12. * The header contains implementation of bounded FIFO queueing strategy for
  13. * the asynchronous sink frontend.
  14. */
  15. #ifndef BOOST_LOG_SINKS_BOUNDED_FIFO_QUEUE_HPP_INCLUDED_
  16. #define BOOST_LOG_SINKS_BOUNDED_FIFO_QUEUE_HPP_INCLUDED_
  17. #include <boost/log/detail/config.hpp>
  18. #ifdef BOOST_HAS_PRAGMA_ONCE
  19. #pragma once
  20. #endif
  21. #if defined(BOOST_LOG_NO_THREADS)
  22. #error Boost.Log: This header content is only supported in multithreaded environment
  23. #endif
  24. #include <cstddef>
  25. #include <queue>
  26. #include <boost/thread/locks.hpp>
  27. #include <boost/thread/mutex.hpp>
  28. #include <boost/thread/condition_variable.hpp>
  29. #include <boost/log/core/record_view.hpp>
  30. #include <boost/log/detail/header.hpp>
  31. namespace boost {
  32. BOOST_LOG_OPEN_NAMESPACE
  33. namespace sinks {
  34. /*!
  35. * \brief Bounded FIFO log record queueing strategy
  36. *
  37. * The \c bounded_fifo_queue class is intended to be used with
  38. * the \c asynchronous_sink frontend as a log record queueing strategy.
  39. *
  40. * This strategy describes log record queueing logic.
  41. * The queue has a limited capacity, upon reaching which the enqueue operation will
  42. * invoke the overflow handling strategy specified in the \c OverflowStrategyT
  43. * template parameter to handle the situation. The library provides overflow handling
  44. * strategies for most common cases: \c drop_on_overflow will silently discard the log record,
  45. * and \c block_on_overflow will put the enqueueing thread to wait until there is space
  46. * in the queue.
  47. *
  48. * The log record queue imposes no ordering over the queued
  49. * elements aside from the order in which they are enqueued.
  50. */
  51. template< std::size_t MaxQueueSizeV, typename OverflowStrategyT >
  52. class bounded_fifo_queue :
  53. private OverflowStrategyT
  54. {
  55. private:
  56. typedef OverflowStrategyT overflow_strategy;
  57. typedef std::queue< record_view > queue_type;
  58. typedef boost::mutex mutex_type;
  59. private:
  60. //! Synchronization primitive
  61. mutex_type m_mutex;
  62. //! Condition to block the consuming thread on
  63. condition_variable m_cond;
  64. //! Log record queue
  65. queue_type m_queue;
  66. //! Interruption flag
  67. bool m_interruption_requested;
  68. protected:
  69. //! Default constructor
  70. bounded_fifo_queue() : m_interruption_requested(false)
  71. {
  72. }
  73. //! Initializing constructor
  74. template< typename ArgsT >
  75. explicit bounded_fifo_queue(ArgsT const&) : m_interruption_requested(false)
  76. {
  77. }
  78. //! Enqueues log record to the queue
  79. void enqueue(record_view const& rec)
  80. {
  81. unique_lock< mutex_type > lock(m_mutex);
  82. std::size_t size = m_queue.size();
  83. for (; size >= MaxQueueSizeV; size = m_queue.size())
  84. {
  85. if (!overflow_strategy::on_overflow(rec, lock))
  86. return;
  87. }
  88. m_queue.push(rec);
  89. if (size == 0)
  90. m_cond.notify_one();
  91. }
  92. //! Attempts to enqueue log record to the queue
  93. bool try_enqueue(record_view const& rec)
  94. {
  95. unique_lock< mutex_type > lock(m_mutex, try_to_lock);
  96. if (lock.owns_lock())
  97. {
  98. const std::size_t size = m_queue.size();
  99. // Do not invoke the bounding strategy in case of overflow as it may block
  100. if (size < MaxQueueSizeV)
  101. {
  102. m_queue.push(rec);
  103. if (size == 0)
  104. m_cond.notify_one();
  105. return true;
  106. }
  107. }
  108. return false;
  109. }
  110. //! Attempts to dequeue a log record ready for processing from the queue, does not block if the queue is empty
  111. bool try_dequeue_ready(record_view& rec)
  112. {
  113. return try_dequeue(rec);
  114. }
  115. //! Attempts to dequeue log record from the queue, does not block if the queue is empty
  116. bool try_dequeue(record_view& rec)
  117. {
  118. lock_guard< mutex_type > lock(m_mutex);
  119. const std::size_t size = m_queue.size();
  120. if (size > 0)
  121. {
  122. rec.swap(m_queue.front());
  123. m_queue.pop();
  124. overflow_strategy::on_queue_space_available();
  125. return true;
  126. }
  127. return false;
  128. }
  129. //! Dequeues log record from the queue, blocks if the queue is empty
  130. bool dequeue_ready(record_view& rec)
  131. {
  132. unique_lock< mutex_type > lock(m_mutex);
  133. while (!m_interruption_requested)
  134. {
  135. const std::size_t size = m_queue.size();
  136. if (size > 0)
  137. {
  138. rec.swap(m_queue.front());
  139. m_queue.pop();
  140. overflow_strategy::on_queue_space_available();
  141. return true;
  142. }
  143. else
  144. {
  145. m_cond.wait(lock);
  146. }
  147. }
  148. m_interruption_requested = false;
  149. return false;
  150. }
  151. //! Wakes a thread possibly blocked in the \c dequeue method
  152. void interrupt_dequeue()
  153. {
  154. lock_guard< mutex_type > lock(m_mutex);
  155. m_interruption_requested = true;
  156. overflow_strategy::interrupt();
  157. m_cond.notify_one();
  158. }
  159. };
  160. } // namespace sinks
  161. BOOST_LOG_CLOSE_NAMESPACE // namespace log
  162. } // namespace boost
  163. #include <boost/log/detail/footer.hpp>
  164. #endif // BOOST_LOG_SINKS_BOUNDED_FIFO_QUEUE_HPP_INCLUDED_