Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #204 #205 #207 #206

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
94 changes: 94 additions & 0 deletions src/tests/test_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include <boost/test/unit_test.hpp>
#include <thread>
#include <exception>
#include <unistd.h>

#define private public

#include "zmqpp/context.hpp"
#include "zmqpp/message.hpp"
Expand Down Expand Up @@ -62,13 +65,48 @@ BOOST_AUTO_TEST_CASE(socket_removed_in_timer)
loop.add(output, [&socket_called]() -> bool { socket_called = true; return false; });
loop.add(std::chrono::milliseconds(0), 1, [&loop, &output]() -> bool {
loop.remove(output);
//output.close(); // Simple way fails. See socket_closed_after_remove_at_timer.
loop.add(std::chrono::milliseconds(10), 1, []() -> bool { return false; });
return true;
});

input.send("PING");

BOOST_CHECK_NO_THROW(loop.start());
BOOST_CHECK(loop.items_.size() == 0);
BOOST_CHECK(socket_called == false);
}

BOOST_AUTO_TEST_CASE(socket_closed_after_remove_at_timer)
{
zmqpp::context context;

zmqpp::socket output(context, zmqpp::socket_type::pair);
output.bind("inproc://test");
zmqpp::socket input(context, zmqpp::socket_type::pair);
input.connect("inproc://test");

zmqpp::loop loop;

bool socket_called = false;

loop.add(output, [&socket_called]() -> bool { socket_called = true; return false; },
zmqpp::poller::poll_in, [&output]()
{
output.close();
return true;
});
loop.add(std::chrono::milliseconds(0), 1, [&loop, &output]() -> bool {
loop.remove(output);
//output.close(); moved to loop.add(output,,cb2);
loop.add(std::chrono::milliseconds(10), 1, []() -> bool { return false; });
return true;
});

input.send("PING");

BOOST_CHECK_NO_THROW(loop.start());
BOOST_CHECK(loop.items_.size() == 0);
BOOST_CHECK(socket_called == false);
}

Expand Down Expand Up @@ -219,5 +257,61 @@ BOOST_AUTO_TEST_CASE(remove_socket_in_handler)
BOOST_CHECK_EQUAL(2, test2);
}

BOOST_AUTO_TEST_CASE(remove_fd_in_handler)
{
zmqpp::context context;

zmqpp::loop loop;
auto end_loop = []() -> bool { return false; };

int pipefd[2];
BOOST_CHECK_EQUAL(0, pipe(pipefd));

int test1 = 0;
loop.add(pipefd[0], [&](){
char buffer[10];
BOOST_CHECK_EQUAL(4, read(pipefd[0],buffer,10));
test1 = 1;
loop.remove(pipefd[0]);
return true;
});

BOOST_CHECK_EQUAL(4, write(pipefd[1],"haha",4));
//BOOST_CHECK_EQUAL(0, close(pipefd[1]));

loop.add(std::chrono::milliseconds(100), 1, end_loop);
BOOST_CHECK_NO_THROW(loop.start());

BOOST_CHECK_EQUAL(1, test1);
}

BOOST_AUTO_TEST_CASE(remove_invalid_fd_in_handler)
{
zmqpp::context context;

zmqpp::loop loop;
auto end_loop = []() -> bool { return false; };

int pipefd[2];
BOOST_CHECK_EQUAL(0, pipe(pipefd));

int test1 = 0;
loop.add(pipefd[0], [&](){
char buffer[10];
BOOST_CHECK_EQUAL(4, read(pipefd[0],buffer,10));
test1 = 1;
loop.remove(STDIN_FILENO);
return true;
});

BOOST_CHECK_EQUAL(4, write(pipefd[1],"haha",4));
//BOOST_CHECK_EQUAL(0, close(pipefd[1]));

loop.add(std::chrono::milliseconds(100), 1, end_loop);
BOOST_CHECK_NO_THROW(loop.start());

BOOST_CHECK_EQUAL(1, test1);
}


BOOST_AUTO_TEST_SUITE_END()
75 changes: 75 additions & 0 deletions src/tests/test_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,52 @@ BOOST_AUTO_TEST_CASE( copyable )
BOOST_CHECK_EQUAL("string", second.get(0));
}

BOOST_AUTO_TEST_CASE( append_all )
{
zmqpp::message second;
second.add("second");
BOOST_CHECK_EQUAL(1, second.parts());

{
zmqpp::message first;
first.add("string");
first.add("string2");
first.add("string3");
BOOST_CHECK_EQUAL(3, first.parts());

second.append(first);
}

BOOST_REQUIRE_EQUAL(4, second.parts());
BOOST_CHECK_EQUAL(strlen("second"), second.size(0));
BOOST_CHECK_EQUAL("second", second.get(0));
BOOST_CHECK_EQUAL("string", second.get(1));
BOOST_CHECK_EQUAL("string2", second.get(2));
BOOST_CHECK_EQUAL("string3", second.get(3));
}

