From 22f1de9ab01abf5fc324f5a372863e1905499ddb Mon Sep 17 00:00:00 2001 From: Ava Hahn Date: Thu, 19 Sep 2024 15:05:15 -0700 Subject: [PATCH] more review nits Signed-off-by: Ava Hahn --- auto/otel | 2 +- auto/ssltls | 2 +- src/nxt_conf_validation.c | 65 ++++++++++++++++ src/nxt_http.h | 4 +- src/nxt_otel.c | 151 ++++++++++++-------------------------- src/nxt_otel.h | 31 +++++--- src/otel/src/lib.rs | 9 ++- 7 files changed, 143 insertions(+), 121 deletions(-) diff --git a/auto/otel b/auto/otel index 11387a1ba..109fefdfa 100644 --- a/auto/otel +++ b/auto/otel @@ -22,4 +22,4 @@ $NXT_OTEL_LIB_LOC: cargo build cd ../../ -END \ No newline at end of file +END diff --git a/auto/ssltls b/auto/ssltls index d8b795583..6512d330c 100644 --- a/auto/ssltls +++ b/auto/ssltls @@ -212,4 +212,4 @@ if [ $NXT_POLARSSL = YES ]; then $echo exit 1; fi -fi \ No newline at end of file +fi diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index c3a932922..f1e38bdae 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -241,6 +241,18 @@ static nxt_int_t nxt_conf_vldt_js_module_element(nxt_conf_validation_t *vldt, nxt_conf_value_t *value); #endif +#if (NXT_HAVE_OTEL) +nxt_inline nxt_int_t nxt_otel_validate_endpoint(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, + void *data); +nxt_int_t nxt_otel_validate_batch_size(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, + void *data); +nxt_int_t nxt_otel_validate_protocol(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, + void *data); +#endif + static nxt_conf_vldt_object_t nxt_conf_vldt_setting_members[]; static nxt_conf_vldt_object_t nxt_conf_vldt_http_members[]; @@ -1496,6 +1508,59 @@ nxt_conf_validate(nxt_conf_validation_t *vldt) "a number, a string, an array, or an object" + +#if (NXT_HAVE_OTEL) +inline nxt_int_t +nxt_otel_validate_endpoint(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, + void *data) +{ + // This function is a stub for now + return NXT_OK; +} + + +nxt_int_t +nxt_otel_validate_batch_size(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, + void *data) +{ + double batch_size; + batch_size = nxt_conf_get_number(value); + if (batch_size <= 0) { + return NXT_ERROR; + } + + return NXT_OK; +} + + +nxt_int_t +nxt_otel_validate_protocol(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, + void *data) +{ + nxt_str_t proto; + + nxt_conf_get_string(value, &proto); + if (nxt_str_eq(&proto, "HTTP", 4) || + nxt_str_eq(&proto, "http", 4)) { + goto happy; + } + + if (nxt_str_eq(&proto, "GRPC", 4) || + nxt_str_eq(&proto, "grpc", 4)) { + goto happy; + } + + return NXT_ERROR; + + happy: + return NXT_OK; +} +#endif + + static nxt_int_t nxt_conf_vldt_type(nxt_conf_validation_t *vldt, const nxt_str_t *name, nxt_conf_value_t *value, nxt_conf_vldt_type_t type) diff --git a/src/nxt_http.h b/src/nxt_http.h index 19bbdda34..bc76d67eb 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -8,8 +8,10 @@ #define _NXT_HTTP_H_INCLUDED_ #include -#include +#if (NXT_HAVE_OTEL) +#include +#endif typedef enum { NXT_HTTP_UNSET = -1, diff --git a/src/nxt_otel.c b/src/nxt_otel.c index dfc1db0ef..0b385eb91 100644 --- a/src/nxt_otel.c +++ b/src/nxt_otel.c @@ -3,7 +3,6 @@ * Copyright (C) F5, Inc. */ -#include "nxt_clang.h" #include #include @@ -16,14 +15,13 @@ #include #include - #define NXT_OTEL_TRACEPARENT_LEN 55 #define NXT_OTEL_BODY_SIZE_TAG "body size" #define NXT_OTEL_METHOD_TAG "method" #define NXT_OTEL_PATH_TAG "path" -nxt_inline void +void nxt_otel_state_transition(nxt_otel_state_t *state, nxt_otel_status_t status) { if (status == NXT_OTEL_ERROR_STATE || state->status != NXT_OTEL_ERROR_STATE) { @@ -32,32 +30,17 @@ nxt_otel_state_transition(nxt_otel_state_t *state, nxt_otel_status_t status) } -nxt_inline void -nxt_otel_trace_and_span_init(nxt_task_t *t, nxt_http_request_t *r) -{ - r->otel->trace = - nxt_otel_get_or_create_trace(r->otel->trace_id); - if (r->otel->trace == NULL) { - nxt_log(t, NXT_LOG_ERR, "error generating otel span"); - nxt_otel_state_transition(r->otel, NXT_OTEL_ERROR_STATE); - return; - } - - nxt_otel_state_transition(r->otel, NXT_OTEL_HEADER_STATE); -} - - -static void -nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r) +void +nxt_otel_span_add_headers(nxt_task_t *task, nxt_http_request_t *r) { nxt_http_field_t *f, *cur; u_char *traceval, *name_cur, *val_cur; - nxt_log(t, NXT_LOG_DEBUG, "adding headers to trace"); + nxt_log(task, NXT_LOG_DEBUG, "adding headers to trace"); if (r->otel == NULL || r->otel->trace == NULL) { - nxt_log(t, NXT_LOG_ERR, "no trace to add events to!"); + nxt_log(task, NXT_LOG_ERR, "no trace to add events to!"); nxt_otel_state_transition(r->otel, NXT_OTEL_ERROR_STATE); return; } @@ -75,22 +58,20 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r) // Add method and path to the trace as well // 1. method first - name_cur = val_cur = NULL; name_cur = nxt_mp_zalloc(r->mem_pool, sizeof(NXT_OTEL_METHOD_TAG) + 1); val_cur = nxt_mp_zalloc(r->mem_pool, r->method->length + 1); if (name_cur != NULL && val_cur != NULL) { - sprintf((char *) name_cur, NXT_OTEL_METHOD_TAG); + nxt_cpystr(name_cur, (const u_char *) NXT_OTEL_METHOD_TAG); nxt_cpystrn(val_cur, r->method->start, r->method->length); nxt_otel_add_event_to_trace(r->otel->trace, name_cur, val_cur); } // 2. path second - name_cur = val_cur = NULL; name_cur = nxt_mp_zalloc(r->mem_pool, sizeof(NXT_OTEL_PATH_TAG) + 1); val_cur = nxt_mp_zalloc(r->mem_pool, r->path->length + 1); if (name_cur != NULL && val_cur != NULL) { - sprintf((char *) name_cur, NXT_OTEL_PATH_TAG); + nxt_cpystr(name_cur, (const u_char *) NXT_OTEL_PATH_TAG); nxt_cpystrn(val_cur, r->path->start, r->path->length); nxt_otel_add_event_to_trace(r->otel->trace, name_cur, val_cur); @@ -102,7 +83,7 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r) * span still gets populated and sent * but data is not propagated to peer or app. */ - nxt_log(t, NXT_LOG_ERR, + nxt_log(task, NXT_LOG_ERR, "couldnt allocate traceparent header. span will not propagate"); return; } @@ -122,7 +103,7 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r) nxt_otel_add_event_to_trace(r->otel->trace, f->name, traceval); } else { // copy in the pre-existing traceparent for the response - snprintf((char *) traceval, NXT_OTEL_TRACEPARENT_LEN + 1, "%s-%s-%s-%s", + sprintf((char *) traceval, "%s-%s-%s-%s", (char *) r->otel->version, (char *) r->otel->trace_id, (char *) r->otel->parent_id, @@ -131,7 +112,7 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r) f = nxt_list_add(r->resp.fields); if (f == NULL) { - nxt_log(t, NXT_LOG_ERR, + nxt_log(task, NXT_LOG_ERR, "couldnt allocate traceparent header in response"); goto next; } @@ -145,7 +126,7 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r) } -static void +void nxt_otel_span_add_body(nxt_http_request_t *r) { size_t body_size, size_digits; @@ -159,18 +140,17 @@ nxt_otel_span_add_body(nxt_http_request_t *r) return; } - sprintf((char *) body_tag_buf, NXT_OTEL_BODY_SIZE_TAG); sprintf((char *) body_size_buf, "%lu", body_size); + nxt_cpystr(body_tag_buf, (const u_char *) NXT_OTEL_BODY_SIZE_TAG); nxt_otel_add_event_to_trace(r->otel->trace, body_tag_buf, body_size_buf); nxt_otel_state_transition(r->otel, NXT_OTEL_COLLECT_STATE); } -static void +void nxt_otel_send_trace_and_span_data(nxt_task_t *task, void *obj, void *data) { - nxt_http_request_t *r; - r = obj; + nxt_http_request_t *r = obj; if (r->otel->trace == NULL) { nxt_log(task, NXT_LOG_ERR, "otel error: no trace to send!"); @@ -185,29 +165,44 @@ nxt_otel_send_trace_and_span_data(nxt_task_t *task, void *obj, void *data) } -nxt_inline void -nxt_otel_span_collect(nxt_task_t *t, nxt_http_request_t *r) -{ - nxt_log(t, NXT_LOG_DEBUG, "collecting span by adding the task to the fast work queue"); - nxt_work_queue_add(&t->thread->engine->fast_work_queue, - nxt_otel_send_trace_and_span_data, t, r, NULL); - nxt_otel_state_transition(r->otel, NXT_OTEL_UNINIT_STATE); -} - - -static void -nxt_otel_error(nxt_task_t *t, nxt_http_request_t *r) +void +nxt_otel_error(nxt_task_t *task, nxt_http_request_t *r) { // purposefully not using state transition helper r->otel->status = NXT_OTEL_UNINIT_STATE; - nxt_log(t, NXT_LOG_ERR, "otel error condition"); + nxt_log(task, NXT_LOG_ERR, "otel error condition"); // if r->otel->trace it WILL leak here. // TODO Phase 2: drop trace without sending it somehow? } -inline void -nxt_otel_test_and_call_state(nxt_task_t *t, nxt_http_request_t *r) +void +nxt_otel_trace_and_span_init(nxt_task_t *task, nxt_http_request_t *r) +{ + r->otel->trace = + nxt_otel_get_or_create_trace(r->otel->trace_id); + if (r->otel->trace == NULL) { + nxt_log(task, NXT_LOG_ERR, "error generating otel span"); + nxt_otel_state_transition(r->otel, NXT_OTEL_ERROR_STATE); + return; + } + + nxt_otel_state_transition(r->otel, NXT_OTEL_HEADER_STATE); +} + + +void +nxt_otel_span_collect(nxt_task_t *task, nxt_http_request_t *r) +{ + nxt_log(task, NXT_LOG_DEBUG, "collecting span by adding the task to the fast work queue"); + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_otel_send_trace_and_span_data, task, r, NULL); + nxt_otel_state_transition(r->otel, NXT_OTEL_UNINIT_STATE); +} + + +void +nxt_otel_test_and_call_state(nxt_task_t *task, nxt_http_request_t *r) { if (r->otel == NULL) { return; @@ -217,19 +212,19 @@ nxt_otel_test_and_call_state(nxt_task_t *t, nxt_http_request_t *r) case NXT_OTEL_UNINIT_STATE: return; case NXT_OTEL_INIT_STATE: - nxt_otel_trace_and_span_init(t, r); + nxt_otel_trace_and_span_init(task, r); break; case NXT_OTEL_HEADER_STATE: - nxt_otel_span_add_headers(t, r); + nxt_otel_span_add_headers(task, r); break; case NXT_OTEL_BODY_STATE: nxt_otel_span_add_body(r); break; case NXT_OTEL_COLLECT_STATE: - nxt_otel_span_collect(t, r); + nxt_otel_span_collect(task, r); break; case NXT_OTEL_ERROR_STATE: - nxt_otel_error(t, r); + nxt_otel_error(task, r); break; } } @@ -306,53 +301,3 @@ nxt_otel_parse_tracestate(void *ctx, nxt_http_field_t *field, uintptr_t data) return NXT_OK; } - - -inline nxt_int_t -nxt_otel_validate_endpoint(nxt_conf_validation_t *vldt, - nxt_conf_value_t *value, - void *data) -{ - // This function is a stub for now - return NXT_OK; -} - - -nxt_int_t -nxt_otel_validate_batch_size(nxt_conf_validation_t *vldt, - nxt_conf_value_t *value, - void *data) -{ - double batch_size; - batch_size = nxt_conf_get_number(value); - if (batch_size <= 0) { - return NXT_ERROR; - } - - return NXT_OK; -} - - -nxt_int_t -nxt_otel_validate_protocol(nxt_conf_validation_t *vldt, - nxt_conf_value_t *value, - void *data) -{ - nxt_str_t proto; - - nxt_conf_get_string(value, &proto); - if (nxt_str_eq(&proto, "HTTP", 4) || - nxt_str_eq(&proto, "http", 4)) { - goto happy; - } - - if (nxt_str_eq(&proto, "GRPC", 4) || - nxt_str_eq(&proto, "grpc", 4)) { - goto happy; - } - - return NXT_ERROR; - - happy: - return NXT_OK; -} diff --git a/src/nxt_otel.h b/src/nxt_otel.h index ccd6cfafd..1ce51aa6a 100644 --- a/src/nxt_otel.h +++ b/src/nxt_otel.h @@ -5,29 +5,22 @@ #ifndef _NXT_OTEL_H_INCLUDED_ #define _NXT_OTEL_H_INCLUDED_ -#if (NXT_HAVE_OTEL) - #include // forward declared struct nxt_http_field_t; struct nxt_conf_validation_t; struct nxt_conf_value_t; +struct nxt_http_request_t; extern void nxt_otel_send_trace(void *); extern void * nxt_otel_get_or_create_trace(u_char *); extern void nxt_otel_init(void (*)(u_char*), const char *, const char *, double); extern void nxt_otel_copy_traceparent(u_char *, void *); extern void nxt_otel_add_event_to_trace(void *, u_char *, u_char *); -extern uint8_t nxt_otel_is_init(); -extern void nxt_otel_uninit(); +extern uint8_t nxt_otel_is_init(void); +extern void nxt_otel_uninit(void); -void nxt_otel_test_and_call_state(nxt_task_t *, nxt_http_request_t *); -nxt_int_t nxt_otel_parse_traceparent(void *, nxt_http_field_t *, uintptr_t); -nxt_int_t nxt_otel_parse_tracestate(void *, nxt_http_field_t *, uintptr_t); -nxt_int_t nxt_otel_validate_endpoint(nxt_conf_validation_t *, nxt_conf_value_t *, void *); -nxt_int_t nxt_otel_validate_batch_size(nxt_conf_validation_t *, nxt_conf_value_t *, void *); -nxt_int_t nxt_otel_validate_protocol(nxt_conf_validation_t *, nxt_conf_value_t *, void *); /* nxt_otel_status_t * more efficient than a single handler state struct @@ -46,12 +39,26 @@ typedef enum { * includes indicator as to current flow state */ typedef struct { - u_char *trace_id, *version, *parent_id, *trace_flags; + u_char *trace_id; + u_char *version; + u_char *parent_id; + u_char *trace_flags; void *trace; nxt_otel_status_t status; nxt_str_t trace_state; } nxt_otel_state_t; -#endif // NXT_HAVE_OTEL + +nxt_int_t nxt_otel_parse_traceparent(void *ctx, nxt_http_field_t *field, uintptr_t data); +nxt_int_t nxt_otel_parse_tracestate(void *ctx, nxt_http_field_t *field, uintptr_t data); +void nxt_otel_span_add_headers(nxt_task_t *task, nxt_http_request_t *r); +void nxt_otel_span_add_body(nxt_http_request_t *r); +void nxt_otel_send_trace_and_span_data(nxt_task_t *task, void *obj, void *data); +void nxt_otel_error(nxt_task_t *task, nxt_http_request_t *r); +void nxt_otel_state_transition(nxt_otel_state_t *state, nxt_otel_status_t status); +void nxt_otel_test_and_call_state(nxt_task_t *task, nxt_http_request_t *r); +void nxt_otel_trace_and_span_init(nxt_task_t *task, nxt_http_request_t *r); +void nxt_otel_span_collect(nxt_task_t *task, nxt_http_request_t *r); + #endif // _NXT_OTEL_H_INCLUDED_ diff --git a/src/otel/src/lib.rs b/src/otel/src/lib.rs index d06eec2e9..a95e53e63 100644 --- a/src/otel/src/lib.rs +++ b/src/otel/src/lib.rs @@ -16,6 +16,9 @@ use tokio::sync::mpsc; use tokio::sync::mpsc::{Receiver, Sender}; +const TRACEPARENT_HEADER_LEN: u8 = 55; + + // Stored sender channel to send spans or a shutdown message to within the // Tokio runtime. unsafe fn span_tx(destruct: bool) -> *const OnceLock> { @@ -193,15 +196,15 @@ pub unsafe fn nxt_otel_copy_traceparent(buf: *mut i8, span: *const BoxedSpan) { (*span).span_context().trace_flags() // 1 char, 2 hex ); - assert_eq!(traceparent.len(), 55); + assert_eq!(traceparent.len(), TRACEPARENT_HEADER_LEN); ptr::copy_nonoverlapping( traceparent.as_bytes().as_ptr(), buf as _, - 55, + TRACEPARENT_HEADER_LEN, ); // set null terminator - *buf.add(55) = b'\0' as _; + *buf.add(TRACEPARENT_HEADER_LEN) = b'\0' as _; } #[no_mangle]