diff --git a/frob.h b/frob.h index 281311d..c5cc091 100644 --- a/frob.h +++ b/frob.h @@ -288,8 +288,13 @@ static_assert(sizeof (struct frob_msg) % 16 == 0, "Message shall fit into 16-byt #pragma once struct fsm_frontend_foreign_args { - int in, forwarded_fd; int cs; + union { + struct { + int from_wp, to_low_level_forwarding; + }; + const int fd[2]; + } }; struct fsm_frontend_internal_args { @@ -301,7 +306,12 @@ struct fsm_frontend_timer_args { }; struct fsm_wireformat_args { - const int in, out; + union { + struct { + const int from_wire, to_wire, to_fronted; + }; + const int fd[3]; + }; }; struct autoresponder_args { diff --git a/fsm/frontend.rl b/fsm/frontend.rl index d007496..25e262f 100644 --- a/fsm/frontend.rl +++ b/fsm/frontend.rl @@ -10,7 +10,7 @@ static int cs; %%{ machine frontend; - alphtype char; + alphtype unsigned char; include frob_common "common.rl"; # Foreign: @@ -24,31 +24,15 @@ static int cs; IDEMPOTENT = 0x0A; EFFECTFUL = 0x0D; - action Confirm { - acknak = 0x06; - } - action Reject { - acknak = 0x15; - } - action Send { - LOGDXP(char tmp[4*1], "← % 4d %s", 1, PRETTY(&acknak, &acknak + 1, tmp)); - if (npth_write(6, &acknak, 1) != 1) { - LOGE("write"); - fbreak; - } - } action Process { - LOGDXP(char tmp[4*(pe-p)], "Not lending %zd bytes: %s", pe - p, PRETTY((unsigned char*)p, (unsigned char*)pe, tmp)); + LOGDXP(char tmp[4*(pe-p)], "Not lending %zd bytes: %s", pe - p, PRETTY(p, pe, tmp)); //npth_write(-1, pe - p, (char*)p);// TODO: Remove this cast } action Forward { - if (npth_write(forwarded_fd, p, pe - p) != pe - p) { - LOGE("write"); - fbreak; - } + xnpth_write(forwarded_fd, p, pe - p) != pe - p); } - foreign = OK @Confirm @Send @Process | NAK @Reject @Send; + foreign = OK @Process; internal = IDEMPOTENT ACK | IDEMPOTENT TIMEOUT{1,3} ACK; main := (foreign | internal)*; @@ -68,7 +52,6 @@ static bool is_idempotent(const char* const msg) { */ static int fsm_exec(const char* p, const char* const pe) { - unsigned char acknak; %% write exec; return -1; } @@ -79,18 +62,10 @@ void fsm_frontend_init() { %% write init; } -void* fsm_frontend_foreign(struct fsm_frontend_foreign_args* const a) { - (void)a; - ssize_t bytes; - char buf[1024]; - const char* p; - while ((bytes = npth_read(a->in, buf, sizeof buf)) >= 0) { - LOGDX("Received %zd bytes", bytes); - const char* const pe = (p = buf) + bytes; +void* fsm_frontend_foreign(const struct fsm_frontend_foreign_args* const a) { + unsigned char buf[1024], * pe, * p; + while ((pe = (p = buf) + xnpth_read_fancy(a->from_wp, sizeof buf, buf)) != buf) fsm_exec(p, pe); - } - if (bytes < 0) - LOGE("Closing fronted"); return NULL; } diff --git a/fsm/wireprotocol.rl b/fsm/wireprotocol.rl index d9fdb81..8fd85be 100644 --- a/fsm/wireprotocol.rl +++ b/fsm/wireprotocol.rl @@ -1,9 +1,8 @@ #include "frob.h" #include "utils.h" #include "log.h" +#include "npthfix.h" #include -#include -#include %%{ machine wireformat; @@ -17,11 +16,9 @@ lrc ^= fc; } action LRC_Check { - if (lrc != fc) { - npth_write(args->out, "\x03", 1); - } else { - npth_write(args->out, buf, fpc - start); - } + xnpth_write_fancy(a->to_wire, 1, (unsigned char[]){lrc == fc ? ACK : NAK}); + if (lrc == fc) + xnpth_write_fancy(a->to_fronted, fpc - start, buf); } action Frame_Start { start = fpc; @@ -33,23 +30,16 @@ %% write data; -void* fsm_wireformat(const struct fsm_wireformat_args* const args) { - unsigned char* start = NULL; - char lrc; - ssize_t bytes; - unsigned char buf[1024]; +void* fsm_wireformat(const struct fsm_wireformat_args* const a) { int cs; - unsigned char* p = buf, * pe = p; + ssize_t bytes; + unsigned char buf[1024], * start, * p, * pe, lrc = 0; %% write init; - while ((bytes = npth_read(args->in, buf, sizeof buf)) > 0) { - pe = (p = buf) + bytes; - LOGDXP(char tmp[4*bytes], "→ % 4zd %s", bytes, PRETTY(p, pe, tmp)); + while ((pe = (p = buf) + xnpth_read_fancy(a->from_wire, sizeof buf, buf)) != buf) { %% write exec; } - if (bytes < 0) - LOGE("read"); - close(7); + for (size_t i = 0; i < lengthof(a->fd); i++) + xclose(a->fd[i]); LOGIX("FSM state: current/entry/error/final %d/%d/%d/%d", cs, wireformat_en_main, wireformat_error, wireformat_first_final); - //sus_disable(0); - return cs == wireformat_error ? NULL + 1 : NULL; + return NULL; } diff --git a/log.h b/log.h index 1fb3472..adca525 100644 --- a/log.h +++ b/log.h @@ -4,6 +4,7 @@ #include #include #include +#include // Reimplement using error_at_line(3) @@ -31,8 +32,8 @@ #define LOGW(...) LOG( ,"W" ,WARNING,warn , , ,##__VA_ARGS__) #define LOGEX(...) LOG( ,"E" ,ERR ,warnx , , ,##__VA_ARGS__) #define LOGE(...) LOG( ,"E" ,ERR ,warn , , ,##__VA_ARGS__) -#define EXITFX(...) LOG(exit(ERR_UNSPEC),"A" ,ALERT ,ERRX , , ,##__VA_ARGS__) -#define EXITF(...) LOG(exit(ERR_UNSPEC),"A" ,ALERT ,ERR , , ,##__VA_ARGS__) +#define EXITFX(...) LOG(npth_exit(NULL) ,"A" ,ALERT ,ERRX , , ,##__VA_ARGS__) +#define EXITF(...) LOG(npth_exit(NULL) ,"A" ,ALERT ,ERR , , ,##__VA_ARGS__) #define ABORTFX(...) LOG(abort() ,"F" ,EMERG ,warnx , , ,##__VA_ARGS__) #define ABORTF(...) LOG(abort() ,"F" ,EMERG ,warn , , ,##__VA_ARGS__) @@ -55,7 +56,7 @@ # define LOG_STORY(Prefix, Level, Method, Prologue, Epilogue, Fmt, ...) \ if (LOG_##Level <= g_log_level) {\ Prologue;\ - Method("\x1f" Prefix "\x1f %32s\x1f% 4d\x1f %24s\x1f " Fmt "\x1e", __FILE__, __LINE__, __func__, ##__VA_ARGS__);\ + Method("\x1f" Prefix "\x1f %32s\x1f% 4d\x1f " Fmt "\x1e", __FILE__, __LINE__, ##__VA_ARGS__);\ Epilogue;\ } extern int g_log_level; diff --git a/main.c b/main.c index febdeb6..3dd2071 100644 --- a/main.c +++ b/main.c @@ -26,8 +26,8 @@ int main(const int ac, const char* av[static const ac]) { int frontend_pipe[2]; xpipe(frontend_pipe); struct ThreadBag thr[] = { - npth_define(fsm_wireformat, "wp", .in = fd_fi_main, .out = frontend_pipe[1]), - npth_define(fsm_frontend_foreign, "ff", .in = frontend_pipe[0], .forwarded_fd = -1) + npth_define(fsm_wireformat, "wp", .from_wire = fd_fi_main, .to_wire = fd_fo_main, .to_fronted = frontend_pipe[1]), + npth_define(fsm_frontend_foreign, "ff", .from_wp = frontend_pipe[0], .to_low_level_forwarding = -1) }; xnpth_init(); npth_sigev_init(); diff --git a/npthfix.c b/npthfix.c index c2240af..6667ae5 100644 --- a/npthfix.c +++ b/npthfix.c @@ -1,4 +1,6 @@ #include "npthfix.h" +#include "utils.h" +#include "log.h" int npth_sigwait(const sigset_t *set, int *sig) { npth_unprotect(); @@ -6,3 +8,14 @@ int npth_sigwait(const sigset_t *set, int *sig) { npth_protect(); return res; } + +size_t xnpth_write_fancy(const int fd, const size_t size, const unsigned char buf[static const size]) { + LOGDXP(char tmp[4*size], "[%02d] ← % 4zu %s", fd, size, PRETTY(buf, buf + size, tmp)); + return xnpth_write(fd, buf, size); +} + +size_t xnpth_read_fancy(const int fd, const size_t size, unsigned char buf[static const size]) { + const size_t bytes = xnpth_read(fd, buf, size); + LOGDXP(char tmp[4*bytes], "[%02d] → % 4zu %s", fd, bytes, PRETTY(buf, buf + bytes, tmp)); + return bytes; +} diff --git a/npthfix.h b/npthfix.h index a68bc9e..f2b99d0 100644 --- a/npthfix.h +++ b/npthfix.h @@ -5,6 +5,7 @@ { .entry = (npth_entry_t)Entry, .name = (Name), .arg = &(struct Entry ## _args){ __VA_ARGS__ } } typedef void* (* const npth_entry_t)(void*); +typedef void (* const cleanup_t)(void*); struct ThreadBag { npth_t handle; @@ -12,3 +13,6 @@ struct ThreadBag { npth_entry_t entry; void* const arg; }; + +size_t xnpth_write_fancy(int fd, size_t size, const unsigned char buf[static size]); +size_t xnpth_read_fancy(int fd, size_t size, unsigned char buf[static size]); diff --git a/utils.h b/utils.h index edcd7dc..d753c23 100644 --- a/utils.h +++ b/utils.h @@ -69,6 +69,8 @@ #define xnpth_sigwait(...) XCALL(npth_sigwait, __VA_ARGS__) #define xnpth_join(...) XCALL(npth_join, __VA_ARGS__) #define xnpth_init(...) XCALL(npth_init, __VA_ARGS__) +#define xnpth_read(...) XCALL(npth_read, __VA_ARGS__) +#define xnpth_write(...) XCALL(npth_write, __VA_ARGS__) #define xfprintf(...) XCALL(fprintf, __VA_ARGS__) #define xfputs(...) XCALL(fputs, __VA_ARGS__) #define xfflush(...) XCALL(fflush, __VA_ARGS__)