BOOST_AUTO_TEST_CASE( append_partial )
{
zmqpp::message second;
second.add("second");
BOOST_CHECK_EQUAL(1, second.parts());

{
zmqpp::message first;
first.add("string");
first.add("string2");
first.add("string3");
BOOST_CHECK_EQUAL(3, first.parts());

second.append(first,1,2);
}

BOOST_REQUIRE_EQUAL(2, second.parts());
BOOST_CHECK_EQUAL(strlen("second"), second.size(0));
BOOST_CHECK_EQUAL("second", second.get(0));
BOOST_CHECK_EQUAL("string2", second.get(1));
}

#ifndef ZMQPP_IGNORE_LAMBDA_FUNCTION_TESTS
BOOST_AUTO_TEST_CASE( move_part )
{
Expand Down Expand Up @@ -365,6 +411,35 @@ BOOST_AUTO_TEST_CASE( stream_copy_input_string )
BOOST_CHECK_EQUAL("test part", message.get(0));
}

BOOST_AUTO_TEST_CASE( stream_copy_input_message )
{
zmqpp::message message("test msg1", "test msg1.2");
zmqpp::message message2("test msg2", "test msg2.1");

message << message2;

BOOST_REQUIRE_EQUAL(4, message.parts());
BOOST_CHECK_EQUAL(strlen("test msg1"), message.size(0));
BOOST_CHECK_EQUAL("test msg1", message.get(0));
BOOST_CHECK_EQUAL("test msg1.2", message.get(1));
BOOST_CHECK_EQUAL("test msg2", message.get(2));
BOOST_CHECK_EQUAL("test msg2.1", message.get(3));
}

BOOST_AUTO_TEST_CASE( stream_copy_input_frame )
{
zmqpp::message message("test msg1", "test msg1.2");
zmqpp::frame aframe("test msg2.1",strlen("test msg2.1"));

message << aframe;

BOOST_REQUIRE_EQUAL(3, message.parts());
BOOST_CHECK_EQUAL(strlen("test msg1"), message.size(0));
BOOST_CHECK_EQUAL("test msg1", message.get(0));
BOOST_CHECK_EQUAL("test msg1.2", message.get(1));
BOOST_CHECK_EQUAL("test msg2.1", message.get(2));
}

BOOST_AUTO_TEST_CASE( stream_multiple_parts )
{
zmqpp::message message;
Expand Down
30 changes: 19 additions & 11 deletions src/zmqpp/loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ namespace zmqpp
when += delay;
}

void loop::add(socket& socket, Callable callable, short const event /* = POLL_IN */)
void loop::add(socket& socket, Callable callable, short const event /* = POLL_IN */, Callable after_remove_cb /* = Callable(nullptr)*/)
{
zmq_pollitem_t item{static_cast<void *> (socket), 0, event, 0};
add(item, callable);
add(item, callable, after_remove_cb);
}

void loop::add(raw_socket_t const descriptor, Callable callable, short const event /* = POLL_IN */)
Expand All @@ -59,11 +59,11 @@ namespace zmqpp
add(item, callable);
}

void loop::add(const zmq_pollitem_t& item, Callable callable)
void loop::add(const zmq_pollitem_t& item, Callable callable, Callable after_remove_cb)
{
poller_.add(item);
rebuild_poller_ = true;
items_.push_back(std::make_pair(item, callable));
items_.push_back(std::make_tuple(item, callable, after_remove_cb));
}

