producer_consumer_bounded.cpp 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. // (C) Copyright 2012 Howard Hinnant
  2. // (C) Copyright 2012 Vicente Botet
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. // adapted from the example given by Howard Hinnant in
  7. #include <boost/config.hpp>
  8. #define BOOST_THREAD_VERSION 4
  9. #define BOOST_THREAD_QUEUE_DEPRECATE_OLD
  10. #if ! defined BOOST_NO_CXX11_DECLTYPE
  11. #define BOOST_RESULT_OF_USE_DECLTYPE
  12. #endif
  13. //#define XXXX
  14. #include <iostream>
  15. #include <boost/thread/scoped_thread.hpp>
  16. #ifdef XXXX
  17. #include <boost/thread/externally_locked_stream.hpp>
  18. typedef boost::externally_locked_stream<std::ostream> the_ostream;
  19. #else
  20. typedef std::ostream the_ostream;
  21. typedef std::istream the_istream;
  22. #endif
  23. #include <boost/thread/sync_bounded_queue.hpp>
  24. void producer(the_ostream &/*mos*/, boost::sync_bounded_queue<int> & sbq)
  25. {
  26. using namespace boost;
  27. try {
  28. for(int i=0; ;++i)
  29. {
  30. sbq.push_back(i);
  31. //sbq << i;
  32. //mos << "push_back(" << i << ") "<< sbq.size()<<"\n";
  33. this_thread::sleep_for(chrono::milliseconds(200));
  34. }
  35. }
  36. catch(sync_queue_is_closed&)
  37. {
  38. //mos << "closed !!!\n";
  39. }
  40. catch(...)
  41. {
  42. //mos << "exception !!!\n";
  43. }
  44. }
  45. void consumer(the_ostream &/*mos*/, boost::sync_bounded_queue<int> & sbq)
  46. {
  47. using namespace boost;
  48. try {
  49. for(int i=0; ;++i)
  50. {
  51. int r;
  52. sbq.pull_front(r);
  53. //sbq >> r;
  54. //mos << i << " pull_front(" << r << ") "<< sbq.size()<<"\n";
  55. this_thread::sleep_for(chrono::milliseconds(250));
  56. }
  57. }
  58. catch(sync_queue_is_closed&)
  59. {
  60. //mos << "closed !!!\n";
  61. }
  62. catch(...)
  63. {
  64. //mos << "exception !!!\n";
  65. }
  66. }
  67. void consumer2(the_ostream &/*mos*/, boost::sync_bounded_queue<int> & sbq)
  68. {
  69. using namespace boost;
  70. try {
  71. for(int i=0; ;++i)
  72. {
  73. int r;
  74. queue_op_status st = sbq.try_pull_front(r);
  75. if (queue_op_status::closed == st) break;
  76. if (queue_op_status::success == st) {
  77. //mos << i << " pull(" << r << ")\n";
  78. }
  79. this_thread::sleep_for(chrono::milliseconds(250));
  80. }
  81. }
  82. catch(...)
  83. {
  84. //mos << "exception !!!\n";
  85. }
  86. }
  87. //void consumer3(the_ostream &mos, boost::sync_bounded_queue<int> & sbq)
  88. //{
  89. // using namespace boost;
  90. // bool closed=false;
  91. // try {
  92. // for(int i=0; ;++i)
  93. // {
  94. // int r;
  95. // queue_op_status res = sbq.wait_and_pull(r);
  96. // if (res==queue_op_status::closed) break;
  97. // //mos << i << " wait_and_pull(" << r << ")\n";
  98. // this_thread::sleep_for(chrono::milliseconds(250));
  99. // }
  100. // }
  101. // catch(...)
  102. // {
  103. // //mos << "exception !!!\n";
  104. // }
  105. //}
  106. int main()
  107. {
  108. using namespace boost;
  109. #ifdef XXXX
  110. recursive_mutex terminal_mutex;
  111. externally_locked_stream<std::ostream> mcerr(std::cerr, terminal_mutex);
  112. externally_locked_stream<std::ostream> mcout(std::cout, terminal_mutex);
  113. externally_locked_stream<std::istream> mcin(std::cin, terminal_mutex);
  114. #else
  115. the_ostream &mcerr = std::cout;
  116. the_ostream &mcout = std::cerr;
  117. //the_istream &mcin = std::cin;
  118. #endif
  119. sync_bounded_queue<int> sbq(10);
  120. {
  121. mcout << "begin of main" << std::endl;
  122. scoped_thread<> t11(boost::thread(producer, boost::ref(mcerr), boost::ref(sbq)));
  123. scoped_thread<> t12(boost::thread(producer, boost::ref(mcerr), boost::ref(sbq)));
  124. scoped_thread<> t2(boost::thread(consumer, boost::ref(mcout), boost::ref(sbq)));
  125. this_thread::sleep_for(chrono::seconds(1));
  126. sbq.close();
  127. mcout << "closed()" << std::endl;
  128. } // all threads joined here.
  129. mcout << "end of main" << std::endl;
  130. return 0;
  131. }