Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(asm): avoid double calls on some aspect front-ends #9344

Merged
merged 8 commits into from
May 23, 2024
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "AspectIndex.h"
#include "Helpers.h"

/**
* @brief Index aspect
Expand Down Expand Up @@ -49,6 +50,9 @@ api_index_aspect(PyObject* self, PyObject* const* args, const Py_ssize_t nargs)
PyObject* idx = args[1];

PyObject* result_o = PyObject_GetItem(candidate_text, idx);
if (has_pyerr()) {
return nullptr;
}

if (not ctx_map or ctx_map->empty()) {
return result_o;
Expand Down
116 changes: 49 additions & 67 deletions ddtrace/appsec/_iast/_taint_tracking/aspects.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,13 @@ def join_aspect(orig_function: Optional[Callable], flag_added_args: int, *args:


def index_aspect(candidate_text: Text, index: int) -> Text:
result = candidate_text[index]
if isinstance(candidate_text, IAST.TEXT_TYPES) and isinstance(index, int):
try:
return _index_aspect(candidate_text, index)
except Exception as e:
iast_taint_log_error("IAST propagation error. index_aspect. {}".format(e))

if not isinstance(candidate_text, IAST.TEXT_TYPES) or not isinstance(index, int):
return result

try:
return _index_aspect(candidate_text, index)
except Exception as e:
iast_taint_log_error("IAST propagation error. index_aspect. {}".format(e))
return result
return candidate_text[index]


def slice_aspect(candidate_text: Text, start: int, stop: int, step: int) -> Text:
Expand All @@ -230,15 +227,12 @@ def slice_aspect(candidate_text: Text, start: int, stop: int, step: int) -> Text
or (step is not None and not isinstance(step, int))
):
return candidate_text[start:stop:step]
result = candidate_text[start:stop:step]

try:
new_result = _slice_aspect(candidate_text, start, stop, step)
if new_result != result:
raise Exception("Propagation result %r is different to candidate_text[slice] %r" % (new_result, result))
return new_result
return _slice_aspect(candidate_text, start, stop, step)
except Exception as e:
iast_taint_log_error("IAST propagation error. slice_aspect. {}".format(e))
return result
return candidate_text[start:stop:step]


def bytearray_extend_aspect(orig_function: Optional[Callable], flag_added_args: int, *args: Any, **kwargs: Any) -> Any:
Expand Down Expand Up @@ -319,29 +313,25 @@ def ljust_aspect(
candidate_text = args[0]
args = args[flag_added_args:]

result = candidate_text.ljust(*args, **kwargs)

if not isinstance(candidate_text, IAST.TEXT_TYPES):
return result

try:
ranges_new = get_ranges(candidate_text)
fillchar = parse_params(1, "fillchar", " ", *args, **kwargs)
fillchar_ranges = get_ranges(fillchar)
if ranges_new is None or (not ranges_new and not fillchar_ranges):
if isinstance(candidate_text, IAST.TEXT_TYPES):
try:
ranges_new = get_ranges(candidate_text)
fillchar = parse_params(1, "fillchar", " ", *args, **kwargs)
fillchar_ranges = get_ranges(fillchar)
if ranges_new is None or (not ranges_new and not fillchar_ranges):
return candidate_text.ljust(*args, **kwargs)

if fillchar_ranges:
# Can only be one char, so we create one range to cover from the start to the end
ranges_new = ranges_new + [shift_taint_range(fillchar_ranges[0], len(candidate_text))]

result = candidate_text.ljust(parse_params(0, "width", None, *args, **kwargs), fillchar)
taint_pyobject_with_ranges(result, ranges_new)
return result
except Exception as e:
iast_taint_log_error("IAST propagation error. ljust_aspect. {}".format(e))

if fillchar_ranges:
# Can only be one char, so we create one range to cover from the start to the end
ranges_new = ranges_new + [shift_taint_range(fillchar_ranges[0], len(candidate_text))]

new_result = candidate_text.ljust(parse_params(0, "width", None, *args, **kwargs), fillchar)
taint_pyobject_with_ranges(new_result, ranges_new)
return new_result
except Exception as e:
iast_taint_log_error("IAST propagation error. ljust_aspect. {}".format(e))

return result
return candidate_text.ljust(*args, **kwargs)


def zfill_aspect(
Expand Down Expand Up @@ -410,16 +400,14 @@ def format_aspect(
if not isinstance(candidate_text, IAST.TEXT_TYPES):
return result

try:
params = tuple(args) + tuple(kwargs.values())
new_result = _format_aspect(candidate_text, params, *args, **kwargs)
if new_result != result:
raise Exception("Propagation result %r is different to candidate_text.format %r" % (new_result, result))
return new_result
except Exception as e:
iast_taint_log_error("IAST propagation error. format_aspect. {}".format(e))
if isinstance(candidate_text, IAST.TEXT_TYPES):
try:
params = tuple(args) + tuple(kwargs.values())
return _format_aspect(candidate_text, params, *args, **kwargs)
except Exception as e:
iast_taint_log_error("IAST propagation error. format_aspect. {}".format(e))

return result
return candidate_text.format(*args, **kwargs)


def format_map_aspect(
Expand Down Expand Up @@ -684,18 +672,15 @@ def decode_aspect(
self = args[0]
args = args[(flag_added_args or 1) :]
# Assume we call decode method of the first argument
result = self.decode(*args, **kwargs)

if not is_pyobject_tainted(self) or not isinstance(self, bytes):
return result

try:
codec = args[0] if args else "utf-8"
inc_dec = codecs.getincrementaldecoder(codec)(**kwargs)
return incremental_translation(self, inc_dec, inc_dec.decode, "")
except Exception as e:
iast_taint_log_error("IAST propagation error. decode_aspect. {}".format(e))
return result
if is_pyobject_tainted(self) and isinstance(self, bytes):
try:
codec = args[0] if args else "utf-8"
inc_dec = codecs.getincrementaldecoder(codec)(**kwargs)
return incremental_translation(self, inc_dec, inc_dec.decode, "")
except Exception as e:
iast_taint_log_error("IAST propagation error. decode_aspect. {}".format(e))
return self.decode(*args, **kwargs)


def encode_aspect(
Expand All @@ -708,18 +693,15 @@ def encode_aspect(

self = args[0]
args = args[(flag_added_args or 1) :]
# Assume we call encode method of the first argument
result = self.encode(*args, **kwargs)

if not is_pyobject_tainted(self) or not isinstance(self, str):
return result

try:
codec = args[0] if args else "utf-8"
inc_enc = codecs.getincrementalencoder(codec)(**kwargs)
return incremental_translation(self, inc_enc, inc_enc.encode, b"")
except Exception as e:
iast_taint_log_error("IAST propagation error. encode_aspect. {}".format(e))
if is_pyobject_tainted(self) and isinstance(self, str):
try:
codec = args[0] if args else "utf-8"
inc_enc = codecs.getincrementalencoder(codec)(**kwargs)
return incremental_translation(self, inc_enc, inc_enc.encode, b"")
except Exception as e:
iast_taint_log_error("IAST propagation error. encode_aspect. {}".format(e))
result = self.encode(*args, **kwargs)
return result


Expand Down
26 changes: 24 additions & 2 deletions tests/appsec/iast/aspects/test_encode_decode_aspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def test_encode_and_add_aspect(infix, args, kwargs, should_be_tainted, prefix, s
assert list_ranges[0].length == len_infix


def test_encode_error_and_no_log_metric(telemetry_writer):
def test_encode_error_with_tainted_gives_one_log_metric(telemetry_writer):
string_input = taint_pyobject(
pyobject="abcde",
source_name="test_add_aspect_tainting_left_hand",
Expand All @@ -141,11 +141,22 @@ def test_encode_error_and_no_log_metric(telemetry_writer):
with pytest.raises(LookupError):
mod.do_encode(string_input, "encoding-not-exists")

list_metrics_logs = list(telemetry_writer._logs)
assert len(list_metrics_logs) == 1
assert "message" in list_metrics_logs[0]
assert "IAST propagation error. encode_aspect. " in list_metrics_logs[0]["message"]


def test_encode_error_with_not_tainted_gives_no_log_metric(telemetry_writer):
string_input = "abcde"
with pytest.raises(LookupError):
mod.do_encode(string_input, "encoding-not-exists")

list_metrics_logs = list(telemetry_writer._logs)
assert len(list_metrics_logs) == 0


def test_dencode_error_and_no_log_metric(telemetry_writer):
def test_dencode_error_with_tainted_gives_one_log_metric(telemetry_writer):
string_input = taint_pyobject(
pyobject=b"abcde",
source_name="test_add_aspect_tainting_left_hand",
Expand All @@ -155,5 +166,16 @@ def test_dencode_error_and_no_log_metric(telemetry_writer):
with pytest.raises(LookupError):
mod.do_decode(string_input, "decoding-not-exists")

list_metrics_logs = list(telemetry_writer._logs)
assert len(list_metrics_logs) == 1
assert "message" in list_metrics_logs[0]
assert "IAST propagation error. decode_aspect. " in list_metrics_logs[0]["message"]


def test_dencode_error_with_not_tainted_gives_no_log_metric(telemetry_writer):
string_input = b"abcde"
with pytest.raises(LookupError):
mod.do_decode(string_input, "decoding-not-exists")

list_metrics_logs = list(telemetry_writer._logs)
assert len(list_metrics_logs) == 0
5 changes: 3 additions & 2 deletions tests/appsec/iast/aspects/test_index_aspect_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_string_index(input_str, index_pos, expected_result, tainted):


@pytest.mark.skipif(sys.version_info < (3, 9, 0), reason="Python version not supported by IAST")
def test_index_error_and_no_log_metric(telemetry_writer):
def test_index_error_with_tainted_gives_one_log_metric(telemetry_writer):
string_input = taint_pyobject(
pyobject="abcde",
source_name="test_add_aspect_tainting_left_hand",
Expand All @@ -75,7 +75,8 @@ def test_index_error_and_no_log_metric(telemetry_writer):
mod.do_index(string_input, 100)

list_metrics_logs = list(telemetry_writer._logs)
assert len(list_metrics_logs) == 0
assert len(list_metrics_logs) == 1
assert "IAST propagation error. index_aspect" in list_metrics_logs[0]["message"]


@pytest.mark.skip_iast_check_logs
Expand Down
6 changes: 3 additions & 3 deletions tests/appsec/iast/aspects/test_str_aspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,13 @@ def test_aspect_ljust_str_tainted(self):
ljusted = mod.do_ljust(string_input, 4) # pylint: disable=no-member
assert as_formatted_evidence(ljusted) == ":+-foo-+: "

def test_aspect_ljust_error_and_no_log_metric(self, telemetry_writer):
def test_aspect_ljust_error_with_tainted_gives_one_log_metric(self, telemetry_writer):
string_input = create_taint_range_with_format(":+-foo-+:")
with pytest.raises(TypeError):
mod.do_ljust(string_input, "aaaaa")

list_metrics_logs = list(telemetry_writer._logs)
assert len(list_metrics_logs) == 0
assert len(list_metrics_logs) == 1

def test_zfill(self):
# Not tainted
Expand Down Expand Up @@ -572,4 +572,4 @@ def test_format(self):
result = mod.do_format_fill(string_input) # pylint: disable=no-member
# TODO format with params doesn't work correctly the assert should be
# assert as_formatted_evidence(result) == ":+-foo -+:"
assert as_formatted_evidence(result) == "foo "
assert as_formatted_evidence(result) == ":+-foo-+:"
Loading