loop::timer_id_t loop::add(std::chrono::milliseconds delay, size_t times, Callable callable)
Expand Down Expand Up @@ -110,16 +110,24 @@ namespace zmqpp
sockRemoveLater_.push_back(&socket);
return;
}
items_.erase(std::remove_if(items_.begin(), items_.end(), [&socket](const PollItemCallablePair & pair) -> bool

std::vector<PollItemCallableTuple> cb_after_remove;

items_.erase(std::remove_if(items_.begin(), items_.end(),
[&socket, &cb_after_remove](const PollItemCallableTuple & tuple) -> bool
{
const zmq_pollitem_t &item = pair.first;
const zmq_pollitem_t &item = std::get<0>(tuple);
if (nullptr != item.socket && item.socket == static_cast<void *> (socket))
{
if(std::get<2>(tuple))
cb_after_remove.push_back(tuple);
return true;
}
return false;
}), items_.end());
poller_.remove(socket);
for (const PollItemCallableTuple& item : cb_after_remove)
std::get<2>(item)();
}

void loop::remove(raw_socket_t const descriptor)
Expand All @@ -130,9 +138,9 @@ namespace zmqpp
fdRemoveLater_.push_back(descriptor);
return;
}
items_.erase(std::remove_if(items_.begin(), items_.end(), [descriptor](const PollItemCallablePair & pair) -> bool
items_.erase(std::remove_if(items_.begin(), items_.end(), [descriptor](const PollItemCallableTuple & tuple) -> bool
{
const zmq_pollitem_t &item = pair.first;
const zmq_pollitem_t &item = std::get<0>(tuple);
if (nullptr == item.socket && item.fd == descriptor)
{
return true;
Expand Down Expand Up @@ -194,12 +202,12 @@ namespace zmqpp

bool loop::start_handle_poller()
{
for (const PollItemCallablePair &pair : items_)
for (const PollItemCallableTuple &tuple : items_)
{
const zmq_pollitem_t &pollitem = pair.first;
const zmq_pollitem_t &pollitem = std::get<0>(tuple);

if (poller_.has_input(pollitem) || poller_.has_error(pollitem) || poller_.has_output(pollitem))
if(!pair.second())
if(!std::get<1>(tuple)())
return false;
}
return true;
Expand Down
10 changes: 6 additions & 4 deletions src/zmqpp/loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ namespace zmqpp
* \param socket the socket to monitor.
* \param callable the function that will be called by the loop when a registered event occurs on socket.
* \param event the event flags to monitor on the socket.
* \param after_remove_cb will be called by loop after remove() completion.
* See tests/test_loop.cpp: socket_closed_in_timer and socket_closed_after_remove_at_timer
*/
void add(socket_t& socket, Callable callable, short const event = poller::poll_in);
void add(socket_t& socket, Callable callable, short const event = poller::poll_in, Callable after_remove_cb = Callable(nullptr));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this break ABI?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Method's name will be mangled differently. Clients of libzmqpp need to be recompiled. But no changes at source code of clients.


/*!
* Add a standard socket to the loop, providing a handler that will be called when the monitored events occur.
Expand Down Expand Up @@ -125,18 +127,18 @@ namespace zmqpp
void update();
};

typedef std::pair<zmq_pollitem_t, Callable> PollItemCallablePair;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does changing these types break api backward compatibility?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no. it's private type. But size of class will be changed, so recompile of clients is required.

bluca, please say me what is better:

  1. register onAfterRemove callback at add() time
  2. register onAfterRemove callback at remove() time

First method is implemented now. But second seems more clean for user and more simple to implement.

typedef std::tuple<zmq_pollitem_t, Callable, Callable> PollItemCallableTuple;
typedef std::pair<std::unique_ptr<timer_t>, Callable> TimerItemCallablePair;
static bool TimerItemCallablePairComp(const TimerItemCallablePair &lhs, const TimerItemCallablePair &rhs);

std::vector<PollItemCallablePair> items_;
std::vector<PollItemCallableTuple> items_;
std::list<TimerItemCallablePair> timers_;
std::vector<const socket_t *> sockRemoveLater_;
std::vector<raw_socket_t> fdRemoveLater_;
std::vector<timer_id_t> timerRemoveLater_;


void add(const zmq_pollitem_t &item, Callable callable);
void add(const zmq_pollitem_t &item, Callable callable, Callable after_remove_cb = Callable(nullptr));
void add(std::unique_ptr<timer_t>, Callable callable);

bool start_handle_timers();
Expand Down
18 changes: 13 additions & 5 deletions src/zmqpp/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,11 @@ void message::get(int64_t& integer, size_t const part) const

void message::get(signal &sig, size_t const part) const
{
assert(sizeof(signal) == size(part));
int64_t v;
get(v, part);
sig = static_cast<signal>(v);
assert(sizeof(signal) == size(part));
int64_t v;
get(v, part);

sig = static_cast<signal>(v);
}

void message::get(uint8_t& unsigned_integer, size_t const part) const
Expand Down Expand Up @@ -466,6 +466,14 @@ void message::copy(message const& source)
//_strings = source._strings
}

void message::append(message const& source, size_t from, size_t to)
{
if(to == 0) to = source.parts();
_parts.reserve( _parts.size() + to - from );
for(size_t part=from; part<to; ++part)
push_back(source.raw_data(part), source.size(part));
}

// Used for internal tracking
void message::sent(size_t const part)
{
Expand Down
21 changes: 21 additions & 0 deletions src/zmqpp/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ class ZMQPP_EXPORT message
_parts.push_back( frame( part, data_size ) );
}

//specialization for common add_raw. Add copy
void add_raw(frame const *a_frame)
{
_parts.push_back( a_frame->copy() );
}

// Use exact data past, neither zmqpp nor 0mq will copy, alter or delete
// this data. It must remain as valid for at least the lifetime of the
// 0mq message, recommended only with const data.
Expand Down Expand Up @@ -266,6 +272,17 @@ class ZMQPP_EXPORT message
message& operator<<(char const* c_string);
message& operator<<(std::string const& string);

message& operator<<(frame const& c_frame)
{
add_raw(c_frame.data(), c_frame.size());
return *this;
}
message& operator<<(message const& c_message)
{
append(c_message);
return *this;
}

// Queue manipulation
void push_front(void const* part, size_t const size);

Expand Down Expand Up @@ -313,6 +330,10 @@ class ZMQPP_EXPORT message
message copy() const;
void copy(message const& source);

// Append (like copy) frames[from..to-1] of source to self
// to == 0 -- append till end
void append(message const& source, size_t from = 0, size_t to = 0);

// Used for internal tracking
void sent(size_t const part);

Expand Down
Loading