Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(iast): native str aspect #10978

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions ddtrace/appsec/_iast/_taint_tracking/Aspects/AspectStr.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#include <Aspects/AspectStr.h>

py::str
api_str_aspect(const py::object& orig_function,
const int flag_added_args,
const py::args& args,
const py::kwargs& kwargs)
{
auto result_or_args = py::reinterpret_borrow<py::object>(
process_flag_added_args(orig_function.ptr(), flag_added_args, args.ptr(), kwargs.ptr()));

py::tuple args_tuple;
if (py::isinstance<py::tuple>(result_or_args)) {
args_tuple = result_or_args.cast<py::tuple>();
} else {
return result_or_args;
}

const py::object text = args_tuple[0];
const py::str encoding = len(args) > 1 ? args_tuple[1] : py::str("");
const py::str errors = len(args) > 2 ? args_tuple[2] : py::str("");

py::str result_o;

// Call the original if not a text type
if (not is_text(text.ptr())) {
PyObject* as_str = PyObject_Str(text.ptr());
if (as_str == nullptr) {
throw py::error_already_set();
}
return py::reinterpret_borrow<py::str>(as_str);
}

// With no encoding or errors we can also directly call PyObject_Str
if (len(args_tuple) == 1 or (len(encoding) == 0)) {
PyObject* as_str = PyObject_Str(text.ptr());
if (as_str == nullptr) {
throw py::error_already_set();
}
result_o = py::reinterpret_borrow<py::str>(as_str);
} else {
// Bytesomething: we have to decode

// If it has encoding, then the text object must not be a unicode object
if (len(encoding) > 0 and py::isinstance<py::str>(text)) {
PyErr_SetString(PyExc_TypeError, "decoding str is not supported");
throw py::error_already_set();
}

const char* char_encoding = encoding.cast<string>().c_str();
const char* char_errors = errors.cast<string>().c_str();

char* text_raw_bytes;
Py_ssize_t text_raw_bytes_size;
if (PyBytes_AsStringAndSize(text.ptr(), &text_raw_bytes, &text_raw_bytes_size) == -1) {
throw py::error_already_set();
}

PyObject* result_pyo = PyUnicode_Decode(text_raw_bytes, text_raw_bytes_size, char_encoding, char_errors);
if (PyErr_Occurred()) {
throw py::error_already_set();
}
if (result_pyo == nullptr) {
return py::none();
}
result_o = py::reinterpret_borrow<py::str>(result_pyo);
}

TRY_CATCH_ASPECT("str_aspect", return result_o, , {
gnufede marked this conversation as resolved.
Show resolved Hide resolved
const auto tx_map = Initializer::get_tainting_map();
if (!tx_map || tx_map->empty()) {
return result_o;
}

auto [ranges, ranges_error] = get_ranges(text.ptr(), tx_map);
if (ranges_error || ranges.empty()) {
return result_o;
}

if (py::isinstance<py::str>(text)) {
set_ranges(result_o.ptr(), ranges, tx_map);
} else {
PyObject* check_offset = PyObject_Str(text.ptr());
if (check_offset == nullptr) {
PyErr_Clear();
// FIXME: take all the length of result_o as range length as fallback
set_ranges(result_o.ptr(), ranges, tx_map);
} else {
auto len_result_o = len(result_o);
Py_ssize_t offset = PyUnicode_Find(result_o.ptr(), check_offset, 0, len_result_o, 1);
if (offset == -1) {
PyErr_Clear();
// FIXME: take all the length of result_o as range length as fallback
set_ranges(result_o.ptr(), ranges, tx_map);
} else {
copy_and_shift_ranges_from_strings(text, result_o, offset, len_result_o, tx_map);
}
}
Py_DECREF(check_offset);
}
return result_o;
});
}

void
pyexport_aspect_str(py::module& m)
{
m.def(
"_aspect_str",
[](const py::object& orig_function, const int flag_added_args, const py::args& args, const py::kwargs& kwargs) {
return api_str_aspect(orig_function, flag_added_args, args, kwargs);
},
"orig_function"_a = py::none(),
"flag_added_args"_a = 0,
py::return_value_policy::move);
}
8 changes: 8 additions & 0 deletions ddtrace/appsec/_iast/_taint_tracking/Aspects/AspectStr.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#pragma once
#include "Aspects/Helpers.h"

