diff --git a/fsm/frontend.rl b/fsm/frontend.rl index 25e262f..aac6709 100644 --- a/fsm/frontend.rl +++ b/fsm/frontend.rl @@ -5,8 +5,10 @@ #include #include #include +#include static int cs; +static pthread_mutext_t s_mutex; %%{ machine frontend; @@ -32,7 +34,7 @@ static int cs; xnpth_write(forwarded_fd, p, pe - p) != pe - p); } - foreign = OK @Process; + foreign = NOK | (OK @Process); internal = IDEMPOTENT ACK | IDEMPOTENT TIMEOUT{1,3} ACK; main := (foreign | internal)*; diff --git a/fsm/wireprotocol.rl b/fsm/wireprotocol.rl index a086b76..56eb108 100644 --- a/fsm/wireprotocol.rl +++ b/fsm/wireprotocol.rl @@ -19,6 +19,7 @@ start = fpc; } action Frame_Accept { + assert(fpc - 1 >= start); xsend_message(a->to_fronted, (lrc != fc ? start : fpc - 1), fpc); } diff --git a/main.c b/main.c index 18741c3..27a1a17 100644 --- a/main.c +++ b/main.c @@ -1,27 +1,40 @@ #include "log.h" #include "utils.h" #include "frob.h" -#include "npthfix.h" +#include "tasks.h" #include #include #include #include +union iopair { + int fd[2]; + struct { + int r, w; + }; +}; + +static union iopair get_main(void) { + union iopair ret = { .r = STDIN_FILENO, .w = STDOUT_FILENO }; + ucsp_info_and_adjust_fds(&ret.w, &ret.r); + return ret; +} + +static union iopair make_pipe(void) { + union iopair ret; + xpipe(ret.fd); + return ret; +} + int main(const int ac, const char* av[static const ac]) { if (ac != 3) return 1; - int fd_fo_main = STDOUT_FILENO, fd_fi_main = STDIN_FILENO; - ucsp_info_and_adjust_fds(&fd_fo_main, &fd_fi_main); - int frontend_pipe[2]; - xpipe(frontend_pipe); - struct ThreadBag thr[] = { - npth_define(fsm_wireformat, "wp", .from_wire = fd_fi_main, .to_wire = fd_fo_main, .to_fronted = STDOUT_FILENO) + const union iopair foreign = get_main(), internal[] = { make_pipe() }; + struct task* t[] = { + create_task("wp", fsm_wireformat, .from_wire = foreign.r, .to_wire = foreign.w, .to_fronted = internal[0].w) }; - for (size_t i = 0; i < lengthof(thr); i++) { - pthread_create(&thr[i].handle, NULL, thr[i].entry, thr[i].arg); - pthread_setname_np(thr[i].handle, thr[i].name); - } - for (size_t i = 0; i < lengthof(thr); i++) - pthread_join(thr[i].handle, NULL); - return EXIT_SUCCESS; + int error = 0; + for (size_t i = 0; i < lengthof(t); i++) + error |= teardown_task(t[i]); + return error ? EXIT_FAILURE : EXIT_SUCCESS; } diff --git a/npthfix.h b/npthfix.h deleted file mode 100644 index 9bb83cf..0000000 --- a/npthfix.h +++ /dev/null @@ -1,24 +0,0 @@ -#pragma once -#include -#include -#include "utils.h" - -#define npth_define(Entry, Name, ...) \ - { .entry = (npth_entry_t)Entry, .name = (Name), .arg = &(struct Entry ## _args){ __VA_ARGS__ } } - -typedef void* (* const npth_entry_t)(void*); -typedef void (* const cleanup_t)(void*); - -struct ThreadBag { - npth_t handle; - const char* const name; - npth_entry_t entry; - void* const arg; -}; - -size_t xsend_message(int fd, const input_t* p, const input_t* pe) __attribute__((nonnull(2,3))); -size_t xrecv_message(int fd, size_t size, input_t p[static size], const input_t** pe) __attribute__((nonnull(3,4))); - -inline size_t xsend_message_buf(const int fd, const size_t size, const input_t buf[static const size]) { - return xsend_message(fd, buf, buf + size); -} diff --git a/npthfix.c b/tasks.c similarity index 52% rename from npthfix.c rename to tasks.c index b713b11..e8766b1 100644 --- a/npthfix.c +++ b/tasks.c @@ -4,9 +4,27 @@ #include #include -extern size_t xsend_message_buf(int fd, size_t size, const input_t buf[static size]); +struct task { + pthread_t handle; +}; -size_t xsend_message(const int fd, const input_t* const p, const input_t* const pe) { + +extern size_t send_message_buf(int fd, size_t size, const input_t buf[static size]); + +struct task* create_task_(const char* const name, entry_t entry, const void* const arg) { + struct task* const ret = xmalloc(sizeof struct task); + xpthread_create(&ret->handle, NULL, entry, arg); + xpthread_setname_np(ret->handle, name); + return ret; +} + +int teardown_task(struct task*) { + union retval ret; + xpthread_join(task->handle, &ret.ptr); + return ret.num; +} + +size_t send_message(const int fd, const input_t* const p, const input_t* const pe) { assert(p); assert(pe); assert(pe >= p); @@ -18,7 +36,7 @@ size_t xsend_message(const int fd, const input_t* const p, const input_t* const return written; } -size_t xrecv_message(const int fd, const size_t size, input_t p[static const size], const input_t** const pe) { +size_t recv_message(const int fd, const size_t size, input_t p[static const size], const input_t** const pe) { assert(p); assert(pe); assert(*pe); diff --git a/tasks.h b/tasks.h new file mode 100644 index 0000000..221df8e --- /dev/null +++ b/tasks.h @@ -0,0 +1,21 @@ +#pragma once +#include "utils.h" + +typedef void* (* const entry_t)(void*); + +struct task; +union retval { + void* ptr; + int num; +}; + +#define create_task(Name, Entry, ...) create_task_(Name, Entry, &(struct Entry ## _args){ __VA_ARGS__ }); +struct task* create_task_(const char* name, entry_t entry, const void* arg); +int teardown_task(struct task*); + +size_t send_message(int fd, const input_t* p, const input_t* pe) __attribute__((nonnull(2,3))); +size_t recv_message(int fd, size_t size, input_t p[static size], const input_t** pe) __attribute__((nonnull(3,4))); + +inline size_t send_message_buf(const int fd, const size_t size, const input_t buf[static const size]) { + return send_message(fd, buf, buf + size); +} diff --git a/utils.h b/utils.h index 0253740..474332e 100644 --- a/utils.h +++ b/utils.h @@ -64,13 +64,9 @@ v;\ }) -#define xnpth_setname_np(...) XCALL(npth_setname_np, __VA_ARGS__) -#define xnpth_create(...) XCALL(npth_create, __VA_ARGS__) -#define xnpth_sigwait(...) XCALL(npth_sigwait, __VA_ARGS__) -#define xnpth_join(...) XCALL(npth_join, __VA_ARGS__) -#define xnpth_init(...) XCALL(npth_init, __VA_ARGS__) -#define xnpth_read(...) XCALL(npth_read, __VA_ARGS__) -#define xnpth_write(...) XCALL(npth_write, __VA_ARGS__) +#define xpthread_create(...) XCALL(pthread_create, __VA_ARGS__) +#define xpthread_join(...) XCALL(pthread_join, __VA_ARGS__) +#define xpthread_setname_np(...) XCALL(pthread_setname_np, __VA_ARGS__) #define xfprintf(...) XCALL(fprintf, __VA_ARGS__) #define xfputs(...) XCALL(fputs, __VA_ARGS__) #define xfflush(...) XCALL(fflush, __VA_ARGS__)