Skip to content

Commit

Permalink
Merge branch 'main' into velox-gcs-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrux authored Dec 4, 2023
2 parents f01e991 + 27ca04c commit 8c69534
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1{"v":"a"}\0\0\0
2{"v":"b"}\a\b\b
3{"v":"c"}abcd
Original file line number Diff line number Diff line change
Expand Up @@ -957,17 +957,26 @@ class GlutenClickHouseHiveTableSuite()
spark.sql("DROP TABLE b")
}

test("GLUTEN-3337: fix get_json_object ctrl-chars bug") {
val data_path = rootPath + "/text-data/ctrl-chars"
test("GLUTEN-3337: fix get_json_object for abnormal json") {
val data_path = rootPath + "/text-data/abnormal-json"
spark.sql(s"""
|CREATE TABLE test_tbl_3337 (
| id bigint,
| data string) stored as textfile
|LOCATION '$data_path'
""".stripMargin)

val select_sql = "select id, get_json_object(data, '$.data.v') from test_tbl_3337"
compareResultsAgainstVanillaSpark(select_sql, compareResult = true, _ => {})
val select_sql_1 = "select id, get_json_object(data, '$.data.v') from test_tbl_3337"
val select_sql_2 = "select id, get_json_object(data, '$.v') from test_tbl_3337"
val select_sql_3 = "select id, get_json_object(data, '$.123.234') from test_tbl_3337"
val select_sql_4 = "select id, get_json_object(data, '$.v111') from test_tbl_3337"
val select_sql_5 = "select id, get_json_object(data, 'v112') from test_tbl_3337"
compareResultsAgainstVanillaSpark(select_sql_1, compareResult = true, _ => {})
compareResultsAgainstVanillaSpark(select_sql_2, compareResult = true, _ => {})
compareResultsAgainstVanillaSpark(select_sql_3, compareResult = true, _ => {})
compareResultsAgainstVanillaSpark(select_sql_4, compareResult = true, _ => {})
compareResultsAgainstVanillaSpark(select_sql_5, compareResult = true, _ => {})

spark.sql("DROP TABLE test_tbl_3337")
}

Expand Down
75 changes: 50 additions & 25 deletions cpp-ch/local-engine/Functions/SparkFunctionGetJsonObject.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once
#include <memory>
#include <string_view>
#include <stack>
#include <Columns/ColumnNullable.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
Expand Down Expand Up @@ -211,6 +212,37 @@ class FlattenJSONStringOnRequiredFunction : public DB::IFunction
private:
DB::ContextPtr context;

void parseAbnormalJson(char * dst, std::string_view & json) const
{
const char * json_chars = json.data();
const size_t json_size = json.size();
UInt8 NULL_CHAR = 0x0000;
UInt8 SPACE_CHAR = 0x0020;
std::stack<char> tmp;
size_t cursor = 0;
for (size_t i = 0; i <= json_size; ++i)
{
if (*(json_chars + i) > NULL_CHAR && *(json_chars + i) < SPACE_CHAR)
continue;
else
{
char ch = *(json_chars + i);
dst[cursor++] = ch;
if (ch == '{')
tmp.push('{');
else if (ch == '}')
{
if (tmp.top() == '{')
tmp.pop();
}
if (tmp.empty())
break;
}
}
std::string_view result{dst, cursor};
json = result;
}

template <typename JSONParser, typename Impl>
DB::ColumnPtr innerExecuteImpl(const DB::ColumnsWithTypeAndName & arguments) const
{
Expand All @@ -220,10 +252,12 @@ class FlattenJSONStringOnRequiredFunction : public DB::IFunction
std::vector<DB::ASTPtr> json_path_asts;

std::vector<String> required_fields;
const auto & first_column = arguments[0];
if (const auto * required_fields_col = typeid_cast<const DB::ColumnConst *>(arguments[1].column.get()))
{
std::string json_fields = required_fields_col->getDataAt(0).toString();
Poco::StringTokenizer tokenizer(json_fields, "|");
bool path_parsed = true;
for (const auto & field : tokenizer)
{
required_fields.push_back(field);
Expand All @@ -239,19 +273,26 @@ class FlattenJSONStringOnRequiredFunction : public DB::IFunction
DB::Expected expected;
if (!path_parser.parse(token_iterator, json_path_ast, expected))
{
throw DB::Exception(DB::ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Invalid json path: {}", field);
path_parsed = false;
}
json_path_asts.push_back(json_path_ast);
}
if (!path_parsed)
{
for (size_t i = 0; i < first_column.column->size(); ++i)
{
for (size_t j = 0; j < tuple_columns.size(); ++j)
tuple_columns[j]->insertDefault();
}
return DB::ColumnTuple::create(std::move(tuple_columns));
}
}
else
{
throw DB::Exception(
DB::ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "The second argument of function {} must be a non-constant column", getName());
}


const auto & first_column = arguments[0];
if (!isString(first_column.type))
throw DB::Exception(
DB::ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
Expand All @@ -272,32 +313,16 @@ class FlattenJSONStringOnRequiredFunction : public DB::IFunction
Impl impl;
JSONParser parser;
using Element = typename JSONParser::Element;

auto copyJsonStringExceptCtrlChars = [&](char * dst_chars, const char * src_chars, const size_t & length) -> std::string_view
{
UInt8 NULL_CHAR = 0x0000;
UInt8 SPACE_CHAR = 0x0020;
size_t cursor = 0;
for (size_t i = 0; i <= length; ++i)
{
if (*(src_chars + i) > NULL_CHAR && *(src_chars + i) < SPACE_CHAR)
continue;
else
dst_chars[cursor++] = *(src_chars + i);
}
std::string_view json{dst_chars, cursor - 1};
return json;
};

Element document;
bool document_ok = false;
if (col_json_const)
{
std::string_view json{reinterpret_cast<const char *>(chars.data()), offsets[0] - 1};
document_ok = parser.parse(json, document);
if (!document_ok) {
char dst_chars[json.size()];
json = copyJsonStringExceptCtrlChars(dst_chars, json.data(), json.size());
if (!document_ok)
{
char dst[json.size()];
parseAbnormalJson(dst, json);
document_ok = parser.parse(json, document);
}
}
Expand All @@ -318,8 +343,8 @@ class FlattenJSONStringOnRequiredFunction : public DB::IFunction
document_ok = parser.parse(json, document);
if (!document_ok)
{
char dst_chars[json.size()];
json = copyJsonStringExceptCtrlChars(dst_chars, json.data(), json.size());
char dst[json.size()];
parseAbnormalJson(dst, json);
document_ok = parser.parse(json, document);
}
}
Expand Down
1 change: 0 additions & 1 deletion dev/package.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ VERSION=$(. /etc/os-release && echo ${VERSION_ID})
ARCH=`uname -m`

# compile gluten jar
export CPU_TARGET="${ARCH}"
$GLUTEN_DIR/dev/builddeps-veloxbe.sh --build_tests=ON --build_benchmarks=ON --enable_s3=ON --enable_hdfs=ON
mvn clean package -Pbackends-velox -Prss -Pspark-3.2 -DskipTests
mvn clean package -Pbackends-velox -Prss -Pspark-3.3 -DskipTests
Expand Down

0 comments on commit 8c69534

Please sign in to comment.