Skip to content

Commit

Permalink
Add the rest
Browse files Browse the repository at this point in the history
  • Loading branch information
hurufu committed Mar 9, 2024
1 parent d9a7cac commit 1bb551c
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 40 deletions.
9 changes: 9 additions & 0 deletions co/.gdbinit
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
#source gdb.scm
set env CK_FORK=no
file ./demo
break main
break starter
break sus_select
break sus_read
break sus_write
break sus_lend
break sus_borrow
break sus_return
49 changes: 35 additions & 14 deletions co/comultitask.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "comultitask.h"
#include "contextring.h"
#include "../coro/coro.h"
#include "../log.h"
#include "../utils.h"
#include <assert.h>
#include <string.h>
Expand All @@ -14,18 +13,31 @@ struct coro_stuff {

static struct coro_context s_end;
static struct coro_context_ring* s_current;
static int s_current_fd[3] = { -1, -1, -1 };

static struct shared_data {
int fd[3];
struct shared_buf {
int id;
ssize_t size;
void* data;
} buf;
} s_shared = { .fd = { -1, -1, -1 }, .buf = { .id = -1, .size = 0, .data = NULL } };

static void suspend(void) {
coro_transfer(s_current->ctx, &s_end);
}

static void suspend_until_fd(const enum fdt first, const enum fdt last, const int fd) {
for (enum fdt set = first; set <= last; set++)
while (fd != s_current_fd[set])
while (fd != s_shared.fd[set])
suspend();
if (first == last)
s_current_fd[first] = -1;
s_shared.fd[first] = -1;
}

static void suspend_until_id(const int id) {
while (s_shared.buf.id != id)
suspend();
}

ssize_t sus_write(const int fd, const void* const data, const size_t size) {
Expand All @@ -43,26 +55,35 @@ int sus_select(const int n, fd_set* restrict r, fd_set* restrict w, fd_set* rest
return xselect(n, r, w, e, t);
}

ssize_t sus_lend(const int id, void* const data, const size_t size) {
(void)id, (void)data;
return size;
void sus_lend(const int id, void* const data, const size_t size) {
suspend_until_id(-1);
assert(s_shared.buf.id == -1 && s_shared.buf.size < 0 && s_shared.buf.data == NULL);
s_shared.buf = (struct shared_buf){ .id = id, .data = data, .size = size };
suspend_until_id(-1);
}

ssize_t sus_borrow(const int id, void** const data) {
(void)id, (void)data;
return -1;
suspend_until_id(id);
assert(s_shared.buf.id == id && (s_shared.buf.size == 0 || (s_shared.buf.size > 0 && s_shared.buf.data != NULL)));
*data = s_shared.buf.data;
return s_shared.buf.size;
}

void sus_return(const int id) {
assert(s_shared.buf.id == id && s_shared.buf.size > 0 && s_shared.buf.data != NULL);
s_shared.buf = (struct shared_buf){ .id = -1, .size = -1, .data = NULL };
suspend();
}

int sus_resume(const enum fdt set, const int fd) {
LOGDX("%d %d", set, fd);
s_current_fd[set] = fd;
int sus_notify(const enum fdt set, const int fd) {
s_shared.fd[set] = fd;
suspend();
return 0;
}

// Transfer to scheduler and forget about current coroutine
__attribute__((noreturn))
static inline void sus_return(void) {
static inline void sus_exit(void) {
coro_context* const origin = s_current->ctx;
shrink(&s_current);
coro_transfer(origin, &s_end);
Expand All @@ -72,7 +93,7 @@ static inline void sus_return(void) {
__attribute__((noreturn))
static void starter(struct sus_coroutine_reg* const reg) {
reg->result = reg->entry(&reg->ca, reg->args);
sus_return();
sus_exit();
}

int sus_runall(const size_t length, struct sus_coroutine_reg (* const h)[length]) {
Expand Down
4 changes: 2 additions & 2 deletions co/comultitask.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ struct sus_coroutine_reg {
int sus_select(int n, fd_set* restrict r, fd_set* restrict w, fd_set* restrict e, struct timeval* restrict t);
ssize_t sus_write(int fd, const void* data, size_t size);
ssize_t sus_read(int fd, void* data, size_t size);
ssize_t sus_lend(int id, void* data, size_t size);
void sus_lend(int id, void* data, size_t size);
ssize_t sus_borrow(int id, void** value);
int sus_resume(enum fdt set, int fd);
int sus_notify(enum fdt set, int fd);
int sus_runall(size_t s, struct sus_coroutine_reg (* c)[s]);
int sus_wait(void);
ssize_t sus_sendmsg(int mux, int priority, int type, size_t length, char value[static length]);
Expand Down
29 changes: 14 additions & 15 deletions co/frontend.rl
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
#include <stdbool.h>
#include <sys/timerfd.h>

static int cs;

%%{
machine frontend;
alphtype char;
include frob_common "../common.rl";
variable cs *cs;

# Foreign:
OK = stx;
Expand All @@ -22,7 +23,7 @@

action Confirm { acknak = 0x06; }
action Reject { acknak = 0x15; }
action Send { sus_write(fdout, &acknak, 1); }
action Send { sus_write(2, &acknak, 1); }

foreign = (OK @Confirm | NAK @Reject) @Send;
internal = IDEMPOTENT ACK | IDEMPOTENT TIMEOUT{1,3} ACK;
Expand All @@ -41,31 +42,30 @@ static bool is_idempotent(const char* const msg) {
}
return false;
}
#endif

static int fsm_exec(int* cs, const char* p, const char* const pe) {
static int fsm_exec(const char* p, const char* const pe) {
char acknak;
int fdout = 5;
%% write exec;
return -1;
}
#endif

void fsm_frontend_init(int* const cs) {
__attribute__((constructor))
void fsm_frontend_init() {
(void)frontend_en_main, (void)frontend_error, (void)frontend_first_final;
%% write init;
}

int fsm_frontend_foreign(struct args_frontend_foreign* const a) {
(void)a;
# if 0
char acknak = 0x00;
//char acknak = 0x00;
ssize_t bytes;
const char* p;
while ((bytes = sus_lend(a->idfrom, &p, 1)) > 0) {
while ((bytes = sus_borrow(0, (void**)&p)) >= 0) {
const char* const pe = p + 1;
fsm_exec(&a->cs, p, pe);
fsm_exec(p, pe);
sus_return(0);
}
# endif
return -1;
}

Expand All @@ -76,7 +76,7 @@ int fsm_frontend_internal(struct args_frontend_internal* const a) {
const char* msg;
while ((bytes = sus_lend(1, &msg, 0)) > 0) {
const char* p = (char[]){is_idempotent(msg) ? 0x0A : 0x0D}, * const pe = p + 1;
fsm_exec(&a->cs, p, pe);
fsm_exec(p, pe);
}
# endif
return -1;
Expand All @@ -90,21 +90,20 @@ int fsm_frontend_timer(struct args_frontend_timer* const a) {
unsigned char buf[8];
while ((bytes = sus_read(fd, buf, sizeof buf)) > 0) {
const char* p = (char[]){0}, * const pe = p + 1;
fsm_exec(&a->cs, p, pe);
fsm_exec(p, pe);
}
# endif
return -1;
}

int n_fsm_frontend_timer() {
# if 0
int cs;
void coro(void* a) {
ssize_t bytes;
unsigned char buf[8];
while ((bytes = sus_read(fd, buf, sizeof buf)) > 0) {
const char* p = (char[]){0}, * const pe = p + 1;
fsm_exec(&a->cs, p, pe);
fsm_exec(p, pe);
}
return -1;
}
Expand Down
19 changes: 14 additions & 5 deletions co/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ int co_io_loop(const struct coro_args* const ca, const struct args_io_loop* cons
for (size_t i = 0; i < lengthof(iop.set); i++)
for (int j = 0; j < iop.maxfd; j++)
if (FD_ISSET(j, &iop.set[i]))
sus_resume(i, j);
sus_notify(i, j);
// Remove closed or if need add a new file descriptor
}
return 0;
}

int main() {
struct sus_coroutine_reg coroutines[] = {
struct sus_coroutine_reg tasks[] = {
{
.stack_size = 0,
.ca = {
Expand All @@ -48,13 +48,22 @@ int main() {
},
.entry = (sus_entry)co_io_loop,
.args = &(struct args_io_loop){ .timeout = -1, .s6_notification_fd = -1 }
},
{
.stack_size = 0,
.ca = {
.fd = NULL,
.idx = (int[]){ 0 }
},
.entry = (sus_entry)fsm_frontend_foreign,
.args = &(struct args_frontend_foreign){}
}
};

if (sus_runall(lengthof(coroutines), &coroutines) != 0)
if (sus_runall(lengthof(tasks), &tasks) != 0)
EXITF("Can't start");
for (size_t i = 0; i < lengthof(coroutines); i++)
if (coroutines[i].result)
for (size_t i = 0; i < lengthof(tasks); i++)
if (tasks[i].result)
return 100;
return 0;
}
5 changes: 5 additions & 0 deletions co/ut.t
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ int g_log_level = LOG_DEBUG;
shrink(&ring);
ck_assert_ptr_null(ring);
}
#suite coroutines
#test coroutine1
5 changes: 1 addition & 4 deletions co/wireprotocol.rl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#include "frob.h"
#include "../log.h"
#include <unistd.h>
#include <stdio.h>

%%{
machine wireformat;
Expand All @@ -16,7 +15,7 @@
fbreak;
}
action Frame_Start { start = fpc; }
action Send { printf("%*s", (int)bytes, buf); }
action Send { sus_lend(0, buf, bytes); }

frame = stx @LRC_Init ((any-etx) @LRC_Byte >Frame_Start) ((any-etx) @LRC_Byte )* (etx @LRC_Byte) any @LRC_Check @Send;
main := ((any-stx)* frame any)*;
Expand All @@ -34,10 +33,8 @@ int fsm_wireformat(const struct coro_args* const ca, void*) {
char* p = buf, * pe = p;
%% write init;
while ((bytes = sus_read(ca->fd[0], buf, sizeof buf)) > 0) {
LOGDX("%*s", (int)bytes, buf);
%% write exec;
}
LOGDX("close");
close(ca->fd[0]);
return 0;
}

0 comments on commit 1bb551c

Please sign in to comment.