py::str
api_str_aspect(const py::object& orig_function, int flag_added_args, const py::args& args, const py::kwargs& kwargs);

void
pyexport_aspect_str(py::module& m);
93 changes: 93 additions & 0 deletions ddtrace/appsec/_iast/_taint_tracking/tests/test_str_aspect.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#include <Aspects/AspectStr.h>
#include <Aspects/Helpers.h>
#include <iostream>
#include <tests/test_common.hpp>

using CheckAspectStr = PyEnvWithContext;

TEST_F(CheckAspectStr, StrWithStr)
{
auto result = api_str_aspect(py::none(), 0, py::args(py::make_tuple(py::str("test"))), py::kwargs());
EXPECT_TRUE(py::isinstance<py::str>(result));
EXPECT_STREQ(result.cast<string>().c_str(), "test");
}

TEST_F(CheckAspectStr, StrWithInteger)
{
auto result = api_str_aspect(py::none(), 0, py::args(py::make_tuple(py::int_(42))), py::kwargs());
EXPECT_TRUE(py::isinstance<py::str>(result));
EXPECT_STREQ(result.cast<string>().c_str(), "42");
}

TEST_F(CheckAspectStr, StrWithFloat)
{
auto result = api_str_aspect(py::none(), 0, py::args(py::make_tuple(py::float_(42.42))), py::kwargs());
EXPECT_TRUE(py::isinstance<py::str>(result));
EXPECT_STREQ(result.cast<string>().c_str(), "42.42");
}

TEST_F(CheckAspectStr, StrWithBytesNoEncoding)
{
auto result = api_str_aspect(py::none(), 0, py::args(py::make_tuple(py::bytes("test"))), py::kwargs());
EXPECT_TRUE(py::isinstance<py::str>(result));
EXPECT_STREQ(result.cast<string>().c_str(), "b'test'");
}

TEST_F(CheckAspectStr, StrWithBytesAndEncoding)
{
auto result =
api_str_aspect(py::none(), 0, py::args(py::make_tuple(py::bytes("test"), py::str("utf-8"))), py::kwargs());
EXPECT_TRUE(py::isinstance<py::str>(result));
EXPECT_STREQ(result.cast<string>().c_str(), "test");
}

TEST_F(CheckAspectStr, StrWithBytesAndErrorStrictButNoError)
{
auto result = api_str_aspect(
py::none(), 0, py::args(py::make_tuple(py::bytes("test"), py::str("utf-8"), py::str("strict"))), py::kwargs());
EXPECT_TRUE(py::isinstance<py::str>(result));
EXPECT_STREQ(result.cast<string>().c_str(), "test");
}

TEST_F(CheckAspectStr, StrWithBytesAndErrorStrictAndErrorRaisesUnicodeDecodeError)
{
try {
auto result =
api_str_aspect(py::none(),
0,
py::args(py::make_tuple(py::bytes("test\244"), py::str("ascii"), py::str("strict"))),
py::kwargs());
cerr << "JJJ fucking result: " << result << endl;
FAIL() << "Expected UnicodeDecodeError to be thrown";
} catch (py::error_already_set& e) {
EXPECT_STREQ(
e.what(),
"UnicodeDecodeError: 'ascii' codec can't decode byte 0xa4 in position 4: ordinal not in range(128)");
}
}

TEST_F(CheckAspectStr, StrWithBytesAndErrorIgnoreAndErrorDontRaiseUnicodeDecodeError)
{
auto result = api_str_aspect(py::none(),
0,
py::args(py::make_tuple(py::bytes("test\244"), py::str("ascii"), py::str("ignore"))),
py::kwargs());
EXPECT_TRUE(py::isinstance<py::str>(result));
EXPECT_STREQ(result[py::slice(0, 4, 1)].cast<string>().c_str(), "test");
// No exception should be thrown
}

TEST_F(CheckAspectStr, StrWithStrAndEncodingNotAllowed)
{
try {
auto result = api_str_aspect(
py::none(), 0, py::args(py::make_tuple(py::str("test"), py::str("ascii"), py::str("strict"))), py::kwargs());
FAIL() << "Expected TypeError to be thrown";
} catch (py::error_already_set& e) {
EXPECT_STREQ(e.what(), "TypeError: decoding str is not supported");
}
}

// TODO: more tests:
// - Propagation, including the fallback cases.
// - Other random argument types
Loading