tq_multi_thread_pass.cpp 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. // Copyright (C) 2019 Austin Beer
  2. // Copyright (C) 2019 Vicente J. Botet Escriba
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. #include <boost/config.hpp>
  8. #if ! defined BOOST_NO_CXX11_DECLTYPE
  9. #define BOOST_RESULT_OF_USE_DECLTYPE
  10. #endif
  11. #define BOOST_THREAD_VERSION 4
  12. #define BOOST_THREAD_PROVIDES_EXECUTORS
  13. #include <boost/thread.hpp>
  14. #include <boost/chrono.hpp>
  15. #include <boost/thread/concurrent_queues/sync_timed_queue.hpp>
  16. #include <boost/core/lightweight_test.hpp>
  17. #include "../../../timming.hpp"
  18. using namespace boost::chrono;
  19. typedef boost::concurrent::sync_timed_queue<int> sync_tq;
  20. const int cnt = 5;
  21. void call_push(sync_tq* q, const steady_clock::time_point start)
  22. {
  23. // push elements onto the queue every 500 milliseconds but with a decreasing delay each time
  24. for (int i = 0; i < cnt; ++i)
  25. {
  26. boost::this_thread::sleep_until(start + milliseconds(i * 500));
  27. const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
  28. q->push(i, expected);
  29. }
  30. }
  31. void call_pull(sync_tq* q, const steady_clock::time_point start)
  32. {
  33. // pull elements off of the queue (earliest element first)
  34. for (int i = cnt - 1; i >= 0; --i)
  35. {
  36. int j;
  37. q->pull(j);
  38. BOOST_TEST_EQ(i, j);
  39. const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
  40. BOOST_TEST_GE(steady_clock::now(), expected - milliseconds(BOOST_THREAD_TEST_TIME_MS));
  41. BOOST_TEST_LE(steady_clock::now(), expected + milliseconds(BOOST_THREAD_TEST_TIME_MS));
  42. }
  43. }
  44. void call_pull_until(sync_tq* q, const steady_clock::time_point start)
  45. {
  46. // pull elements off of the queue (earliest element first)
  47. for (int i = cnt - 1; i >= 0; --i)
  48. {
  49. int j;
  50. q->pull_until(steady_clock::now() + hours(1), j);
  51. BOOST_TEST_EQ(i, j);
  52. const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
  53. BOOST_TEST_GE(steady_clock::now(), expected - milliseconds(BOOST_THREAD_TEST_TIME_MS));
  54. BOOST_TEST_LE(steady_clock::now(), expected + milliseconds(BOOST_THREAD_TEST_TIME_MS));
  55. }
  56. }
  57. void call_pull_for(sync_tq* q, const steady_clock::time_point start)
  58. {
  59. // pull elements off of the queue (earliest element first)
  60. for (int i = cnt - 1; i >= 0; --i)
  61. {
  62. int j;
  63. q->pull_for(hours(1), j);
  64. BOOST_TEST_EQ(i, j);
  65. const steady_clock::time_point expected = start + milliseconds(i * 500) + seconds(cnt - i);
  66. BOOST_TEST_GE(steady_clock::now(), expected - milliseconds(BOOST_THREAD_TEST_TIME_MS));
  67. BOOST_TEST_LE(steady_clock::now(), expected + milliseconds(BOOST_THREAD_TEST_TIME_MS));
  68. }
  69. }
  70. void test_push_while_pull()
  71. {
  72. sync_tq tq;
  73. BOOST_TEST(tq.empty());
  74. boost::thread_group tg;
  75. const steady_clock::time_point start = steady_clock::now();
  76. tg.create_thread(boost::bind(call_push, &tq, start));
  77. tg.create_thread(boost::bind(call_pull, &tq, start));
  78. tg.join_all();
  79. BOOST_TEST(tq.empty());
  80. }
  81. void test_push_while_pull_until()
  82. {
  83. sync_tq tq;
  84. BOOST_TEST(tq.empty());
  85. boost::thread_group tg;
  86. const steady_clock::time_point start = steady_clock::now();
  87. tg.create_thread(boost::bind(call_push, &tq, start));
  88. tg.create_thread(boost::bind(call_pull_until, &tq, start));
  89. tg.join_all();
  90. BOOST_TEST(tq.empty());
  91. }
  92. void test_push_while_pull_for()
  93. {
  94. sync_tq tq;
  95. BOOST_TEST(tq.empty());
  96. boost::thread_group tg;
  97. const steady_clock::time_point start = steady_clock::now();
  98. tg.create_thread(boost::bind(call_push, &tq, start));
  99. tg.create_thread(boost::bind(call_pull_for, &tq, start));
  100. tg.join_all();
  101. BOOST_TEST(tq.empty());
  102. }
  103. int main()
  104. {
  105. test_push_while_pull();
  106. test_push_while_pull_until();
  107. test_push_while_pull_for();
  108. return boost::report_errors();
  109. }