From 03750fe064c54ecea63c272e31af2e49f67e2e96 Mon Sep 17 00:00:00 2001 From: Jianliang Qi Date: Thu, 5 Sep 2024 20:07:14 +0800 Subject: [PATCH] [feature](ES Catalog)Add FE open API for Elasticsearch 1. get index mapping api 2. search index api --- .../doris/datasource/es/EsRestClient.java | 36 +++++- .../httpv2/restv2/ElasticsearchAction.java | 108 ++++++++++++++++++ 2 files changed, 139 insertions(+), 5 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ElasticsearchAction.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsRestClient.java index 21a85126c9ea61..e2c7fa0b688913 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsRestClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsRestClient.java @@ -24,8 +24,10 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.collect.ImmutableList; import okhttp3.Credentials; +import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; +import okhttp3.RequestBody; import okhttp3.Response; import org.apache.http.HttpHeaders; import org.apache.logging.log4j.LogManager; @@ -126,12 +128,27 @@ public String getMapping(String indexName) throws DorisEsException { return indexMapping; } + /** + * Search specific index + */ + public String searchIndex(String indexName, String body) throws DorisEsException { + String path = indexName + "/_search"; + RequestBody requestBody = null; + if (Strings.isNotEmpty(body)) { + requestBody = RequestBody.create( + body, + MediaType.get("application/json") + ); + } + return executeWithRequestBody(path, requestBody); + } + /** * Check whether index exist. **/ public boolean existIndex(OkHttpClient httpClient, String indexName) { String path = indexName + "/_mapping"; - try (Response response = executeResponse(httpClient, path)) { + try (Response response = executeResponse(httpClient, path, null)) { if (response.isSuccessful()) { return true; } @@ -228,7 +245,7 @@ private synchronized OkHttpClient getOrCreateSslNetworkClient() { return sslNetworkClient; } - private Response executeResponse(OkHttpClient httpClient, String path) throws IOException { + private Response executeResponse(OkHttpClient httpClient, String path, RequestBody requestBody) throws IOException { currentNode = currentNode.trim(); if (!(currentNode.startsWith("http://") || currentNode.startsWith("https://"))) { currentNode = "http://" + currentNode; @@ -239,7 +256,12 @@ private Response executeResponse(OkHttpClient httpClient, String path) throws IO String url = currentNode + path; try { SecurityChecker.getInstance().startSSRFChecking(url); - Request request = builder.get().url(currentNode + path).build(); + Request request; + if (requestBody != null) { + request = builder.post(requestBody).url(currentNode + path).build(); + } else { + request = builder.get().url(currentNode + path).build(); + } if (LOG.isInfoEnabled()) { LOG.info("es rest client request URL: {}", request.url().toString()); } @@ -251,13 +273,17 @@ private Response executeResponse(OkHttpClient httpClient, String path) throws IO } } + private String execute(String path) throws DorisEsException { + return executeWithRequestBody(path, null); + } + /** * execute request for specific path,it will try again nodes.length times if it fails * * @param path the path must not leading with '/' * @return response */ - private String execute(String path) throws DorisEsException { + private String executeWithRequestBody(String path, RequestBody requestBody) throws DorisEsException { // try 3 times for every node int retrySize = nodes.length * 3; DorisEsException scratchExceptionForThrow = null; @@ -277,7 +303,7 @@ private String execute(String path) throws DorisEsException { if (LOG.isTraceEnabled()) { LOG.trace("es rest client request URL: {}", currentNode + "/" + path); } - try (Response response = executeResponse(httpClient, path)) { + try (Response response = executeResponse(httpClient, path, requestBody)) { if (response.isSuccessful()) { return response.body().string(); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ElasticsearchAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ElasticsearchAction.java new file mode 100644 index 00000000000000..15655239c4ef9b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ElasticsearchAction.java @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.restv2; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.JsonUtil; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.es.EsExternalCatalog; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.httpv2.rest.RestBaseController; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import java.io.BufferedReader; +import java.io.IOException; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +@RestController +@RequestMapping("/rest/v2/api/es") +public class ElasticsearchAction extends RestBaseController { + + private static final Logger LOG = LogManager.getLogger(ElasticsearchAction.class); + private static final String CATALOG = "catalog"; + private static final String TABLE = "table"; + + private Object handleRequest(HttpServletRequest request, HttpServletResponse response, + BiFunction action) { + if (Config.enable_all_http_auth) { + executeCheckPassword(request, response); + } + + try { + if (!Env.getCurrentEnv().isMaster()) { + return redirectToMasterOrException(request, response); + } + } catch (Exception e) { + return ResponseEntityBuilder.okWithCommonError(e.getMessage()); + } + + Map resultMap = Maps.newHashMap(); + Env env = Env.getCurrentEnv(); + String catalogName = request.getParameter(CATALOG); + String tableName = request.getParameter(TABLE); + CatalogIf catalog = env.getCatalogMgr().getCatalog(catalogName); + if (!(catalog instanceof EsExternalCatalog)) { + return ResponseEntityBuilder.badRequest("unknown ES Catalog: " + catalogName); + } + EsExternalCatalog esExternalCatalog = (EsExternalCatalog) catalog; + esExternalCatalog.makeSureInitialized(); + String result = action.apply(esExternalCatalog, tableName); + ObjectNode jsonResult = JsonUtil.parseObject(result); + + resultMap.put("catalog", catalogName); + resultMap.put("table", tableName); + resultMap.put("result", jsonResult); + + return ResponseEntityBuilder.ok(resultMap); + } + + @RequestMapping(path = "/get_mapping", method = RequestMethod.GET) + public Object getMapping(HttpServletRequest request, HttpServletResponse response) { + return handleRequest(request, response, (esExternalCatalog, tableName) -> + esExternalCatalog.getEsRestClient().getMapping(tableName)); + } + + @RequestMapping(path = "/search", method = RequestMethod.POST) + public Object search(HttpServletRequest request, HttpServletResponse response) { + String body; + try { + body = getRequestBody(request); + } catch (IOException e) { + return ResponseEntityBuilder.okWithCommonError(e.getMessage()); + } + return handleRequest(request, response, (esExternalCatalog, tableName) -> + esExternalCatalog.getEsRestClient().searchIndex(tableName, body)); + } + + private String getRequestBody(HttpServletRequest request) throws IOException { + BufferedReader reader = request.getReader(); + return reader.lines().collect(Collectors.joining(System.lineSeparator())); + } +}