Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing connection problems when the peer has performed an shutdown and enhancement for issue #4 #22

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions lib/src/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ int Client::sendRequest(Request *request)

unsigned char *buffer = request->toWireFormat();
int numBytesSent = this->connection->write(request->size(), buffer);
if (numBytesSent == Connection::WRITE_ERROR) { E("Client::sendRequest():write error:" << strerror(errno) << "\n"); return numBytesSent; }
if (numBytesSent == Connection::WRITE_ERROR) {
E("Client::sendRequest():write error:" << strerror(errno) << "\n");
delete this->connection; this->connection = NULL;
return numBytesSent;
}
D(cout.flush() << "Client::sendRequest():request sent:numBytes:" << numBytesSent << "\n";)
return numBytesSent;
}
Expand All @@ -126,13 +130,23 @@ ResponseClass *Client::receiveResponse()

int netValueSize = -1;
int numBytesReceived = this->connection->read(sizeof(int), (unsigned char *)(&netValueSize));
if (numBytesReceived == Connection::READ_ERROR) { E("Client::receiveResponse():read error on size:" << strerror(errno) << "\n"); return NULL; }
if (numBytesReceived == Connection::READ_ERROR || numBytesReceived == Connection::END_OF_CONNECTION_ERROR)
{
E("Client::receiveResponse():read error on size:" << strerror(errno) << "\n");
delete this->connection; this->connection = NULL;
return NULL;
}
int hostValueSize = ntohl(netValueSize);
D(cout.flush() << "Client::receiveResponse():incoming response:size:" << hostValueSize << "\n";)
unsigned char *buffer = new unsigned char[hostValueSize+sizeof(int)]; // add space for int32 size
memcpy(buffer, &netValueSize, sizeof(int));
numBytesReceived = this->connection->read(hostValueSize, buffer + sizeof(int));
if (numBytesReceived == Connection::READ_ERROR) { E("Client::receiveResponse():read error on body:" << strerror(errno) << "\n"); return NULL; }
if (numBytesReceived == Connection::READ_ERROR || numBytesReceived == Connection::END_OF_CONNECTION_ERROR)
{
E("Client::receiveResponse():read error on body:" << strerror(errno) << "\n");
delete this->connection; this->connection = NULL;
return NULL;
}
return new ResponseClass(buffer, true); // true specfies delete buffer on ~Response()
}

Expand Down
8 changes: 6 additions & 2 deletions lib/src/Connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const int Connection::DEFAULT_BUFFER_SIZE;
const int Connection::SOCKET_UNINITIALIZED;
const int Connection::OPEN_CONNECTION_ERROR;
const int Connection::READ_ERROR;
const int Connection::END_OF_CONNECTION_ERROR;
const int Connection::WRITE_ERROR;

