Skip to content

Commit

Permalink
[feature](timev2) pick timediff and timev2 to 2.0 (#22786)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored Aug 9, 2023
1 parent 87db7c2 commit b7d6e8a
Show file tree
Hide file tree
Showing 30 changed files with 576 additions and 43 deletions.
2 changes: 1 addition & 1 deletion be/src/runtime/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ TypeDescriptor::TypeDescriptor(const std::vector<TTypeNode>& types, int* idx)
DCHECK(scalar_type.__isset.len);
len = scalar_type.len;
} else if (type == TYPE_DECIMALV2 || type == TYPE_DECIMAL32 || type == TYPE_DECIMAL64 ||
type == TYPE_DECIMAL128I || type == TYPE_DATETIMEV2) {
type == TYPE_DECIMAL128I || type == TYPE_DATETIMEV2 || type == TYPE_TIMEV2) {
DCHECK(scalar_type.__isset.precision);
DCHECK(scalar_type.__isset.scale);
precision = scalar_type.precision;
Expand Down
104 changes: 104 additions & 0 deletions be/src/util/date_func.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ uint64_t timestamp_from_datetime_v2(const std::string& date_str) {
}
// refer to https://dev.mysql.com/doc/refman/5.7/en/time.html
// the time value between '-838:59:59' and '838:59:59'
/// TODO: Why is the time type stored as double? Can we directly use int64 and remove the time limit?
int32_t time_to_buffer_from_double(double time, char* buffer) {
char* begin = buffer;
if (time < 0) {
Expand Down Expand Up @@ -115,6 +116,61 @@ int32_t time_to_buffer_from_double(double time, char* buffer) {
return buffer - begin;
}

int64_t check_over_max_time(int64_t time) {
const static int64_t max_time = (int64_t)3020399 * 1000 * 1000;
if (time > max_time) {
return max_time;
}
return time;
}
int32_t timev2_to_buffer_from_double(double time, char* buffer, int scale) {
static int pow10[7] = {1, 10, 100, 1000, 10000, 100000, 1000000};

char* begin = buffer;
if (time < 0) {
time = -time;
*buffer++ = '-';
}
int64_t m_time = time;
// m_time = hour * 3600 * 1000 * 1000 + minute * 60 * 1000 * 1000 + second * 1000 * 1000 + microsecond
m_time = check_over_max_time(m_time);
int64_t hour = m_time / ((int64_t)3600 * 1000 * 1000);
if (hour >= 100) {
auto f = fmt::format_int(hour);
memcpy(buffer, f.data(), f.size());
buffer = buffer + f.size();
} else {
*buffer++ = (char)('0' + (hour / 10));
*buffer++ = (char)('0' + (hour % 10));
}
*buffer++ = ':';
m_time %= (int64_t)3600 * 1000 * 1000;
int64_t minute = m_time / (60 * 1000 * 1000);
*buffer++ = (char)('0' + (minute / 10));
*buffer++ = (char)('0' + (minute % 10));
*buffer++ = ':';
m_time %= 60 * 1000 * 1000;
int32_t second = m_time / (1000 * 1000);
*buffer++ = (char)('0' + (second / 10));
*buffer++ = (char)('0' + (second % 10));
m_time %= 1000 * 1000;
if (scale == 0) {
return buffer - begin;
}
*buffer++ = '.';
memset(buffer, '0', scale);
buffer += scale;
int32_t micosecond = m_time % (1000 * 1000);
micosecond /= pow10[6 - scale];
auto it = buffer - 1;
while (micosecond) {
*it = (char)('0' + (micosecond % 10));
micosecond /= 10;
it--;
}
return buffer - begin;
}

std::string time_to_buffer_from_double(double time) {
fmt::memory_buffer buffer;
if (time < 0) {
Expand All @@ -136,4 +192,52 @@ std::string time_to_buffer_from_double(double time) {
return fmt::to_string(buffer);
}

std::string timev2_to_buffer_from_double(double time, int scale) {
static int pow10[7] = {1, 10, 100, 1000, 10000, 100000, 1000000};
fmt::memory_buffer buffer;
if (time < 0) {
time = -time;
fmt::format_to(buffer, "-");
}
int64_t m_time = time;
m_time = check_over_max_time(m_time);
// m_time = hour * 3600 * 1000 * 1000 + minute * 60 * 1000 * 1000 + second * 1000 * 1000 + microsecond
int64_t hour = m_time / ((int64_t)3600 * 1000 * 1000);
if (hour >= 100) {
fmt::format_to(buffer, fmt::format("{}", hour));
} else {
fmt::format_to(buffer, fmt::format("{:02d}", hour));
}
m_time %= (int64_t)3600 * 1000 * 1000;
int64_t minute = m_time / (60 * 1000 * 1000);
m_time %= 60 * 1000 * 1000;
int32_t second = m_time / (1000 * 1000);
int32_t micosecond = m_time % (1000 * 1000);
micosecond /= pow10[6 - scale];
switch (scale) {
case 0:
fmt::format_to(buffer, fmt::format(":{:02d}:{:02d}", minute, second, micosecond));
break;
case 1:
fmt::format_to(buffer, fmt::format(":{:02d}:{:02d}.{:01d}", minute, second, micosecond));
break;
case 2:
fmt::format_to(buffer, fmt::format(":{:02d}:{:02d}.{:02d}", minute, second, micosecond));
break;
case 3:
fmt::format_to(buffer, fmt::format(":{:02d}:{:02d}.{:03d}", minute, second, micosecond));
break;
case 4:
fmt::format_to(buffer, fmt::format(":{:02d}:{:02d}.{:04d}", minute, second, micosecond));
break;
case 5:
fmt::format_to(buffer, fmt::format(":{:02d}:{:02d}.{:05d}", minute, second, micosecond));
break;
case 6:
fmt::format_to(buffer, fmt::format(":{:02d}:{:02d}.{:06d}", minute, second, micosecond));
break;
}

return fmt::to_string(buffer);
}
} // namespace doris
3 changes: 2 additions & 1 deletion be/src/util/date_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ namespace doris {
uint64_t timestamp_from_datetime(const std::string& datetime_str);
uint32_t timestamp_from_date(const std::string& date_str);
int32_t time_to_buffer_from_double(double time, char* buffer);
int32_t timev2_to_buffer_from_double(double time, char* buffer, int scale);
uint32_t timestamp_from_date_v2(const std::string& date_str);
uint64_t timestamp_from_datetime_v2(const std::string& date_str);

std::string time_to_buffer_from_double(double time);

std::string timev2_to_buffer_from_double(double time, int scale);
} // namespace doris
26 changes: 26 additions & 0 deletions be/src/util/mysql_row_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ static char* add_time(double data, char* pos, bool dynamic_mode) {
return pos + length;
}

static char* add_timev2(double data, char* pos, bool dynamic_mode, int scale) {
int length = timev2_to_buffer_from_double(data, pos + !dynamic_mode, scale);
if (!dynamic_mode) {
int1store(pos++, length);
}
return pos + length;
}
template <typename DateType>
static char* add_datetime(const DateType& data, char* pos, bool dynamic_mode) {
int length = data.to_buffer(pos + !dynamic_mode);
Expand Down Expand Up @@ -422,6 +429,25 @@ int MysqlRowBuffer<is_binary_format>::push_time(double data) {
return 0;
}

template <bool is_binary_format>
int MysqlRowBuffer<is_binary_format>::push_timev2(double data, int scale) {
if (is_binary_format && !_dynamic_mode) {
char buff[8];
_field_pos++;
float8store(buff, data);
return append(buff, 8);
}
// 1 for string trail, 1 for length, other for time str
int ret = reserve(2 + MAX_TIME_WIDTH);

if (0 != ret) {
LOG(ERROR) << "mysql row buffer reserve failed.";
return ret;
}

_pos = add_timev2(data, _pos, _dynamic_mode, scale);
return 0;
}
template <bool is_binary_format>
template <typename DateType>
int MysqlRowBuffer<is_binary_format>::push_vec_datetime(DateType& data) {
Expand Down
1 change: 1 addition & 0 deletions be/src/util/mysql_row_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class MysqlRowBuffer {
int push_float(float data);
int push_double(double data);
int push_time(double data);
int push_timev2(double data, int scale);
template <typename DateType>
int push_datetime(const DateType& data);
int push_decimal(const DecimalV2Value& data, int round_scale);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/core/call_on_type_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ bool call_on_index_and_data_type(TypeIndex number, F&& f) {
return f(TypePair<DataTypeNumber<Float64>, T>());
case TypeIndex::Time:
return f(TypePair<DataTypeTime, T>());
case TypeIndex::TimeV2:
return f(TypePair<DataTypeTimeV2, T>());
case TypeIndex::Decimal32:
return f(TypePair<DataTypeDecimal<Decimal32>, T>());
case TypeIndex::Decimal64:
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/data_types/data_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ PGenericType_TypeId IDataType::get_pdata_type(const IDataType* data_type) {
return PGenericType::TIME;
case TypeIndex::AggState:
return PGenericType::AGG_STATE;
case TypeIndex::TimeV2:
return PGenericType::TIMEV2;
default:
return PGenericType::UNKNOWN;
}
Expand Down
13 changes: 6 additions & 7 deletions be/src/vec/data_types/data_type_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ DataTypePtr DataTypeFactory::create_data_type(const TypeDescriptor& col_desc, bo
break;
case TYPE_TIME:
case TYPE_TIMEV2:
nested = std::make_shared<vectorized::DataTypeTime>();
nested = std::make_shared<vectorized::DataTypeTimeV2>(col_desc.scale);
break;
case TYPE_DOUBLE:
nested = std::make_shared<vectorized::DataTypeFloat64>();
Expand Down Expand Up @@ -277,9 +277,6 @@ DataTypePtr DataTypeFactory::create_data_type(const TypeIndex& type_index, bool
case TypeIndex::DateV2:
nested = std::make_shared<vectorized::DataTypeDateV2>();
break;
case TypeIndex::Time:
nested = std::make_shared<DataTypeTime>();
break;
case TypeIndex::DateTimeV2:
nested = std::make_shared<DataTypeDateTimeV2>();
break;
Expand Down Expand Up @@ -316,7 +313,8 @@ DataTypePtr DataTypeFactory::create_data_type(const TypeIndex& type_index, bool
nested = std::make_shared<vectorized::DataTypeQuantileStateDouble>();
break;
case TypeIndex::TimeV2:
nested = std::make_shared<vectorized::DataTypeTime>();
case TypeIndex::Time:
nested = std::make_shared<vectorized::DataTypeTimeV2>();
break;
default:
DCHECK(false) << "invalid typeindex:" << getTypeName(type_index);
Expand Down Expand Up @@ -516,8 +514,9 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) {
nested = std::make_shared<DataTypeQuantileStateDouble>();
break;
}
case PGenericType::TIME: {
nested = std::make_shared<DataTypeTime>();
case PGenericType::TIME:
case PGenericType::TIMEV2: {
nested = std::make_shared<DataTypeTimeV2>(pcolumn.decimal_param().scale());
break;
}
case PGenericType::AGG_STATE: {
Expand Down
28 changes: 28 additions & 0 deletions be/src/vec/data_types/data_type_time.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include "vec/data_types/data_type_time.h"

#include <gen_cpp/data.pb.h>

#include <typeinfo>
#include <utility>

Expand Down Expand Up @@ -56,8 +58,34 @@ void DataTypeTime::to_string(const IColumn& column, size_t row_num, BufferWritab
ostr.write(value.data(), value.size());
}

void DataTypeTimeV2::to_pb_column_meta(PColumnMeta* col_meta) const {
IDataType::to_pb_column_meta(col_meta);
col_meta->mutable_decimal_param()->set_scale(_scale);
}

MutableColumnPtr DataTypeTime::create_column() const {
return DataTypeNumberBase<Float64>::create_column();
}

bool DataTypeTimeV2::equals(const IDataType& rhs) const {
return typeid(rhs) == typeid(*this);
}

std::string DataTypeTimeV2::to_string(const IColumn& column, size_t row_num) const {
auto result = check_column_const_set_readability(column, row_num);
ColumnPtr ptr = result.first;
row_num = result.second;

auto value = assert_cast<const ColumnFloat64&>(*ptr).get_element(row_num);
return timev2_to_buffer_from_double(value, _scale);
}

void DataTypeTimeV2::to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const {
std::string value = to_string(column, row_num);
ostr.write(value.data(), value.size());
}

MutableColumnPtr DataTypeTimeV2::create_column() const {
return DataTypeNumberBase<Float64>::create_column();
}
} // namespace doris::vectorized
39 changes: 39 additions & 0 deletions be/src/vec/data_types/data_type_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,45 @@ class DataTypeTime final : public DataTypeNumberBase<Float64> {

DataTypeSerDeSPtr get_serde() const override { return std::make_shared<DataTypeTimeSerDe>(); };
TypeIndex get_type_id() const override { return TypeIndex::Time; }
const char* get_family_name() const override { return "time"; }
};

class DataTypeTimeV2 final : public DataTypeNumberBase<Float64> {
public:
DataTypeTimeV2(int scale = 0) : _scale(scale) {
if (UNLIKELY(scale > 6)) {
LOG(FATAL) << fmt::format("Scale {} is out of bounds", scale);
}
if (scale == -1) {
_scale = 0;
}
}
bool equals(const IDataType& rhs) const override;

std::string to_string(const IColumn& column, size_t row_num) const override;
PrimitiveType get_type_as_primitive_type() const override { return TYPE_TIMEV2; }
TPrimitiveType::type get_type_as_tprimitive_type() const override {
return TPrimitiveType::TIMEV2;
}

void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;

MutableColumnPtr create_column() const override;

bool is_summable() const override { return true; }
bool can_be_used_in_bit_operations() const override { return true; }
bool can_be_used_in_boolean_context() const override { return true; }
bool can_be_inside_nullable() const override { return true; }

void to_pb_column_meta(PColumnMeta* col_meta) const override;
DataTypeSerDeSPtr get_serde() const override {
return std::make_shared<DataTypeTimeV2SerDe>(_scale);
};
TypeIndex get_type_id() const override { return TypeIndex::TimeV2; }
const char* get_family_name() const override { return "timev2"; }
UInt32 get_scale() const override { return _scale; }

private:
UInt32 _scale;
};
} // namespace doris::vectorized
21 changes: 21 additions & 0 deletions be/src/vec/data_types/serde/data_type_time_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,26 @@ Status DataTypeTimeSerDe::write_column_to_mysql(const IColumn& column,
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
}

Status DataTypeTimeV2SerDe::write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<true>& row_buffer, int row_idx,
bool col_const) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
}
Status DataTypeTimeV2SerDe::write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<false>& row_buffer, int row_idx,
bool col_const) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
}
template <bool is_binary_format>
Status DataTypeTimeV2SerDe::_write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
int row_idx, bool col_const) const {
auto& data = assert_cast<const ColumnVector<Float64>&>(column).get_data();
const auto col_index = index_check_const(row_idx, col_const);
if (UNLIKELY(0 != result.push_timev2(data[col_index], scale))) {
return Status::InternalError("pack mysql buffer failed.");
}
return Status::OK();
}
} // namespace vectorized
} // namespace doris
14 changes: 14 additions & 0 deletions be/src/vec/data_types/serde/data_type_time_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,19 @@ class DataTypeTimeSerDe : public DataTypeNumberSerDe<Float64> {
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
int row_idx, bool col_const) const;
};
class DataTypeTimeV2SerDe : public DataTypeNumberSerDe<Float64> {
public:
DataTypeTimeV2SerDe(int scale = 0) : scale(scale) {};
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
int row_idx, bool col_const) const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
int row_idx, bool col_const) const override;

private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
int row_idx, bool col_const) const;
int scale;
};
} // namespace vectorized
} // namespace doris
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logi
}
}
} else if (logicalType.__isset.TIME) {
type = TypeDescriptor(TYPE_TIMEV2);
type = TypeDescriptor(TYPE_TIME);
} else if (logicalType.__isset.TIMESTAMP) {
type = TypeDescriptor(TYPE_DATETIMEV2);
const auto& time_unit = logicalType.TIMESTAMP.unit;
Expand Down
Loading

0 comments on commit b7d6e8a

Please sign in to comment.