Skip to content

Commit

Permalink
[feature](insert)add hive insert plan ut and remove redundant fields (#…
Browse files Browse the repository at this point in the history
…33051)

add hive insert sink plan UT case
remove some deprecated code
  • Loading branch information
wsjz authored Mar 30, 2024
1 parent 2b94cd1 commit 6df957d
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
*/
public class DistributionSpecTableSinkHashPartitioned extends DistributionSpec {

public static final DistributionSpecTableSinkHashPartitioned INSTANCE =
new DistributionSpecTableSinkHashPartitioned();

private List<ExprId> outputColExprIds;

public DistributionSpecTableSinkHashPartitioned() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.hive.metastore.api.FieldSchema;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -389,15 +387,10 @@ private Plan bindHiveTableSink(MatchingContext<UnboundHiveTableSink<Plan>> ctx)
return column;
}).collect(ImmutableList.toImmutableList());
}
Set<String> hivePartitionKeys = table.getRemoteTable()
.getPartitionKeys().stream()
.map(FieldSchema::getName)
.collect(Collectors.toSet());
LogicalHiveTableSink<?> boundSink = new LogicalHiveTableSink<>(
database,
table,
bindColumns,
hivePartitionKeys,
child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public Rule build() {
sink.getLogicalProperties(),
null,
null,
sink.child(),
sink.getHivePartitionKeys());
sink.child());
}).toRule(RuleType.LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
* constructor
*/
public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> labelName,
Optional<InsertCommandContext> insertCtx) {
Optional<InsertCommandContext> insertCtx) {
super(PlanType.INSERT_INTO_TABLE_COMMAND);
this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null");
this.labelName = Objects.requireNonNull(labelName, "labelName should not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

/**
* logical hive table sink for insert command
Expand All @@ -47,7 +46,6 @@ public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends LogicalTableS
// bound data sink
private final HMSExternalDatabase database;
private final HMSExternalTable targetTable;
private final Set<String> hivePartitionKeys;
private final DMLCommandType dmlCommandType;

/**
Expand All @@ -56,7 +54,6 @@ public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends LogicalTableS
public LogicalHiveTableSink(HMSExternalDatabase database,
HMSExternalTable targetTable,
List<Column> cols,
Set<String> hivePartitionKeys,
List<NamedExpression> outputExprs,
DMLCommandType dmlCommandType,
Optional<GroupExpression> groupExpression,
Expand All @@ -66,26 +63,25 @@ public LogicalHiveTableSink(HMSExternalDatabase database,
this.database = Objects.requireNonNull(database, "database != null in LogicalHiveTableSink");
this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalHiveTableSink");
this.dmlCommandType = dmlCommandType;
this.hivePartitionKeys = hivePartitionKeys;
}

public Plan withChildAndUpdateOutput(Plan child) {
List<NamedExpression> output = child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList());
return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, output,
return new LogicalHiveTableSink<>(database, targetTable, cols, output,
dmlCommandType, Optional.empty(), Optional.empty(), child);
}

@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "LogicalHiveTableSink only accepts one child");
return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs,
return new LogicalHiveTableSink<>(database, targetTable, cols, outputExprs,
dmlCommandType, Optional.empty(), Optional.empty(), children.get(0));
}

public LogicalHiveTableSink<CHILD_TYPE> withOutputExprs(List<NamedExpression> outputExprs) {
return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs,
return new LogicalHiveTableSink<>(database, targetTable, cols, outputExprs,
dmlCommandType, Optional.empty(), Optional.empty(), child());
}

Expand All @@ -97,10 +93,6 @@ public HMSExternalTable getTargetTable() {
return targetTable;
}

public Set<String> getHivePartitionKeys() {
return hivePartitionKeys;
}

public DMLCommandType getDmlCommandType() {
return dmlCommandType;
}
Expand Down Expand Up @@ -134,7 +126,6 @@ public String toString() {
"database", database.getFullName(),
"targetTable", targetTable.getName(),
"cols", cols,
"hivePartitionKeys", hivePartitionKeys,
"dmlCommandType", dmlCommandType
);
}
Expand All @@ -146,14 +137,14 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {

@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs,
return new LogicalHiveTableSink<>(database, targetTable, cols, outputExprs,
dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child());
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs,
return new LogicalHiveTableSink<>(database, targetTable, cols, outputExprs,
dmlCommandType, groupExpression, logicalProperties, children.get(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.doris.statistics.Statistics;

import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.metastore.api.FieldSchema;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -50,7 +49,6 @@ public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends PhysicalTabl
private final HMSExternalDatabase database;
private final HMSExternalTable targetTable;
private final List<Column> cols;
private final Set<String> hivePartitionKeys;

/**
* constructor
Expand All @@ -61,10 +59,9 @@ public PhysicalHiveTableSink(HMSExternalDatabase database,
List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
CHILD_TYPE child,
Set<String> hivePartitionKeys) {
CHILD_TYPE child) {
this(database, targetTable, cols, outputExprs, groupExpression, logicalProperties,
PhysicalProperties.GATHER, null, child, hivePartitionKeys);
PhysicalProperties.GATHER, null, child);
}

/**
Expand All @@ -78,14 +75,12 @@ public PhysicalHiveTableSink(HMSExternalDatabase database,
LogicalProperties logicalProperties,
PhysicalProperties physicalProperties,
Statistics statistics,
CHILD_TYPE child,
Set<String> hivePartitionKeys) {
CHILD_TYPE child) {
super(PlanType.PHYSICAL_HIVE_TABLE_SINK, outputExprs, groupExpression,
logicalProperties, physicalProperties, statistics, child);
this.database = Objects.requireNonNull(database, "database != null in PhysicalHiveTableSink");
this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalHiveTableSink");
this.cols = Utils.copyRequiredList(cols);
this.hivePartitionKeys = hivePartitionKeys;
}

public HMSExternalDatabase getDatabase() {
Expand All @@ -103,7 +98,7 @@ public List<Column> getCols() {
@Override
public Plan withChildren(List<Plan> children) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, groupExpression,
getLogicalProperties(), physicalProperties, statistics, children.get(0), hivePartitionKeys);
getLogicalProperties(), physicalProperties, statistics, children.get(0));
}

@Override
Expand All @@ -119,31 +114,28 @@ public List<? extends Expression> getExpressions() {
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs,
groupExpression, getLogicalProperties(), child(), hivePartitionKeys);
groupExpression, getLogicalProperties(), child());
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs,
groupExpression, logicalProperties.get(), children.get(0), hivePartitionKeys);
groupExpression, logicalProperties.get(), children.get(0));
}

@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child(), hivePartitionKeys);
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}

/**
* get output physical properties
*/
@Override
public PhysicalProperties getRequirePhysicalProperties() {
Set<String> hivePartitionKeys = targetTable.getRemoteTable()
.getPartitionKeys().stream()
.map(FieldSchema::getName)
.collect(Collectors.toSet());
Set<String> hivePartitionKeys = targetTable.getPartitionColumnNames();
if (!hivePartitionKeys.isEmpty()) {
List<Integer> columnIdx = new ArrayList<>();
List<Column> fullSchema = targetTable.getFullSchema();
Expand Down
Loading

0 comments on commit 6df957d

Please sign in to comment.