Skip to content

Commit

Permalink
[cherry-pick](branch-2.1) Pick "[feature](function) support ip functi…
Browse files Browse the repository at this point in the history
…ons named ipv4_to_ipv6 and cut_ipv6" (#39058)

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->
pick #36883 and
#35239
  • Loading branch information
sjyango authored Aug 10, 2024
1 parent 0db1583 commit 5f77f90
Show file tree
Hide file tree
Showing 14 changed files with 555 additions and 30 deletions.
6 changes: 6 additions & 0 deletions be/src/vec/functions/function_ip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,11 @@ void register_function_ip(SimpleFunctionFactory& factory) {
factory.register_function<FunctionToIP<IPConvertExceptionMode::Throw, IPv6>>();
factory.register_function<FunctionToIP<IPConvertExceptionMode::Default, IPv6>>();
factory.register_function<FunctionToIP<IPConvertExceptionMode::Null, IPv6>>();

/// Convert between IPv4 and IPv6 part
factory.register_function<FunctionIPv4ToIPv6>();

/// Cut IPv6 part
factory.register_function<FunctionCutIPv6>();
}
} // namespace doris::vectorized
137 changes: 137 additions & 0 deletions be/src/vec/functions/function_ip.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
#include <vector>

#include "vec/columns/column.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_struct.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/format_ip.h"
#include "vec/common/ipv6_to_binary.h"
#include "vec/common/unaligned.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/columns_with_type_and_name.h"
#include "vec/core/types.h"
Expand Down Expand Up @@ -1226,4 +1229,138 @@ class FunctionToIP : public IFunction {
}
};

class FunctionIPv4ToIPv6 : public IFunction {
public:
static constexpr auto name = "ipv4_to_ipv6";
static FunctionPtr create() { return std::make_shared<FunctionIPv4ToIPv6>(); }

String get_name() const override { return name; }

size_t get_number_of_arguments() const override { return 1; }

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return std::make_shared<DataTypeIPv6>();
}

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
size_t result, size_t input_rows_count) const override {
const auto& ipv4_column_with_type_and_name = block.get_by_position(arguments[0]);
const auto& [ipv4_column, ipv4_const] =
unpack_if_const(ipv4_column_with_type_and_name.column);
const auto* ipv4_addr_column = assert_cast<const ColumnIPv4*>(ipv4_column.get());
const auto& ipv4_column_data = ipv4_addr_column->get_data();
auto col_res = ColumnIPv6::create(input_rows_count, 0);
auto& col_res_data = col_res->get_data();

for (size_t i = 0; i < input_rows_count; ++i) {
auto ipv4_idx = index_check_const(i, ipv4_const);
map_ipv4_to_ipv6(ipv4_column_data[ipv4_idx],
reinterpret_cast<UInt8*>(&col_res_data[i]));
}

block.replace_by_position(result, std::move(col_res));
return Status::OK();
}

private:
static void map_ipv4_to_ipv6(IPv4 ipv4, UInt8* buf) {
unaligned_store<UInt64>(buf, 0x0000FFFF00000000ULL | static_cast<UInt64>(ipv4));
unaligned_store<UInt64>(buf + 8, 0);
}
};

