condition.cpp 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. // Copyright (C) 2001-2003
  2. // William E. Kempf
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. #include <iostream>
  7. #include <vector>
  8. #include <boost/utility.hpp>
  9. #include <boost/thread/condition_variable.hpp>
  10. #include <boost/thread/thread_only.hpp>
  11. #include "../test/remove_error_code_unused_warning.hpp"
  12. class bounded_buffer : private boost::noncopyable
  13. {
  14. public:
  15. typedef boost::unique_lock<boost::mutex> lock;
  16. bounded_buffer(int n) : boost::noncopyable(), begin(0), end(0), buffered(0), circular_buf(n) { }
  17. void send (int m) {
  18. lock lk(monitor);
  19. while (buffered == circular_buf.size())
  20. buffer_not_full.wait(lk);
  21. circular_buf[end] = m;
  22. end = (end+1) % circular_buf.size();
  23. ++buffered;
  24. buffer_not_empty.notify_one();
  25. }
  26. int receive() {
  27. lock lk(monitor);
  28. while (buffered == 0)
  29. buffer_not_empty.wait(lk);
  30. int i = circular_buf[begin];
  31. begin = (begin+1) % circular_buf.size();
  32. --buffered;
  33. buffer_not_full.notify_one();
  34. return i;
  35. }
  36. private:
  37. int begin, end;
  38. std::vector<int>::size_type buffered;
  39. std::vector<int> circular_buf;
  40. boost::condition_variable_any buffer_not_full, buffer_not_empty;
  41. boost::mutex monitor;
  42. };
  43. bounded_buffer buf(2);
  44. boost::mutex io_mutex;
  45. void sender() {
  46. int n = 0;
  47. while (n < 1000000) {
  48. buf.send(n);
  49. if(!(n%10000))
  50. {
  51. boost::unique_lock<boost::mutex> io_lock(io_mutex);
  52. std::cout << "sent: " << n << std::endl;
  53. }
  54. ++n;
  55. }
  56. buf.send(-1);
  57. }
  58. void receiver() {
  59. int n;
  60. do {
  61. n = buf.receive();
  62. if(!(n%10000))
  63. {
  64. boost::unique_lock<boost::mutex> io_lock(io_mutex);
  65. std::cout << "received: " << n << std::endl;
  66. }
  67. } while (n != -1); // -1 indicates end of buffer
  68. buf.send(-1);
  69. }
  70. int main(int, char*[])
  71. {
  72. boost::thread thrd1(&sender);
  73. boost::thread thrd2(&receiver);
  74. boost::thread thrd3(&receiver);
  75. boost::thread thrd4(&receiver);
  76. thrd1.join();
  77. thrd2.join();
  78. thrd3.join();
  79. thrd4.join();
  80. return 0;
  81. }