Skip to content

Commit

Permalink
fix: chunk bodies and process partial (#252)
Browse files Browse the repository at this point in the history
  • Loading branch information
M4tteoP authored Jan 4, 2024
1 parent 475eda6 commit 393d4d6
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 69 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ env:
TINYGO_VERSION: 0.30.0
# Run e2e tests against latest two releases and latest dev
ENVOY_IMAGES: >
envoyproxy/envoy:v1.28-latest
envoyproxy/envoy:v1.27-latest
envoyproxy/envoy:v1.26-latest
envoyproxy/envoy-dev:latest
istio/proxyv2:1.18.2
istio/proxyv2:1.19.0
istio/proxyv2:1.20.1
istio/proxyv2:1.19.5
jobs:
build:
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ go 1.20
require (
github.com/corazawaf/coraza-wasilibs v0.1.0
github.com/corazawaf/coraza/v3 v3.0.4
github.com/stretchr/testify v1.8.0
github.com/tetratelabs/proxy-wasm-go-sdk v0.22.0
github.com/stretchr/testify v1.8.4
github.com/tetratelabs/proxy-wasm-go-sdk v0.22.1-0.20240102162926-b089ccb94219
github.com/tidwall/gjson v1.17.0
github.com/wasilibs/nottinygc v0.7.0
)
Expand All @@ -18,7 +18,7 @@ require (
github.com/magefile/mage v1.15.0 // indirect
github.com/petar-dambovaliev/aho-corasick v0.0.0-20230725210150-fb29fc3c913e // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/tetratelabs/wazero v1.5.0 // indirect
github.com/tetratelabs/wazero v1.6.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/wasilibs/go-aho-corasick v0.5.0 // indirect
Expand Down
17 changes: 6 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ github.com/corazawaf/coraza/v3 v3.0.4 h1:Llemgoh0hp2NggCwcWN8lNiV4Pfe+AWzf1oEcas
github.com/corazawaf/coraza/v3 v3.0.4/go.mod h1:3fTYjY5BZv3nezLpH6NAap0gr3jZfbQWUAu2GF17ET4=
github.com/corazawaf/libinjection-go v0.1.2 h1:oeiV9pc5rvJ+2oqOqXEAMJousPpGiup6f7Y3nZj5GoM=
github.com/corazawaf/libinjection-go v0.1.2/go.mod h1:OP4TM7xdJ2skyXqNX1AN1wN5nNZEmJNuWbNPOItn7aw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/foxcpp/go-mockdns v1.0.0 h1:7jBqxd3WDWwi/6WhDvacvH1XsN3rOLXyHM1uhvIx6FI=
Expand All @@ -20,15 +19,12 @@ github.com/petar-dambovaliev/aho-corasick v0.0.0-20230725210150-fb29fc3c913e h1:
github.com/petar-dambovaliev/aho-corasick v0.0.0-20230725210150-fb29fc3c913e/go.mod h1:EHPiTAKtiFmrMldLUNswFwfZ2eJIYBHktdaUTZxYWRw=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/tetratelabs/proxy-wasm-go-sdk v0.22.0 h1:kS7BvMKN+FiptV4pfwiNX8e3q14evxAWkhYbxt8EI1M=
github.com/tetratelabs/proxy-wasm-go-sdk v0.22.0/go.mod h1:qkW5MBz2jch2u8bS59wws65WC+Gtx3x0aPUX5JL7CXI=
github.com/tetratelabs/wazero v1.5.0 h1:Yz3fZHivfDiZFUXnWMPUoiW7s8tC1sjdBtlJn08qYa0=
github.com/tetratelabs/wazero v1.5.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tetratelabs/proxy-wasm-go-sdk v0.22.1-0.20240102162926-b089ccb94219 h1:lBbCzjjZJl2+deUwrRv5+GQle18AkCoUkFwL8sPVvIE=
github.com/tetratelabs/proxy-wasm-go-sdk v0.22.1-0.20240102162926-b089ccb94219/go.mod h1:YqR8JZaY3Ev9ihXgjzAQAMkXEzPKKmy4Q5rsVWt4XGk=
github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzlP3g=
github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A=
github.com/tidwall/gjson v1.17.0 h1:/Jocvlh98kcTfpN2+JzGQWQcqrPQwDrVEMApx/M5ZwM=
github.com/tidwall/gjson v1.17.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
Expand All @@ -54,7 +50,6 @@ golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
rsc.io/binaryregexp v0.2.0 h1:HfqmD5MEmC0zvwBuF187nq9mdnXjXsSivRiXN7SmRkE=
Expand Down
50 changes: 42 additions & 8 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"fmt"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"testing"

Expand Down Expand Up @@ -229,8 +231,8 @@ func TestLifecycle(t *testing.T) {
{
name: "request body accepted, no request body access",
inlineRules: `
SecRuleEngine On\nSecRequestBodyAccess Off\nSecRule REQUEST_BODY \"animal=bear\" \"id:101,phase:2,t:lowercase,deny\"
`,
SecRuleEngine On\nSecRequestBodyAccess Off\nSecRule REQUEST_BODY \"animal=bear\" \"id:101,phase:2,t:lowercase,deny\"
`,
requestHdrsAction: types.ActionContinue,
requestBodyAction: types.ActionContinue,
responseHdrsAction: types.ActionContinue,
Expand All @@ -240,8 +242,8 @@ func TestLifecycle(t *testing.T) {
{
name: "request body accepted, payload above process partial",
inlineRules: `
SecRuleEngine On\nSecRequestBodyAccess On\nSecRequestBodyLimit 2\nSecRequestBodyLimitAction ProcessPartial\nSecRule REQUEST_BODY \"animal=bear\" \"id:101,phase:2,t:lowercase,deny\"
`,
SecRuleEngine On\nSecRequestBodyAccess On\nSecRequestBodyLimit 2\nSecRequestBodyLimitAction ProcessPartial\nSecRule REQUEST_BODY \"animal=bear\" \"id:101,phase:2,t:lowercase,deny\"
`,
requestHdrsAction: types.ActionContinue,
requestBodyAction: types.ActionContinue,
responseHdrsAction: types.ActionContinue,
Expand Down Expand Up @@ -450,6 +452,16 @@ func TestLifecycle(t *testing.T) {
// Stream bodies in chunks of 5

if requestHdrsAction == types.ActionContinue {
totalBodysent := 0
requestBodyAccess := strings.Contains(tt.inlineRules, "SecRequestBodyAccess On")
requestBodyProcessPartial := strings.Contains(tt.inlineRules, "SecRequestBodyLimitAction ProcessPartial")
var requestBodyLimit int
matches := regexp.MustCompile(`SecRequestBodyLimit (\d+)`).FindStringSubmatch(tt.inlineRules)
if len(matches) > 1 {
var err error
requestBodyLimit, err = strconv.Atoi(matches[1])
require.NoError(t, err)
}
for i := 0; i < len(reqBody); i += 5 {
eos := i+5 >= len(reqBody)
var body []byte
Expand All @@ -458,13 +470,20 @@ func TestLifecycle(t *testing.T) {
} else {
body = reqBody[i : i+5]
}
totalBodysent += len(body)
requestBodyAction = host.CallOnRequestBody(id, body, eos)
requestBodyAccess := strings.Contains(tt.inlineRules, "SecRequestBodyAccess On")
switch {
case eos:
requireEqualAction(t, tt.requestBodyAction, requestBodyAction, "unexpected body action, want %q, have %q on end of stream")
case requestBodyAccess:
// Reject: We expect pause in all cases with action Reject: being the limit reached or not
case requestBodyAccess && !requestBodyProcessPartial:
requireEqualAction(t, types.ActionPause, requestBodyAction, "unexpected request body action, want %q, have %q")
// ProcessPartial: we expect pause until the limit is reached
case requestBodyAccess && requestBodyProcessPartial && totalBodysent < requestBodyLimit:
requireEqualAction(t, types.ActionPause, requestBodyAction, "unexpected request body action, want %q, have %q")
// ProcessPartial: we expect tt.requestBodyAction when the limit is reached
case requestBodyAccess && requestBodyProcessPartial && totalBodysent >= requestBodyLimit:
requireEqualAction(t, tt.requestBodyAction, requestBodyAction, "unexpected request body action, want %q, have %q")
default:
requireEqualAction(t, types.ActionContinue, requestBodyAction, "unexpected request body action, want %q, have %q")
}
Expand All @@ -478,6 +497,15 @@ func TestLifecycle(t *testing.T) {

if responseHdrsAction == types.ActionContinue {
responseBodyAccess := strings.Contains(tt.inlineRules, "SecResponseBodyAccess On")
responseBodyProcessPartial := strings.Contains(tt.inlineRules, "SecResponseBodyLimitAction ProcessPartial")
var responseBodyLimit int
matches := regexp.MustCompile(`SecResponseBodyLimit (\d+)`).FindStringSubmatch(tt.inlineRules)
if len(matches) > 1 {
var err error
responseBodyLimit, err = strconv.Atoi(matches[1])
require.NoError(t, err)
}
totalBodysent := 0
for i := 0; i < len(respBody); i += 5 {
eos := i+5 >= len(respBody)
var body []byte
Expand All @@ -486,15 +514,21 @@ func TestLifecycle(t *testing.T) {
} else {
body = respBody[i : i+5]
}
totalBodysent += len(body)
responseBodyAction := host.CallOnResponseBody(id, body, eos)
switch {
// expectResponseRejectLimitActionSinceFirstChunk: writing the first chunk (len(respBody) bytes), it is expected to reach
// the ResponseBodyLimit with the Action set to Reject. When these conditions happen, ActionContinue will be returned,
// with the interruption enforced replacing the body with null bytes (checked with tt.respondedNullBody)
case eos, tt.expectResponseRejectSinceFirstChunk:
requireEqualAction(t, types.ActionContinue, responseBodyAction, "unexpected response body action, want %q, have %q on end of stream")
case responseBodyAccess:
requireEqualAction(t, types.ActionPause, responseBodyAction, "unexpected response body action, want %q, have %q")
// Reject: We expect pause in all cases with action Reject: being the limit reached or not
// It would either be paused because we are callectin the body or because the limit was reached and we triggered the action
case responseBodyAccess && !responseBodyProcessPartial:
requireEqualAction(t, types.ActionPause, responseBodyAction, "unexpected request body action, want %q, have %q")
// ProcessPartial: we expect pause until the limit is reached
case responseBodyAccess && responseBodyProcessPartial && totalBodysent < responseBodyLimit:
requireEqualAction(t, types.ActionPause, responseBodyAction, "unexpected request body action, want %q, have %q")
default:
requireEqualAction(t, types.ActionContinue, responseBodyAction, "unexpected response body action, want %q, have %q")
}
Expand Down
111 changes: 67 additions & 44 deletions wasmplugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,36 +384,42 @@ func (ctx *httpContext) OnHttpRequestBody(bodySize int, endOfStream bool) types.
return types.ActionContinue
}

if bodySize > 0 {
b, err := proxywasm.GetHttpRequestBody(ctx.bodyReadIndex, bodySize)
if err == nil {
interruption, _, err := tx.WriteRequestBody(b)
if err != nil {
ctx.logger.Error().Err(err).Msg("Failed to write request body")
return types.ActionContinue
}

if interruption != nil {
return ctx.handleInterruption(interruptionPhaseHttpRequestBody, interruption)
}

ctx.bodyReadIndex += bodySize
} else if err != types.ErrorStatusNotFound {
// When using FWT sometimes (it is inconsistent) we receive calls where ctx.bodyReadIndex == bodySize
// meaning that the incoming size in the body is the same as the already read body.
// When that happens, this code fails to retrieve the body through proxywasm.GetHttpRequestBody
// as the total body is from 0 up to X bytes and since the last bodySize = X it attempts to read
// from X up to X bytes and it returns a types.ErrorStatusNotFound. This could happen despite
// endOfStream being true or false.
// The tests in 920410 show this problem.
// TODO(jcchavezs): Verify if this is a FTW problem.
ctx.logger.Error().
Err(err).
Int("body_read_index", ctx.bodyReadIndex).
// bodySize is the size of the whole body received so far, not the size of the current chunk
chunkSize := bodySize - ctx.bodyReadIndex
// OnHttpRequestBody might be called more than once with the same data, we check if there is new data available to be read
if chunkSize > 0 {
bodyChunk, err := proxywasm.GetHttpRequestBody(ctx.bodyReadIndex, chunkSize)
if err != nil {
ctx.logger.Error().Err(err).
Int("body_size", bodySize).
Int("body_read_index", ctx.bodyReadIndex).
Int("chunk_size", chunkSize).
Msg("Failed to read request body")
return types.ActionContinue
}
readchunkSize := len(bodyChunk)
if readchunkSize != chunkSize {
ctx.logger.Warn().Int("read_chunk_size", readchunkSize).Int("chunk_size", chunkSize).Msg("Request chunk size read is different from the computed one")
}
interruption, writtenBytes, err := tx.WriteRequestBody(bodyChunk)
if err != nil {
ctx.logger.Error().Err(err).Msg("Failed to write request body")
return types.ActionContinue
}
if interruption != nil {
return ctx.handleInterruption(interruptionPhaseHttpRequestBody, interruption)
}

// If not the whole chunk has been written, it implicitly means that we reached the waf request body limit.
// Internally ProcessRequestBody has been called and it did not raise any interruption (just checked in the condition above).
if writtenBytes < readchunkSize {
// No further body data will be processed
// Setting processedRequestBody avoid to call more than once ProcessRequestBody
ctx.processedRequestBody = true
return types.ActionContinue
}

ctx.bodyReadIndex += readchunkSize
}

if endOfStream {
Expand Down Expand Up @@ -531,6 +537,10 @@ func (ctx *httpContext) OnHttpResponseBody(bodySize int, endOfStream bool) types
return replaceResponseBodyWhenInterrupted(ctx.logger, bodySize)
}

if ctx.processedResponseBody {
return types.ActionContinue
}

if ctx.tx == nil {
return types.ActionContinue
}
Expand Down Expand Up @@ -562,33 +572,46 @@ func (ctx *httpContext) OnHttpResponseBody(bodySize int, endOfStream bool) types
return types.ActionContinue
}

if bodySize > 0 {
body, err := proxywasm.GetHttpResponseBody(ctx.bodyReadIndex, bodySize)
if err == nil {
interruption, _, err := tx.WriteResponseBody(body)
if err != nil {
ctx.logger.Error().Err(err).Msg("Failed to write response body")
return types.ActionContinue
}
// bodyReadIndex has to be updated before evaluating the interruption
// it is internally needed to replace the full body if the tx is interrupted
ctx.bodyReadIndex += bodySize
if interruption != nil {
return ctx.handleInterruption(interruptionPhaseHttpResponseBody, interruption)
}
} else if err != types.ErrorStatusNotFound {
chunkSize := bodySize - ctx.bodyReadIndex
if chunkSize > 0 {
bodyChunk, err := proxywasm.GetHttpResponseBody(ctx.bodyReadIndex, chunkSize)
if err != nil {
ctx.logger.Error().
Int("body_read_index", ctx.bodyReadIndex).
Int("body_size", bodySize).
Int("body_read_index", ctx.bodyReadIndex).
Int("chunk_size", chunkSize).
Err(err).
Msg("Failed to read response body")
return types.ActionContinue
}

readchunkSize := len(bodyChunk)
if readchunkSize != chunkSize {
ctx.logger.Warn().Int("read_chunk_size", readchunkSize).Int("chunk_size", chunkSize).Msg("Response chunk size read is different from the computed one")
}
interruption, writtenBytes, err := tx.WriteResponseBody(bodyChunk)
if err != nil {
ctx.logger.Error().Err(err).Msg("Failed to write response body")
return types.ActionContinue
}
// bodyReadIndex has to be updated before evaluating the interruption
// it is internally needed to replace the full body if the transaction is interrupted
ctx.bodyReadIndex += readchunkSize
if interruption != nil {
return ctx.handleInterruption(interruptionPhaseHttpResponseBody, interruption)
}
// If not the whole chunk has been written, it implicitly means that we reached the waf response body limit,
// internally ProcessResponseBody has been called and it did not raise any interruption (just checked in the condition above).
if writtenBytes < readchunkSize {
// no further body data will be processed
ctx.processedResponseBody = true
return types.ActionContinue
}
}

if endOfStream {
// We have already sent response headers, an unauthorized response can not be sent anymore,
// but we can still drop the response to prevent leaking sensitive content.
// but we can still drop the response body to prevent leaking sensitive content.
// The error will also be logged by Coraza.
ctx.processedResponseBody = true
interruption, err := tx.ProcessResponseBody()
Expand All @@ -604,7 +627,7 @@ func (ctx *httpContext) OnHttpResponseBody(bodySize int, endOfStream bool) types
return types.ActionContinue
}
// Wait until we see the entire body. It has to be buffered in order to check that it is fully legit
// before sending it downstream
// before sending it downstream (to the client)
return types.ActionPause
}

Expand Down

0 comments on commit 393d4d6

Please sign in to comment.