// Copyright (C) 2011-2013 Tim Blechmann // // Distributed under the Boost Software License, Version 1.0. (See // accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) #include #include #define BOOST_TEST_MAIN #ifdef BOOST_LOCKFREE_INCLUDE_TESTS #include #else #include #endif #include #include #include "test_helpers.hpp" #include "test_common.hpp" using namespace boost; using namespace boost::lockfree; using namespace std; #ifndef BOOST_LOCKFREE_STRESS_TEST static const boost::uint32_t nodes_per_thread = 100000; #else static const boost::uint32_t nodes_per_thread = 100000000; #endif struct spsc_queue_tester { spsc_queue > sf; boost::lockfree::detail::atomic spsc_queue_cnt, received_nodes; // In VxWorks one RTP just supports 65535 objects #ifndef __VXWORKS__ static_hashed_set working_set; #else static_hashed_set working_set; #endif spsc_queue_tester(void): spsc_queue_cnt(0), received_nodes(0) {} void add(void) { for (boost::uint32_t i = 0; i != nodes_per_thread; ++i) { int id = generate_id(); working_set.insert(id); while (sf.push(id) == false) {} ++spsc_queue_cnt; } running = false; } bool get_element(void) { int data; bool success = sf.pop(data); if (success) { ++received_nodes; --spsc_queue_cnt; bool erased = working_set.erase(data); assert(erased); return true; } else return false; } boost::lockfree::detail::atomic running; void get(void) { for(;;) { bool success = get_element(); if (!running && !success) break; } while ( get_element() ); } void run(void) { running = true; BOOST_REQUIRE(sf.empty()); boost::thread reader(boost::bind(&spsc_queue_tester::get, this)); boost::thread writer(boost::bind(&spsc_queue_tester::add, this)); cout << "reader and writer threads created" << endl; writer.join(); cout << "writer threads joined. waiting for readers to finish" << endl; reader.join(); BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread); BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0); BOOST_REQUIRE(sf.empty()); BOOST_REQUIRE(working_set.count_nodes() == 0); } }; BOOST_AUTO_TEST_CASE( spsc_queue_test_caching ) { boost::shared_ptr test1(new spsc_queue_tester); test1->run(); } struct spsc_queue_tester_buffering { spsc_queue > sf; boost::lockfree::detail::atomic spsc_queue_cnt; // In VxWorks one RTP just supports 65535 objects #ifndef __VXWORKS__ static_hashed_set working_set; #else static_hashed_set working_set; #endif boost::lockfree::detail::atomic received_nodes; spsc_queue_tester_buffering(void): spsc_queue_cnt(0), received_nodes(0) {} static const size_t buf_size = 5; void add(void) { boost::array input_buffer; for (boost::uint32_t i = 0; i != nodes_per_thread; i+=buf_size) { for (size_t i = 0; i != buf_size; ++i) { int id = generate_id(); working_set.insert(id); input_buffer[i] = id; } size_t pushed = 0; do { pushed += sf.push(input_buffer.c_array() + pushed, input_buffer.size() - pushed); } while (pushed != buf_size); spsc_queue_cnt+=buf_size; } running = false; } bool get_elements(void) { boost::array output_buffer; size_t popd = sf.pop(output_buffer.c_array(), output_buffer.size()); if (popd) { received_nodes += popd; spsc_queue_cnt -= popd; for (size_t i = 0; i != popd; ++i) { bool erased = working_set.erase(output_buffer[i]); assert(erased); } return true; } else return false; } boost::lockfree::detail::atomic running; void get(void) { for(;;) { bool success = get_elements(); if (!running && !success) break; } while ( get_elements() ); } void run(void) { running = true; boost::thread reader(boost::bind(&spsc_queue_tester_buffering::get, this)); boost::thread writer(boost::bind(&spsc_queue_tester_buffering::add, this)); cout << "reader and writer threads created" << endl; writer.join(); cout << "writer threads joined. waiting for readers to finish" << endl; reader.join(); BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread); BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0); BOOST_REQUIRE(sf.empty()); BOOST_REQUIRE(working_set.count_nodes() == 0); } }; BOOST_AUTO_TEST_CASE( spsc_queue_test_buffering ) { boost::shared_ptr test1(new spsc_queue_tester_buffering); test1->run(); }