diff --git a/ddtrace/appsec/_iast/_taint_tracking/Aspects/Helpers.cpp b/ddtrace/appsec/_iast/_taint_tracking/Aspects/Helpers.cpp index 8332a89c1f8..5384d147303 100644 --- a/ddtrace/appsec/_iast/_taint_tracking/Aspects/Helpers.cpp +++ b/ddtrace/appsec/_iast/_taint_tracking/Aspects/Helpers.cpp @@ -1,5 +1,6 @@ #include "Helpers.h" #include "Initializer/Initializer.h" +#include #include #include @@ -327,6 +328,87 @@ _convert_escaped_text_to_taint_text(const StrType& taint_escaped_text, TaintRang return { StrType(result), ranges }; } +/** + * @brief This function takes the ranges of a string splitted (as in string.split or rsplit or os.path.split) and + * applies the ranges of the original string to the splitted parts with updated offsets. + * + * @param source_str: The original string that was splitted. + * @param source_ranges: The ranges of the original string. + * @param split_result: The splitted parts of the original string. + * @param tx_map: The taint map to apply the ranges. + * @param include_separator: If the separator should be included in the splitted parts. + */ +template +bool +set_ranges_on_splitted(const StrType& source_str, + const TaintRangeRefs& source_ranges, + const py::list& split_result, + TaintRangeMapType* tx_map, + bool include_separator) +{ + bool some_set = false; + + // Some quick shortcuts + if (source_ranges.empty() or py::len(split_result) == 0 or py::len(source_str) == 0 or not tx_map) { + return false; + } + + RANGE_START offset = 0; + std::string c_source_str = py::cast(source_str); + auto separator_increase = (int)((not include_separator)); + + for (const auto& item : split_result) { + if (not is_text(item.ptr()) or py::len(item) == 0) { + continue; + } + auto c_item = py::cast(item); + TaintRangeRefs item_ranges; + + // Find the item in the source_str. + const auto start = static_cast(c_source_str.find(c_item, offset)); + if (start == -1) { + continue; + } + const auto end = static_cast(start + c_item.length()); + + // Find what source_ranges match these positions and create a new range with the start and len updated. + for (const auto& range : source_ranges) { + auto range_end_abs = range->start + range->length; + + if (range->start < end && range_end_abs > start) { + // Create a new range with the updated start + auto new_range_start = std::max(range->start - offset, 0L); + auto new_range_length = std::min(end - start, (range->length - std::max(0L, offset - range->start))); + item_ranges.emplace_back( + initializer->allocate_taint_range(new_range_start, new_range_length, range->source)); + } + } + if (not item_ranges.empty()) { + set_ranges(item.ptr(), item_ranges, tx_map); + some_set = true; + } + + offset += py::len(item) + separator_increase; + } + + return some_set; +} + +template +bool +api_set_ranges_on_splitted(const StrType& source_str, + const TaintRangeRefs& source_ranges, + const py::list& split_result, + bool include_separator) +{ + TaintRangeMapType* tx_map = initializer->get_tainting_map(); + if (not tx_map) { + throw py::value_error(MSG_ERROR_TAINT_MAP); + } + + return set_ranges_on_splitted(source_str, source_ranges, split_result, tx_map, include_separator); +} + py::object parse_params(size_t position, const char* keyword_name, @@ -348,6 +430,27 @@ pyexport_aspect_helpers(py::module& m) m.def("common_replace", &api_common_replace, "string_method"_a, "candidate_text"_a); m.def("common_replace", &api_common_replace, "string_method"_a, "candidate_text"_a); m.def("common_replace", &api_common_replace, "string_method"_a, "candidate_text"_a); + m.def("set_ranges_on_splitted", + &api_set_ranges_on_splitted, + "source_str"_a, + "source_ranges"_a, + "split_result"_a, + // cppcheck-suppress assignBoolToPointer + "include_separator"_a = false); + m.def("set_ranges_on_splitted", + &api_set_ranges_on_splitted, + "source_str"_a, + "source_ranges"_a, + "split_result"_a, + // cppcheck-suppress assignBoolToPointer + "include_separator"_a = false); + m.def("set_ranges_on_splitted", + &api_set_ranges_on_splitted, + "source_str"_a, + "source_ranges"_a, + "split_result"_a, + // cppcheck-suppress assignBoolToPointer + "include_separator"_a = false); m.def("_all_as_formatted_evidence", &_all_as_formatted_evidence, "text"_a, diff --git a/ddtrace/appsec/_iast/_taint_tracking/Aspects/Helpers.h b/ddtrace/appsec/_iast/_taint_tracking/Aspects/Helpers.h index bd672442f2c..3a8ddbf83ed 100644 --- a/ddtrace/appsec/_iast/_taint_tracking/Aspects/Helpers.h +++ b/ddtrace/appsec/_iast/_taint_tracking/Aspects/Helpers.h @@ -52,5 +52,20 @@ template std::tuple _convert_escaped_text_to_taint_text(const StrType& taint_escaped_text, TaintRangeRefs ranges_orig); +template +bool +set_ranges_on_splitted(const StrType& source_str, + const TaintRangeRefs& source_ranges, + const py::list& split_result, + TaintRangeMapType* tx_map, + bool include_separator = false); + +template +bool +api_set_ranges_on_splitted(const StrType& source_str, + const TaintRangeRefs& source_ranges, + const py::list& split_result, + bool include_separator = false); + void pyexport_aspect_helpers(py::module& m); diff --git a/ddtrace/appsec/_iast/_taint_tracking/__init__.py b/ddtrace/appsec/_iast/_taint_tracking/__init__.py index 73b7aecc5b3..86c425bfd2d 100644 --- a/ddtrace/appsec/_iast/_taint_tracking/__init__.py +++ b/ddtrace/appsec/_iast/_taint_tracking/__init__.py @@ -23,6 +23,7 @@ from ._native.aspect_helpers import as_formatted_evidence from ._native.aspect_helpers import common_replace from ._native.aspect_helpers import parse_params + from ._native.aspect_helpers import set_ranges_on_splitted from ._native.aspect_ospath_join import _aspect_ospathjoin from ._native.initializer import active_map_addreses_size from ._native.initializer import create_context @@ -84,6 +85,7 @@ "_format_aspect", "as_formatted_evidence", "parse_params", + "set_ranges_on_splitted", "num_objects_tainted", "debug_taint_map", "iast_taint_log_error", diff --git a/scripts/cppcheck.sh b/scripts/cppcheck.sh index 920d1c060fd..d96809c0cf4 100755 --- a/scripts/cppcheck.sh +++ b/scripts/cppcheck.sh @@ -1,5 +1,5 @@ #!/bin/bash set -e -cppcheck --error-exitcode=1 --std=c++17 --language=c++ --force \ +cppcheck --inline-suppr --error-exitcode=1 --std=c++17 --language=c++ --force \ $(git ls-files '*.c' '*.cpp' '*.h' '*.hpp' '*.cc' '*.hh' | grep -E -v '^(ddtrace/(vendor|internal)|ddtrace/appsec/_iast/_taint_tracking/_vendor)/') diff --git a/tests/appsec/iast/aspects/test_aspect_helpers.py b/tests/appsec/iast/aspects/test_aspect_helpers.py index d261980a7b0..43efa3d8efe 100644 --- a/tests/appsec/iast/aspects/test_aspect_helpers.py +++ b/tests/appsec/iast/aspects/test_aspect_helpers.py @@ -1,3 +1,5 @@ +import os + import pytest from ddtrace.appsec._iast._taint_tracking import OriginType @@ -7,6 +9,7 @@ from ddtrace.appsec._iast._taint_tracking import common_replace from ddtrace.appsec._iast._taint_tracking import get_ranges from ddtrace.appsec._iast._taint_tracking import set_ranges +from ddtrace.appsec._iast._taint_tracking import set_ranges_on_splitted from ddtrace.appsec._iast._taint_tracking.aspects import _convert_escaped_text_to_tainted_text @@ -105,3 +108,339 @@ def test_as_formatted_evidence_convert_escaped_text_to_tainted_text(): # type: as_formatted_evidence(s, tag_mapping_function=TagMappingMode.Mapper) == ":+-<1750328947>abcde<1750328947>-+:fgh" ) assert _convert_escaped_text_to_tainted_text(":+-<1750328947>abcde<1750328947>-+:fgh", [ranges]) == "abcdefgh" + + +def test_set_ranges_on_splitted_str() -> None: + s = "abc|efgh" + range1 = _build_sample_range(0, 2, "first") + range2 = _build_sample_range(4, 2, "second") + set_ranges(s, (range1, range2)) + ranges = get_ranges(s) + assert ranges + + parts = s.split("|") + assert set_ranges_on_splitted(s, ranges, parts) + assert get_ranges(parts[0]) == [TaintRange(0, 2, Source("first", "sample_value", OriginType.PARAMETER))] + assert get_ranges(parts[1]) == [TaintRange(0, 2, Source("second", "sample_value", OriginType.PARAMETER))] + + +def test_set_ranges_on_splitted_rsplit() -> None: + s = "abc|efgh|jkl" + range1 = _build_sample_range(0, 2, s[0:2]) + range2 = _build_sample_range(4, 2, s[4:6]) + range3 = _build_sample_range(9, 3, s[9:12]) + set_ranges(s, (range1, range2, range3)) + ranges = get_ranges(s) + assert ranges + + parts = s.rsplit("|", 1) + assert parts == ["abc|efgh", "jkl"] + assert set_ranges_on_splitted(s, ranges, parts) + assert get_ranges(parts[0]) == [ + TaintRange(0, 2, Source("ab", "sample_value", OriginType.PARAMETER)), + TaintRange(4, 2, Source("ef", "sample_value", OriginType.PARAMETER)), + ] + assert get_ranges(parts[1]) == [ + TaintRange(0, 3, Source("jkl", "sample_value", OriginType.PARAMETER)), + ] + + +def test_set_ranges_on_splitted_ospathsplit(): + s = "abc/efgh/jkl" + range1 = _build_sample_range(0, 4, s[0:4]) + range2 = _build_sample_range(4, 4, s[4:8]) + range3 = _build_sample_range(9, 3, s[9:12]) + set_ranges(s, (range1, range2, range3)) + ranges = get_ranges(s) + assert ranges + + parts = list(os.path.split(s)) + assert parts == ["abc/efgh", "jkl"] + assert set_ranges_on_splitted(s, ranges, parts) + assert get_ranges(parts[0]) == [ + TaintRange(0, 4, Source("abc/", "sample_value", OriginType.PARAMETER)), + TaintRange(4, 4, Source("efgh", "sample_value", OriginType.PARAMETER)), + ] + assert get_ranges(parts[1]) == [ + TaintRange(0, 3, Source("jkl", "sample_value", OriginType.PARAMETER)), + ] + + +def test_set_ranges_on_splitted_ospathsplitext(): + s = "abc/efgh/jkl.txt" + range1 = _build_sample_range(0, 3, s[0:2]) + range2 = _build_sample_range(4, 4, s[4:8]) + range3 = _build_sample_range(9, 3, s[9:12]) + range4 = _build_sample_range(13, 4, s[13:17]) + set_ranges(s, (range1, range2, range3, range4)) + ranges = get_ranges(s) + assert ranges + + parts = list(os.path.splitext(s)) + assert parts == ["abc/efgh/jkl", ".txt"] + assert set_ranges_on_splitted(s, ranges, parts, include_separator=True) + assert get_ranges(parts[0]) == [ + TaintRange(0, 3, Source("abc", "sample_value", OriginType.PARAMETER)), + TaintRange(4, 4, Source("efgh", "sample_value", OriginType.PARAMETER)), + TaintRange(9, 3, Source("jkl", "sample_value", OriginType.PARAMETER)), + ] + assert get_ranges(parts[1]) == [ + TaintRange(1, 4, Source("txt", "sample_value", OriginType.PARAMETER)), + ] + + +def test_set_ranges_on_splitted_ospathsplit_with_empty_string(): + s = "abc/efgh/jkl/" + range1 = _build_sample_range(0, 2, s[0:2]) + range2 = _build_sample_range(4, 4, s[4:8]) + range3 = _build_sample_range(9, 3, s[9:12]) + set_ranges(s, (range1, range2, range3)) + ranges = get_ranges(s) + assert ranges + + parts = list(os.path.split(s)) + assert parts == ["abc/efgh/jkl", ""] + assert set_ranges_on_splitted(s, ranges, parts) + assert get_ranges(parts[0]) == [ + TaintRange(0, 2, Source("ab", "sample_value", OriginType.PARAMETER)), + TaintRange(4, 4, Source("efgh", "sample_value", OriginType.PARAMETER)), + TaintRange(9, 3, Source("jkl", "sample_value", OriginType.PARAMETER)), + ] + assert get_ranges(parts[1]) == [] + + +def test_set_ranges_on_splitted_ospathbasename(): + s = "abc/efgh/jkl" + range1 = _build_sample_range(0, 2, s[0:2]) + range2 = _build_sample_range(4, 4, s[4:8]) + range3 = _build_sample_range(9, 3, s[9:12]) + set_ranges(s, (range1, range2, range3)) + ranges = get_ranges(s) + assert ranges + + # Basename aspect implementation works by adding the previous content in a list so + # we can use set_ranges_on_splitted to set the ranges on the last part (the real result) + parts = ["abc/efgh/", os.path.basename(s)] + assert parts == ["abc/efgh/", "jkl"] + assert set_ranges_on_splitted(s, ranges, parts, include_separator=True) + assert get_ranges(parts[1]) == [ + TaintRange(0, 3, Source("jkl", "sample_value", OriginType.PARAMETER)), + ] + + +def test_set_ranges_on_splitted_ospathsplitdrive_windows(): + s = "C:/abc/efgh/jkl" + range1 = _build_sample_range(0, 2, s[0:2]) + range2 = _build_sample_range(4, 4, s[4:8]) + range3 = _build_sample_range(9, 3, s[9:12]) + range4 = _build_sample_range(12, 3, s[12:16]) + set_ranges(s, (range1, range2, range3, range4)) + ranges = get_ranges(s) + assert ranges + + # We emulate what os.path.splitdrive would do on Windows instead of calling it + parts = ["C:", "/abc/efgh/jkl"] + assert set_ranges_on_splitted(s, ranges, parts, include_separator=True) + assert get_ranges(parts[0]) == [ + TaintRange(0, 2, Source("C:", "sample_value", OriginType.PARAMETER)), + ] + assert get_ranges(parts[1]) == [ + TaintRange(2, 4, Source("bc/e", "sample_value", OriginType.PARAMETER)), + TaintRange(7, 3, Source("gh/", "sample_value", OriginType.PARAMETER)), + TaintRange(10, 3, Source("jkl", "sample_value", OriginType.PARAMETER)), + ] + + +def test_set_ranges_on_splitted_ospathsplitdrive_posix(): + s = "/abc/efgh/jkl" + range1 = _build_sample_range(0, 2, s[0:2]) + range2 = _build_sample_range(4, 4, s[4:8]) + range3 = _build_sample_range(9, 3, s[9:12]) + set_ranges(s, (range1, range2, range3)) + ranges = get_ranges(s) + assert ranges + + # We emulate what os.path.splitdrive would do on posix instead of calling it + parts = ["", "/abc/efgh/jkl"] + assert set_ranges_on_splitted(s, ranges, parts) + assert get_ranges(parts[0]) == [] + assert get_ranges(parts[1]) == ranges + + +def test_set_ranges_on_splitted_ospathsplitroot_windows_drive(): + s = "C:/abc/efgh/jkl" + range1 = _build_sample_range(0, 2, s[0:2]) + range2 = _build_sample_range(4, 4, s[4:8]) + range3 = _build_sample_range(9, 3, s[9:12]) + range4 = _build_sample_range(12, 3, s[12:16]) + set_ranges(s, (range1, range2, range3, range4)) + ranges = get_ranges(s) + assert ranges + + # We emulate what os.path.splitroot would do on Windows instead of calling it + parts = ["C:", "/", "abc/efgh/jkl"] + assert set_ranges_on_splitted(s, ranges, parts, include_separator=True) + assert get_ranges(parts[0]) == [ + TaintRange(0, 2, Source("C:", "sample_value", OriginType.PARAMETER)), + ] + assert get_ranges(parts[1]) == [] + assert get_ranges(parts[2]) == [ + TaintRange(1, 4, Source("bc/e", "sample_value", OriginType.PARAMETER)), + TaintRange(6, 3, Source("gh/", "sample_value", OriginType.PARAMETER)), + TaintRange(9, 3, Source("jkl", "sample_value", OriginType.PARAMETER)), + ] + + +def test_set_ranges_on_splitted_ospathsplitroot_windows_share(): + s = "//server/share/abc/efgh/jkl" + range1 = _build_sample_range(0, 2, "//") + range2 = _build_sample_range(2, 6, "server") + range3 = _build_sample_range(9, 5, "share") + range4 = _build_sample_range(14, 1, "/") + range5 = _build_sample_range(15, 3, "abc") + range6 = _build_sample_range(19, 4, "efgh") + range7 = _build_sample_range(23, 4, "/jkl") + set_ranges(s, (range1, range2, range3, range4, range5, range6, range7)) + ranges = get_ranges(s) + assert ranges + + # We emulate what os.path.splitroot would do on Windows instead of calling it; the implementation + # removed the second element + parts = ["//server/share", "/", "abc/efgh/jkl"] + assert set_ranges_on_splitted(s, ranges, parts, include_separator=True) + assert get_ranges(parts[0]) == [ + TaintRange(0, 2, Source("//", "sample_value", OriginType.PARAMETER)), + TaintRange(2, 6, Source("server", "sample_value", OriginType.PARAMETER)), + TaintRange(9, 5, Source("share", "sample_value", OriginType.PARAMETER)), + ] + assert get_ranges(parts[1]) == [ + TaintRange(0, 1, Source("/", "sample_value", OriginType.PARAMETER)), + ] + assert get_ranges(parts[2]) == [ + TaintRange(0, 3, Source("abc", "sample_value", OriginType.PARAMETER)), + TaintRange(4, 4, Source("efgh", "sample_value", OriginType.PARAMETER)), + TaintRange(8, 4, Source("/jkl", "sample_value", OriginType.PARAMETER)), + ] + + +def test_set_ranges_on_splitted_ospathsplitroot_posix_normal_path(): + s = "/abc/efgh/jkl" + range1 = _build_sample_range(0, 4, "/abc") + range2 = _build_sample_range(3, 5, "c/efg") + range3 = _build_sample_range(7, 5, "gh/jk") + set_ranges(s, (range1, range2, range3)) + ranges = get_ranges(s) + assert ranges + + # We emulate what os.path.splitroot would do on posix instead of calling it + parts = ["", "/", "abc/efgh/jkl"] + assert set_ranges_on_splitted(s, ranges, parts, include_separator=True) + assert get_ranges(parts[0]) == [] + assert get_ranges(parts[1]) == [ + TaintRange(0, 1, Source("/abc", "sample_value", OriginType.PARAMETER)), + ] + assert get_ranges(parts[2]) == [ + TaintRange(0, 3, Source("abc", "sample_value", OriginType.PARAMETER)), + TaintRange(2, 5, Source("c/efg", "sample_value", OriginType.PARAMETER)), + TaintRange(6, 5, Source("gh/jk", "sample_value", OriginType.PARAMETER)), + ] + + +def test_set_ranges_on_splitted_ospathsplitroot_posix_startwithtwoslashes_path(): + s = "//abc/efgh/jkl" + range1 = _build_sample_range(0, 2, "//") + range2 = _build_sample_range(2, 3, "abc") + range3 = _build_sample_range(5, 4, "/efg") + range4 = _build_sample_range(9, 4, "h/jk") + set_ranges(s, (range1, range2, range3, range4)) + ranges = get_ranges(s) + assert ranges + + # We emulate what os.path.splitroot would do on posix starting with double slash instead of calling it + parts = ["", "//", "abc/efgh/jkl"] + assert set_ranges_on_splitted(s, ranges, parts, include_separator=True) + assert get_ranges(parts[0]) == [] + assert get_ranges(parts[1]) == [ + TaintRange(0, 2, Source("//", "sample_value", OriginType.PARAMETER)), + ] + assert get_ranges(parts[2]) == [ + TaintRange(0, 3, Source("abc", "sample_value", OriginType.PARAMETER)), + TaintRange(3, 4, Source("/efg", "sample_value", OriginType.PARAMETER)), + TaintRange(7, 4, Source("h/jk", "sample_value", OriginType.PARAMETER)), + ] + + +def test_set_ranges_on_splitted_ospathsplitroot_posix_startwiththreeslashes_path(): + s = "///abc/efgh/jkl" + range1 = _build_sample_range(0, 3, "///") + range2 = _build_sample_range(3, 3, "abc") + range3 = _build_sample_range(6, 4, "/efg") + range4 = _build_sample_range(10, 4, "h/jk") + set_ranges(s, (range1, range2, range3, range4)) + ranges = get_ranges(s) + assert ranges + + # We emulate what os.path.splitroot would do on posix starting with triple slash instead of calling it + parts = ["", "/", "//abc/efgh/jkl"] + assert set_ranges_on_splitted(s, ranges, parts, include_separator=True) + assert get_ranges(parts[0]) == [] + assert get_ranges(parts[1]) == [ + TaintRange(0, 1, Source("/", "sample_value", OriginType.PARAMETER)), + ] + assert get_ranges(parts[2]) == [ + TaintRange(0, 2, Source("///", "sample_value", OriginType.PARAMETER)), + TaintRange(2, 3, Source("abc", "sample_value", OriginType.PARAMETER)), + TaintRange(5, 4, Source("/efg", "sample_value", OriginType.PARAMETER)), + TaintRange(9, 4, Source("h/jk", "sample_value", OriginType.PARAMETER)), + ] + + +def test_set_ranges_on_splitted_bytes() -> None: + s = b"abc|efgh|ijkl" + range1 = _build_sample_range(0, 2, "first") # ab -> 0, 2 + range2 = _build_sample_range(5, 1, "second") # f -> 1, 1 + range3 = _build_sample_range(11, 2, "third") # jkl -> 1, 3 + set_ranges(s, (range1, range2, range3)) + ranges = get_ranges(s) + assert ranges + + parts = s.split(b"|") + assert set_ranges_on_splitted(s, ranges, parts) + assert get_ranges(parts[0]) == [TaintRange(0, 2, Source("first", "sample_value", OriginType.PARAMETER))] + assert get_ranges(parts[1]) == [TaintRange(1, 1, Source("second", "sample_value", OriginType.PARAMETER))] + assert get_ranges(parts[2]) == [TaintRange(2, 2, Source("third", "sample_value", OriginType.PARAMETER))] + + +def test_set_ranges_on_splitted_bytearray() -> None: + s = bytearray(b"abc|efgh|ijkl") + range1 = _build_sample_range(0, 2, "ab") + range2 = _build_sample_range(5, 1, "f") + range3 = _build_sample_range(5, 6, "fgh|ij") + + set_ranges(s, (range1, range2, range3)) + ranges = get_ranges(s) + assert ranges + + parts = s.split(b"|") + assert set_ranges_on_splitted(s, ranges, parts) + assert get_ranges(parts[0]) == [TaintRange(0, 2, Source("ab", "sample_value", OriginType.PARAMETER))] + assert get_ranges(parts[1]) == [ + TaintRange(1, 1, Source("f", "sample_value", OriginType.PARAMETER)), + TaintRange(1, 4, Source("second", "sample_value", OriginType.PARAMETER)), + ] + assert get_ranges(parts[2]) == [TaintRange(0, 2, Source("third", "sample_value", OriginType.PARAMETER))] + + +def test_set_ranges_on_splitted_wrong_args(): + s = "12345" + range1 = _build_sample_range(1, 3, "234") + set_ranges(s, (range1,)) + ranges = get_ranges(s) + + assert not set_ranges_on_splitted(s, [], ["123", 45]) + assert not set_ranges_on_splitted("", ranges, ["123", 45]) + assert not set_ranges_on_splitted(s, ranges, []) + parts = ["123", 45] + set_ranges_on_splitted(s, ranges, parts) + ranges = get_ranges(parts[0]) + assert ranges == [TaintRange(1, 3, Source("123", "sample_value", OriginType.PARAMETER))]