producer_consumer2.cpp 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. // (C) Copyright 2014 Vicente Botet
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. // adapted from the example given by Howard Hinnant in
  6. #include <boost/config.hpp>
  7. #define BOOST_THREAD_VERSION 4
  8. //#define BOOST_THREAD_QUEUE_DEPRECATE_OLD
  9. #if ! defined BOOST_NO_CXX11_DECLTYPE
  10. #define BOOST_RESULT_OF_USE_DECLTYPE
  11. #endif
  12. #include <iostream>
  13. #include <boost/thread/scoped_thread.hpp>
  14. #ifdef XXXX
  15. #include <boost/thread/externally_locked_stream.hpp>
  16. typedef boost::externally_locked_stream<std::ostream> the_ostream;
  17. #else
  18. typedef std::ostream the_ostream;
  19. typedef std::istream the_istream;
  20. #endif
  21. #include <boost/thread/concurrent_queues/sync_queue.hpp>
  22. #include <boost/thread/concurrent_queues/queue_adaptor.hpp>
  23. #include <boost/thread/concurrent_queues/queue_views.hpp>
  24. #include <boost/static_assert.hpp>
  25. #include <boost/type_traits.hpp>
  26. void producer(the_ostream &/*mos*/, boost::queue_back<int> sbq)
  27. {
  28. using namespace boost;
  29. try {
  30. for(int i=0; ;++i)
  31. {
  32. sbq.push(i);
  33. //sbq << i;
  34. //mos << "push(" << i << ") " << sbq.size() <<"\n";
  35. this_thread::sleep_for(chrono::milliseconds(200));
  36. }
  37. }
  38. catch(sync_queue_is_closed&)
  39. {
  40. //mos << "closed !!!\n";
  41. }
  42. catch(...)
  43. {
  44. //mos << "exception !!!\n";
  45. }
  46. }
  47. void consumer(
  48. the_ostream &/*mos*/,
  49. boost::queue_front<int> sbq)
  50. {
  51. using namespace boost;
  52. try {
  53. for(int i=0; ;++i)
  54. {
  55. int r;
  56. sbq.pull(r);
  57. //sbq >> r;
  58. //mos << i << " pull(" << r << ") " << sbq.size() <<"\n";
  59. this_thread::sleep_for(chrono::milliseconds(250));
  60. }
  61. }
  62. catch(sync_queue_is_closed&)
  63. {
  64. //mos << "closed !!!\n";
  65. }
  66. catch(...)
  67. {
  68. //mos << "exception !!!\n";
  69. }
  70. }
  71. void consumer2(the_ostream &/*mos*/, boost::queue_front<int> sbq)
  72. {
  73. using namespace boost;
  74. try {
  75. for(int i=0; ;++i)
  76. {
  77. int r;
  78. queue_op_status st = sbq.try_pull(r);
  79. if (queue_op_status::closed == st) break;
  80. if (queue_op_status::success == st) {
  81. //mos << i << " try_pull(" << r << ")\n";
  82. }
  83. this_thread::sleep_for(chrono::milliseconds(250));
  84. }
  85. }
  86. catch(...)
  87. {
  88. //mos << "exception !!!\n";
  89. }
  90. }
  91. void consumer3(the_ostream &/*mos*/, boost::queue_front<int> sbq)
  92. {
  93. using namespace boost;
  94. try {
  95. for(int i=0; ;++i)
  96. {
  97. int r;
  98. queue_op_status res = sbq.wait_pull(r);
  99. if (res==queue_op_status::closed) break;
  100. //mos << i << " wait_pull(" << r << ")\n";
  101. this_thread::sleep_for(chrono::milliseconds(250));
  102. }
  103. }
  104. catch(...)
  105. {
  106. //mos << "exception !!!\n";
  107. }
  108. }
  109. int main()
  110. {
  111. using namespace boost;
  112. #ifdef XXXX
  113. recursive_mutex terminal_mutex;
  114. externally_locked_stream<std::ostream> mcerr(std::cerr, terminal_mutex);
  115. externally_locked_stream<std::ostream> mcout(std::cout, terminal_mutex);
  116. externally_locked_stream<std::istream> mcin(std::cin, terminal_mutex);
  117. #else
  118. the_ostream &mcerr = std::cout;
  119. the_ostream &mcout = std::cerr;
  120. //the_istream &mcin = std::cin;
  121. #endif
  122. queue_adaptor<sync_queue<int> > sbq;
  123. {
  124. mcout << "begin of main" << std::endl;
  125. scoped_thread<> t11(boost::thread(producer, boost::ref(mcerr), concurrent::queue_back<int>(sbq)));
  126. scoped_thread<> t12(boost::thread(producer, boost::ref(mcerr), concurrent::queue_back<int>(sbq)));
  127. scoped_thread<> t2(boost::thread(consumer, boost::ref(mcout), concurrent::queue_front<int>(sbq)));
  128. this_thread::sleep_for(chrono::seconds(1));
  129. mcout << "closed()" << std::endl;
  130. sbq.close();
  131. mcout << "closed()" << std::endl;
  132. } // all threads joined here.
  133. mcout << "end of main" << std::endl;
  134. return 0;
  135. }