Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect snapshotTimeMillis when doing optimized count #1307

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -598,25 +598,37 @@ public static String fullTableName(TableId tableId) {
}
}

public long calculateTableSize(TableId tableId, Optional<String> filter) {
return calculateTableSize(getTable(tableId), filter);
public long calculateTableSize(
TableId tableId, Optional<String> filter, OptionalLong snapshotTimeMillis) {
return calculateTableSize(getTable(tableId), filter, snapshotTimeMillis);
}

public long calculateTableSize(TableInfo tableInfo, Optional<String> filter) {
public long calculateTableSize(
TableInfo tableInfo, Optional<String> filter, OptionalLong snapshotTimeMillis) {
TableDefinition.Type type = tableInfo.getDefinition().getType();
if (type == TableDefinition.Type.TABLE && !filter.isPresent()) {
if (type == TableDefinition.Type.TABLE
&& !filter.isPresent()
&& !snapshotTimeMillis.isPresent()) {
return tableInfo.getNumRows().longValue();
} else if (type == TableDefinition.Type.EXTERNAL && !filter.isPresent()) {
} else if (type == TableDefinition.Type.EXTERNAL
&& !filter.isPresent()
&& !snapshotTimeMillis.isPresent()) {
String table = fullTableName(tableInfo.getTableId());
return getNumberOfRows(String.format("SELECT COUNT(*) from `%s`", table));
} else if (type == TableDefinition.Type.VIEW
|| type == TableDefinition.Type.MATERIALIZED_VIEW
|| ((type == TableDefinition.Type.TABLE || type == TableDefinition.Type.EXTERNAL)
&& filter.isPresent())) {
&& (filter.isPresent() || snapshotTimeMillis.isPresent()))) {
// run a query
String table = fullTableName(tableInfo.getTableId());
String timeTravelClause =
snapshotTimeMillis.isPresent()
? String.format(
"FOR SYSTEM TIME AS OF TIMESTAMP_MILLIS(%d)", snapshotTimeMillis.getAsLong())
: "";
String whereClause = filter.map(f -> "WHERE " + f).orElse("");
return getNumberOfRows(String.format("SELECT COUNT(*) from `%s` %s", table, whereClause));
return getNumberOfRows(
String.format("SELECT COUNT(*) from `%s` %s %s", table, timeTravelClause, whereClause));
} else {
throw new IllegalArgumentException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.slf4j.Logger;
Expand Down Expand Up @@ -98,14 +99,16 @@ public ReadSessionResponse create(
log.info(
"|creation a read session for table {}, parameters: "
+ "|selectedFields=[{}],"
+ "|filter=[{}]"
+ "|snapshotTimeMillis[{}]",
+ "|filter=[{}],"
+ "|snapshotTimeMillis[{}],"
+ "|view=[{}]",
actualTable.getFriendlyName(),
String.join(",", selectedFields),
filter.orElse("None"),
config.getSnapshotTimeMillis().isPresent()
? String.valueOf(config.getSnapshotTimeMillis().getAsLong())
: "None");
: "None",
isInputTableAView(tableDetails));

String tablePath = toTablePath(actualTable.getTableId());
CreateReadSessionRequest request =
Expand Down Expand Up @@ -264,7 +267,7 @@ TableInfo getActualTable(
// get it from the view
String querySql =
bigQueryClient.createSql(
table.getTableId(), requiredColumns, filters, config.getSnapshotTimeMillis());
table.getTableId(), requiredColumns, filters, OptionalLong.empty());
log.debug("querySql is {}", querySql);
return bigQueryClient.materializeViewToTable(
querySql, table.getTableId(), config.getMaterializationExpirationTimeInMinutes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -130,7 +131,11 @@ public RDD<Row> buildScan(String[] requiredColumns, Filter[] filters) {
BigQueryUtil.emptyIfNeeded(compiledFilter));
return (RDD<Row>)
generateEmptyRowRDD(
actualTable, readSessionCreator.isInputTableAView(table) ? "" : compiledFilter);
actualTable,
readSessionCreator.isInputTableAView(table) ? "" : compiledFilter,
readSessionCreator.isInputTableAView(table)
? OptionalLong.empty()
: options.getSnapshotTimeMillis());
} else if (requiredColumns.length == 0) {
log.debug("Not using optimized empty projection");
}
Expand Down Expand Up @@ -182,11 +187,13 @@ String getCompiledFilter(Filter[] filters) {
}
}

private RDD<?> generateEmptyRowRDD(TableInfo tableInfo, String filter) {
private RDD<?> generateEmptyRowRDD(
TableInfo tableInfo, String filter, OptionalLong snapshotTimeMillis) {
emptyRowRDDsCreated += 1;
Optional<String> optionalFilter =
(filter.length() == 0) ? Optional.empty() : Optional.of(filter);
long numberOfRows = bigQueryClient.calculateTableSize(tableInfo, optionalFilter);
long numberOfRows =
bigQueryClient.calculateTableSize(tableInfo, optionalFilter, snapshotTimeMillis);

Function1<Object, InternalRow> objectToInternalRowConverter =
new ObjectToInternalRowConverter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,9 @@ private ReadSessionResponse createReadSession() {

Stream<InputPartitionContext<InternalRow>> createEmptyProjectionPartitions() {
Optional<String> filter = getCombinedFilter();
long rowCount = bigQueryClient.calculateTableSize(tableId, filter);
long rowCount =
bigQueryClient.calculateTableSize(
tableId, filter, readSessionCreatorConfig.getSnapshotTimeMillis());
logger.info("Used optimized BQ count(*) path. Count: " + rowCount);
int partitionsCount = readSessionCreatorConfig.getDefaultParallelism();
int partitionSize = (int) (rowCount / partitionsCount);
Expand Down
Loading