diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index 7ba515c5c..c3a932922 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -311,19 +311,19 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_root_members[] = { #if (NXT_HAVE_OTEL) static nxt_conf_vldt_object_t nxt_conf_vldt_otel_members[] = { { - .name = nxt_string("endpoint"), - .type = NXT_CONF_VLDT_STRING, - .validator = nxt_otel_validate_endpoint, - .flags = NXT_CONF_VLDT_REQUIRED - }, { - .name = nxt_string("batch_size"), - .type = NXT_CONF_VLDT_INTEGER, - .validator = nxt_otel_validate_batch_size, - }, { - .name = nxt_string("protocol"), - .type = NXT_CONF_VLDT_STRING, - .validator = nxt_otel_validate_protocol, - .flags = NXT_CONF_VLDT_REQUIRED + .name = nxt_string("endpoint"), + .type = NXT_CONF_VLDT_STRING, + .validator = nxt_otel_validate_endpoint, + .flags = NXT_CONF_VLDT_REQUIRED + }, { + .name = nxt_string("batch_size"), + .type = NXT_CONF_VLDT_INTEGER, + .validator = nxt_otel_validate_batch_size, + }, { + .name = nxt_string("protocol"), + .type = NXT_CONF_VLDT_STRING, + .validator = nxt_otel_validate_protocol, + .flags = NXT_CONF_VLDT_REQUIRED }, NXT_CONF_VLDT_END @@ -356,7 +356,6 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_setting_members[] = { #endif }, - NXT_CONF_VLDT_END }; @@ -914,7 +913,7 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_python_target_members[] = { { .name = nxt_string("module"), .type = NXT_CONF_VLDT_STRING, - .flags = NXT_CONF_VLDT_REQUIRED + .flags = NXT_CONF_VLDT_REQUIRED, }, { .name = nxt_string("callable"), .type = NXT_CONF_VLDT_STRING, diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index 7387c53e8..e018b24e6 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -1695,7 +1695,6 @@ nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r, nxt_sendbuf_drain(task, wq, last); #if (NXT_HAVE_OTEL) - // TODO Phase 2: Test nxt_otel_test_and_call_state(task, r); #endif } @@ -1839,6 +1838,7 @@ nxt_h1p_conn_sent(nxt_task_t *task, void *obj, void *data) nxt_event_engine_t *engine; #if (NXT_HAVE_OTEL) nxt_http_request_t *r; + r = ((nxt_h1proto_t *) data)->request; nxt_otel_test_and_call_state(task, r); #endif diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c index 248141346..338efc672 100644 --- a/src/nxt_http_request.c +++ b/src/nxt_http_request.c @@ -286,7 +286,7 @@ nxt_http_request_create(nxt_task_t *task) #if (NXT_HAVE_OTEL) if (nxt_otel_is_init()) { r->otel = nxt_mp_zget(r->mem_pool, sizeof(nxt_otel_state_t)); - if (!r->otel) { + if (r->otel == NULL) { goto fail; } r->otel->status = NXT_OTEL_INIT_STATE; diff --git a/src/nxt_otel.c b/src/nxt_otel.c index a75c7fab5..32e43905b 100644 --- a/src/nxt_otel.c +++ b/src/nxt_otel.c @@ -13,6 +13,7 @@ #include #include #include +#include #define NXT_OTEL_TRACEPARENT_LEN 55 @@ -20,40 +21,6 @@ #define NXT_OTEL_METHOD_TAG "method" #define NXT_OTEL_PATH_TAG "path" -static inline void nxt_otel_trace_and_span_init(nxt_task_t *, nxt_http_request_t *); -static inline void nxt_otel_span_collect(nxt_task_t *, nxt_http_request_t *); -static void nxt_otel_span_add_headers(nxt_task_t *, nxt_http_request_t *); -static void nxt_otel_span_add_body(nxt_http_request_t *); -static void nxt_otel_error(nxt_task_t *, nxt_http_request_t *); - -inline void -nxt_otel_test_and_call_state(nxt_task_t *t, nxt_http_request_t *r) -{ - /* state (r->otel) null if opentelemetry wasnt configured - * status (r->otel->status) null if opentelemetry flow completes or errors - */ - if (!r->otel || !r->otel->status) { - return; - } - - switch (r->otel->status) { - case NXT_OTEL_INIT_STATE: - nxt_otel_trace_and_span_init(t, r); - break; - case NXT_OTEL_HEADER_STATE: - nxt_otel_span_add_headers(t, r); - break; - case NXT_OTEL_BODY_STATE: - nxt_otel_span_add_body(r); - break; - case NXT_OTEL_COLLECT_STATE: - nxt_otel_span_collect(t, r); - break; - case NXT_OTEL_ERROR_STATE: - nxt_otel_error(t, r); - break; - } -} static inline void nxt_otel_state_transition(nxt_otel_state_t *state, nxt_otel_status_t status) @@ -63,13 +30,13 @@ nxt_otel_state_transition(nxt_otel_state_t *state, nxt_otel_status_t status) } } + static inline void nxt_otel_trace_and_span_init(nxt_task_t *t, nxt_http_request_t *r) { r->otel->trace = nxt_otel_get_or_create_trace(r->otel->trace_id); - if (!r->otel->trace) - { + if (r->otel->trace == NULL) { nxt_log(t, NXT_LOG_ERR, "error generating otel span"); nxt_otel_state_transition(r->otel, NXT_OTEL_ERROR_STATE); return; @@ -78,6 +45,7 @@ nxt_otel_trace_and_span_init(nxt_task_t *t, nxt_http_request_t *r) nxt_otel_state_transition(r->otel, NXT_OTEL_HEADER_STATE); } + static void nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r) { @@ -86,7 +54,7 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r) nxt_log(t, NXT_LOG_DEBUG, "adding headers to trace"); - if (!r->otel || !r->otel->trace) + if (r->otel == NULL || r->otel->trace == NULL) { nxt_log(t, NXT_LOG_ERR, "no trace to add events to!"); nxt_otel_state_transition(r->otel, NXT_OTEL_ERROR_STATE); @@ -97,9 +65,9 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r) // we need this in a continguous and null terminated segment of memory for Rust FFI name_cur = nxt_mp_zalloc(r->mem_pool, cur->name_length + 1); val_cur = nxt_mp_zalloc(r->mem_pool, cur->value_length + 1); - if (name_cur && val_cur) { - strncpy((char *) name_cur, (char *) cur->name, cur->name_length); - strncpy((char *) val_cur, (char *) cur->value, cur->value_length); + if (name_cur != NULL && val_cur != NULL) { + nxt_cpystrn(name_cur, cur->name, cur->name_length); + nxt_cpystrn(val_cur, cur->value, cur->value_length); nxt_otel_add_event_to_trace(r->otel->trace, name_cur, val_cur); } } nxt_list_loop; @@ -109,9 +77,9 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r) name_cur = val_cur = NULL; name_cur = nxt_mp_zalloc(r->mem_pool, sizeof(NXT_OTEL_METHOD_TAG) + 1); val_cur = nxt_mp_zalloc(r->mem_pool, r->method->length + 1); - if (name_cur && val_cur) { + if (name_cur != NULL && val_cur != NULL) { sprintf((char *) name_cur, NXT_OTEL_METHOD_TAG); - strncpy((char *) val_cur, (char *) r->method->start, r->method->length); + nxt_cpystrn(val_cur, r->method->start, r->method->length); nxt_otel_add_event_to_trace(r->otel->trace, name_cur, val_cur); } @@ -120,15 +88,15 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r) name_cur = val_cur = NULL; name_cur = nxt_mp_zalloc(r->mem_pool, sizeof(NXT_OTEL_PATH_TAG) + 1); val_cur = nxt_mp_zalloc(r->mem_pool, r->path->length + 1); - if (name_cur && val_cur) { + if (name_cur != NULL && val_cur != NULL) { sprintf((char *) name_cur, NXT_OTEL_PATH_TAG); - strncpy((char *) val_cur, (char *) r->path->start, r->path->length); + nxt_cpystrn(val_cur, r->path->start, r->path->length); nxt_otel_add_event_to_trace(r->otel->trace, name_cur, val_cur); } traceval = nxt_mp_zalloc(r->mem_pool, NXT_OTEL_TRACEPARENT_LEN + 1); - if (!traceval) { + if (traceval == NULL) { /* let it go blank here. * span still gets populated and sent * but data is not propagated to peer or app. @@ -140,18 +108,17 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r) // if we didnt inherit a trace id then we need to add the // traceparent header to the request - if (!r->otel->trace_id) { + if (r->otel->trace_id == NULL) { nxt_otel_copy_traceparent(traceval, r->otel->trace); f = nxt_list_add(r->fields); - if (f) { - nxt_http_field_name_set(f, "traceparent"); - f->value = traceval; - f->value_length = nxt_strlen(traceval); - nxt_otel_add_event_to_trace(r->otel->trace, f->name, traceval); - } else { - nxt_log(t, NXT_LOG_ERR, - "couldnt allocate traceparent header in request"); + if (f == NULL) { + goto next; } + + nxt_http_field_name_set(f, "traceparent"); + f->value = traceval; + f->value_length = nxt_strlen(traceval); + nxt_otel_add_event_to_trace(r->otel->trace, f->name, traceval); } else { // copy in the pre-existing traceparent for the response snprintf((char *) traceval, NXT_OTEL_TRACEPARENT_LEN + 1, "%s-%s-%s-%s", @@ -161,31 +128,32 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r) (char *) r->otel->trace_flags); } - f = NULL; f = nxt_list_add(r->resp.fields); - if (f) { - nxt_http_field_name_set(f, "traceparent"); - f->value = traceval; - f->value_length = nxt_strlen(traceval); - } else { + if (f == NULL) { nxt_log(t, NXT_LOG_ERR, "couldnt allocate traceparent header in response"); } + nxt_http_field_name_set(f, "traceparent"); + f->value = traceval; + f->value_length = nxt_strlen(traceval); + + next: nxt_otel_state_transition(r->otel, NXT_OTEL_BODY_STATE); } + static void nxt_otel_span_add_body(nxt_http_request_t *r) { size_t body_size, size_digits; u_char *body_size_buf, *body_tag_buf; - body_size = (r->body) ? nxt_buf_used_size(r->body) : 0; - size_digits = (!body_size) ? 1 : log10(body_size) + 1; + body_size = (r->body != NULL) ? nxt_buf_used_size(r->body) : 0; + size_digits = (body_size == 0) ? 1 : log10(body_size) + 1; body_size_buf = nxt_mp_zalloc(r->mem_pool, size_digits + 1); - body_tag_buf = nxt_mp_zalloc(r->mem_pool, sizeof(NXT_OTEL_BODY_SIZE_TAG) + 1); - if (!body_size_buf || !body_tag_buf) { + body_tag_buf = nxt_mp_zalloc(r->mem_pool, strlen(NXT_OTEL_BODY_SIZE_TAG) + 1); + if (body_size_buf == NULL || body_tag_buf == NULL) { return; } @@ -195,43 +163,76 @@ nxt_otel_span_add_body(nxt_http_request_t *r) nxt_otel_state_transition(r->otel, NXT_OTEL_COLLECT_STATE); } + static void nxt_otel_send_trace_and_span_data(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; r = obj; - - if (!r->otel->trace) { + if (r->otel->trace == NULL) { nxt_log(task, NXT_LOG_ERR, "otel error: no trace to send!"); nxt_otel_state_transition(r->otel, NXT_OTEL_ERROR_STATE); return; } - nxt_otel_state_transition(r->otel, 0); + nxt_otel_state_transition(r->otel, NXT_OTEL_UNINIT_STATE); nxt_otel_send_trace(r->otel->trace); + r->otel->trace = NULL; } + static inline void nxt_otel_span_collect(nxt_task_t *t, nxt_http_request_t *r) { nxt_log(t, NXT_LOG_DEBUG, "collecting span by adding the task to the fast work queue"); nxt_work_queue_add(&t->thread->engine->fast_work_queue, nxt_otel_send_trace_and_span_data, t, r, NULL); - nxt_otel_state_transition(r->otel, 0); + nxt_otel_state_transition(r->otel, NXT_OTEL_UNINIT_STATE); } + static void nxt_otel_error(nxt_task_t *t, nxt_http_request_t *r) { // purposefully not using state transition helper - r->otel->status = 0; + r->otel->status = NXT_OTEL_UNINIT_STATE; nxt_log(t, NXT_LOG_ERR, "otel error condition"); // if r->otel->trace it WILL leak here. // TODO Phase 2: drop trace without sending it somehow? } + +inline void +nxt_otel_test_and_call_state(nxt_task_t *t, nxt_http_request_t *r) +{ + if (r->otel == NULL) { + return; + } + + switch (r->otel->status) { + case NXT_OTEL_UNINIT_STATE: + return; + case NXT_OTEL_INIT_STATE: + nxt_otel_trace_and_span_init(t, r); + break; + case NXT_OTEL_HEADER_STATE: + nxt_otel_span_add_headers(t, r); + break; + case NXT_OTEL_BODY_STATE: + nxt_otel_span_add_body(r); + break; + case NXT_OTEL_COLLECT_STATE: + nxt_otel_span_collect(t, r); + break; + case NXT_OTEL_ERROR_STATE: + nxt_otel_error(t, r); + break; + } +} + + nxt_int_t nxt_otel_parse_traceparent(void *ctx, nxt_http_field_t *field, uintptr_t data) { @@ -255,7 +256,7 @@ nxt_otel_parse_traceparent(void *ctx, nxt_http_field_t *field, uintptr_t data) /* strsep is destructive so we make a copy of the field */ - copy = nxt_mp_zalloc(r->mem_pool, field->value_length+1); + copy = nxt_mp_zalloc(r->mem_pool, field->value_length + 1); if (copy == NULL) { goto error_state; } @@ -266,10 +267,11 @@ nxt_otel_parse_traceparent(void *ctx, nxt_http_field_t *field, uintptr_t data) r->otel->parent_id = (u_char *) strsep(©, "-"); r->otel->trace_flags = (u_char *) strsep(©, "-"); - if (!r->otel->version || - !r->otel->trace_id || - !r->otel->parent_id || - !r->otel->trace_flags) { + if (r->otel->version == NULL || + r->otel->trace_id == NULL || + r->otel->parent_id == NULL || + r->otel->trace_flags == NULL) + { goto error_state; } @@ -280,6 +282,7 @@ nxt_otel_parse_traceparent(void *ctx, nxt_http_field_t *field, uintptr_t data) return NXT_ERROR; } + nxt_int_t nxt_otel_parse_tracestate(void *ctx, nxt_http_field_t *field, uintptr_t data) { @@ -295,22 +298,28 @@ nxt_otel_parse_tracestate(void *ctx, nxt_http_field_t *field, uintptr_t data) // maybe someday this should get sent down into the otel lib f = nxt_list_add(r->resp.fields); - if (f) { + if (f != NULL) { *f = *field; } return NXT_OK; } + inline nxt_int_t -nxt_otel_validate_endpoint(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) +nxt_otel_validate_endpoint(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, + void *data) { // This function is a stub for now return NXT_OK; } + nxt_int_t -nxt_otel_validate_batch_size(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) +nxt_otel_validate_batch_size(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, + void *data) { double batch_size; batch_size = nxt_conf_get_number(value); @@ -321,8 +330,11 @@ nxt_otel_validate_batch_size(nxt_conf_validation_t *vldt, nxt_conf_value_t *valu return NXT_OK; } + nxt_int_t -nxt_otel_validate_protocol(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) +nxt_otel_validate_protocol(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, + void *data) { nxt_str_t proto; diff --git a/src/nxt_otel.h b/src/nxt_otel.h index fcca300c6..ccd6cfafd 100644 --- a/src/nxt_otel.h +++ b/src/nxt_otel.h @@ -33,8 +33,8 @@ nxt_int_t nxt_otel_validate_protocol(nxt_conf_validation_t *, nxt_conf_value_t * * more efficient than a single handler state struct */ typedef enum { - // 0 = uninitialized and/or unset status - NXT_OTEL_INIT_STATE = 1, + NXT_OTEL_UNINIT_STATE = 0, + NXT_OTEL_INIT_STATE, NXT_OTEL_HEADER_STATE, NXT_OTEL_BODY_STATE, NXT_OTEL_COLLECT_STATE,