From 666f6ecd39a1937f18643bf05ab44c4a10003899 Mon Sep 17 00:00:00 2001 From: Alberto Vara Date: Mon, 2 Sep 2024 10:54:07 +0200 Subject: [PATCH] chore(iast): testing fastapi and sqli (#10474) ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [ ] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --- .../fastapi/test_fastapi_appsec_iast.py | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/tests/contrib/fastapi/test_fastapi_appsec_iast.py b/tests/contrib/fastapi/test_fastapi_appsec_iast.py index b732cebd1cb..405c5c1b89a 100644 --- a/tests/contrib/fastapi/test_fastapi_appsec_iast.py +++ b/tests/contrib/fastapi/test_fastapi_appsec_iast.py @@ -9,21 +9,30 @@ from fastapi.responses import JSONResponse import pytest +from ddtrace.appsec._constants import IAST from ddtrace.appsec._handlers import _on_asgi_request_parse_body from ddtrace.appsec._iast import oce from ddtrace.appsec._iast._patch import _on_iast_fastapi_patch +from ddtrace.appsec._iast.constants import VULN_SQL_INJECTION +from ddtrace.appsec._iast.taint_sinks.header_injection import patch as patch_header_injection +from ddtrace.contrib.sqlite3.patch import patch as patch_sqlite_sqli from ddtrace.internal import core +from tests.appsec.iast.iast_utils import get_line_and_hash from tests.utils import override_env from tests.utils import override_global_config IAST_ENV = {"DD_IAST_REQUEST_SAMPLING": "100"} +TEST_FILE_PATH = "tests/contrib/fastapi/test_fastapi_appsec_iast.py" + fastapi_version = tuple([int(v) for v in _fastapi_version.split(".")]) def _aux_appsec_prepare_tracer(tracer): _on_iast_fastapi_patch() + patch_sqlite_sqli() + patch_header_injection() oce.reconfigure() tracer._iast_enabled = True @@ -276,3 +285,56 @@ async def test_route(item_id): assert result["ranges_start"] == 0 assert result["ranges_length"] == 8 assert result["ranges_origin"] == "http.request.path.parameter" + + +def test_fastapi_sqli_path_param(fastapi_application, client, tracer, test_spans): + @fastapi_application.get("/index.html/{param_str}") + async def test_route(param_str): + import sqlite3 + + from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted + from ddtrace.appsec._iast._taint_tracking.aspects import add_aspect + + assert is_pyobject_tainted(param_str) + + con = sqlite3.connect(":memory:") + cur = con.cursor() + # label test_fastapi_sqli_path_parameter + cur.execute(add_aspect("SELECT 1 FROM ", param_str)) + + # test if asgi middleware is ok without any callback registered + core.reset_listeners(event_id="asgi.request.parse.body") + + with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)), override_env(IAST_ENV): + # disable callback + _aux_appsec_prepare_tracer(tracer) + resp = client.get( + "/index.html/sqlite_master/", + ) + assert resp.status_code == 200 + + span = test_spans.pop_traces()[1][0] + assert span.get_metric(IAST.ENABLED) == 1.0 + + loaded = json.loads(span.get_tag(IAST.JSON)) + + assert loaded["sources"] == [ + {"origin": "http.request.path.parameter", "name": "param_str", "value": "sqlite_master"} + ] + + line, hash_value = get_line_and_hash( + "test_fastapi_sqli_path_parameter", VULN_SQL_INJECTION, filename=TEST_FILE_PATH + ) + vulnerability = loaded["vulnerabilities"][0] + assert vulnerability["type"] == VULN_SQL_INJECTION + assert vulnerability["evidence"] == { + "valueParts": [ + {"value": "SELECT "}, + {"redacted": True}, + {"value": " FROM "}, + {"value": "sqlite_master", "source": 0}, + ] + } + assert vulnerability["location"]["line"] == line + assert vulnerability["location"]["path"] == TEST_FILE_PATH + assert vulnerability["hash"] == hash_value