class FunctionCutIPv6 : public IFunction {
public:
static constexpr auto name = "cut_ipv6";
static FunctionPtr create() { return std::make_shared<FunctionCutIPv6>(); }

String get_name() const override { return name; }

size_t get_number_of_arguments() const override { return 3; }

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return std::make_shared<DataTypeString>();
}

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
size_t result, size_t input_rows_count) const override {
const auto& ipv6_column_with_type_and_name = block.get_by_position(arguments[0]);
const auto& bytes_to_cut_for_ipv6_column_with_type_and_name =
block.get_by_position(arguments[1]);
const auto& bytes_to_cut_for_ipv4_column_with_type_and_name =
block.get_by_position(arguments[2]);

const auto& [ipv6_column, ipv6_const] =
unpack_if_const(ipv6_column_with_type_and_name.column);
const auto& [bytes_to_cut_for_ipv6_column, bytes_to_cut_for_ipv6_const] =
unpack_if_const(bytes_to_cut_for_ipv6_column_with_type_and_name.column);
const auto& [bytes_to_cut_for_ipv4_column, bytes_to_cut_for_ipv4_const] =
unpack_if_const(bytes_to_cut_for_ipv4_column_with_type_and_name.column);

const auto* ipv6_addr_column = assert_cast<const ColumnIPv6*>(ipv6_column.get());
const auto* to_cut_for_ipv6_bytes_column =
assert_cast<const ColumnInt8*>(bytes_to_cut_for_ipv6_column.get());
const auto* to_cut_for_ipv4_bytes_column =
assert_cast<const ColumnInt8*>(bytes_to_cut_for_ipv4_column.get());

const auto& ipv6_addr_column_data = ipv6_addr_column->get_data();
const auto& to_cut_for_ipv6_bytes_column_data = to_cut_for_ipv6_bytes_column->get_data();
const auto& to_cut_for_ipv4_bytes_column_data = to_cut_for_ipv4_bytes_column->get_data();

auto col_res = ColumnString::create();
ColumnString::Chars& chars_res = col_res->get_chars();
ColumnString::Offsets& offsets_res = col_res->get_offsets();
chars_res.resize(input_rows_count * (IPV6_MAX_TEXT_LENGTH + 1)); // + 1 for ending '\0'
offsets_res.resize(input_rows_count);
auto* begin = reinterpret_cast<char*>(chars_res.data());
auto* pos = begin;

for (size_t i = 0; i < input_rows_count; ++i) {
auto ipv6_idx = index_check_const(i, ipv6_const);
auto bytes_to_cut_for_ipv6_idx = index_check_const(i, bytes_to_cut_for_ipv6_const);
auto bytes_to_cut_for_ipv4_idx = index_check_const(i, bytes_to_cut_for_ipv4_const);

auto* address = const_cast<unsigned char*>(
reinterpret_cast<const unsigned char*>(&ipv6_addr_column_data[ipv6_idx]));
Int8 bytes_to_cut_for_ipv6_count =
to_cut_for_ipv6_bytes_column_data[bytes_to_cut_for_ipv6_idx];
Int8 bytes_to_cut_for_ipv4_count =
to_cut_for_ipv4_bytes_column_data[bytes_to_cut_for_ipv4_idx];

if (bytes_to_cut_for_ipv6_count > IPV6_BINARY_LENGTH) [[unlikely]] {
throw Exception(ErrorCode::INVALID_ARGUMENT,
"Illegal value for argument 2 {} of function {}",
bytes_to_cut_for_ipv6_column_with_type_and_name.type->get_name(),
get_name());
}

if (bytes_to_cut_for_ipv4_count > IPV6_BINARY_LENGTH) [[unlikely]] {
throw Exception(ErrorCode::INVALID_ARGUMENT,
"Illegal value for argument 3 {} of function {}",
bytes_to_cut_for_ipv4_column_with_type_and_name.type->get_name(),
get_name());
}

UInt8 bytes_to_cut_count = is_ipv4_mapped(address) ? bytes_to_cut_for_ipv4_count
: bytes_to_cut_for_ipv6_count;
cut_address(address, pos, bytes_to_cut_count);
offsets_res[i] = pos - begin;
}

block.replace_by_position(result, std::move(col_res));
return Status::OK();
}

private:
static bool is_ipv4_mapped(const UInt8* address) {
return (unaligned_load_little_endian<UInt64>(address + 8) == 0) &&
((unaligned_load_little_endian<UInt64>(address) & 0xFFFFFFFF00000000ULL) ==
0x0000FFFF00000000ULL);
}

static void cut_address(unsigned char* address, char*& dst, UInt8 zeroed_tail_bytes_count) {
format_ipv6(address, dst, zeroed_tail_bytes_count);
}
};

} // namespace doris::vectorized
75 changes: 75 additions & 0 deletions be/test/vec/function/function_ip_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "function_test_util.h"
#include "gtest/gtest_pred_impl.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_ipv6.h"
#include "vec/data_types/data_type_number.h"

namespace doris::vectorized {
Expand Down Expand Up @@ -80,4 +81,78 @@ TEST(FunctionIpTest, FunctionIsIPAddressInRangeTest) {
}
}

TEST(FunctionIpTest, FunctionIPv4ToIPv6Test) {
std::string func_name = "ipv4_to_ipv6";

DataSet data_set = {
{{static_cast<IPv4>(0)}, static_cast<IPv6>(0xFFFF00000000ULL)}, // 0.0.0.0
{{static_cast<IPv4>(1)}, static_cast<IPv6>(0xFFFF00000001ULL)}, // 0.0.0.1
{{static_cast<IPv4>(2130706433)}, static_cast<IPv6>(0xFFFF7F000001ULL)}, // 127.0.0.1
{{static_cast<IPv4>(3232235521)}, static_cast<IPv6>(0xFFFFC0A80001ULL)}, // 192.168.0.1
{{static_cast<IPv4>(4294967294)},
static_cast<IPv6>(0xFFFFFFFFFFFEULL)}, // 255.255.255.254
{{static_cast<IPv4>(4294967295)},
static_cast<IPv6>(0xFFFFFFFFFFFFULL)} // 255.255.255.255
};

InputTypeSet input_types = {TypeIndex::IPv4};
static_cast<void>(check_function<DataTypeIPv6, true>(func_name, input_types, data_set));
}

