message_queue_test.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. //////////////////////////////////////////////////////////////////////////////
  2. //
  3. // (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost
  4. // Software License, Version 1.0. (See accompanying file
  5. // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // See http://www.boost.org/libs/interprocess for documentation.
  8. //
  9. //////////////////////////////////////////////////////////////////////////////
  10. #include <boost/interprocess/detail/config_begin.hpp>
  11. #include <boost/interprocess/ipc/message_queue.hpp>
  12. #include <boost/interprocess/managed_external_buffer.hpp>
  13. #include <boost/interprocess/managed_heap_memory.hpp>
  14. #include <boost/interprocess/containers/map.hpp>
  15. #include <boost/interprocess/containers/set.hpp>
  16. #include <boost/interprocess/allocators/node_allocator.hpp>
  17. #include <boost/interprocess/detail/os_thread_functions.hpp>
  18. // intrusive/detail
  19. #include <boost/intrusive/detail/minimal_pair_header.hpp>
  20. #include <boost/intrusive/detail/minimal_less_equal_header.hpp>
  21. #include <boost/move/unique_ptr.hpp>
  22. #include <cstddef>
  23. #include <memory>
  24. #include <iostream>
  25. #include <vector>
  26. #include <stdexcept>
  27. #include <limits>
  28. #include "get_process_id_name.hpp"
  29. ////////////////////////////////////////////////////////////////////////////////
  30. // //
  31. // This example tests the process shared message queue. //
  32. // //
  33. ////////////////////////////////////////////////////////////////////////////////
  34. using namespace boost::interprocess;
  35. //This test inserts messages with different priority and marks them with a
  36. //time-stamp to check if receiver obtains highest priority messages first and
  37. //messages with same priority are received in fifo order
  38. bool test_priority_order()
  39. {
  40. message_queue::remove(test::get_process_id_name());
  41. {
  42. message_queue mq1
  43. (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)),
  44. mq2
  45. (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t));
  46. //We test that the queue is ordered by priority and in the
  47. //same priority, is a FIFO
  48. message_queue::size_type recvd = 0;
  49. unsigned int priority = 0;
  50. std::size_t tstamp;
  51. unsigned int priority_prev;
  52. std::size_t tstamp_prev;
  53. //We will send 100 message with priority 0-9
  54. //The message will contain the timestamp of the message
  55. for(std::size_t i = 0; i < 100; ++i){
  56. tstamp = i;
  57. mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10));
  58. }
  59. priority_prev = (std::numeric_limits<unsigned int>::max)();
  60. tstamp_prev = 0;
  61. //Receive all messages and test those are ordered
  62. //by priority and by FIFO in the same priority
  63. for(std::size_t i = 0; i < 100; ++i){
  64. mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
  65. if(priority > priority_prev)
  66. return false;
  67. if(priority == priority_prev &&
  68. tstamp <= tstamp_prev){
  69. return false;
  70. }
  71. priority_prev = priority;
  72. tstamp_prev = tstamp;
  73. }
  74. //Now retry it with different priority order
  75. for(std::size_t i = 0; i < 100; ++i){
  76. tstamp = i;
  77. mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(9 - i%10));
  78. }
  79. priority_prev = (std::numeric_limits<unsigned int>::max)();
  80. tstamp_prev = 0;
  81. //Receive all messages and test those are ordered
  82. //by priority and by FIFO in the same priority
  83. for(std::size_t i = 0; i < 100; ++i){
  84. mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
  85. if(priority > priority_prev)
  86. return false;
  87. if(priority == priority_prev &&
  88. tstamp <= tstamp_prev){
  89. return false;
  90. }
  91. priority_prev = priority;
  92. tstamp_prev = tstamp;
  93. }
  94. }
  95. message_queue::remove(test::get_process_id_name());
  96. return true;
  97. }
  98. //[message_queue_test_test_serialize_db
  99. //This test creates a in memory data-base using Interprocess machinery and
  100. //serializes it through a message queue. Then rebuilds the data-base in
  101. //another buffer and checks it against the original data-base
  102. bool test_serialize_db()
  103. {
  104. //Typedef data to create a Interprocess map
  105. typedef std::pair<const std::size_t, std::size_t> MyPair;
  106. typedef std::less<std::size_t> MyLess;
  107. typedef node_allocator<MyPair, managed_external_buffer::segment_manager>
  108. node_allocator_t;
  109. typedef map<std::size_t,
  110. std::size_t,
  111. std::less<std::size_t>,
  112. node_allocator_t>
  113. MyMap;
  114. //Some constants
  115. const std::size_t BufferSize = 65536;
  116. const std::size_t MaxMsgSize = 100;
  117. //Allocate a memory buffer to hold the destiny database using vector<char>
  118. std::vector<char> buffer_destiny(BufferSize, 0);
  119. message_queue::remove(test::get_process_id_name());
  120. {
  121. //Create the message-queues
  122. message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize);
  123. //Open previously created message-queue simulating other process
  124. message_queue mq2(open_only, test::get_process_id_name());
  125. //A managed heap memory to create the origin database
  126. managed_heap_memory db_origin(buffer_destiny.size());
  127. //Construct the map in the first buffer
  128. MyMap *map1 = db_origin.construct<MyMap>("MyMap")
  129. (MyLess(),
  130. db_origin.get_segment_manager());
  131. if(!map1)
  132. return false;
  133. //Fill map1 until is full
  134. try{
  135. std::size_t i = 0;
  136. while(1){
  137. (*map1)[i] = i;
  138. ++i;
  139. }
  140. }
  141. catch(boost::interprocess::bad_alloc &){}
  142. //Data control data sending through the message queue
  143. std::size_t sent = 0;
  144. message_queue::size_type recvd = 0;
  145. message_queue::size_type total_recvd = 0;
  146. unsigned int priority;
  147. //Send whole first buffer through the mq1, read it
  148. //through mq2 to the second buffer
  149. while(1){
  150. //Send a fragment of buffer1 through mq1
  151. std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ?
  152. MaxMsgSize : (db_origin.get_size() - sent);
  153. mq1.send( &static_cast<char*>(db_origin.get_address())[sent]
  154. , bytes_to_send
  155. , 0);
  156. sent += bytes_to_send;
  157. //Receive the fragment through mq2 to buffer_destiny
  158. mq2.receive( &buffer_destiny[total_recvd]
  159. , BufferSize - recvd
  160. , recvd
  161. , priority);
  162. total_recvd += recvd;
  163. //Check if we have received all the buffer
  164. if(total_recvd == BufferSize){
  165. break;
  166. }
  167. }
  168. //The buffer will contain a copy of the original database
  169. //so let's interpret the buffer with managed_external_buffer
  170. managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize);
  171. //Let's find the map
  172. std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>("MyMap");
  173. MyMap *map2 = ret.first;
  174. //Check if we have found it
  175. if(!map2){
  176. return false;
  177. }
  178. //Check if it is a single variable (not an array)
  179. if(ret.second != 1){
  180. return false;
  181. }
  182. //Now let's compare size
  183. if(map1->size() != map2->size()){
  184. return false;
  185. }
  186. //Now let's compare all db values
  187. MyMap::size_type num_elements = map1->size();
  188. for(std::size_t i = 0; i < num_elements; ++i){
  189. if((*map1)[i] != (*map2)[i]){
  190. return false;
  191. }
  192. }
  193. //Destroy maps from db-s
  194. db_origin.destroy_ptr(map1);
  195. db_destiny.destroy_ptr(map2);
  196. }
  197. message_queue::remove(test::get_process_id_name());
  198. return true;
  199. }
  200. //]
  201. static const int MsgSize = 10;
  202. static const int NumMsg = 1000;
  203. static char msgsend [10];
  204. static char msgrecv [10];
  205. static boost::interprocess::message_queue *pmessage_queue;
  206. void receiver()
  207. {
  208. boost::interprocess::message_queue::size_type recvd_size;
  209. unsigned int priority;
  210. int nummsg = NumMsg;
  211. while(nummsg--){
  212. pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority);
  213. }
  214. }
  215. bool test_buffer_overflow()
  216. {
  217. boost::interprocess::message_queue::remove(test::get_process_id_name());
  218. {
  219. boost::movelib::unique_ptr<boost::interprocess::message_queue>
  220. ptr(new boost::interprocess::message_queue
  221. (create_only, test::get_process_id_name(), 10, 10));
  222. pmessage_queue = ptr.get();
  223. //Launch the receiver thread
  224. boost::interprocess::ipcdetail::OS_thread_t thread;
  225. boost::interprocess::ipcdetail::thread_launch(thread, &receiver);
  226. boost::interprocess::ipcdetail::thread_yield();
  227. int nummsg = NumMsg;
  228. while(nummsg--){
  229. pmessage_queue->send(msgsend, MsgSize, 0);
  230. }
  231. boost::interprocess::ipcdetail::thread_join(thread);
  232. }
  233. boost::interprocess::message_queue::remove(test::get_process_id_name());
  234. return true;
  235. }
  236. //////////////////////////////////////////////////////////////////////////////
  237. //
  238. // test_multi_sender_receiver is based on Alexander (aalutov's)
  239. // testcase for ticket #9221. Many thanks.
  240. //
  241. //////////////////////////////////////////////////////////////////////////////
  242. static boost::interprocess::message_queue *global_queue = 0;
  243. //We'll send MULTI_NUM_MSG_PER_SENDER messages per sender
  244. static const int MULTI_NUM_MSG_PER_SENDER = 10000;
  245. //Message queue message capacity
  246. static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1;
  247. //We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers
  248. static const int MULTI_THREAD_COUNT = 10;
  249. static void multisend()
  250. {
  251. char buff;
  252. for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) {
  253. global_queue->send(&buff, 1, 0);
  254. }
  255. global_queue->send(&buff, 0, 0);
  256. //std::cout<<"writer thread complete"<<std::endl;
  257. }
  258. static void multireceive()
  259. {
  260. char buff;
  261. size_t size;
  262. int received_msgs = 0;
  263. unsigned int priority;
  264. do {
  265. global_queue->receive(&buff, 1, size, priority);
  266. ++received_msgs;
  267. } while (size > 0);
  268. --received_msgs;
  269. //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl;
  270. }
  271. bool test_multi_sender_receiver()
  272. {
  273. bool ret = true;
  274. //std::cout << "Testing multi-sender / multi-receiver " << std::endl;
  275. try {
  276. boost::interprocess::message_queue::remove(test::get_process_id_name());
  277. boost::interprocess::message_queue mq
  278. (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1);
  279. global_queue = &mq;
  280. std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2);
  281. //Launch senders receiver thread
  282. for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
  283. boost::interprocess::ipcdetail::thread_launch
  284. (threads[i], &multisend);
  285. }
  286. for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
  287. boost::interprocess::ipcdetail::thread_launch
  288. (threads[MULTI_THREAD_COUNT+i], &multireceive);
  289. }
  290. for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) {
  291. boost::interprocess::ipcdetail::thread_join(threads[i]);
  292. //std::cout << "Joined thread " << i << std::endl;
  293. }
  294. }
  295. catch (std::exception &e) {
  296. std::cout << "error " << e.what() << std::endl;
  297. ret = false;
  298. }
  299. boost::interprocess::message_queue::remove(test::get_process_id_name());
  300. return ret;
  301. }
  302. int main ()
  303. {
  304. if(!test_priority_order()){
  305. return 1;
  306. }
  307. if(!test_serialize_db()){
  308. return 1;
  309. }
  310. if(!test_buffer_overflow()){
  311. return 1;
  312. }
  313. if(!test_multi_sender_receiver()){
  314. return 1;
  315. }
  316. return 0;
  317. }
  318. #include <boost/interprocess/detail/config_end.hpp>