Skip to content

Commit

Permalink
[fix](ES Catalog)Only like on keyword can be applied to wildcard query
Browse files Browse the repository at this point in the history
We map `text` and `keyword` both to `string` type in Doris.
When enable `like_push_down`, we translate like to wildcard query in ES, which will lead unexpected result in `text` field.
We should stick to `keyword` with wildcard query.
1. Add `column2typeMap` in `EsTable` to save the mapping of column_name to ES field data type.
2. Add new class `EsSchemaCacheValue` to get schema and column to type map
2. Init `column2typeMap` when cache init and build query process of ES external table
  • Loading branch information
qidaye committed Sep 23, 2024
1 parent 5214e6b commit 34f8d8a
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ public class EsTable extends Table implements GsonPostProcessable {
// Periodically pull es metadata
private EsMetaStateTracker esMetaStateTracker;

// column name -> elasticsearch field data type
private Map<String, String> column2typeMap = new HashMap<>();

public EsTable() {
super(TableType.ELASTICSEARCH);
}
Expand Down Expand Up @@ -366,6 +369,6 @@ public void syncTableMetaData() {
}

public List<Column> genColumnsFromEs() {
return EsUtil.genColumnsFromEs(client, indexName, mappingType, false);
return EsUtil.genColumnsFromEs(client, indexName, mappingType, false, column2typeMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
Expand Down Expand Up @@ -73,13 +75,20 @@ public TTableDescriptor toThrift() {
@Override
public Optional<SchemaCacheValue> initSchema() {
EsRestClient restClient = ((EsExternalCatalog) catalog).getEsRestClient();
return Optional.of(new SchemaCacheValue(
EsUtil.genColumnsFromEs(restClient, name, null,
((EsExternalCatalog) catalog).enableMappingEsId())));
Map<String, String> column2typeMap = new HashMap<>();
List<Column> columns = EsUtil.genColumnsFromEs(restClient, name, null,
((EsExternalCatalog) catalog).enableMappingEsId(), column2typeMap);
return Optional.of(new EsSchemaCacheValue(columns, column2typeMap));
}

public Map<String, String> getColumn2type() {
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((EsSchemaCacheValue) value).getColumn2typeMap()).orElse(null);
}

private EsTable toEsTable() {
List<Column> schema = getFullSchema();
Map<String, String> column2typeMap = getColumn2type();
EsExternalCatalog esCatalog = (EsExternalCatalog) catalog;
EsTable esTable = new EsTable(this.id, this.name, schema, TableType.ES_EXTERNAL_TABLE);
esTable.setIndexName(name);
Expand All @@ -95,6 +104,7 @@ private EsTable toEsTable() {
esTable.setHosts(String.join(",", esCatalog.getNodes()));
esTable.syncTableMetaData();
esTable.setIncludeHiddenIndex(esCatalog.enableIncludeHiddenIndex());
esTable.setColumn2typeMap(column2typeMap);
return esTable;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.es;

import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.SchemaCacheValue;

import java.util.List;
import java.util.Map;

public class EsSchemaCacheValue extends SchemaCacheValue {
public Map<String, String> column2typeMap;

public EsSchemaCacheValue(List<Column> columns, Map<String, String> column2typeMap) {
super(columns);
this.column2typeMap = column2typeMap;
}

public Map<String, String> getColumn2typeMap() {
return column2typeMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,18 @@ public static ObjectNode getMappingProps(String sourceIndex, String indexMapping
* Add mappingEsId config in es external catalog.
**/
public static List<Column> genColumnsFromEs(EsRestClient client, String indexName, String mappingType,
boolean mappingEsId) {
boolean mappingEsId, Map<String, String> column2typeMap) {
String mapping = client.getMapping(indexName);
ObjectNode mappings = getMapping(mapping);
// Get array_fields while removing _meta property.
List<String> arrayFields = new ArrayList<>();
ObjectNode rootSchema = getRootSchema(mappings, mappingType, arrayFields);
return genColumnsFromEs(indexName, mappingType, rootSchema, mappingEsId, arrayFields);
return genColumnsFromEs(indexName, mappingType, rootSchema, mappingEsId, arrayFields, column2typeMap);
}

@VisibleForTesting
public static List<Column> genColumnsFromEs(String indexName, String mappingType, ObjectNode rootSchema,
boolean mappingEsId, List<String> arrayFields) {
boolean mappingEsId, List<String> arrayFields, Map<String, String> column2typeMap) {
List<Column> columns = new ArrayList<>();
if (mappingEsId) {
Column column = new Column();
Expand All @@ -220,7 +220,8 @@ public static List<Column> genColumnsFromEs(String indexName, String mappingType
while (iterator.hasNext()) {
String fieldName = iterator.next();
ObjectNode fieldValue = (ObjectNode) mappingProps.get(fieldName);
Column column = parseEsField(fieldName, replaceFieldAlias(mappingProps, fieldValue), arrayFields);
Column column = parseEsField(fieldName, replaceFieldAlias(mappingProps, fieldValue), arrayFields,
column2typeMap);
columns.add(column);
}
return columns;
Expand All @@ -245,7 +246,8 @@ private static ObjectNode replaceFieldAlias(ObjectNode mappingProps, ObjectNode
return fieldValue;
}

private static Column parseEsField(String fieldName, ObjectNode fieldValue, List<String> arrayFields) {
private static Column parseEsField(String fieldName, ObjectNode fieldValue, List<String> arrayFields,
Map<String, String> column2typeMap) {
Column column = new Column();
column.setName(fieldName);
column.setIsKey(true);
Expand All @@ -256,6 +258,7 @@ private static Column parseEsField(String fieldName, ObjectNode fieldValue, List
if (fieldValue.has("type")) {
String typeStr = fieldValue.get("type").asText();
column.setComment("Elasticsearch type is " + typeStr);
column2typeMap.put(fieldName, typeStr);
// reference https://www.elastic.co/guide/en/elasticsearch/reference/8.3/sql-data-types.html
switch (typeStr) {
case "null":
Expand Down Expand Up @@ -307,6 +310,7 @@ private static Column parseEsField(String fieldName, ObjectNode fieldValue, List
} else {
type = Type.JSONB;
column.setComment("Elasticsearch no type");
column2typeMap.put(fieldName, "no_type");
}
if (arrayFields.contains(fieldName)) {
column.setType(ArrayType.create(type, true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ public final class QueryBuilders {
* Generate dsl from compound expr.
**/
private static QueryBuilder toCompoundEsDsl(Expr expr, List<Expr> notPushDownList,
Map<String, String> fieldsContext, BuilderOptions builderOptions) {
Map<String, String> fieldsContext, BuilderOptions builderOptions, Map<String, String> column2typeMap) {
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
switch (compoundPredicate.getOp()) {
case AND: {
QueryBuilder left = toEsDsl(compoundPredicate.getChild(0), notPushDownList, fieldsContext,
builderOptions);
builderOptions, column2typeMap);
QueryBuilder right = toEsDsl(compoundPredicate.getChild(1), notPushDownList, fieldsContext,
builderOptions);
builderOptions, column2typeMap);
if (left != null && right != null) {
return QueryBuilders.boolQuery().must(left).must(right);
}
Expand All @@ -86,9 +86,9 @@ private static QueryBuilder toCompoundEsDsl(Expr expr, List<Expr> notPushDownLis
case OR: {
int beforeSize = notPushDownList.size();
QueryBuilder left = toEsDsl(compoundPredicate.getChild(0), notPushDownList, fieldsContext,
builderOptions);
builderOptions, column2typeMap);
QueryBuilder right = toEsDsl(compoundPredicate.getChild(1), notPushDownList, fieldsContext,
builderOptions);
builderOptions, column2typeMap);
int afterSize = notPushDownList.size();
if (left != null && right != null) {
return QueryBuilders.boolQuery().should(left).should(right);
Expand All @@ -101,7 +101,7 @@ private static QueryBuilder toCompoundEsDsl(Expr expr, List<Expr> notPushDownLis
}
case NOT: {
QueryBuilder child = toEsDsl(compoundPredicate.getChild(0), notPushDownList, fieldsContext,
builderOptions);
builderOptions, column2typeMap);
if (child != null) {
return QueryBuilders.boolQuery().mustNot(child);
}
Expand All @@ -122,10 +122,10 @@ private static Expr exprWithoutCast(Expr expr) {
return expr;
}

public static QueryBuilder toEsDsl(Expr expr) {
public static QueryBuilder toEsDsl(Expr expr, Map<String, String> column2typeMap) {
return toEsDsl(expr, new ArrayList<>(), new HashMap<>(),
BuilderOptions.builder().likePushDown(Boolean.parseBoolean(EsResource.LIKE_PUSH_DOWN_DEFAULT_VALUE))
.build());
.build(), column2typeMap);
}

private static TExprOpcode flipOpCode(TExprOpcode opCode) {
Expand Down Expand Up @@ -257,18 +257,18 @@ private static String getColumnFromExpr(Expr expr) {
* Doris expr to es dsl.
**/
public static QueryBuilder toEsDsl(Expr expr, List<Expr> notPushDownList, Map<String, String> fieldsContext,
BuilderOptions builderOptions) {
BuilderOptions builderOptions, Map<String, String> column2typeMap) {
if (expr == null) {
return null;
}
// esquery functionCallExpr will be rewritten to castExpr in where clause rewriter,
// so we get the functionCallExpr here.
if (expr instanceof CastExpr) {
return toEsDsl(expr.getChild(0), notPushDownList, fieldsContext, builderOptions);
return toEsDsl(expr.getChild(0), notPushDownList, fieldsContext, builderOptions, column2typeMap);
}
// CompoundPredicate, `between` also converted to CompoundPredicate.
if (expr instanceof CompoundPredicate) {
return toCompoundEsDsl(expr, notPushDownList, fieldsContext, builderOptions);
return toCompoundEsDsl(expr, notPushDownList, fieldsContext, builderOptions, column2typeMap);
}
TExprOpcode opCode = expr.getOpcode();
boolean isFlip = false;
Expand All @@ -287,6 +287,7 @@ public static QueryBuilder toEsDsl(Expr expr, List<Expr> notPushDownList, Map<St
return null;
}

String type = column2typeMap.get(column);
// Check whether the date type need compat, it must before keyword replace.
List<String> needCompatDateFields = builderOptions.getNeedCompatDateFields();
boolean needDateCompat = needCompatDateFields != null && needCompatDateFields.contains(column);
Expand All @@ -313,10 +314,11 @@ public static QueryBuilder toEsDsl(Expr expr, List<Expr> notPushDownList, Map<St
return parseIsNullPredicate(expr, column);
}
if (expr instanceof LikePredicate) {
if (!builderOptions.isLikePushDown()) {
if (!builderOptions.isLikePushDown() || !"keyword".equals(type)) {
notPushDownList.add(expr);
return null;
} else {
// only keyword can apply wildcard query
return parseLikePredicate(expr, column);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,14 @@ private void buildQuery() throws UserException {
boolean hasFilter = false;
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
List<Expr> notPushDownList = new ArrayList<>();
if (table.getColumn2typeMap() == null) {
table.genColumnsFromEs();
}
for (Expr expr : conjuncts) {
QueryBuilder queryBuilder = QueryBuilders.toEsDsl(expr, notPushDownList, fieldsContext,
BuilderOptions.builder().likePushDown(table.isLikePushDown())
.needCompatDateFields(table.needCompatDateFields()).build());
.needCompatDateFields(table.needCompatDateFields()).build(),
table.getColumn2typeMap());
if (queryBuilder != null) {
hasFilter = true;
boolQueryBuilder.must(queryBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
Expand Down Expand Up @@ -226,7 +227,7 @@ public void testEs8Mapping() throws IOException, URISyntaxException {
public void testDateType() throws IOException, URISyntaxException {
ObjectNode testDateFormat = EsUtil.getRootSchema(
EsUtil.getMapping(loadJsonFromFile("data/es/test_date_format.json")), null, new ArrayList<>());
List<Column> parseColumns = EsUtil.genColumnsFromEs("test_date_format", null, testDateFormat, false, new ArrayList<>());
List<Column> parseColumns = EsUtil.genColumnsFromEs("test_date_format", null, testDateFormat, false, new ArrayList<>(), new HashMap<>());
Assertions.assertEquals(8, parseColumns.size());
for (Column column : parseColumns) {
String name = column.getName();
Expand Down Expand Up @@ -259,7 +260,7 @@ public void testDateType() throws IOException, URISyntaxException {
public void testFieldAlias() throws IOException, URISyntaxException {
ObjectNode testFieldAlias = EsUtil.getRootSchema(
EsUtil.getMapping(loadJsonFromFile("data/es/test_field_alias.json")), null, new ArrayList<>());
List<Column> parseColumns = EsUtil.genColumnsFromEs("test_field_alias", null, testFieldAlias, true, new ArrayList<>());
List<Column> parseColumns = EsUtil.genColumnsFromEs("test_field_alias", null, testFieldAlias, true, new ArrayList<>(), new HashMap<>());
Assertions.assertEquals("datetimev2(0)", parseColumns.get(2).getType().toSql());
Assertions.assertEquals("text", parseColumns.get(4).getType().toSql());
}
Expand All @@ -269,7 +270,7 @@ public void testComplexType() throws IOException, URISyntaxException {
ObjectNode testFieldAlias = EsUtil.getRootSchema(
EsUtil.getMapping(loadJsonFromFile("data/es/es6_dynamic_complex_type.json")), null, new ArrayList<>());
List<Column> columns = EsUtil.genColumnsFromEs("test_complex_type", "complex_type", testFieldAlias, true,
new ArrayList<>());
new ArrayList<>(), new HashMap<>());
Assertions.assertEquals(3, columns.size());
}

Expand Down
Loading

0 comments on commit 34f8d8a

Please sign in to comment.