Connection::Connection(string host, int port)
Expand Down Expand Up @@ -141,7 +142,7 @@ int Connection::read(int numBytes, unsigned char* buffer)
while (numBytesReceived < numBytes)
{
int rcvd = (int)::recv(this->socketFd, p, (size_t)(numBytes-numBytesReceived), flags);
if (rcvd == READ_ERROR) { E("Connection::read():error:" << strerror(errno) << "\n"); break; }
if (rcvd == READ_ERROR || rcvd == END_OF_CONNECTION_ERROR) {E("Connection::read():error:" << strerror(errno) << "\n"); break; }
p += rcvd;
numBytesReceived += rcvd;
D(cout.flush() << "--------------Connection::read(" << numBytes << "):read " << rcvd << " bytes\n";)
Expand All @@ -155,7 +156,10 @@ int Connection::write(int numBytes, unsigned char* buffer)
{
D(cout.flush() << "--------------Connection::write(" << numBytes << ")\n";)

int flags = 0;
// MSG_NOSIGNAL (since Linux 2.2)
// The local end has been "shut down" on a connection oriented socket. In this case the process will also receive a SIGPIPE unless MSG_NOSIGNAL is set.

int flags = MSG_NOSIGNAL;
int numBytesSent = (int)::send(this->socketFd, (const void*)buffer, (ssize_t)numBytes, flags);
if (numBytesSent == WRITE_ERROR) { E("Connection::write():error:" << strerror(errno) << "\n"); }
D(cout.flush() << "--------------Connection::write(" << numBytes << "):wrote " << numBytesSent << "bytes\n";)
Expand Down
1 change: 1 addition & 0 deletions lib/src/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Connection
static const int SOCKET_UNINITIALIZED = -1;
static const int OPEN_CONNECTION_ERROR = -1;
static const int READ_ERROR = -1;
static const int END_OF_CONNECTION_ERROR = 0;
static const int WRITE_ERROR = -1;

Connection(std::string host, int port);
Expand Down
11 changes: 11 additions & 0 deletions lib/src/Request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ Request::Request(short int apiKey, short int apiVersion, int correlationId, stri
this->clientId = clientId;
}

Request::Request(short int apiKey, short int apiVersion, int correlationId, string clientId, long bufferSize) : RequestOrResponse(bufferSize)
{
D(cout.flush() << "--------------Request(params)\n";)

this->apiKey = apiKey;
this->apiVersion = apiVersion;
this->correlationId = correlationId;
this->clientId = clientId;
}


unsigned char* Request::toWireFormat(bool updatePacketSize)
{
unsigned char* buffer = this->RequestOrResponse::toWireFormat(false);
Expand Down
1 change: 1 addition & 0 deletions lib/src/Request.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Request : public RequestOrResponse

Request(unsigned char *buffer, bool releaseBuffer = false);
Request(short int apiKey, short int apiVersion, int correlationId, std::string clientId);
Request(short int apiKey, short int apiVersion, int correlationId, std::string clientId, long bufferSize);

unsigned char* toWireFormat(bool updatePacketSize = true);
int getWireFormatSize(bool includePacketSize = true);
Expand Down
7 changes: 7 additions & 0 deletions lib/src/RequestOrResponse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ RequestOrResponse::RequestOrResponse() : WireFormatter()
D(cout.flush() << "--------------RequestOrResponse(params)\n";)
}

RequestOrResponse::RequestOrResponse(long bufferSize) : WireFormatter()
{
this->packet = new Packet(bufferSize);

D(cout.flush() << "--------------RequestOrResponse(params)\n";)
}

RequestOrResponse::~RequestOrResponse()
{
delete this->packet;
Expand Down
1 change: 1 addition & 0 deletions lib/src/RequestOrResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class RequestOrResponse : public WireFormatter, public PacketWriter
public:

RequestOrResponse();
RequestOrResponse(long bufferSize);
RequestOrResponse(unsigned char *buffer, bool releaseBuffer = false);
~RequestOrResponse();

Expand Down
12 changes: 12 additions & 0 deletions lib/src/produce/ProduceRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ namespace LibKafka {
this->produceTopicArray = produceTopicArray;
this->releaseArrays = releaseArrays;
}

ProduceRequest::ProduceRequest(int correlationId, std::string clientId, short int requiredAcks, int timeout, int produceTopicArraySize, TopicNameBlock<ProduceMessageSet>** produceTopicArray, long bufferSize, bool releaseArrays) : Request(ApiConstants::PRODUCE_REQUEST_KEY, ApiConstants::API_VERSION, correlationId, clientId, bufferSize)
{
D(cout.flush() << "--------------ProduceRequest(params)\n";)

this->requiredAcks = requiredAcks;
this->timeout = timeout;
this->produceTopicArraySize = produceTopicArraySize;
this->produceTopicArray = produceTopicArray;
this->releaseArrays = releaseArrays;
}


ProduceRequest::~ProduceRequest()
{
Expand Down
1 change: 1 addition & 0 deletions lib/src/produce/ProduceRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ProduceRequest : public Request

ProduceRequest(unsigned char *buffer, bool releaseBuffer = false);
ProduceRequest(int correlationId, std::string clientId, short int requiredAcks, int timeout, int produceTopicArraySize, TopicNameBlock<ProduceMessageSet> **produceTopicArray, bool releaseArrays = false);
ProduceRequest(int correlationId, std::string clientId, short int requiredAcks, int timeout, int produceTopicArraySize, TopicNameBlock<ProduceMessageSet> **produceTopicArray, long bufferSize, bool releaseArrays = false);
~ProduceRequest();

unsigned char* toWireFormat(bool updatePacketSize = true);
Expand Down