Skip to content

Commit

Permalink
golang filter: support full-duplex (envoyproxy#33377)
Browse files Browse the repository at this point in the history

Signed-off-by: doujiang24 <[email protected]>
  • Loading branch information
doujiang24 authored Jun 11, 2024
1 parent 9b9c2f4 commit 34fa1fc
Show file tree
Hide file tree
Showing 34 changed files with 1,256 additions and 1,011 deletions.
6 changes: 6 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,12 @@ updates:
interval: daily
time: "06:00"

- package-ecosystem: "gomod"
directory: "/contrib/golang/filters/http/test/test_data/websocket"
schedule:
interval: daily
time: "06:00"

- package-ecosystem: "gomod"
directory: "/contrib/golang/filters/network/test/test_data"
groups:
Expand Down
4 changes: 4 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ behavior_changes:
:ref:`TlvsMetadata type <envoy_v3_api_msg_data.core.v3.TlvsMetadata>`.
This change can be temporarily disabled by setting the runtime flag
``envoy.reloadable_features.use_typed_metadata_in_proxy_protocol_listener`` to ``false``.
- area: golang
change: |
Move ``Continue``, ``SendLocalReply`` and ``RecoverPanic` from ``FilterCallbackHandler`` to ``DecoderFilterCallbacks`` and
``EncoderFilterCallbacks``, to support full-duplex processing.
minor_behavior_changes:
# *Changes that may cause incompatibilities for some users, but should not for most*
Expand Down
4 changes: 2 additions & 2 deletions contrib/golang/common/dso/dso.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ void HttpFilterDsoImpl::envoyGoFilterDestroyHttpPluginConfig(GoUint64 p0, GoInt
return envoy_go_filter_destroy_http_plugin_config_(p0, p1);
}

GoUint64 HttpFilterDsoImpl::envoyGoFilterOnHttpHeader(httpRequest* p0, GoUint64 p1, GoUint64 p2,
GoUint64 HttpFilterDsoImpl::envoyGoFilterOnHttpHeader(processState* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) {
ASSERT(envoy_go_filter_on_http_header_ != nullptr);
return envoy_go_filter_on_http_header_(p0, p1, p2, p3);
}

GoUint64 HttpFilterDsoImpl::envoyGoFilterOnHttpData(httpRequest* p0, GoUint64 p1, GoUint64 p2,
GoUint64 HttpFilterDsoImpl::envoyGoFilterOnHttpData(processState* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) {
ASSERT(envoy_go_filter_on_http_data_ != nullptr);
return envoy_go_filter_on_http_data_(p0, p1, p2, p3);
Expand Down
13 changes: 7 additions & 6 deletions contrib/golang/common/dso/dso.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ class HttpFilterDso : public Dso {
virtual GoUint64 envoyGoFilterMergeHttpPluginConfig(GoUint64 p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) PURE;
virtual void envoyGoFilterDestroyHttpPluginConfig(GoUint64 p0, GoInt p1) PURE;
virtual GoUint64 envoyGoFilterOnHttpHeader(httpRequest* p0, GoUint64 p1, GoUint64 p2,
virtual GoUint64 envoyGoFilterOnHttpHeader(processState* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) PURE;
virtual GoUint64 envoyGoFilterOnHttpData(httpRequest* p0, GoUint64 p1, GoUint64 p2,
virtual GoUint64 envoyGoFilterOnHttpData(processState* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) PURE;
virtual void envoyGoFilterOnHttpLog(httpRequest* p0, int p1) PURE;
virtual void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) PURE;
Expand All @@ -53,9 +53,10 @@ class HttpFilterDsoImpl : public HttpFilterDso {
GoUint64 envoyGoFilterMergeHttpPluginConfig(GoUint64 p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) override;
void envoyGoFilterDestroyHttpPluginConfig(GoUint64 p0, GoInt p1) override;
GoUint64 envoyGoFilterOnHttpHeader(httpRequest* p0, GoUint64 p1, GoUint64 p2,
GoUint64 envoyGoFilterOnHttpHeader(processState* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) override;
GoUint64 envoyGoFilterOnHttpData(httpRequest* p0, GoUint64 p1, GoUint64 p2, GoUint64 p3) override;
GoUint64 envoyGoFilterOnHttpData(processState* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) override;
void envoyGoFilterOnHttpLog(httpRequest* p0, int p1) override;
void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) override;
void envoyGoRequestSemaDec(httpRequest* p0) override;
Expand All @@ -65,9 +66,9 @@ class HttpFilterDsoImpl : public HttpFilterDso {
GoUint64 (*envoy_go_filter_merge_http_plugin_config_)(GoUint64 p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) = {nullptr};
void (*envoy_go_filter_destroy_http_plugin_config_)(GoUint64 p0, GoInt p1) = {nullptr};
GoUint64 (*envoy_go_filter_on_http_header_)(httpRequest* p0, GoUint64 p1, GoUint64 p2,
GoUint64 (*envoy_go_filter_on_http_header_)(processState* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) = {nullptr};
GoUint64 (*envoy_go_filter_on_http_data_)(httpRequest* p0, GoUint64 p1, GoUint64 p2,
GoUint64 (*envoy_go_filter_on_http_data_)(processState* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) = {nullptr};
void (*envoy_go_filter_on_http_log_)(httpRequest* p0, GoUint64 p1) = {nullptr};
void (*envoy_go_filter_on_http_destroy_)(httpRequest* p0, GoUint64 p1) = {nullptr};
Expand Down
4 changes: 2 additions & 2 deletions contrib/golang/common/dso/libgolang.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ extern GoUint64 envoyGoFilterMergeHttpPluginConfig(GoUint64 name_ptr, GoUint64 n

// go:linkname envoyGoFilterOnHttpHeader
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpHeader
extern GoUint64 envoyGoFilterOnHttpHeader(httpRequest* r, GoUint64 end_stream, GoUint64 header_num,
extern GoUint64 envoyGoFilterOnHttpHeader(processState* r, GoUint64 end_stream, GoUint64 header_num,
GoUint64 header_bytes);

// go:linkname envoyGoFilterOnHttpData
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpData
extern GoUint64 envoyGoFilterOnHttpData(httpRequest* r, GoUint64 end_stream, GoUint64 buffer,
extern GoUint64 envoyGoFilterOnHttpData(processState* s, GoUint64 end_stream, GoUint64 buffer,
GoUint64 length);

// go:linkname envoyGoFilterOnHttpLog
Expand Down
4 changes: 2 additions & 2 deletions contrib/golang/common/dso/test/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ class MockHttpFilterDsoImpl : public HttpFilterDso {
(GoUint64 p0, GoUint64 p1, GoUint64 p2, GoUint64 p3));
MOCK_METHOD(void, envoyGoFilterDestroyHttpPluginConfig, (GoUint64 p0, GoInt p1));
MOCK_METHOD(GoUint64, envoyGoFilterOnHttpHeader,
(httpRequest * p0, GoUint64 p1, GoUint64 p2, GoUint64 p3));
(processState * p0, GoUint64 p1, GoUint64 p2, GoUint64 p3));
MOCK_METHOD(GoUint64, envoyGoFilterOnHttpData,
(httpRequest * p0, GoUint64 p1, GoUint64 p2, GoUint64 p3));
(processState * p0, GoUint64 p1, GoUint64 p2, GoUint64 p3));
MOCK_METHOD(void, envoyGoFilterOnHttpLog, (httpRequest * p0, int p1));
MOCK_METHOD(void, envoyGoFilterOnHttpDestroy, (httpRequest * p0, int p1));
MOCK_METHOD(void, envoyGoRequestSemaDec, (httpRequest * p0));
Expand Down
8 changes: 6 additions & 2 deletions contrib/golang/common/dso/test/test_data/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ typedef struct {
int foo;
} httpRequest;
typedef struct {
int state;
} processState;
typedef struct {
unsigned long long int plugin_name_ptr;
unsigned long long int plugin_name_len;
Expand Down Expand Up @@ -43,12 +47,12 @@ func envoyGoFilterMergeHttpPluginConfig(namePtr, nameLen, parentId, childId uint
}

//export envoyGoFilterOnHttpHeader
func envoyGoFilterOnHttpHeader(r *C.httpRequest, endStream, headerNum, headerBytes uint64) uint64 {
func envoyGoFilterOnHttpHeader(s *C.processState, endStream, headerNum, headerBytes uint64) uint64 {
return 0
}

//export envoyGoFilterOnHttpData
func envoyGoFilterOnHttpData(r *C.httpRequest, endStream, buffer, length uint64) uint64 {
func envoyGoFilterOnHttpData(s *C.processState, endStream, buffer, length uint64) uint64 {
return 0
}

Expand Down
52 changes: 32 additions & 20 deletions contrib/golang/common/go/api/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,19 @@ typedef struct { // NOLINT(modernize-use-using)
uint64_t len;
} Cstring;

struct httpRequest;

typedef struct { // NOLINT(modernize-use-using)
struct httpRequest* req;
int is_encoding;
int state;
} processState;

typedef struct httpRequest { // NOLINT(modernize-use-using)
Cstring plugin_name;
uint64_t configId;
int phase;
// The ID of the worker that is processing this request, this enables the go filter to dedicate
// memory to each worker and not require locks
uint32_t worker_id;
} httpRequest;

Expand Down Expand Up @@ -53,31 +62,33 @@ typedef enum { // NOLINT(modernize-use-using)
CAPISerializationFailure = -8,
} CAPIStatus;

CAPIStatus envoyGoFilterHttpClearRouteCache(void* r);
CAPIStatus envoyGoFilterHttpContinue(void* r, int status);
CAPIStatus envoyGoFilterHttpSendLocalReply(void* r, int response_code, void* body_text_data,
/* These APIs are related to the decode/encode phase, use the pointer of processState. */
CAPIStatus envoyGoFilterHttpContinue(void* s, int status);
CAPIStatus envoyGoFilterHttpSendLocalReply(void* s, int response_code, void* body_text_data,
int body_text_len, void* headers, int headers_num,
long long int grpc_status, void* details_data,
int details_len);
CAPIStatus envoyGoFilterHttpSendPanicReply(void* r, void* details_data, int details_len);
CAPIStatus envoyGoFilterHttpSendPanicReply(void* s, void* details_data, int details_len);

CAPIStatus envoyGoFilterHttpGetHeader(void* r, void* key_data, int key_len, uint64_t* value_data,
CAPIStatus envoyGoFilterHttpGetHeader(void* s, void* key_data, int key_len, uint64_t* value_data,
int* value_len);
CAPIStatus envoyGoFilterHttpCopyHeaders(void* r, void* strs, void* buf);
CAPIStatus envoyGoFilterHttpSetHeaderHelper(void* r, void* key_data, int key_len, void* value_data,
CAPIStatus envoyGoFilterHttpCopyHeaders(void* s, void* strs, void* buf);
CAPIStatus envoyGoFilterHttpSetHeaderHelper(void* s, void* key_data, int key_len, void* value_data,
int value_len, headerAction action);
CAPIStatus envoyGoFilterHttpRemoveHeader(void* r, void* key_data, int key_len);
CAPIStatus envoyGoFilterHttpRemoveHeader(void* s, void* key_data, int key_len);

CAPIStatus envoyGoFilterHttpGetBuffer(void* r, uint64_t buffer, void* value);
CAPIStatus envoyGoFilterHttpDrainBuffer(void* r, uint64_t buffer, uint64_t length);
CAPIStatus envoyGoFilterHttpSetBufferHelper(void* r, uint64_t buffer, void* data, int length,
CAPIStatus envoyGoFilterHttpGetBuffer(void* s, uint64_t buffer, void* value);
CAPIStatus envoyGoFilterHttpDrainBuffer(void* s, uint64_t buffer, uint64_t length);
CAPIStatus envoyGoFilterHttpSetBufferHelper(void* s, uint64_t buffer, void* data, int length,
bufferAction action);

CAPIStatus envoyGoFilterHttpCopyTrailers(void* r, void* strs, void* buf);
CAPIStatus envoyGoFilterHttpSetTrailer(void* r, void* key_data, int key_len, void* value,
CAPIStatus envoyGoFilterHttpCopyTrailers(void* s, void* strs, void* buf);
CAPIStatus envoyGoFilterHttpSetTrailer(void* s, void* key_data, int key_len, void* value,
int value_len, headerAction action);
CAPIStatus envoyGoFilterHttpRemoveTrailer(void* r, void* key_data, int key_len);
CAPIStatus envoyGoFilterHttpRemoveTrailer(void* s, void* key_data, int key_len);

/* These APIs have nothing to do with the decode/encode phase, use the pointer of httpRequest. */
CAPIStatus envoyGoFilterHttpClearRouteCache(void* r);
CAPIStatus envoyGoFilterHttpGetStringValue(void* r, int id, uint64_t* value_data, int* value_len);
CAPIStatus envoyGoFilterHttpGetIntegerValue(void* r, int id, uint64_t* value);

Expand All @@ -86,12 +97,7 @@ CAPIStatus envoyGoFilterHttpGetDynamicMetadata(void* r, void* name_data, int nam
CAPIStatus envoyGoFilterHttpSetDynamicMetadata(void* r, void* name_data, int name_len,
void* key_data, int key_len, void* buf_data,
int buf_len);

void envoyGoFilterLog(uint32_t level, void* message_data, int message_len);
uint32_t envoyGoFilterLogLevel();

void envoyGoFilterHttpFinalize(void* r, int reason);
void envoyGoConfigHttpFinalize(void* c);

CAPIStatus envoyGoFilterHttpSetStringFilterState(void* r, void* key_data, int key_len,
void* value_data, int value_len, int state_type,
Expand All @@ -101,6 +107,12 @@ CAPIStatus envoyGoFilterHttpGetStringFilterState(void* r, void* key_data, int ke
CAPIStatus envoyGoFilterHttpGetStringProperty(void* r, void* key_data, int key_len,
uint64_t* value_data, int* value_len, int* rc);

/* These APIs have nothing to do with request */
void envoyGoFilterLog(uint32_t level, void* message_data, int message_len);
uint32_t envoyGoFilterLogLevel();

/* These APIs are related to config, use the pointer of config. */
void envoyGoConfigHttpFinalize(void* c);
CAPIStatus envoyGoFilterHttpDefineMetric(void* c, uint32_t metric_type, void* name_data,
int name_len, uint32_t* metric_id);
CAPIStatus envoyGoFilterHttpIncrementMetric(void* c, uint32_t metric_id, int64_t offset);
Expand Down
47 changes: 26 additions & 21 deletions contrib/golang/common/go/api/capi.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,54 @@ package api
import "unsafe"

type HttpCAPI interface {
ClearRouteCache(r unsafe.Pointer)
HttpContinue(r unsafe.Pointer, status uint64)
HttpSendLocalReply(r unsafe.Pointer, responseCode int, bodyText string, headers map[string][]string, grpcStatus int64, details string)
/* These APIs are related to the decode/encode phase, use the pointer of processState. */
HttpContinue(s unsafe.Pointer, status uint64)
HttpSendLocalReply(s unsafe.Pointer, responseCode int, bodyText string, headers map[string][]string, grpcStatus int64, details string)

// Send a specialized reply that indicates that the filter has failed on the go side. Internally this is used for
// when unhandled panics are detected.
HttpSendPanicReply(r unsafe.Pointer, details string)
HttpSendPanicReply(s unsafe.Pointer, details string)
// experience api, memory unsafe
HttpGetHeader(r unsafe.Pointer, key string) string
HttpCopyHeaders(r unsafe.Pointer, num uint64, bytes uint64) map[string][]string
HttpSetHeader(r unsafe.Pointer, key string, value string, add bool)
HttpRemoveHeader(r unsafe.Pointer, key string)
HttpGetHeader(s unsafe.Pointer, key string) string
HttpCopyHeaders(s unsafe.Pointer, num uint64, bytes uint64) map[string][]string
HttpSetHeader(s unsafe.Pointer, key string, value string, add bool)
HttpRemoveHeader(s unsafe.Pointer, key string)

HttpGetBuffer(s unsafe.Pointer, bufferPtr uint64, length uint64) []byte
HttpDrainBuffer(s unsafe.Pointer, bufferPtr uint64, length uint64)
HttpSetBufferHelper(s unsafe.Pointer, bufferPtr uint64, value string, action BufferAction)
HttpSetBytesBufferHelper(s unsafe.Pointer, bufferPtr uint64, value []byte, action BufferAction)

HttpGetBuffer(r unsafe.Pointer, bufferPtr uint64, length uint64) []byte
HttpDrainBuffer(r unsafe.Pointer, bufferPtr uint64, length uint64)
HttpSetBufferHelper(r unsafe.Pointer, bufferPtr uint64, value string, action BufferAction)
HttpSetBytesBufferHelper(r unsafe.Pointer, bufferPtr uint64, value []byte, action BufferAction)
HttpCopyTrailers(s unsafe.Pointer, num uint64, bytes uint64) map[string][]string
HttpSetTrailer(s unsafe.Pointer, key string, value string, add bool)
HttpRemoveTrailer(s unsafe.Pointer, key string)

HttpCopyTrailers(r unsafe.Pointer, num uint64, bytes uint64) map[string][]string
HttpSetTrailer(r unsafe.Pointer, key string, value string, add bool)
HttpRemoveTrailer(r unsafe.Pointer, key string)
/* These APIs have nothing to do with the decode/encode phase, use the pointer of httpRequest. */
ClearRouteCache(r unsafe.Pointer)

HttpGetStringValue(r unsafe.Pointer, id int) (string, bool)
HttpGetIntegerValue(r unsafe.Pointer, id int) (uint64, bool)

HttpGetDynamicMetadata(r unsafe.Pointer, filterName string) map[string]interface{}
HttpSetDynamicMetadata(r unsafe.Pointer, filterName string, key string, value interface{})

HttpLog(level LogType, message string)
HttpLogLevel() LogType

HttpFinalize(r unsafe.Pointer, reason int)
HttpConfigFinalize(c unsafe.Pointer)

HttpSetStringFilterState(r unsafe.Pointer, key string, value string, stateType StateType, lifeSpan LifeSpan, streamSharing StreamSharing)
HttpGetStringFilterState(r unsafe.Pointer, key string) string

HttpGetStringProperty(r unsafe.Pointer, key string) (string, error)

HttpFinalize(r unsafe.Pointer, reason int)

/* These APIs are related to config, use the pointer of config. */
HttpDefineMetric(c unsafe.Pointer, metricType MetricType, name string) uint32
HttpIncrementMetric(c unsafe.Pointer, metricId uint32, offset int64)
HttpGetMetric(c unsafe.Pointer, metricId uint32) uint64
HttpRecordMetric(c unsafe.Pointer, metricId uint32, value uint64)
HttpConfigFinalize(c unsafe.Pointer)

/* These APIs have nothing to do with request */
HttpLog(level LogType, message string)
HttpLogLevel() LogType
}

type NetworkCAPI interface {
Expand Down
31 changes: 22 additions & 9 deletions contrib/golang/common/go/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,10 @@ type StreamInfo interface {

type StreamFilterCallbacks interface {
StreamInfo() StreamInfo
}

type FilterCallbacks interface {
StreamFilterCallbacks
// ClearRouteCache clears the route cache for the current request, and filtermanager will re-fetch the route in the next filter.
// Please be careful to invoke it, since filtermanager will raise an 404 route_not_found response when failed to re-fetch a route.
ClearRouteCache()
// Continue or SendLocalReply should be last API invoked, no more code after them.
Continue(StatusType)
SendLocalReply(responseCode int, bodyText string, headers map[string][]string, grpcStatus int64, details string)
// RecoverPanic recover panic in defer and terminate the request by SendLocalReply with 500 status code.
RecoverPanic()
Log(level LogType, msg string)
LogLevel() LogType
// GetProperty fetch Envoy attribute and return the value as a string.
Expand All @@ -180,8 +172,29 @@ type FilterCallbacks interface {
// TODO add more for filter callbacks
}

// FilterProcessCallbacks is the interface for filter to process request/response in decode/encode phase.
type FilterProcessCallbacks interface {
// Continue or SendLocalReply should be last API invoked, no more code after them.
Continue(StatusType)
SendLocalReply(responseCode int, bodyText string, headers map[string][]string, grpcStatus int64, details string)
// RecoverPanic recover panic in defer and terminate the request by SendLocalReply with 500 status code.
RecoverPanic()
}

type DecoderFilterCallbacks interface {
FilterProcessCallbacks
}

type EncoderFilterCallbacks interface {
FilterProcessCallbacks
}

type FilterCallbackHandler interface {
FilterCallbacks
StreamFilterCallbacks
// DecoderFilterCallbacks could only be used in DecodeXXX phases.
DecoderFilterCallbacks() DecoderFilterCallbacks
// EncoderFilterCallbacks could only be used in EncodeXXX phases.
EncoderFilterCallbacks() EncoderFilterCallbacks
}

type DynamicMetadata interface {
Expand Down
Loading

0 comments on commit 34fa1fc

Please sign in to comment.