Skip to content

Commit

Permalink
Merge branch 'main' into juanjux/better-leak-script-and-testing
Browse files Browse the repository at this point in the history
  • Loading branch information
juanjux authored Sep 2, 2024
2 parents f0cc234 + 666f6ec commit da6700d
Showing 1 changed file with 62 additions and 0 deletions.
62 changes: 62 additions & 0 deletions tests/contrib/fastapi/test_fastapi_appsec_iast.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,30 @@
from fastapi.responses import JSONResponse
import pytest

from ddtrace.appsec._constants import IAST
from ddtrace.appsec._handlers import _on_asgi_request_parse_body
from ddtrace.appsec._iast import oce
from ddtrace.appsec._iast._patch import _on_iast_fastapi_patch
from ddtrace.appsec._iast.constants import VULN_SQL_INJECTION
from ddtrace.appsec._iast.taint_sinks.header_injection import patch as patch_header_injection
from ddtrace.contrib.sqlite3.patch import patch as patch_sqlite_sqli
from ddtrace.internal import core
from tests.appsec.iast.iast_utils import get_line_and_hash
from tests.utils import override_env
from tests.utils import override_global_config


IAST_ENV = {"DD_IAST_REQUEST_SAMPLING": "100"}

TEST_FILE_PATH = "tests/contrib/fastapi/test_fastapi_appsec_iast.py"

fastapi_version = tuple([int(v) for v in _fastapi_version.split(".")])


def _aux_appsec_prepare_tracer(tracer):
_on_iast_fastapi_patch()
patch_sqlite_sqli()
patch_header_injection()
oce.reconfigure()

tracer._iast_enabled = True
Expand Down Expand Up @@ -276,3 +285,56 @@ async def test_route(item_id):
assert result["ranges_start"] == 0
assert result["ranges_length"] == 8
assert result["ranges_origin"] == "http.request.path.parameter"


def test_fastapi_sqli_path_param(fastapi_application, client, tracer, test_spans):
@fastapi_application.get("/index.html/{param_str}")
async def test_route(param_str):
import sqlite3

from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted
from ddtrace.appsec._iast._taint_tracking.aspects import add_aspect

assert is_pyobject_tainted(param_str)

con = sqlite3.connect(":memory:")
cur = con.cursor()
# label test_fastapi_sqli_path_parameter
cur.execute(add_aspect("SELECT 1 FROM ", param_str))

# test if asgi middleware is ok without any callback registered
core.reset_listeners(event_id="asgi.request.parse.body")

with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)), override_env(IAST_ENV):
# disable callback
_aux_appsec_prepare_tracer(tracer)
resp = client.get(
"/index.html/sqlite_master/",
)
assert resp.status_code == 200

span = test_spans.pop_traces()[1][0]
assert span.get_metric(IAST.ENABLED) == 1.0

loaded = json.loads(span.get_tag(IAST.JSON))

assert loaded["sources"] == [
{"origin": "http.request.path.parameter", "name": "param_str", "value": "sqlite_master"}
]

line, hash_value = get_line_and_hash(
"test_fastapi_sqli_path_parameter", VULN_SQL_INJECTION, filename=TEST_FILE_PATH
)
vulnerability = loaded["vulnerabilities"][0]
assert vulnerability["type"] == VULN_SQL_INJECTION
assert vulnerability["evidence"] == {
"valueParts": [
{"value": "SELECT "},
{"redacted": True},
{"value": " FROM "},
{"value": "sqlite_master", "source": 0},
]
}
assert vulnerability["location"]["line"] == line
assert vulnerability["location"]["path"] == TEST_FILE_PATH
assert vulnerability["hash"] == hash_value

0 comments on commit da6700d

Please sign in to comment.