Skip to content

Commit

Permalink
[Enhancement](MaxCompute)Refactoring maxCompute catalog using Storage…
Browse files Browse the repository at this point in the history
… API. (#40225)

## Proposed changes
Refactoring maxCompute catalog using Storage API.

Storage API :
https://help.aliyun.com/zh/maxcompute/user-guide/open-storage-sample-java-sdk?spm=a2c4g.11186623.0.i0

```
The following are required:

CREATE CATALOG mc PROPERTIES (
"type" = "max_compute",
"mc.default.project" = "xxx",
"mc.access_key" = "xxx",
"mc.secret_key" = "xxxx",
"mc.endpoint" = "xxxx"
);

Optional parameters:
Configuration Item      Default Value
"mc.quota"              =    "pay-as-you-go"

"mc.split_strategy" =   "byte_size" Split according to file size
"mc.split_byte_size" = "268435456" You can set the file size of each split

"mc.split_strategy" = "row_count" Split according to the number of rows of data
"mc.split_row_count" = "1048576" You can set how many lines to read for each split
```
  • Loading branch information
hubgeter authored Sep 11, 2024
1 parent c8b5386 commit 55bde8c
Show file tree
Hide file tree
Showing 30 changed files with 2,663 additions and 965 deletions.
16 changes: 15 additions & 1 deletion be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,21 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& tde
_tunnel_url(tdesc.mcTable.tunnel_url),
_access_key(tdesc.mcTable.access_key),
_secret_key(tdesc.mcTable.secret_key),
_public_access(tdesc.mcTable.public_access) {}
_public_access(tdesc.mcTable.public_access) {
if (tdesc.mcTable.__isset.endpoint) {
_endpoint = tdesc.mcTable.endpoint;
} else {
_init_status = Status::InvalidArgument(
"fail to init MaxComputeTableDescriptor, missing endpoint.");
}

if (tdesc.mcTable.__isset.quota) {
_quota = tdesc.mcTable.quota;
} else {
_init_status =
Status::InvalidArgument("fail to init MaxComputeTableDescriptor, missing quota.");
}
}

MaxComputeTableDescriptor::~MaxComputeTableDescriptor() = default;

Expand Down
14 changes: 10 additions & 4 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,22 @@ class MaxComputeTableDescriptor : public TableDescriptor {
std::string access_key() const { return _access_key; }
std::string secret_key() const { return _secret_key; }
std::string public_access() const { return _public_access; }
std::string endpoint() const { return _endpoint; }
std::string quota() const { return _quota; }
Status init_status() const { return _init_status; }

private:
std::string _region;
std::string _region; //deprecated
std::string _project;
std::string _table;
std::string _odps_url;
std::string _tunnel_url;
std::string _odps_url; //deprecated
std::string _tunnel_url; //deprecated
std::string _access_key;
std::string _secret_key;
std::string _public_access;
std::string _public_access; //deprecated
std::string _endpoint;
std::string _quota;
Status _init_status = Status::OK();
};

class TrinoConnectorTableDescriptor : public TableDescriptor {
Expand Down
28 changes: 15 additions & 13 deletions be/src/vec/exec/format/table/max_compute_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,21 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des
}
index++;
}
std::map<String, String> params = {{"region", _table_desc->region()},
{"odps_url", _table_desc->odps_url()},
{"tunnel_url", _table_desc->tunnel_url()},
{"access_key", _table_desc->access_key()},
{"secret_key", _table_desc->secret_key()},
{"project", _table_desc->project()},
{"partition_spec", _max_compute_params.partition_spec},
{"table", _table_desc->table()},
{"public_access", _table_desc->public_access()},
{"start_offset", std::to_string(_range.start_offset)},
{"split_size", std::to_string(_range.size)},
{"required_fields", required_fields.str()},
{"columns_types", columns_types.str()}};
std::map<String, String> params = {
{"access_key", _table_desc->access_key()},
{"secret_key", _table_desc->secret_key()},
{"endpoint", _table_desc->endpoint()},
{"quota", _table_desc->quota()},
{"project", _table_desc->project()},
{"table", _table_desc->table()},

{"session_id", _max_compute_params.session_id},
{"scan_serializer", _max_compute_params.table_batch_read_session},

{"start_offset", std::to_string(_range.start_offset)},
{"split_size", std::to_string(_range.size)},
{"required_fields", required_fields.str()},
{"columns_types", columns_types.str()}};
_jni_connector = std::make_unique<JniConnector>(
"org/apache/doris/maxcompute/MaxComputeJniScanner", params, column_names);
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,9 @@ Status VFileScanner::_get_next_reader() {
range.table_format_params.table_format_type == "max_compute") {
const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
_real_tuple_desc->table_desc());
if (!mc_desc->init_status()) {
return mc_desc->init_status();
}
std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
range, _state, _profile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ public interface ColumnValue {

LocalDate getDate();

default String getChar() {
return getString();
}

default byte[] getCharAsBytes() {
return getStringAsBytes();
}

default boolean canGetCharAsBytes() {
return canGetStringAsBytes();
}

LocalDateTime getDateTime();

byte[] getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1540,6 +1540,12 @@ public void appendValue(ColumnValue o) {
appendDateTime(o.getDateTime());
break;
case CHAR:
if (o.canGetCharAsBytes()) {
appendBytesAndOffset(o.getCharAsBytes());
} else {
appendStringAndOffset(o.getChar());
}
break;
case VARCHAR:
case STRING:
if (o.canGetStringAsBytes()) {
Expand Down
6 changes: 6 additions & 0 deletions fe/be-java-extensions/max-compute-scanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ under the License.
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>${maxcompute.version}</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
Expand All @@ -54,6 +55,11 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-table-api</artifactId>
<version>${maxcompute.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,36 @@
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DateMilliVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TimeStampSecTZVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.holders.NullableTimeStampNanoHolder;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.log4j.Logger;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteOrder;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;

/**
Expand Down Expand Up @@ -193,6 +201,33 @@ public String getString() {
return v == null ? new String(new byte[0]) : v;
}



public String getChar() {
skippedIfNull();
VarCharVector varcharCol = (VarCharVector) column;
return varcharCol.getObject(idx++).toString().stripTrailing();
}

// Maybe I can use `appendBytesAndOffset(byte[] src, int offset, int length)` to reduce the creation of byte[].
// But I haven't figured out how to write it elegantly.
public byte[] getCharAsBytes() {
skippedIfNull();
VarCharVector varcharCol = (VarCharVector) column;
byte[] v = varcharCol.getObject(idx++).getBytes();

if (v == null) {
return new byte[0];
}

int end = v.length - 1;
while (end >= 0 && v[end] == ' ') {
end--;
}
return (end == -1) ? new byte[0] : Arrays.copyOfRange(v, 0, end + 1);
}


@Override
public byte[] getStringAsBytes() {
skippedIfNull();
Expand All @@ -213,14 +248,52 @@ public LocalDate getDate() {
public LocalDateTime getDateTime() {
skippedIfNull();
LocalDateTime result;
if (column instanceof DateMilliVector) {
DateMilliVector datetimeCol = (DateMilliVector) column;
result = datetimeCol.getObject(idx++);

ArrowType.Timestamp timestampType = ( ArrowType.Timestamp) column.getField().getFieldType().getType();
if (timestampType.getUnit() == org.apache.arrow.vector.types.TimeUnit.MILLISECOND) {
result = convertToLocalDateTime((TimeStampMilliTZVector) column, idx++);
} else {
TimeStampNanoVector datetimeCol = (TimeStampNanoVector) column;
result = datetimeCol.getObject(idx++);
NullableTimeStampNanoHolder valueHoder = new NullableTimeStampNanoHolder();
((TimeStampNanoVector) column).get(idx++, valueHoder);
long timestampNanos = valueHoder.value;

result = LocalDateTime.ofEpochSecond(timestampNanos / 1_000_000_000,
(int) (timestampNanos % 1_000_000_000), java.time.ZoneOffset.UTC);
}
return result == null ? LocalDateTime.MIN : result;

/*
timestampType.getUnit()
result = switch (timestampType.getUnit()) {
case MICROSECOND -> convertToLocalDateTime((TimeStampMicroTZVector) column, idx++);
case SECOND -> convertToLocalDateTime((TimeStampSecTZVector) column, idx++);
case MILLISECOND -> convertToLocalDateTime((TimeStampMilliTZVector) column, idx++);
case NANOSECOND -> convertToLocalDateTime((TimeStampNanoTZVector) column, idx++);
};
Because :
MaxCompute type => Doris Type
DATETIME => ScalarType.createDatetimeV2Type(3)
TIMESTAMP_NTZ => ScalarType.createDatetimeV2Type(6);
and
TableBatchReadSession
.withArrowOptions (
ArrowOptions.newBuilder()
.withDatetimeUnit(TimestampUnit.MILLI)
.withTimestampUnit(TimestampUnit.NANO)
.build()
)
,
TIMESTAMP_NTZ is NTZ => column is TimeStampNanoVector
So:
case SECOND -> convertToLocalDateTime((TimeStampSecTZVector) column, idx++);
case MICROSECOND -> convertToLocalDateTime((TimeStampMicroTZVector) column, idx++);
case NANOSECOND -> convertToLocalDateTime((TimeStampNanoTZVector) column, idx++);
may never be used.
*/

return result;
}

@Override
Expand Down Expand Up @@ -248,9 +321,10 @@ public void unpackArray(List<ColumnValue> values) {
public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
skippedIfNull();
MapVector mapCol = (MapVector) column;
int elemSize = mapCol.getObject(idx).size();
FieldVector keyList = mapCol.getDataVector().getChildrenFromFields().get(0);
FieldVector valList = mapCol.getDataVector().getChildrenFromFields().get(1);
int elemSize = mapCol.getElementEndIndex(idx) - mapCol.getElementStartIndex(idx);
List<FieldVector> innerCols = ((StructVector) mapCol.getDataVector()).getChildrenFromFields();
FieldVector keyList = innerCols.get(0);
FieldVector valList = innerCols.get(1);
for (int i = 0; i < elemSize; i++) {
MaxComputeColumnValue key = new MaxComputeColumnValue(keyList, offset);
keys.add(key);
Expand All @@ -265,10 +339,35 @@ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
skippedIfNull();
StructVector structCol = (StructVector) column;
List<FieldVector> innerCols = structCol.getChildrenFromFields();
for (Integer fieldIndex : structFieldIndex) {
MaxComputeColumnValue val = new MaxComputeColumnValue(structCol.getChildByOrdinal(fieldIndex), idx);
MaxComputeColumnValue val = new MaxComputeColumnValue(innerCols.get(fieldIndex), idx);
values.add(val);
}
idx++;
}

public static LocalDateTime convertToLocalDateTime(TimeStampMilliTZVector milliTZVector, int index) {
long timestampMillis = milliTZVector.get(index);
return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), ZoneId.systemDefault());
}

public static LocalDateTime convertToLocalDateTime(TimeStampNanoTZVector nanoTZVector, int index) {
long timestampNanos = nanoTZVector.get(index);
return LocalDateTime.ofInstant(Instant.ofEpochSecond(timestampNanos / 1_000_000_000,
timestampNanos % 1_000_000_000), ZoneId.systemDefault());
}

public static LocalDateTime convertToLocalDateTime(TimeStampSecTZVector secTZVector, int index) {
long timestampSeconds = secTZVector.get(index);
return LocalDateTime.ofInstant(Instant.ofEpochSecond(timestampSeconds), ZoneId.systemDefault());
}

public static LocalDateTime convertToLocalDateTime(TimeStampMicroTZVector microTZVector, int index) {
long timestampMicros = microTZVector.get(index);
long seconds = timestampMicros / 1_000_000;
long nanos = (timestampMicros % 1_000_000) * 1_000;

return LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds, nanos), ZoneId.systemDefault());
}
}
Loading

0 comments on commit 55bde8c

Please sign in to comment.