TEST(FunctionIpTest, FunctionCutIPv6Test) {
std::string func_name = "cut_ipv6";

std::array<std::array<uint8_t, 16>, 9> ipv6s = {
std::array<uint8_t, 16> {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, // ::
std::array<uint8_t, 16> {0x1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, // ::1
std::array<uint8_t, 16> {0x02, 0, 0x02, 0xb1, 0, 0, 0, 0, 0x10, 0x06, 0xa1, 0, 0x70,
0x1b, 0x01, 0x20}, // 2001:1b70:a1:610::b102:2
std::array<uint8_t, 16> {0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff,
0xff}, // ffff:ffff:ffff:ffff:ffff:ffff:ffff:fffe
std::array<uint8_t, 16> {0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff,
0xff}, // ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff
std::array<uint8_t, 16> {0x01, 0, 0xa8, 0xc0, 0xff, 0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0}, // ::ffff:192.168.0.1
std::array<uint8_t, 16> {0x01, 0, 0, 0x7f, 0xff, 0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0}, // ::ffff:127.0.0.1
std::array<uint8_t, 16> {0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0}, // ::ffff:255.255.255.254
std::array<uint8_t, 16> {0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0} // ::ffff:255.255.255.255
};

std::vector<int8_t> bytes = {0, 2, 4, 8, 16, 2, 4, 8, 16};

std::vector<std::string> results = {"::",
"::",
"2001:1b70:a1:610::",
"ffff:ffff:ffff:ffff::",
"::",
"::ffff:192.168.0.0",
"::ffff:0.0.0.0",
"::",
"::"};

DataSet data_set;

for (int i = 0; i < 5; ++i) {
IPv6 ipv6;
std::memcpy(&ipv6, &ipv6s[i], sizeof(IPv6));
// *reinterpret_cast<uint128_t*> will result in core dump, using std::memcpy instead.
data_set.push_back({{ipv6, bytes[i], (int8_t)0}, results[i]});
}

for (int i = 5; i < results.size(); ++i) {
IPv6 ipv6;
std::memcpy(&ipv6, &ipv6s[i], sizeof(IPv6));
// *reinterpret_cast<uint128_t*> will result in core dump, using std::memcpy instead.
data_set.push_back({{ipv6, (int8_t)0, bytes[i]}, results[i]});
}

InputTypeSet input_types = {TypeIndex::IPv6, TypeIndex::Int8, TypeIndex::Int8};
static_cast<void>(check_function<DataTypeString, true>(func_name, input_types, data_set));
}

} // namespace doris::vectorized
16 changes: 16 additions & 0 deletions be/test/vec/function/function_test_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include "vec/data_types/data_type_date.h"
#include "vec/data_types/data_type_date_time.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_ipv4.h"
#include "vec/data_types/data_type_ipv6.h"
#include "vec/data_types/data_type_jsonb.h"
#include "vec/data_types/data_type_string.h"
#include "vec/data_types/data_type_time_v2.h"
Expand Down Expand Up @@ -98,6 +100,14 @@ size_t type_index_to_data_type(const std::vector<AnyType>& input_types, size_t i
desc.type = doris::PrimitiveType::TYPE_OBJECT;
type = std::make_shared<DataTypeBitMap>();
return 1;
case TypeIndex::IPv4:
desc.type = doris::PrimitiveType::TYPE_IPV4;
type = std::make_shared<DataTypeIPv4>();
return 1;
case TypeIndex::IPv6:
desc.type = doris::PrimitiveType::TYPE_IPV6;
type = std::make_shared<DataTypeIPv6>();
return 1;
case TypeIndex::UInt8:
desc.type = doris::PrimitiveType::TYPE_BOOLEAN;
type = std::make_shared<DataTypeUInt8>();
Expand Down Expand Up @@ -242,6 +252,12 @@ bool insert_cell(MutableColumnPtr& column, DataTypePtr type_ptr, const AnyType&
} else if (type.idx == TypeIndex::BitMap) {
BitmapValue* bitmap = any_cast<BitmapValue*>(cell);
column->insert_data((char*)bitmap, sizeof(BitmapValue));
} else if (type.is_ipv4()) {
auto value = any_cast<ut_type::IPV4>(cell);
column->insert_data(reinterpret_cast<char*>(&value), 0);
} else if (type.is_ipv6()) {
auto value = any_cast<ut_type::IPV6>(cell);
column->insert_data(reinterpret_cast<char*>(&value), 0);
} else if (type.is_uint8()) {
auto value = any_cast<ut_type::BOOLEAN>(cell);
column->insert_data(reinterpret_cast<char*>(&value), 0);
Expand Down
3 changes: 3 additions & 0 deletions be/test/vec/function/function_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ using STRING = std::string;
using DOUBLE = double;
using FLOAT = float;

using IPV4 = uint32_t;
using IPV6 = uint128_t;

inline auto DECIMAL = Decimal128V2::double_to_decimal;
inline auto DECIMALFIELD = [](double v) {
return DecimalField<Decimal128V2>(Decimal128V2::double_to_decimal(v), 9);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import org.apache.doris.nereids.trees.expressions.functions.scalar.CurrentDate;
import org.apache.doris.nereids.trees.expressions.functions.scalar.CurrentTime;
import org.apache.doris.nereids.trees.expressions.functions.scalar.CurrentUser;
import org.apache.doris.nereids.trees.expressions.functions.scalar.CutIpv6;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Database;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Date;
import org.apache.doris.nereids.trees.expressions.functions.scalar.DateDiff;
Expand Down Expand Up @@ -207,6 +208,7 @@
import org.apache.doris.nereids.trees.expressions.functions.scalar.Ipv4StringToNum;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Ipv4StringToNumOrDefault;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Ipv4StringToNumOrNull;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Ipv4ToIpv6;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Ipv6CIDRToRange;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Ipv6NumToString;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Ipv6StringToNum;
Expand Down Expand Up @@ -571,6 +573,7 @@ public class BuiltinScalarFunctions implements FunctionHelper {
scalar(CurrentDate.class, "curdate", "current_date"),
scalar(CurrentTime.class, "curtime", "current_time"),
scalar(CurrentUser.class, "current_user"),
scalar(CutIpv6.class, "cut_ipv6"),
scalar(Database.class, "database", "schema"),
scalar(Date.class, "date"),
scalar(DateDiff.class, "datediff"),
Expand Down Expand Up @@ -643,6 +646,7 @@ public class BuiltinScalarFunctions implements FunctionHelper {
scalar(Ipv4StringToNum.class, "ipv4_string_to_num"),
scalar(Ipv4StringToNumOrDefault.class, "ipv4_string_to_num_or_default"),
scalar(Ipv4StringToNumOrNull.class, "ipv4_string_to_num_or_null", "inet_aton"),
scalar(Ipv4ToIpv6.class, "ipv4_to_ipv6"),
scalar(Ipv6NumToString.class, "ipv6_num_to_string", "inet6_ntoa"),
scalar(Ipv6StringToNum.class, "ipv6_string_to_num"),
scalar(Ipv6StringToNumOrDefault.class, "ipv6_string_to_num_or_default"),
Expand All @@ -654,12 +658,6 @@ public class BuiltinScalarFunctions implements FunctionHelper {
scalar(IsIpAddressInRange.class, "is_ip_address_in_range"),
scalar(Ipv4CIDRToRange.class, "ipv4_cidr_to_range"),
scalar(Ipv6CIDRToRange.class, "ipv6_cidr_to_range"),
scalar(ToIpv4.class, "to_ipv4"),
scalar(ToIpv4OrDefault.class, "to_ipv4_or_default"),
scalar(ToIpv4OrNull.class, "to_ipv4_or_null"),
scalar(ToIpv6.class, "to_ipv6"),
scalar(ToIpv6OrDefault.class, "to_ipv6_or_default"),
scalar(ToIpv6OrNull.class, "to_ipv6_or_null"),
scalar(JsonArray.class, "json_array"),
scalar(JsonObject.class, "json_object"),
scalar(JsonQuote.class, "json_quote"),
Expand Down Expand Up @@ -874,6 +872,12 @@ public class BuiltinScalarFunctions implements FunctionHelper {
scalar(ToDate.class, "to_date"),
scalar(ToDateV2.class, "to_datev2"),
scalar(ToDays.class, "to_days"),
scalar(ToIpv4.class, "to_ipv4"),
scalar(ToIpv4OrDefault.class, "to_ipv4_or_default"),
scalar(ToIpv4OrNull.class, "to_ipv4_or_null"),
scalar(ToIpv6.class, "to_ipv6"),
scalar(ToIpv6OrDefault.class, "to_ipv6_or_default"),
scalar(ToIpv6OrNull.class, "to_ipv6_or_null"),
scalar(Tokenize.class, "tokenize"),
scalar(ToMonday.class, "to_monday"),
scalar(ToQuantileState.class, "to_quantile_state"),
Expand Down
Loading

0 comments on commit 5f77f90

Please sign in to comment.