////////////////////////////////////////////////////////////////////////////// // // (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost // Software License, Version 1.0. (See accompanying file // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // // See http://www.boost.org/libs/interprocess for documentation. // ////////////////////////////////////////////////////////////////////////////// #include #include #include #include #include #include #include #include // intrusive/detail #include #include #include #include #include #include #include #include #include #include "get_process_id_name.hpp" //////////////////////////////////////////////////////////////////////////////// // // // This example tests the process shared message queue. // // // //////////////////////////////////////////////////////////////////////////////// using namespace boost::interprocess; //This test inserts messages with different priority and marks them with a //time-stamp to check if receiver obtains highest priority messages first and //messages with same priority are received in fifo order bool test_priority_order() { message_queue::remove(test::get_process_id_name()); { message_queue mq1 (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)), mq2 (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)); //We test that the queue is ordered by priority and in the //same priority, is a FIFO message_queue::size_type recvd = 0; unsigned int priority = 0; std::size_t tstamp; unsigned int priority_prev; std::size_t tstamp_prev; //We will send 100 message with priority 0-9 //The message will contain the timestamp of the message for(std::size_t i = 0; i < 100; ++i){ tstamp = i; mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10)); } priority_prev = (std::numeric_limits::max)(); tstamp_prev = 0; //Receive all messages and test those are ordered //by priority and by FIFO in the same priority for(std::size_t i = 0; i < 100; ++i){ mq1.receive(&tstamp, sizeof(tstamp), recvd, priority); if(priority > priority_prev) return false; if(priority == priority_prev && tstamp <= tstamp_prev){ return false; } priority_prev = priority; tstamp_prev = tstamp; } //Now retry it with different priority order for(std::size_t i = 0; i < 100; ++i){ tstamp = i; mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(9 - i%10)); } priority_prev = (std::numeric_limits::max)(); tstamp_prev = 0; //Receive all messages and test those are ordered //by priority and by FIFO in the same priority for(std::size_t i = 0; i < 100; ++i){ mq1.receive(&tstamp, sizeof(tstamp), recvd, priority); if(priority > priority_prev) return false; if(priority == priority_prev && tstamp <= tstamp_prev){ return false; } priority_prev = priority; tstamp_prev = tstamp; } } message_queue::remove(test::get_process_id_name()); return true; } //[message_queue_test_test_serialize_db //This test creates a in memory data-base using Interprocess machinery and //serializes it through a message queue. Then rebuilds the data-base in //another buffer and checks it against the original data-base bool test_serialize_db() { //Typedef data to create a Interprocess map typedef std::pair MyPair; typedef std::less MyLess; typedef node_allocator node_allocator_t; typedef map, node_allocator_t> MyMap; //Some constants const std::size_t BufferSize = 65536; const std::size_t MaxMsgSize = 100; //Allocate a memory buffer to hold the destiny database using vector std::vector buffer_destiny(BufferSize, 0); message_queue::remove(test::get_process_id_name()); { //Create the message-queues message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize); //Open previously created message-queue simulating other process message_queue mq2(open_only, test::get_process_id_name()); //A managed heap memory to create the origin database managed_heap_memory db_origin(buffer_destiny.size()); //Construct the map in the first buffer MyMap *map1 = db_origin.construct("MyMap") (MyLess(), db_origin.get_segment_manager()); if(!map1) return false; //Fill map1 until is full try{ std::size_t i = 0; while(1){ (*map1)[i] = i; ++i; } } catch(boost::interprocess::bad_alloc &){} //Data control data sending through the message queue std::size_t sent = 0; message_queue::size_type recvd = 0; message_queue::size_type total_recvd = 0; unsigned int priority; //Send whole first buffer through the mq1, read it //through mq2 to the second buffer while(1){ //Send a fragment of buffer1 through mq1 std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ? MaxMsgSize : (db_origin.get_size() - sent); mq1.send( &static_cast(db_origin.get_address())[sent] , bytes_to_send , 0); sent += bytes_to_send; //Receive the fragment through mq2 to buffer_destiny mq2.receive( &buffer_destiny[total_recvd] , BufferSize - recvd , recvd , priority); total_recvd += recvd; //Check if we have received all the buffer if(total_recvd == BufferSize){ break; } } //The buffer will contain a copy of the original database //so let's interpret the buffer with managed_external_buffer managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize); //Let's find the map std::pair ret = db_destiny.find("MyMap"); MyMap *map2 = ret.first; //Check if we have found it if(!map2){ return false; } //Check if it is a single variable (not an array) if(ret.second != 1){ return false; } //Now let's compare size if(map1->size() != map2->size()){ return false; } //Now let's compare all db values MyMap::size_type num_elements = map1->size(); for(std::size_t i = 0; i < num_elements; ++i){ if((*map1)[i] != (*map2)[i]){ return false; } } //Destroy maps from db-s db_origin.destroy_ptr(map1); db_destiny.destroy_ptr(map2); } message_queue::remove(test::get_process_id_name()); return true; } //] static const int MsgSize = 10; static const int NumMsg = 1000; static char msgsend [10]; static char msgrecv [10]; static boost::interprocess::message_queue *pmessage_queue; void receiver() { boost::interprocess::message_queue::size_type recvd_size; unsigned int priority; int nummsg = NumMsg; while(nummsg--){ pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority); } } bool test_buffer_overflow() { boost::interprocess::message_queue::remove(test::get_process_id_name()); { boost::movelib::unique_ptr ptr(new boost::interprocess::message_queue (create_only, test::get_process_id_name(), 10, 10)); pmessage_queue = ptr.get(); //Launch the receiver thread boost::interprocess::ipcdetail::OS_thread_t thread; boost::interprocess::ipcdetail::thread_launch(thread, &receiver); boost::interprocess::ipcdetail::thread_yield(); int nummsg = NumMsg; while(nummsg--){ pmessage_queue->send(msgsend, MsgSize, 0); } boost::interprocess::ipcdetail::thread_join(thread); } boost::interprocess::message_queue::remove(test::get_process_id_name()); return true; } ////////////////////////////////////////////////////////////////////////////// // // test_multi_sender_receiver is based on Alexander (aalutov's) // testcase for ticket #9221. Many thanks. // ////////////////////////////////////////////////////////////////////////////// static boost::interprocess::message_queue *global_queue = 0; //We'll send MULTI_NUM_MSG_PER_SENDER messages per sender static const int MULTI_NUM_MSG_PER_SENDER = 10000; //Message queue message capacity static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1; //We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers static const int MULTI_THREAD_COUNT = 10; static void multisend() { char buff; for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) { global_queue->send(&buff, 1, 0); } global_queue->send(&buff, 0, 0); //std::cout<<"writer thread complete"<receive(&buff, 1, size, priority); ++received_msgs; } while (size > 0); --received_msgs; //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl; } bool test_multi_sender_receiver() { bool ret = true; //std::cout << "Testing multi-sender / multi-receiver " << std::endl; try { boost::interprocess::message_queue::remove(test::get_process_id_name()); boost::interprocess::message_queue mq (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1); global_queue = &mq; std::vector threads(MULTI_THREAD_COUNT*2); //Launch senders receiver thread for (int i = 0; i < MULTI_THREAD_COUNT; i++) { boost::interprocess::ipcdetail::thread_launch (threads[i], &multisend); } for (int i = 0; i < MULTI_THREAD_COUNT; i++) { boost::interprocess::ipcdetail::thread_launch (threads[MULTI_THREAD_COUNT+i], &multireceive); } for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) { boost::interprocess::ipcdetail::thread_join(threads[i]); //std::cout << "Joined thread " << i << std::endl; } } catch (std::exception &e) { std::cout << "error " << e.what() << std::endl; ret = false; } boost::interprocess::message_queue::remove(test::get_process_id_name()); return ret; } int main () { if(!test_priority_order()){ return 1; } if(!test_serialize_db()){ return 1; } if(!test_buffer_overflow()){ return 1; } if(!test_multi_sender_receiver()){ return 1; } return 0; } #include