diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsRestClient.java index 21a85126c9ea61..e2c7fa0b688913 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsRestClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsRestClient.java @@ -24,8 +24,10 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.collect.ImmutableList; import okhttp3.Credentials; +import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; +import okhttp3.RequestBody; import okhttp3.Response; import org.apache.http.HttpHeaders; import org.apache.logging.log4j.LogManager; @@ -126,12 +128,27 @@ public String getMapping(String indexName) throws DorisEsException { return indexMapping; } + /** + * Search specific index + */ + public String searchIndex(String indexName, String body) throws DorisEsException { + String path = indexName + "/_search"; + RequestBody requestBody = null; + if (Strings.isNotEmpty(body)) { + requestBody = RequestBody.create( + body, + MediaType.get("application/json") + ); + } + return executeWithRequestBody(path, requestBody); + } + /** * Check whether index exist. **/ public boolean existIndex(OkHttpClient httpClient, String indexName) { String path = indexName + "/_mapping"; - try (Response response = executeResponse(httpClient, path)) { + try (Response response = executeResponse(httpClient, path, null)) { if (response.isSuccessful()) { return true; } @@ -228,7 +245,7 @@ private synchronized OkHttpClient getOrCreateSslNetworkClient() { return sslNetworkClient; } - private Response executeResponse(OkHttpClient httpClient, String path) throws IOException { + private Response executeResponse(OkHttpClient httpClient, String path, RequestBody requestBody) throws IOException { currentNode = currentNode.trim(); if (!(currentNode.startsWith("http://") || currentNode.startsWith("https://"))) { currentNode = "http://" + currentNode; @@ -239,7 +256,12 @@ private Response executeResponse(OkHttpClient httpClient, String path) throws IO String url = currentNode + path; try { SecurityChecker.getInstance().startSSRFChecking(url); - Request request = builder.get().url(currentNode + path).build(); + Request request; + if (requestBody != null) { + request = builder.post(requestBody).url(currentNode + path).build(); + } else { + request = builder.get().url(currentNode + path).build(); + } if (LOG.isInfoEnabled()) { LOG.info("es rest client request URL: {}", request.url().toString()); } @@ -251,13 +273,17 @@ private Response executeResponse(OkHttpClient httpClient, String path) throws IO } } + private String execute(String path) throws DorisEsException { + return executeWithRequestBody(path, null); + } + /** * execute request for specific path,it will try again nodes.length times if it fails * * @param path the path must not leading with '/' * @return response */ - private String execute(String path) throws DorisEsException { + private String executeWithRequestBody(String path, RequestBody requestBody) throws DorisEsException { // try 3 times for every node int retrySize = nodes.length * 3; DorisEsException scratchExceptionForThrow = null; @@ -277,7 +303,7 @@ private String execute(String path) throws DorisEsException { if (LOG.isTraceEnabled()) { LOG.trace("es rest client request URL: {}", currentNode + "/" + path); } - try (Response response = executeResponse(httpClient, path)) { + try (Response response = executeResponse(httpClient, path, requestBody)) { if (response.isSuccessful()) { return response.body().string(); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ESCatalogAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ESCatalogAction.java new file mode 100644 index 00000000000000..1a075c79782747 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ESCatalogAction.java @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.restv2; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.JsonUtil; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.es.EsExternalCatalog; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.httpv2.rest.RestBaseController; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import java.io.BufferedReader; +import java.io.IOException; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +@RestController +@RequestMapping("/rest/v2/api/es_catalog") +public class ESCatalogAction extends RestBaseController { + + private static final Logger LOG = LogManager.getLogger(ESCatalogAction.class); + private static final String CATALOG = "catalog"; + private static final String TABLE = "table"; + + private Object handleRequest(HttpServletRequest request, HttpServletResponse response, + BiFunction action) { + if (Config.enable_all_http_auth) { + executeCheckPassword(request, response); + } + + try { + if (!Env.getCurrentEnv().isMaster()) { + return redirectToMasterOrException(request, response); + } + } catch (Exception e) { + return ResponseEntityBuilder.okWithCommonError(e.getMessage()); + } + + Map resultMap = Maps.newHashMap(); + Env env = Env.getCurrentEnv(); + String catalogName = request.getParameter(CATALOG); + String tableName = request.getParameter(TABLE); + CatalogIf catalog = env.getCatalogMgr().getCatalog(catalogName); + if (!(catalog instanceof EsExternalCatalog)) { + return ResponseEntityBuilder.badRequest("unknown ES Catalog: " + catalogName); + } + EsExternalCatalog esExternalCatalog = (EsExternalCatalog) catalog; + esExternalCatalog.makeSureInitialized(); + String result = action.apply(esExternalCatalog, tableName); + ObjectNode jsonResult = JsonUtil.parseObject(result); + + resultMap.put("catalog", catalogName); + resultMap.put("table", tableName); + resultMap.put("result", jsonResult); + + return ResponseEntityBuilder.ok(resultMap); + } + + @RequestMapping(path = "/get_mapping", method = RequestMethod.GET) + public Object getMapping(HttpServletRequest request, HttpServletResponse response) { + return handleRequest(request, response, (esExternalCatalog, tableName) -> + esExternalCatalog.getEsRestClient().getMapping(tableName)); + } + + @RequestMapping(path = "/search", method = RequestMethod.POST) + public Object search(HttpServletRequest request, HttpServletResponse response) { + String body; + try { + body = getRequestBody(request); + } catch (IOException e) { + return ResponseEntityBuilder.okWithCommonError(e.getMessage()); + } + return handleRequest(request, response, (esExternalCatalog, tableName) -> + esExternalCatalog.getEsRestClient().searchIndex(tableName, body)); + } + + private String getRequestBody(HttpServletRequest request) throws IOException { + BufferedReader reader = request.getReader(); + return reader.lines().collect(Collectors.joining(System.lineSeparator())); + } +} diff --git a/regression-test/plugins/plugin_curl_requester.groovy b/regression-test/plugins/plugin_curl_requester.groovy index 1c1e24a80ddeff..691ec1233c9563 100644 --- a/regression-test/plugins/plugin_curl_requester.groovy +++ b/regression-test/plugins/plugin_curl_requester.groovy @@ -111,14 +111,12 @@ Suite.metaClass.http_client = { String method, String url /* param */ -> logger.info("Added 'http_client' function to Suite") -Suite.metaClass.curl = { String method, String url /* param */-> +Suite.metaClass.curl = { String method, String url, String body = null /* param */-> Suite suite = delegate as Suite - if (method != "GET" && method != "POST") - { + if (method != "GET" && method != "POST") { throw new Exception(String.format("invalid curl method: %s", method)) } - if (url.isBlank()) - { + if (url.isBlank()) { throw new Exception("invalid curl url, blank") } @@ -127,7 +125,13 @@ Suite.metaClass.curl = { String method, String url /* param */-> Integer retryCount = 0; // Current retry count Integer sleepTime = 5000; // Sleep time in milliseconds - String cmd = String.format("curl --max-time %d -X %s %s", timeout, method, url).toString() + String cmd + if (method == "POST" && body != null) { + cmd = String.format("curl --max-time %d -X %s -H Content-Type:application/json -d %s %s", timeout, method, body, url).toString() + } else { + cmd = String.format("curl --max-time %d -X %s %s", timeout, method, url).toString() + } + logger.info("curl cmd: " + cmd) def process int code diff --git a/regression-test/suites/external_table_p0/es/test_es_catalog_http_open_api.groovy b/regression-test/suites/external_table_p0/es/test_es_catalog_http_open_api.groovy new file mode 100644 index 00000000000000..485cdd061869d7 --- /dev/null +++ b/regression-test/suites/external_table_p0/es/test_es_catalog_http_open_api.groovy @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_es_catalog_http_open_api", "p0,external,es,external_docker,external_docker_es") { + String enabled = context.config.otherConfigs.get("enableEsTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String es_5_port = context.config.otherConfigs.get("es_5_port") + String es_6_port = context.config.otherConfigs.get("es_6_port") + String es_7_port = context.config.otherConfigs.get("es_7_port") + String es_8_port = context.config.otherConfigs.get("es_8_port") + + // test old create-catalog syntax for compatibility + sql """ + create catalog if not exists test_es_query_es5 + properties ( + "type"="es", + "elasticsearch.hosts"="http://${externalEnvIp}:$es_5_port", + "elasticsearch.nodes_discovery"="false", + "elasticsearch.keyword_sniff"="true" + ); + """ + sql """ + create catalog if not exists test_es_query_es6 + properties ( + "type"="es", + "elasticsearch.hosts"="http://${externalEnvIp}:$es_6_port", + "elasticsearch.nodes_discovery"="false", + "elasticsearch.keyword_sniff"="true" + ); + """ + + // test new create catalog syntax + sql """create catalog if not exists test_es_query_es7 properties( + "type"="es", + "hosts"="http://${externalEnvIp}:$es_7_port", + "nodes_discovery"="false", + "enable_keyword_sniff"="true" + ); + """ + + sql """create catalog if not exists test_es_query_es8 properties( + "type"="es", + "hosts"="http://${externalEnvIp}:$es_8_port", + "nodes_discovery"="false", + "enable_keyword_sniff"="true" + ); + """ + + List feHosts = getFrontendIpHttpPort() + // for each catalog 5..8, send a request + for (int i = 5; i <= 8; i++) { + String catalog = String.format("test_es_query_es%s", i) + def (code, out, err) = curl("GET", String.format("http://%s/rest/v2/api/es_catalog/get_mapping?catalog=%s&table=test1", feHosts[0], catalog)) + logger.info("Get mapping response: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(code == 0) + assertTrue(out.toLowerCase().contains("success")) + assertTrue(out.toLowerCase().contains("mappings")) + assertTrue(out.toLowerCase().contains(catalog)) + + String body = '{"query":{"match_all":{}},"stored_fields":"_none_","docvalue_fields":["test6"],"sort":["_doc"],"size":4064}'; + def (code1, out1, err1) = curl("POST", String.format("http://%s/rest/v2/api/es_catalog/search?catalog=%s&table=test1", feHosts[0], catalog), body) + logger.info("Search index response: code=" + code1 + ", out=" + out1 + ", err=" + err1) + assertTrue(code1 == 0) + assertTrue(out1.toLowerCase().contains("success")) + assertTrue(out1.toLowerCase().contains("hits")) + assertTrue(out1.toLowerCase().contains(catalog)) + } + } +}