pipeline.cpp 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. #include <boost/asio/associated_executor.hpp>
  2. #include <boost/asio/bind_executor.hpp>
  3. #include <boost/asio/execution_context.hpp>
  4. #include <boost/asio/post.hpp>
  5. #include <boost/asio/system_executor.hpp>
  6. #include <boost/asio/use_future.hpp>
  7. #include <condition_variable>
  8. #include <future>
  9. #include <memory>
  10. #include <mutex>
  11. #include <queue>
  12. #include <thread>
  13. #include <vector>
  14. #include <cctype>
  15. using boost::asio::execution_context;
  16. using boost::asio::executor_binder;
  17. using boost::asio::get_associated_executor;
  18. using boost::asio::post;
  19. using boost::asio::system_executor;
  20. using boost::asio::use_future;
  21. using boost::asio::use_service;
  22. // An executor that launches a new thread for each function submitted to it.
  23. // This class satisfies the Executor requirements.
  24. class thread_executor
  25. {
  26. private:
  27. // Service to track all threads started through a thread_executor.
  28. class thread_bag : public execution_context::service
  29. {
  30. public:
  31. typedef thread_bag key_type;
  32. explicit thread_bag(execution_context& ctx)
  33. : execution_context::service(ctx)
  34. {
  35. }
  36. void add_thread(std::thread&& t)
  37. {
  38. std::unique_lock<std::mutex> lock(mutex_);
  39. threads_.push_back(std::move(t));
  40. }
  41. private:
  42. virtual void shutdown()
  43. {
  44. for (auto& t : threads_)
  45. t.join();
  46. }
  47. std::mutex mutex_;
  48. std::vector<std::thread> threads_;
  49. };
  50. public:
  51. execution_context& context() const noexcept
  52. {
  53. return system_executor().context();
  54. }
  55. void on_work_started() const noexcept
  56. {
  57. // This executor doesn't count work.
  58. }
  59. void on_work_finished() const noexcept
  60. {
  61. // This executor doesn't count work.
  62. }
  63. template <class Func, class Alloc>
  64. void dispatch(Func&& f, const Alloc& a) const
  65. {
  66. post(std::forward<Func>(f), a);
  67. }
  68. template <class Func, class Alloc>
  69. void post(Func f, const Alloc&) const
  70. {
  71. thread_bag& bag = use_service<thread_bag>(context());
  72. bag.add_thread(std::thread(std::move(f)));
  73. }
  74. template <class Func, class Alloc>
  75. void defer(Func&& f, const Alloc& a) const
  76. {
  77. post(std::forward<Func>(f), a);
  78. }
  79. friend bool operator==(const thread_executor&,
  80. const thread_executor&) noexcept
  81. {
  82. return true;
  83. }
  84. friend bool operator!=(const thread_executor&,
  85. const thread_executor&) noexcept
  86. {
  87. return false;
  88. }
  89. };
  90. // Base class for all thread-safe queue implementations.
  91. class queue_impl_base
  92. {
  93. template <class> friend class queue_front;
  94. template <class> friend class queue_back;
  95. std::mutex mutex_;
  96. std::condition_variable condition_;
  97. bool stop_ = false;
  98. };
  99. // Underlying implementation of a thread-safe queue, shared between the
  100. // queue_front and queue_back classes.
  101. template <class T>
  102. class queue_impl : public queue_impl_base
  103. {
  104. template <class> friend class queue_front;
  105. template <class> friend class queue_back;
  106. std::queue<T> queue_;
  107. };
  108. // The front end of a queue between consecutive pipeline stages.
  109. template <class T>
  110. class queue_front
  111. {
  112. public:
  113. typedef T value_type;
  114. explicit queue_front(std::shared_ptr<queue_impl<T>> impl)
  115. : impl_(impl)
  116. {
  117. }
  118. void push(T t)
  119. {
  120. std::unique_lock<std::mutex> lock(impl_->mutex_);
  121. impl_->queue_.push(std::move(t));
  122. impl_->condition_.notify_one();
  123. }
  124. void stop()
  125. {
  126. std::unique_lock<std::mutex> lock(impl_->mutex_);
  127. impl_->stop_ = true;
  128. impl_->condition_.notify_one();
  129. }
  130. private:
  131. std::shared_ptr<queue_impl<T>> impl_;
  132. };
  133. // The back end of a queue between consecutive pipeline stages.
  134. template <class T>
  135. class queue_back
  136. {
  137. public:
  138. typedef T value_type;
  139. explicit queue_back(std::shared_ptr<queue_impl<T>> impl)
  140. : impl_(impl)
  141. {
  142. }
  143. bool pop(T& t)
  144. {
  145. std::unique_lock<std::mutex> lock(impl_->mutex_);
  146. while (impl_->queue_.empty() && !impl_->stop_)
  147. impl_->condition_.wait(lock);
  148. if (!impl_->queue_.empty())
  149. {
  150. t = impl_->queue_.front();
  151. impl_->queue_.pop();
  152. return true;
  153. }
  154. return false;
  155. }
  156. private:
  157. std::shared_ptr<queue_impl<T>> impl_;
  158. };
  159. // Launch the last stage in a pipeline.
  160. template <class T, class F>
  161. std::future<void> pipeline(queue_back<T> in, F f)
  162. {
  163. // Get the function's associated executor, defaulting to thread_executor.
  164. auto ex = get_associated_executor(f, thread_executor());
  165. // Run the function, and as we're the last stage return a future so that the
  166. // caller can wait for the pipeline to finish.
  167. return post(ex, use_future([in, f]() mutable { f(in); }));
  168. }
  169. // Launch an intermediate stage in a pipeline.
  170. template <class T, class F, class... Tail>
  171. std::future<void> pipeline(queue_back<T> in, F f, Tail... t)
  172. {
  173. // Determine the output queue type.
  174. typedef typename executor_binder<F, thread_executor>::second_argument_type::value_type output_value_type;
  175. // Create the output queue and its implementation.
  176. auto out_impl = std::make_shared<queue_impl<output_value_type>>();
  177. queue_front<output_value_type> out(out_impl);
  178. queue_back<output_value_type> next_in(out_impl);
  179. // Get the function's associated executor, defaulting to thread_executor.
  180. auto ex = get_associated_executor(f, thread_executor());
  181. // Run the function.
  182. post(ex, [in, out, f]() mutable
  183. {
  184. f(in, out);
  185. out.stop();
  186. });
  187. // Launch the rest of the pipeline.
  188. return pipeline(next_in, std::move(t)...);
  189. }
  190. // Launch the first stage in a pipeline.
  191. template <class F, class... Tail>
  192. std::future<void> pipeline(F f, Tail... t)
  193. {
  194. // Determine the output queue type.
  195. typedef typename executor_binder<F, thread_executor>::argument_type::value_type output_value_type;
  196. // Create the output queue and its implementation.
  197. auto out_impl = std::make_shared<queue_impl<output_value_type>>();
  198. queue_front<output_value_type> out(out_impl);
  199. queue_back<output_value_type> next_in(out_impl);
  200. // Get the function's associated executor, defaulting to thread_executor.
  201. auto ex = get_associated_executor(f, thread_executor());
  202. // Run the function.
  203. post(ex, [out, f]() mutable
  204. {
  205. f(out);
  206. out.stop();
  207. });
  208. // Launch the rest of the pipeline.
  209. return pipeline(next_in, std::move(t)...);
  210. }
  211. //------------------------------------------------------------------------------
  212. #include <boost/asio/thread_pool.hpp>
  213. #include <iostream>
  214. #include <string>
  215. using boost::asio::bind_executor;
  216. using boost::asio::thread_pool;
  217. void reader(queue_front<std::string> out)
  218. {
  219. std::string line;
  220. while (std::getline(std::cin, line))
  221. out.push(line);
  222. }
  223. void filter(queue_back<std::string> in, queue_front<std::string> out)
  224. {
  225. std::string line;
  226. while (in.pop(line))
  227. if (line.length() > 5)
  228. out.push(line);
  229. }
  230. void upper(queue_back<std::string> in, queue_front<std::string> out)
  231. {
  232. std::string line;
  233. while (in.pop(line))
  234. {
  235. std::string new_line;
  236. for (char c : line)
  237. new_line.push_back(std::toupper(c));
  238. out.push(new_line);
  239. }
  240. }
  241. void writer(queue_back<std::string> in)
  242. {
  243. std::size_t count = 0;
  244. std::string line;
  245. while (in.pop(line))
  246. std::cout << count++ << ": " << line << std::endl;
  247. }
  248. int main()
  249. {
  250. thread_pool pool;
  251. auto f = pipeline(reader, filter, bind_executor(pool, upper), writer);
  252. f.wait();
  253. }