diff --git a/ddtrace/appsec/_iast/_taint_tracking/Aspects/AspectIndex.cpp b/ddtrace/appsec/_iast/_taint_tracking/Aspects/AspectIndex.cpp index 1b03ce01eaa..88e4b522ff8 100644 --- a/ddtrace/appsec/_iast/_taint_tracking/Aspects/AspectIndex.cpp +++ b/ddtrace/appsec/_iast/_taint_tracking/Aspects/AspectIndex.cpp @@ -1,4 +1,5 @@ #include "AspectIndex.h" +#include "Helpers.h" /** * @brief Index aspect @@ -49,6 +50,9 @@ api_index_aspect(PyObject* self, PyObject* const* args, const Py_ssize_t nargs) PyObject* idx = args[1]; PyObject* result_o = PyObject_GetItem(candidate_text, idx); + if (has_pyerr()) { + return nullptr; + } if (not ctx_map or ctx_map->empty()) { return result_o; diff --git a/ddtrace/appsec/_iast/_taint_tracking/aspects.py b/ddtrace/appsec/_iast/_taint_tracking/aspects.py index 49e50dca773..2a887f23f33 100644 --- a/ddtrace/appsec/_iast/_taint_tracking/aspects.py +++ b/ddtrace/appsec/_iast/_taint_tracking/aspects.py @@ -210,16 +210,13 @@ def join_aspect(orig_function: Optional[Callable], flag_added_args: int, *args: def index_aspect(candidate_text: Text, index: int) -> Text: - result = candidate_text[index] + if isinstance(candidate_text, IAST.TEXT_TYPES) and isinstance(index, int): + try: + return _index_aspect(candidate_text, index) + except Exception as e: + iast_taint_log_error("IAST propagation error. index_aspect. {}".format(e)) - if not isinstance(candidate_text, IAST.TEXT_TYPES) or not isinstance(index, int): - return result - - try: - return _index_aspect(candidate_text, index) - except Exception as e: - iast_taint_log_error("IAST propagation error. index_aspect. {}".format(e)) - return result + return candidate_text[index] def slice_aspect(candidate_text: Text, start: int, stop: int, step: int) -> Text: @@ -230,15 +227,12 @@ def slice_aspect(candidate_text: Text, start: int, stop: int, step: int) -> Text or (step is not None and not isinstance(step, int)) ): return candidate_text[start:stop:step] - result = candidate_text[start:stop:step] + try: - new_result = _slice_aspect(candidate_text, start, stop, step) - if new_result != result: - raise Exception("Propagation result %r is different to candidate_text[slice] %r" % (new_result, result)) - return new_result + return _slice_aspect(candidate_text, start, stop, step) except Exception as e: iast_taint_log_error("IAST propagation error. slice_aspect. {}".format(e)) - return result + return candidate_text[start:stop:step] def bytearray_extend_aspect(orig_function: Optional[Callable], flag_added_args: int, *args: Any, **kwargs: Any) -> Any: @@ -319,29 +313,25 @@ def ljust_aspect( candidate_text = args[0] args = args[flag_added_args:] - result = candidate_text.ljust(*args, **kwargs) - - if not isinstance(candidate_text, IAST.TEXT_TYPES): - return result - - try: - ranges_new = get_ranges(candidate_text) - fillchar = parse_params(1, "fillchar", " ", *args, **kwargs) - fillchar_ranges = get_ranges(fillchar) - if ranges_new is None or (not ranges_new and not fillchar_ranges): + if isinstance(candidate_text, IAST.TEXT_TYPES): + try: + ranges_new = get_ranges(candidate_text) + fillchar = parse_params(1, "fillchar", " ", *args, **kwargs) + fillchar_ranges = get_ranges(fillchar) + if ranges_new is None or (not ranges_new and not fillchar_ranges): + return candidate_text.ljust(*args, **kwargs) + + if fillchar_ranges: + # Can only be one char, so we create one range to cover from the start to the end + ranges_new = ranges_new + [shift_taint_range(fillchar_ranges[0], len(candidate_text))] + + result = candidate_text.ljust(parse_params(0, "width", None, *args, **kwargs), fillchar) + taint_pyobject_with_ranges(result, ranges_new) return result + except Exception as e: + iast_taint_log_error("IAST propagation error. ljust_aspect. {}".format(e)) - if fillchar_ranges: - # Can only be one char, so we create one range to cover from the start to the end - ranges_new = ranges_new + [shift_taint_range(fillchar_ranges[0], len(candidate_text))] - - new_result = candidate_text.ljust(parse_params(0, "width", None, *args, **kwargs), fillchar) - taint_pyobject_with_ranges(new_result, ranges_new) - return new_result - except Exception as e: - iast_taint_log_error("IAST propagation error. ljust_aspect. {}".format(e)) - - return result + return candidate_text.ljust(*args, **kwargs) def zfill_aspect( @@ -410,16 +400,14 @@ def format_aspect( if not isinstance(candidate_text, IAST.TEXT_TYPES): return result - try: - params = tuple(args) + tuple(kwargs.values()) - new_result = _format_aspect(candidate_text, params, *args, **kwargs) - if new_result != result: - raise Exception("Propagation result %r is different to candidate_text.format %r" % (new_result, result)) - return new_result - except Exception as e: - iast_taint_log_error("IAST propagation error. format_aspect. {}".format(e)) + if isinstance(candidate_text, IAST.TEXT_TYPES): + try: + params = tuple(args) + tuple(kwargs.values()) + return _format_aspect(candidate_text, params, *args, **kwargs) + except Exception as e: + iast_taint_log_error("IAST propagation error. format_aspect. {}".format(e)) - return result + return candidate_text.format(*args, **kwargs) def format_map_aspect( @@ -684,18 +672,15 @@ def decode_aspect( self = args[0] args = args[(flag_added_args or 1) :] # Assume we call decode method of the first argument - result = self.decode(*args, **kwargs) - if not is_pyobject_tainted(self) or not isinstance(self, bytes): - return result - - try: - codec = args[0] if args else "utf-8" - inc_dec = codecs.getincrementaldecoder(codec)(**kwargs) - return incremental_translation(self, inc_dec, inc_dec.decode, "") - except Exception as e: - iast_taint_log_error("IAST propagation error. decode_aspect. {}".format(e)) - return result + if is_pyobject_tainted(self) and isinstance(self, bytes): + try: + codec = args[0] if args else "utf-8" + inc_dec = codecs.getincrementaldecoder(codec)(**kwargs) + return incremental_translation(self, inc_dec, inc_dec.decode, "") + except Exception as e: + iast_taint_log_error("IAST propagation error. decode_aspect. {}".format(e)) + return self.decode(*args, **kwargs) def encode_aspect( @@ -708,18 +693,15 @@ def encode_aspect( self = args[0] args = args[(flag_added_args or 1) :] - # Assume we call encode method of the first argument - result = self.encode(*args, **kwargs) - - if not is_pyobject_tainted(self) or not isinstance(self, str): - return result - try: - codec = args[0] if args else "utf-8" - inc_enc = codecs.getincrementalencoder(codec)(**kwargs) - return incremental_translation(self, inc_enc, inc_enc.encode, b"") - except Exception as e: - iast_taint_log_error("IAST propagation error. encode_aspect. {}".format(e)) + if is_pyobject_tainted(self) and isinstance(self, str): + try: + codec = args[0] if args else "utf-8" + inc_enc = codecs.getincrementalencoder(codec)(**kwargs) + return incremental_translation(self, inc_enc, inc_enc.encode, b"") + except Exception as e: + iast_taint_log_error("IAST propagation error. encode_aspect. {}".format(e)) + result = self.encode(*args, **kwargs) return result diff --git a/tests/appsec/iast/aspects/test_encode_decode_aspect.py b/tests/appsec/iast/aspects/test_encode_decode_aspect.py index bf7477a4fca..29161e4594d 100644 --- a/tests/appsec/iast/aspects/test_encode_decode_aspect.py +++ b/tests/appsec/iast/aspects/test_encode_decode_aspect.py @@ -131,7 +131,7 @@ def test_encode_and_add_aspect(infix, args, kwargs, should_be_tainted, prefix, s assert list_ranges[0].length == len_infix -def test_encode_error_and_no_log_metric(telemetry_writer): +def test_encode_error_with_tainted_gives_one_log_metric(telemetry_writer): string_input = taint_pyobject( pyobject="abcde", source_name="test_add_aspect_tainting_left_hand", @@ -141,11 +141,22 @@ def test_encode_error_and_no_log_metric(telemetry_writer): with pytest.raises(LookupError): mod.do_encode(string_input, "encoding-not-exists") + list_metrics_logs = list(telemetry_writer._logs) + assert len(list_metrics_logs) == 1 + assert "message" in list_metrics_logs[0] + assert "IAST propagation error. encode_aspect. " in list_metrics_logs[0]["message"] + + +def test_encode_error_with_not_tainted_gives_no_log_metric(telemetry_writer): + string_input = "abcde" + with pytest.raises(LookupError): + mod.do_encode(string_input, "encoding-not-exists") + list_metrics_logs = list(telemetry_writer._logs) assert len(list_metrics_logs) == 0 -def test_dencode_error_and_no_log_metric(telemetry_writer): +def test_dencode_error_with_tainted_gives_one_log_metric(telemetry_writer): string_input = taint_pyobject( pyobject=b"abcde", source_name="test_add_aspect_tainting_left_hand", @@ -155,5 +166,16 @@ def test_dencode_error_and_no_log_metric(telemetry_writer): with pytest.raises(LookupError): mod.do_decode(string_input, "decoding-not-exists") + list_metrics_logs = list(telemetry_writer._logs) + assert len(list_metrics_logs) == 1 + assert "message" in list_metrics_logs[0] + assert "IAST propagation error. decode_aspect. " in list_metrics_logs[0]["message"] + + +def test_dencode_error_with_not_tainted_gives_no_log_metric(telemetry_writer): + string_input = b"abcde" + with pytest.raises(LookupError): + mod.do_decode(string_input, "decoding-not-exists") + list_metrics_logs = list(telemetry_writer._logs) assert len(list_metrics_logs) == 0 diff --git a/tests/appsec/iast/aspects/test_index_aspect_fixtures.py b/tests/appsec/iast/aspects/test_index_aspect_fixtures.py index 5b6671958b9..d3f486e5b12 100644 --- a/tests/appsec/iast/aspects/test_index_aspect_fixtures.py +++ b/tests/appsec/iast/aspects/test_index_aspect_fixtures.py @@ -64,7 +64,7 @@ def test_string_index(input_str, index_pos, expected_result, tainted): @pytest.mark.skipif(sys.version_info < (3, 9, 0), reason="Python version not supported by IAST") -def test_index_error_and_no_log_metric(telemetry_writer): +def test_index_error_with_tainted_gives_one_log_metric(telemetry_writer): string_input = taint_pyobject( pyobject="abcde", source_name="test_add_aspect_tainting_left_hand", @@ -75,7 +75,8 @@ def test_index_error_and_no_log_metric(telemetry_writer): mod.do_index(string_input, 100) list_metrics_logs = list(telemetry_writer._logs) - assert len(list_metrics_logs) == 0 + assert len(list_metrics_logs) == 1 + assert "IAST propagation error. index_aspect" in list_metrics_logs[0]["message"] @pytest.mark.skip_iast_check_logs diff --git a/tests/appsec/iast/aspects/test_str_aspect.py b/tests/appsec/iast/aspects/test_str_aspect.py index 5666fb97baf..ad34bb730c0 100644 --- a/tests/appsec/iast/aspects/test_str_aspect.py +++ b/tests/appsec/iast/aspects/test_str_aspect.py @@ -527,13 +527,13 @@ def test_aspect_ljust_str_tainted(self): ljusted = mod.do_ljust(string_input, 4) # pylint: disable=no-member assert as_formatted_evidence(ljusted) == ":+-foo-+: " - def test_aspect_ljust_error_and_no_log_metric(self, telemetry_writer): + def test_aspect_ljust_error_with_tainted_gives_one_log_metric(self, telemetry_writer): string_input = create_taint_range_with_format(":+-foo-+:") with pytest.raises(TypeError): mod.do_ljust(string_input, "aaaaa") list_metrics_logs = list(telemetry_writer._logs) - assert len(list_metrics_logs) == 0 + assert len(list_metrics_logs) == 1 def test_zfill(self): # Not tainted @@ -572,4 +572,4 @@ def test_format(self): result = mod.do_format_fill(string_input) # pylint: disable=no-member # TODO format with params doesn't work correctly the assert should be # assert as_formatted_evidence(result) == ":+-foo -+:" - assert as_formatted_evidence(result) == "foo " + assert as_formatted_evidence(result) == ":+-foo-+:"