123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- //////////////////////////////////////////////////////////////////////////////
- //
- // (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost
- // Software License, Version 1.0. (See accompanying file
- // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
- //
- // See http://www.boost.org/libs/interprocess for documentation.
- //
- //////////////////////////////////////////////////////////////////////////////
- #include <boost/interprocess/detail/config_begin.hpp>
- #include <boost/interprocess/ipc/message_queue.hpp>
- #include <boost/interprocess/managed_external_buffer.hpp>
- #include <boost/interprocess/managed_heap_memory.hpp>
- #include <boost/interprocess/containers/map.hpp>
- #include <boost/interprocess/containers/set.hpp>
- #include <boost/interprocess/allocators/node_allocator.hpp>
- #include <boost/interprocess/detail/os_thread_functions.hpp>
- // intrusive/detail
- #include <boost/intrusive/detail/minimal_pair_header.hpp>
- #include <boost/intrusive/detail/minimal_less_equal_header.hpp>
- #include <boost/move/unique_ptr.hpp>
- #include <cstddef>
- #include <memory>
- #include <iostream>
- #include <vector>
- #include <stdexcept>
- #include <limits>
- #include "get_process_id_name.hpp"
- ////////////////////////////////////////////////////////////////////////////////
- // //
- // This example tests the process shared message queue. //
- // //
- ////////////////////////////////////////////////////////////////////////////////
- using namespace boost::interprocess;
- //This test inserts messages with different priority and marks them with a
- //time-stamp to check if receiver obtains highest priority messages first and
- //messages with same priority are received in fifo order
- bool test_priority_order()
- {
- message_queue::remove(test::get_process_id_name());
- {
- message_queue mq1
- (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)),
- mq2
- (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t));
- //We test that the queue is ordered by priority and in the
- //same priority, is a FIFO
- message_queue::size_type recvd = 0;
- unsigned int priority = 0;
- std::size_t tstamp;
- unsigned int priority_prev;
- std::size_t tstamp_prev;
- //We will send 100 message with priority 0-9
- //The message will contain the timestamp of the message
- for(std::size_t i = 0; i < 100; ++i){
- tstamp = i;
- mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10));
- }
- priority_prev = (std::numeric_limits<unsigned int>::max)();
- tstamp_prev = 0;
- //Receive all messages and test those are ordered
- //by priority and by FIFO in the same priority
- for(std::size_t i = 0; i < 100; ++i){
- mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
- if(priority > priority_prev)
- return false;
- if(priority == priority_prev &&
- tstamp <= tstamp_prev){
- return false;
- }
- priority_prev = priority;
- tstamp_prev = tstamp;
- }
- //Now retry it with different priority order
- for(std::size_t i = 0; i < 100; ++i){
- tstamp = i;
- mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(9 - i%10));
- }
- priority_prev = (std::numeric_limits<unsigned int>::max)();
- tstamp_prev = 0;
- //Receive all messages and test those are ordered
- //by priority and by FIFO in the same priority
- for(std::size_t i = 0; i < 100; ++i){
- mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
- if(priority > priority_prev)
- return false;
- if(priority == priority_prev &&
- tstamp <= tstamp_prev){
- return false;
- }
- priority_prev = priority;
- tstamp_prev = tstamp;
- }
- }
- message_queue::remove(test::get_process_id_name());
- return true;
- }
- //[message_queue_test_test_serialize_db
- //This test creates a in memory data-base using Interprocess machinery and
- //serializes it through a message queue. Then rebuilds the data-base in
- //another buffer and checks it against the original data-base
- bool test_serialize_db()
- {
- //Typedef data to create a Interprocess map
- typedef std::pair<const std::size_t, std::size_t> MyPair;
- typedef std::less<std::size_t> MyLess;
- typedef node_allocator<MyPair, managed_external_buffer::segment_manager>
- node_allocator_t;
- typedef map<std::size_t,
- std::size_t,
- std::less<std::size_t>,
- node_allocator_t>
- MyMap;
- //Some constants
- const std::size_t BufferSize = 65536;
- const std::size_t MaxMsgSize = 100;
- //Allocate a memory buffer to hold the destiny database using vector<char>
- std::vector<char> buffer_destiny(BufferSize, 0);
- message_queue::remove(test::get_process_id_name());
- {
- //Create the message-queues
- message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize);
- //Open previously created message-queue simulating other process
- message_queue mq2(open_only, test::get_process_id_name());
- //A managed heap memory to create the origin database
- managed_heap_memory db_origin(buffer_destiny.size());
- //Construct the map in the first buffer
- MyMap *map1 = db_origin.construct<MyMap>("MyMap")
- (MyLess(),
- db_origin.get_segment_manager());
- if(!map1)
- return false;
- //Fill map1 until is full
- try{
- std::size_t i = 0;
- while(1){
- (*map1)[i] = i;
- ++i;
- }
- }
- catch(boost::interprocess::bad_alloc &){}
- //Data control data sending through the message queue
- std::size_t sent = 0;
- message_queue::size_type recvd = 0;
- message_queue::size_type total_recvd = 0;
- unsigned int priority;
- //Send whole first buffer through the mq1, read it
- //through mq2 to the second buffer
- while(1){
- //Send a fragment of buffer1 through mq1
- std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ?
- MaxMsgSize : (db_origin.get_size() - sent);
- mq1.send( &static_cast<char*>(db_origin.get_address())[sent]
- , bytes_to_send
- , 0);
- sent += bytes_to_send;
- //Receive the fragment through mq2 to buffer_destiny
- mq2.receive( &buffer_destiny[total_recvd]
- , BufferSize - recvd
- , recvd
- , priority);
- total_recvd += recvd;
- //Check if we have received all the buffer
- if(total_recvd == BufferSize){
- break;
- }
- }
- //The buffer will contain a copy of the original database
- //so let's interpret the buffer with managed_external_buffer
- managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize);
- //Let's find the map
- std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>("MyMap");
- MyMap *map2 = ret.first;
- //Check if we have found it
- if(!map2){
- return false;
- }
- //Check if it is a single variable (not an array)
- if(ret.second != 1){
- return false;
- }
- //Now let's compare size
- if(map1->size() != map2->size()){
- return false;
- }
- //Now let's compare all db values
- MyMap::size_type num_elements = map1->size();
- for(std::size_t i = 0; i < num_elements; ++i){
- if((*map1)[i] != (*map2)[i]){
- return false;
- }
- }
- //Destroy maps from db-s
- db_origin.destroy_ptr(map1);
- db_destiny.destroy_ptr(map2);
- }
- message_queue::remove(test::get_process_id_name());
- return true;
- }
- //]
- static const int MsgSize = 10;
- static const int NumMsg = 1000;
- static char msgsend [10];
- static char msgrecv [10];
- static boost::interprocess::message_queue *pmessage_queue;
- void receiver()
- {
- boost::interprocess::message_queue::size_type recvd_size;
- unsigned int priority;
- int nummsg = NumMsg;
- while(nummsg--){
- pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority);
- }
- }
- bool test_buffer_overflow()
- {
- boost::interprocess::message_queue::remove(test::get_process_id_name());
- {
- boost::movelib::unique_ptr<boost::interprocess::message_queue>
- ptr(new boost::interprocess::message_queue
- (create_only, test::get_process_id_name(), 10, 10));
- pmessage_queue = ptr.get();
- //Launch the receiver thread
- boost::interprocess::ipcdetail::OS_thread_t thread;
- boost::interprocess::ipcdetail::thread_launch(thread, &receiver);
- boost::interprocess::ipcdetail::thread_yield();
- int nummsg = NumMsg;
- while(nummsg--){
- pmessage_queue->send(msgsend, MsgSize, 0);
- }
- boost::interprocess::ipcdetail::thread_join(thread);
- }
- boost::interprocess::message_queue::remove(test::get_process_id_name());
- return true;
- }
- //////////////////////////////////////////////////////////////////////////////
- //
- // test_multi_sender_receiver is based on Alexander (aalutov's)
- // testcase for ticket #9221. Many thanks.
- //
- //////////////////////////////////////////////////////////////////////////////
- static boost::interprocess::message_queue *global_queue = 0;
- //We'll send MULTI_NUM_MSG_PER_SENDER messages per sender
- static const int MULTI_NUM_MSG_PER_SENDER = 10000;
- //Message queue message capacity
- static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1;
- //We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers
- static const int MULTI_THREAD_COUNT = 10;
- static void multisend()
- {
- char buff;
- for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) {
- global_queue->send(&buff, 1, 0);
- }
- global_queue->send(&buff, 0, 0);
- //std::cout<<"writer thread complete"<<std::endl;
- }
- static void multireceive()
- {
- char buff;
- size_t size;
- int received_msgs = 0;
- unsigned int priority;
- do {
- global_queue->receive(&buff, 1, size, priority);
- ++received_msgs;
- } while (size > 0);
- --received_msgs;
- //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl;
- }
- bool test_multi_sender_receiver()
- {
- bool ret = true;
- //std::cout << "Testing multi-sender / multi-receiver " << std::endl;
- try {
- boost::interprocess::message_queue::remove(test::get_process_id_name());
- boost::interprocess::message_queue mq
- (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1);
- global_queue = &mq;
- std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2);
- //Launch senders receiver thread
- for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
- boost::interprocess::ipcdetail::thread_launch
- (threads[i], &multisend);
- }
- for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
- boost::interprocess::ipcdetail::thread_launch
- (threads[MULTI_THREAD_COUNT+i], &multireceive);
- }
- for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) {
- boost::interprocess::ipcdetail::thread_join(threads[i]);
- //std::cout << "Joined thread " << i << std::endl;
- }
- }
- catch (std::exception &e) {
- std::cout << "error " << e.what() << std::endl;
- ret = false;
- }
- boost::interprocess::message_queue::remove(test::get_process_id_name());
- return ret;
- }
- int main ()
- {
- if(!test_priority_order()){
- return 1;
- }
- if(!test_serialize_db()){
- return 1;
- }
- if(!test_buffer_overflow()){
- return 1;
- }
- if(!test_multi_sender_receiver()){
- return 1;
- }
- return 0;
- }
- #include <boost/interprocess/detail/config_end.hpp>
|