diff --git a/co/.gdbinit b/co/.gdbinit index 9873e5d..694d153 100644 --- a/co/.gdbinit +++ b/co/.gdbinit @@ -1,2 +1,11 @@ #source gdb.scm set env CK_FORK=no +file ./demo +break main +break starter +break sus_select +break sus_read +break sus_write +break sus_lend +break sus_borrow +break sus_return diff --git a/co/comultitask.c b/co/comultitask.c index 79488f6..916812c 100644 --- a/co/comultitask.c +++ b/co/comultitask.c @@ -1,7 +1,6 @@ #include "comultitask.h" #include "contextring.h" #include "../coro/coro.h" -#include "../log.h" #include "../utils.h" #include #include @@ -14,7 +13,15 @@ struct coro_stuff { static struct coro_context s_end; static struct coro_context_ring* s_current; -static int s_current_fd[3] = { -1, -1, -1 }; + +static struct shared_data { + int fd[3]; + struct shared_buf { + int id; + ssize_t size; + void* data; + } buf; +} s_shared = { .fd = { -1, -1, -1 }, .buf = { .id = -1, .size = 0, .data = NULL } }; static void suspend(void) { coro_transfer(s_current->ctx, &s_end); @@ -22,10 +29,15 @@ static void suspend(void) { static void suspend_until_fd(const enum fdt first, const enum fdt last, const int fd) { for (enum fdt set = first; set <= last; set++) - while (fd != s_current_fd[set]) + while (fd != s_shared.fd[set]) suspend(); if (first == last) - s_current_fd[first] = -1; + s_shared.fd[first] = -1; +} + +static void suspend_until_id(const int id) { + while (s_shared.buf.id != id) + suspend(); } ssize_t sus_write(const int fd, const void* const data, const size_t size) { @@ -43,26 +55,35 @@ int sus_select(const int n, fd_set* restrict r, fd_set* restrict w, fd_set* rest return xselect(n, r, w, e, t); } -ssize_t sus_lend(const int id, void* const data, const size_t size) { - (void)id, (void)data; - return size; +void sus_lend(const int id, void* const data, const size_t size) { + suspend_until_id(-1); + assert(s_shared.buf.id == -1 && s_shared.buf.size < 0 && s_shared.buf.data == NULL); + s_shared.buf = (struct shared_buf){ .id = id, .data = data, .size = size }; + suspend_until_id(-1); } ssize_t sus_borrow(const int id, void** const data) { - (void)id, (void)data; - return -1; + suspend_until_id(id); + assert(s_shared.buf.id == id && (s_shared.buf.size == 0 || (s_shared.buf.size > 0 && s_shared.buf.data != NULL))); + *data = s_shared.buf.data; + return s_shared.buf.size; +} + +void sus_return(const int id) { + assert(s_shared.buf.id == id && s_shared.buf.size > 0 && s_shared.buf.data != NULL); + s_shared.buf = (struct shared_buf){ .id = -1, .size = -1, .data = NULL }; + suspend(); } -int sus_resume(const enum fdt set, const int fd) { - LOGDX("%d %d", set, fd); - s_current_fd[set] = fd; +int sus_notify(const enum fdt set, const int fd) { + s_shared.fd[set] = fd; suspend(); return 0; } // Transfer to scheduler and forget about current coroutine __attribute__((noreturn)) -static inline void sus_return(void) { +static inline void sus_exit(void) { coro_context* const origin = s_current->ctx; shrink(&s_current); coro_transfer(origin, &s_end); @@ -72,7 +93,7 @@ static inline void sus_return(void) { __attribute__((noreturn)) static void starter(struct sus_coroutine_reg* const reg) { reg->result = reg->entry(®->ca, reg->args); - sus_return(); + sus_exit(); } int sus_runall(const size_t length, struct sus_coroutine_reg (* const h)[length]) { diff --git a/co/comultitask.h b/co/comultitask.h index 64dc5ff..2381f3b 100644 --- a/co/comultitask.h +++ b/co/comultitask.h @@ -23,9 +23,9 @@ struct sus_coroutine_reg { int sus_select(int n, fd_set* restrict r, fd_set* restrict w, fd_set* restrict e, struct timeval* restrict t); ssize_t sus_write(int fd, const void* data, size_t size); ssize_t sus_read(int fd, void* data, size_t size); -ssize_t sus_lend(int id, void* data, size_t size); +void sus_lend(int id, void* data, size_t size); ssize_t sus_borrow(int id, void** value); -int sus_resume(enum fdt set, int fd); +int sus_notify(enum fdt set, int fd); int sus_runall(size_t s, struct sus_coroutine_reg (* c)[s]); int sus_wait(void); ssize_t sus_sendmsg(int mux, int priority, int type, size_t length, char value[static length]); diff --git a/co/frontend.rl b/co/frontend.rl index 5b87b36..94866cb 100644 --- a/co/frontend.rl +++ b/co/frontend.rl @@ -3,11 +3,12 @@ #include #include +static int cs; + %%{ machine frontend; alphtype char; include frob_common "../common.rl"; - variable cs *cs; # Foreign: OK = stx; @@ -22,7 +23,7 @@ action Confirm { acknak = 0x06; } action Reject { acknak = 0x15; } - action Send { sus_write(fdout, &acknak, 1); } + action Send { sus_write(2, &acknak, 1); } foreign = (OK @Confirm | NAK @Reject) @Send; internal = IDEMPOTENT ACK | IDEMPOTENT TIMEOUT{1,3} ACK; @@ -41,31 +42,30 @@ static bool is_idempotent(const char* const msg) { } return false; } +#endif -static int fsm_exec(int* cs, const char* p, const char* const pe) { +static int fsm_exec(const char* p, const char* const pe) { char acknak; - int fdout = 5; %% write exec; return -1; } -#endif -void fsm_frontend_init(int* const cs) { +__attribute__((constructor)) +void fsm_frontend_init() { (void)frontend_en_main, (void)frontend_error, (void)frontend_first_final; %% write init; } int fsm_frontend_foreign(struct args_frontend_foreign* const a) { (void)a; -# if 0 - char acknak = 0x00; + //char acknak = 0x00; ssize_t bytes; const char* p; - while ((bytes = sus_lend(a->idfrom, &p, 1)) > 0) { + while ((bytes = sus_borrow(0, (void**)&p)) >= 0) { const char* const pe = p + 1; - fsm_exec(&a->cs, p, pe); + fsm_exec(p, pe); + sus_return(0); } -# endif return -1; } @@ -76,7 +76,7 @@ int fsm_frontend_internal(struct args_frontend_internal* const a) { const char* msg; while ((bytes = sus_lend(1, &msg, 0)) > 0) { const char* p = (char[]){is_idempotent(msg) ? 0x0A : 0x0D}, * const pe = p + 1; - fsm_exec(&a->cs, p, pe); + fsm_exec(p, pe); } # endif return -1; @@ -90,7 +90,7 @@ int fsm_frontend_timer(struct args_frontend_timer* const a) { unsigned char buf[8]; while ((bytes = sus_read(fd, buf, sizeof buf)) > 0) { const char* p = (char[]){0}, * const pe = p + 1; - fsm_exec(&a->cs, p, pe); + fsm_exec(p, pe); } # endif return -1; @@ -98,13 +98,12 @@ int fsm_frontend_timer(struct args_frontend_timer* const a) { int n_fsm_frontend_timer() { # if 0 - int cs; void coro(void* a) { ssize_t bytes; unsigned char buf[8]; while ((bytes = sus_read(fd, buf, sizeof buf)) > 0) { const char* p = (char[]){0}, * const pe = p + 1; - fsm_exec(&a->cs, p, pe); + fsm_exec(p, pe); } return -1; } diff --git a/co/main.c b/co/main.c index d32cb55..ee31174 100644 --- a/co/main.c +++ b/co/main.c @@ -23,14 +23,14 @@ int co_io_loop(const struct coro_args* const ca, const struct args_io_loop* cons for (size_t i = 0; i < lengthof(iop.set); i++) for (int j = 0; j < iop.maxfd; j++) if (FD_ISSET(j, &iop.set[i])) - sus_resume(i, j); + sus_notify(i, j); // Remove closed or if need add a new file descriptor } return 0; } int main() { - struct sus_coroutine_reg coroutines[] = { + struct sus_coroutine_reg tasks[] = { { .stack_size = 0, .ca = { @@ -48,13 +48,22 @@ int main() { }, .entry = (sus_entry)co_io_loop, .args = &(struct args_io_loop){ .timeout = -1, .s6_notification_fd = -1 } + }, + { + .stack_size = 0, + .ca = { + .fd = NULL, + .idx = (int[]){ 0 } + }, + .entry = (sus_entry)fsm_frontend_foreign, + .args = &(struct args_frontend_foreign){} } }; - if (sus_runall(lengthof(coroutines), &coroutines) != 0) + if (sus_runall(lengthof(tasks), &tasks) != 0) EXITF("Can't start"); - for (size_t i = 0; i < lengthof(coroutines); i++) - if (coroutines[i].result) + for (size_t i = 0; i < lengthof(tasks); i++) + if (tasks[i].result) return 100; return 0; } diff --git a/co/ut.t b/co/ut.t index debdb7b..ee72eb6 100644 --- a/co/ut.t +++ b/co/ut.t @@ -33,3 +33,8 @@ int g_log_level = LOG_DEBUG; shrink(&ring); ck_assert_ptr_null(ring); } + +#suite coroutines + +#test coroutine1 + diff --git a/co/wireprotocol.rl b/co/wireprotocol.rl index 7646679..0898f39 100644 --- a/co/wireprotocol.rl +++ b/co/wireprotocol.rl @@ -2,7 +2,6 @@ #include "frob.h" #include "../log.h" #include -#include %%{ machine wireformat; @@ -16,7 +15,7 @@ fbreak; } action Frame_Start { start = fpc; } - action Send { printf("%*s", (int)bytes, buf); } + action Send { sus_lend(0, buf, bytes); } frame = stx @LRC_Init ((any-etx) @LRC_Byte >Frame_Start) ((any-etx) @LRC_Byte )* (etx @LRC_Byte) any @LRC_Check @Send; main := ((any-stx)* frame any)*; @@ -34,10 +33,8 @@ int fsm_wireformat(const struct coro_args* const ca, void*) { char* p = buf, * pe = p; %% write init; while ((bytes = sus_read(ca->fd[0], buf, sizeof buf)) > 0) { - LOGDX("%*s", (int)bytes, buf); %% write exec; } - LOGDX("close"); close(ca->fd[0]); return 0; }