-
Notifications
You must be signed in to change notification settings - Fork 153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
can't sent/receive between internal threads with libvma #1009
Comments
// EpollServerAndClient.c
#include <stdarg.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <netdb.h>
#include <assert.h>
extern int errno;
#define BUFFSIZE 30
#define MAXBTYE 10
#define OPEN_MAX 30
#define LISTENQ 20
#define SERV_PORT 10012
#define INFTIM 1000
#define TIMEOUT 500
#define MAX_CNT 5
struct task
{
epoll_data_t data;
struct task* next;
};
struct user_data
{
int fd;
unsigned int n_size;
char line[MAXBTYE];
};
void *readtask(void *args);
void *writetask(void *args);
int epfd;
struct epoll_event ev;
struct epoll_event events[LISTENQ];
pthread_mutex_t r_mutex;
pthread_cond_t r_condl;
pthread_mutex_t w_mutex;
pthread_cond_t w_condl;
struct task *readhead = NULL, *readtail = NULL;
struct task *writehead = NULL, *writetail = NULL;
// Print error information
int errexit(const char* format, ...)
{
va_list args;
va_start(args, format);
vfprintf(stderr, format, args);
va_end(args);
exit(1);
}
// Print work information
void echo(const char* format, ...)
{
va_list args;
va_start(args, format);
vfprintf(stderr, format, args);
va_end(args);
}
void setnonblocking(int sock)
{
int opts;
if ((opts = fcntl(sock, F_GETFL)) < 0)
errexit("GETFL %d failed", sock);
opts = opts | O_NONBLOCK;
if (fcntl(sock, F_SETFL, opts) < 0)
errexit("SETFL %d failed", sock);
printf("Set socket %d O_NONBLOCK. Succeed!\n", sock);
}
int recvFromServer(int sockfd, char* buf, int buf_size) {
int n, offset = 0;
errno = 0;
while (buf_size - offset > 0 &&
(n = recv(sockfd, buf + offset, buf_size - offset, MSG_DONTWAIT)) > 0) {
offset += n;
}
if (offset == 0 && errno == EAGAIN) {
echo("[CLIENT] no message received.\n");
return -1;
} else {
return offset;
}
}
void process(int sockfd)
{
char sendline[BUFFSIZE], recvline[BUFFSIZE];
int numbytes;
int cnt = 0;
while (cnt < MAX_CNT)
{
memset(sendline, 0, BUFFSIZE);
memset(recvline, 0, BUFFSIZE);
for (int j = 0; j < BUFFSIZE; j++)
sendline[j] = '0' + cnt;
send(sockfd, sendline, strlen(sendline), 0);
echo("[CLIENT] sent %s\n", sendline);
sleep(1);
if ((numbytes = recvFromServer(sockfd, recvline, BUFFSIZE)) == 0)
{
echo("[CLIENT] server terminated.\n");
return;
}
else if (numbytes > 0)
{
recvline[numbytes] = '\0';
echo("[CLIENT] Received: %s\n", recvline);
}
cnt++;
}
echo("[CLIENT] exit.\n");
}
void *client(void *server_local_addr) {
int sock;
struct hostent *hent;
struct sockaddr_in server;
unsigned short port = SERV_PORT;
char *host = server_local_addr;
int ret;
int retry = 1;
sleep(1);
// convert decimal IP to binary IP
if ((hent = gethostbyname(host)) == NULL)
errexit("gethostbyname failed.\n");
if ((sock = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) < 0)
errexit("create socket failed: %s\n", strerror(errno));
else
printf("[Client]: create socket %d\n", sock);
bzero(&server, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(port);
server.sin_addr = *((struct in_addr*)hent->h_addr);
echo("[CLIENT] server addr: %s, port: %u\n", inet_ntoa(server.sin_addr), ntohs(server.sin_port));
do_connect:
printf("[Client]: round %d connect() started\n", retry);
ret = connect(sock, (struct sockaddr*)&server, sizeof(server));
printf("[Client]: round %d connect() returned\n", retry);
while (ret < 0 && retry < 11) {
if (errno == EALREADY) {
printf("[Client]: connect to server failed: EALREADY [%d], %s\n", errno, strerror(errno));
sleep(1);
}
else if (errno == EINPROGRESS) {
printf("[Client]: connect to server failed: EINPROGRESS, %s\n", strerror(errno));
sleep(1);
} else {
errexit("[Client]: connect to server failed: [%d], %s\n", errno, strerror(errno));
}
retry += 1;
goto do_connect;
}
echo("[CLIENT] connected to server %s\n", inet_ntoa(server.sin_addr));
// Send request to server
process(sock);
close(sock);
return NULL;
}
int main(int argc,char* argv[])
{
int i, nfds;
int listenfd, connfd;
pthread_t tid_read;
pthread_t tid_write;
pthread_t tid_client;
int cnt = 0;
int ret;
// task node
struct task *new_task = NULL;
socklen_t clilen;
struct sockaddr_in clientaddr;
struct sockaddr_in serveraddr;
if (argc != 2) {
printf("Usage: %s IP_of_local_interface\n", argv[0]);
return -1;
}
pthread_mutex_init(&r_mutex, NULL);
pthread_cond_init(&r_condl, NULL);
pthread_mutex_init(&w_mutex, NULL);
pthread_cond_init(&w_condl, NULL);
pthread_create(&tid_read, NULL, readtask, NULL);
pthread_create(&tid_write, NULL, writetask, NULL);
pthread_create(&tid_client, NULL, client, (void*)argv[1]);
epfd = epoll_create(256);
listenfd = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
printf("[SERVER]: open socket %d\n", listenfd);
setnonblocking(listenfd);
ev.data.fd = listenfd;
ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
bzero(&serveraddr, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
char *server_local_addr = argv[1];
inet_aton(server_local_addr, &(serveraddr.sin_addr));
serveraddr.sin_port = htons(SERV_PORT);
ret = bind(listenfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr));
if (ret != 0) {
printf("[SERVER]: bind failed\n");
exit(-2);
} else
printf("[SERVER]: bind DONE!\n");
ret = listen(listenfd, LISTENQ);
assert(ret == 0);
printf("[SERVER]: listening at socket %d\n", listenfd);
for(;cnt < MAX_CNT;)
{
nfds = epoll_wait(epfd, events, LISTENQ, TIMEOUT);
// In case of edge trigger, must go over each event
for (i = 0; i < nfds; ++i)
{
// Get new connection
if (events[i].data.fd == listenfd)
{
// accept the client connection
connfd = accept(listenfd, (struct sockaddr*)&clientaddr, &clilen);
if (connfd < 0)
errexit("connfd < 0");
printf("[SERVER]: accept socket %d\n", connfd);
setnonblocking(connfd);
echo("[SERVER] connect from %s \n", inet_ntoa(clientaddr.sin_addr));
ev.data.fd = connfd;
// monitor in message, edge trigger
ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
// add fd to epoll queue
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
}
// Received data
else if (events[i].events & EPOLLIN)
{
if (events[i].data.fd < 0)
continue;
echo("[SERVER] put task %d to read queue\n", events[i].data.fd);
new_task = malloc(sizeof(struct task));
new_task->data.fd = events[i].data.fd;
new_task->next = NULL;
pthread_mutex_lock(&r_mutex);
if (readhead == NULL)
{
readhead = new_task;
readtail = new_task;
}
// queue is not empty
else
{
readtail->next = new_task;
readtail = new_task;
}
// trigger readtask thread
pthread_cond_broadcast(&r_condl);
pthread_mutex_unlock(&r_mutex);
cnt++;
}
// Have data to send
else if (events[i].events & EPOLLOUT)
{
if (events[i].data.ptr == NULL)
continue;
echo("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
new_task = malloc(sizeof(struct task));
new_task->data.ptr = (struct user_data*)events[i].data.ptr;
new_task->next = NULL;
pthread_mutex_lock(&w_mutex);
// the queue is empty
if (writehead == NULL)
{
writehead = new_task;
writetail = new_task;
}
// queue is not empty
else
{
writetail->next = new_task;
writetail = new_task;
}
// trigger writetask thread
pthread_cond_broadcast(&w_condl);
pthread_mutex_unlock(&w_mutex);
}
else
{
echo("[SERVER] Error: unknown epoll event");
}
}
}
return 0;
}
void *readtask(void *args)
{
int fd = -1;
int n, i;
struct user_data* data = NULL;
while(1)
{
pthread_mutex_lock(&r_mutex);
while(readhead == NULL) {
pthread_cond_wait(&r_condl, &r_mutex);
}
fd = readhead->data.fd;
struct task* tmp = readhead;
readhead = readhead->next;
free(tmp);
tmp = NULL;
pthread_mutex_unlock(&r_mutex);
echo("[SERVER] readtask %lu handling %d\n", pthread_self(), fd);
data = malloc(sizeof(struct user_data));
data->fd = fd;
if ((n = recv(fd, data->line, MAXBTYE, 0)) < 0)
{
if (errno == ECONNRESET)
close(fd);
echo("[SERVER] Error: readline failed: %s\n", strerror(errno));
if (data != NULL) {
free(data);
data = NULL;
}
}
else if (n == 0)
{
close(fd);
echo("[SERVER] Error: client closed connection.\n");
if (data != NULL) {
free(data);
data = NULL;
}
}
else
{
data->n_size = n;
for (i = 0; i < n; ++i)
{
if (data->line[i] == '\n')
{
data->line[i] = '\0';
data->n_size = i + 1;
}
}
echo("[SERVER] readtask %lu received %d : [%d] %s\n", pthread_self(), fd, data->n_size, data->line);
if (data->line[0] != '\0')
{
ev.data.ptr = data;
ev.events = EPOLLOUT | EPOLLET | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
}
}
void *writetask(void *args)
{
unsigned int n;
struct user_data *rdata = NULL;
while(1)
{
pthread_mutex_lock(&w_mutex);
while(writehead == NULL)
pthread_cond_wait(&w_condl, &w_mutex);
rdata = (struct user_data*)writehead->data.ptr;
struct task* tmp = writehead;
writehead = writehead->next;
pthread_mutex_unlock(&w_mutex);
echo("[SERVER] writetask %lu sending %d : [%d] %s\n", pthread_self(), rdata->fd, rdata->n_size, rdata->line);
if ((n = send(rdata->fd, rdata->line, rdata->n_size, 0)) < 0)
{
if (errno == ECONNRESET)
close(rdata->fd);
echo("[SERVER] Error: send responce failed: %s\n", strerror(errno));
}
else if (n == 0)
{
close(rdata->fd);
echo("[SERVER] Error: client closed connection.");
}
else
{
ev.data.fd = rdata->fd;
ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_MOD, rdata->fd, &ev);
}
free(rdata);
rdata = NULL;
free(tmp);
tmp = NULL;
}
} |
|
I think that described issue looks the same as https://github.com/Mellanox/libvma/blob/master/tests/gtest/tcp/tcp_event.cc#L129 |
Yes, you are right. Why VMA does not work as server/client from single process? |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
can't sent/receive between internal threads with libvma
Issue type
Configuration:
(I tested two versions of libvma)
OS
RHEL-8.2 x64
OFED
MLNX_OFED_LINUX-5.0-2.1.8.0-rhel8.2-x86_64.iso
Hardware
Actual behavior:
Expected behavior:
pthread threads can send/receive data with libvma, just like with kernel TCP/IP.
Steps to reproduce:
The text was updated successfully, but these errors were encountered: