Skip to content

Commit

Permalink
more review nits
Browse files Browse the repository at this point in the history
Signed-off-by: Ava Hahn <[email protected]>
  • Loading branch information
avahahn committed Sep 19, 2024
1 parent e7c4c8a commit 22f1de9
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 121 deletions.
2 changes: 1 addition & 1 deletion auto/otel
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ $NXT_OTEL_LIB_LOC:
cargo build
cd ../../

END
END
2 changes: 1 addition & 1 deletion auto/ssltls
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,4 @@ if [ $NXT_POLARSSL = YES ]; then
$echo
exit 1;
fi
fi
fi
65 changes: 65 additions & 0 deletions src/nxt_conf_validation.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,18 @@ static nxt_int_t nxt_conf_vldt_js_module_element(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value);
#endif

#if (NXT_HAVE_OTEL)
nxt_inline nxt_int_t nxt_otel_validate_endpoint(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value,
void *data);
nxt_int_t nxt_otel_validate_batch_size(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value,
void *data);
nxt_int_t nxt_otel_validate_protocol(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value,
void *data);
#endif


static nxt_conf_vldt_object_t nxt_conf_vldt_setting_members[];
static nxt_conf_vldt_object_t nxt_conf_vldt_http_members[];
Expand Down Expand Up @@ -1496,6 +1508,59 @@ nxt_conf_validate(nxt_conf_validation_t *vldt)
"a number, a string, an array, or an object"



#if (NXT_HAVE_OTEL)
inline nxt_int_t
nxt_otel_validate_endpoint(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value,
void *data)
{
// This function is a stub for now
return NXT_OK;
}


nxt_int_t
nxt_otel_validate_batch_size(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value,
void *data)
{
double batch_size;
batch_size = nxt_conf_get_number(value);
if (batch_size <= 0) {
return NXT_ERROR;
}

return NXT_OK;
}


nxt_int_t
nxt_otel_validate_protocol(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value,
void *data)
{
nxt_str_t proto;

nxt_conf_get_string(value, &proto);
if (nxt_str_eq(&proto, "HTTP", 4) ||
nxt_str_eq(&proto, "http", 4)) {
goto happy;
}

if (nxt_str_eq(&proto, "GRPC", 4) ||
nxt_str_eq(&proto, "grpc", 4)) {
goto happy;
}

return NXT_ERROR;

happy:
return NXT_OK;
}
#endif


static nxt_int_t
nxt_conf_vldt_type(nxt_conf_validation_t *vldt, const nxt_str_t *name,
nxt_conf_value_t *value, nxt_conf_vldt_type_t type)
Expand Down
4 changes: 3 additions & 1 deletion src/nxt_http.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
#define _NXT_HTTP_H_INCLUDED_

#include <nxt_regex.h>
#include <nxt_otel.h>

#if (NXT_HAVE_OTEL)
#include <nxt_otel.h>
#endif

typedef enum {
NXT_HTTP_UNSET = -1,
Expand Down
151 changes: 48 additions & 103 deletions src/nxt_otel.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
* Copyright (C) F5, Inc.
*/

#include "nxt_clang.h"
#include <math.h>

#include <nxt_router.h>
Expand All @@ -16,14 +15,13 @@
#include <nxt_types.h>
#include <nxt_string.h>


#define NXT_OTEL_TRACEPARENT_LEN 55
#define NXT_OTEL_BODY_SIZE_TAG "body size"
#define NXT_OTEL_METHOD_TAG "method"
#define NXT_OTEL_PATH_TAG "path"


nxt_inline void
void
nxt_otel_state_transition(nxt_otel_state_t *state, nxt_otel_status_t status)
{
if (status == NXT_OTEL_ERROR_STATE || state->status != NXT_OTEL_ERROR_STATE) {
Expand All @@ -32,32 +30,17 @@ nxt_otel_state_transition(nxt_otel_state_t *state, nxt_otel_status_t status)
}


nxt_inline void
nxt_otel_trace_and_span_init(nxt_task_t *t, nxt_http_request_t *r)
{
r->otel->trace =
nxt_otel_get_or_create_trace(r->otel->trace_id);
if (r->otel->trace == NULL) {
nxt_log(t, NXT_LOG_ERR, "error generating otel span");
nxt_otel_state_transition(r->otel, NXT_OTEL_ERROR_STATE);
return;
}

nxt_otel_state_transition(r->otel, NXT_OTEL_HEADER_STATE);
}


static void
nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r)
void
nxt_otel_span_add_headers(nxt_task_t *task, nxt_http_request_t *r)
{
nxt_http_field_t *f, *cur;
u_char *traceval, *name_cur, *val_cur;

nxt_log(t, NXT_LOG_DEBUG, "adding headers to trace");
nxt_log(task, NXT_LOG_DEBUG, "adding headers to trace");

if (r->otel == NULL || r->otel->trace == NULL)
{
nxt_log(t, NXT_LOG_ERR, "no trace to add events to!");
nxt_log(task, NXT_LOG_ERR, "no trace to add events to!");
nxt_otel_state_transition(r->otel, NXT_OTEL_ERROR_STATE);
return;
}
Expand All @@ -75,22 +58,20 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r)

// Add method and path to the trace as well
// 1. method first
name_cur = val_cur = NULL;
name_cur = nxt_mp_zalloc(r->mem_pool, sizeof(NXT_OTEL_METHOD_TAG) + 1);
val_cur = nxt_mp_zalloc(r->mem_pool, r->method->length + 1);
if (name_cur != NULL && val_cur != NULL) {
sprintf((char *) name_cur, NXT_OTEL_METHOD_TAG);
nxt_cpystr(name_cur, (const u_char *) NXT_OTEL_METHOD_TAG);
nxt_cpystrn(val_cur, r->method->start, r->method->length);

nxt_otel_add_event_to_trace(r->otel->trace, name_cur, val_cur);
}

// 2. path second
name_cur = val_cur = NULL;
name_cur = nxt_mp_zalloc(r->mem_pool, sizeof(NXT_OTEL_PATH_TAG) + 1);
val_cur = nxt_mp_zalloc(r->mem_pool, r->path->length + 1);
if (name_cur != NULL && val_cur != NULL) {
sprintf((char *) name_cur, NXT_OTEL_PATH_TAG);
nxt_cpystr(name_cur, (const u_char *) NXT_OTEL_PATH_TAG);
nxt_cpystrn(val_cur, r->path->start, r->path->length);

nxt_otel_add_event_to_trace(r->otel->trace, name_cur, val_cur);
Expand All @@ -102,7 +83,7 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r)
* span still gets populated and sent
* but data is not propagated to peer or app.
*/
nxt_log(t, NXT_LOG_ERR,
nxt_log(task, NXT_LOG_ERR,
"couldnt allocate traceparent header. span will not propagate");
return;
}
Expand All @@ -122,7 +103,7 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r)
nxt_otel_add_event_to_trace(r->otel->trace, f->name, traceval);
} else {
// copy in the pre-existing traceparent for the response
snprintf((char *) traceval, NXT_OTEL_TRACEPARENT_LEN + 1, "%s-%s-%s-%s",
sprintf((char *) traceval, "%s-%s-%s-%s",
(char *) r->otel->version,
(char *) r->otel->trace_id,
(char *) r->otel->parent_id,
Expand All @@ -131,7 +112,7 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r)

f = nxt_list_add(r->resp.fields);
if (f == NULL) {
nxt_log(t, NXT_LOG_ERR,
nxt_log(task, NXT_LOG_ERR,
"couldnt allocate traceparent header in response");
goto next;
}
Expand All @@ -145,7 +126,7 @@ nxt_otel_span_add_headers(nxt_task_t *t, nxt_http_request_t *r)
}


static void
void
nxt_otel_span_add_body(nxt_http_request_t *r)
{
size_t body_size, size_digits;
Expand All @@ -159,18 +140,17 @@ nxt_otel_span_add_body(nxt_http_request_t *r)
return;
}

sprintf((char *) body_tag_buf, NXT_OTEL_BODY_SIZE_TAG);
sprintf((char *) body_size_buf, "%lu", body_size);
nxt_cpystr(body_tag_buf, (const u_char *) NXT_OTEL_BODY_SIZE_TAG);
nxt_otel_add_event_to_trace(r->otel->trace, body_tag_buf, body_size_buf);
nxt_otel_state_transition(r->otel, NXT_OTEL_COLLECT_STATE);
}


static void
void
nxt_otel_send_trace_and_span_data(nxt_task_t *task, void *obj, void *data)
{
nxt_http_request_t *r;
r = obj;
nxt_http_request_t *r = obj;

if (r->otel->trace == NULL) {
nxt_log(task, NXT_LOG_ERR, "otel error: no trace to send!");
Expand All @@ -185,29 +165,44 @@ nxt_otel_send_trace_and_span_data(nxt_task_t *task, void *obj, void *data)
}


nxt_inline void
nxt_otel_span_collect(nxt_task_t *t, nxt_http_request_t *r)
{
nxt_log(t, NXT_LOG_DEBUG, "collecting span by adding the task to the fast work queue");
nxt_work_queue_add(&t->thread->engine->fast_work_queue,
nxt_otel_send_trace_and_span_data, t, r, NULL);
nxt_otel_state_transition(r->otel, NXT_OTEL_UNINIT_STATE);
}


static void
nxt_otel_error(nxt_task_t *t, nxt_http_request_t *r)
void
nxt_otel_error(nxt_task_t *task, nxt_http_request_t *r)
{
// purposefully not using state transition helper
r->otel->status = NXT_OTEL_UNINIT_STATE;
nxt_log(t, NXT_LOG_ERR, "otel error condition");
nxt_log(task, NXT_LOG_ERR, "otel error condition");
// if r->otel->trace it WILL leak here.
// TODO Phase 2: drop trace without sending it somehow?
}


inline void
nxt_otel_test_and_call_state(nxt_task_t *t, nxt_http_request_t *r)
void
nxt_otel_trace_and_span_init(nxt_task_t *task, nxt_http_request_t *r)
{
r->otel->trace =
nxt_otel_get_or_create_trace(r->otel->trace_id);
if (r->otel->trace == NULL) {
nxt_log(task, NXT_LOG_ERR, "error generating otel span");
nxt_otel_state_transition(r->otel, NXT_OTEL_ERROR_STATE);
return;
}

nxt_otel_state_transition(r->otel, NXT_OTEL_HEADER_STATE);
}


void
nxt_otel_span_collect(nxt_task_t *task, nxt_http_request_t *r)
{
nxt_log(task, NXT_LOG_DEBUG, "collecting span by adding the task to the fast work queue");
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_otel_send_trace_and_span_data, task, r, NULL);
nxt_otel_state_transition(r->otel, NXT_OTEL_UNINIT_STATE);
}


void
nxt_otel_test_and_call_state(nxt_task_t *task, nxt_http_request_t *r)
{
if (r->otel == NULL) {
return;
Expand All @@ -217,19 +212,19 @@ nxt_otel_test_and_call_state(nxt_task_t *t, nxt_http_request_t *r)
case NXT_OTEL_UNINIT_STATE:
return;
case NXT_OTEL_INIT_STATE:
nxt_otel_trace_and_span_init(t, r);
nxt_otel_trace_and_span_init(task, r);
break;
case NXT_OTEL_HEADER_STATE:
nxt_otel_span_add_headers(t, r);
nxt_otel_span_add_headers(task, r);
break;
case NXT_OTEL_BODY_STATE:
nxt_otel_span_add_body(r);
break;
case NXT_OTEL_COLLECT_STATE:
nxt_otel_span_collect(t, r);
nxt_otel_span_collect(task, r);
break;
case NXT_OTEL_ERROR_STATE:
nxt_otel_error(t, r);
nxt_otel_error(task, r);
break;
}
}
Expand Down Expand Up @@ -306,53 +301,3 @@ nxt_otel_parse_tracestate(void *ctx, nxt_http_field_t *field, uintptr_t data)

return NXT_OK;
}


inline nxt_int_t
nxt_otel_validate_endpoint(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value,
void *data)
{
// This function is a stub for now
return NXT_OK;
}


nxt_int_t
nxt_otel_validate_batch_size(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value,
void *data)
{
double batch_size;
batch_size = nxt_conf_get_number(value);
if (batch_size <= 0) {
return NXT_ERROR;
}

return NXT_OK;
}


nxt_int_t
nxt_otel_validate_protocol(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value,
void *data)
{
nxt_str_t proto;

nxt_conf_get_string(value, &proto);
if (nxt_str_eq(&proto, "HTTP", 4) ||
nxt_str_eq(&proto, "http", 4)) {
goto happy;
}

if (nxt_str_eq(&proto, "GRPC", 4) ||
nxt_str_eq(&proto, "grpc", 4)) {
goto happy;
}

return NXT_ERROR;

happy:
return NXT_OK;
}
Loading

0 comments on commit 22f1de9

Please sign in to comment.