diff --git a/streampark-console/streampark-console-service/pom.xml b/streampark-console/streampark-console-service/pom.xml index bbc761000f..7fdc4387dd 100644 --- a/streampark-console/streampark-console-service/pom.xml +++ b/streampark-console/streampark-console-service/pom.xml @@ -371,6 +371,7 @@ ${project.version} + org.apache.streampark streampark-flink-sql-gateway-base @@ -383,6 +384,13 @@ ${project.version} + + org.apache.streampark + streampark-flink-sql-gateway-flink-v2 + ${project.version} + + + com.fasterxml.jackson.module jackson-module-scala_${scala.binary.version} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java index 2d99c7bbf5..45a34c2f55 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.module.scala.DefaultScalaModule; @@ -57,4 +58,8 @@ public static T read(String json, TypeReference typeReference) public static String write(Object object) throws JsonProcessingException { return MAPPER.writeValueAsString(object); } + + public static JsonNode readTree(String json) throws JsonProcessingException { + return MAPPER.readTree(json); + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SqlWorkBenchController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SqlWorkBenchController.java index c0d7aa77bd..ea99856be7 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SqlWorkBenchController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SqlWorkBenchController.java @@ -20,6 +20,7 @@ import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.core.annotation.ApiAccess; import org.apache.streampark.console.core.service.SqlWorkBenchService; +import org.apache.streampark.gateway.CompleteStatementRequestBody; import org.apache.streampark.gateway.results.ResultQueryCondition; import org.apache.streampark.gateway.session.SessionHandle; @@ -186,6 +187,21 @@ public RestResponse fetchResults( flinkGatewayId, sessionHandle, operationHandle, resultQueryCondition)); } + @ApiAccess + @ApiOperation( + value = "Complete statement", + notes = "Complete statement", + tags = "FLINK_GATEWAY_TAG") + @PostMapping("sessions/{sessionHandle}/statements/complete") + public RestResponse completeStatement( + @PathVariable Long flinkGatewayId, + @PathVariable String sessionHandle, + @RequestBody CompleteStatementRequestBody completeStatementRequestBody) { + return RestResponse.success( + sqlWorkBenchService.completeStatement( + flinkGatewayId, sessionHandle, completeStatementRequestBody)); + } + // ------------------------------------------------------------------------------------------- // Catalog API // ------------------------------------------------------------------------------------------- diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SqlWorkBenchService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SqlWorkBenchService.java index 40ded03f29..6ddff63ba5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SqlWorkBenchService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SqlWorkBenchService.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.core.service; +import org.apache.streampark.gateway.CompleteStatementRequestBody; import org.apache.streampark.gateway.OperationHandle; import org.apache.streampark.gateway.results.Column; import org.apache.streampark.gateway.results.GatewayInfo; @@ -25,6 +26,8 @@ import org.apache.streampark.gateway.results.ResultSet; import org.apache.streampark.gateway.session.SessionHandle; +import java.util.List; + public interface SqlWorkBenchService { /** @@ -103,6 +106,19 @@ Column getOperationResultSchema( OperationHandle executeStatement( Long flinkGatewayId, String sessionHandleUUIDStr, String statement); + /** + * Get the completion hints for the given statement at the given position. + * + * @param flinkGatewayId flink gateway id + * @param sessionHandleUUIDStr session handle uuid string + * @param completeStatementRequestBody complete statement request body + * @return completion hints + */ + List completeStatement( + Long flinkGatewayId, + String sessionHandleUUIDStr, + CompleteStatementRequestBody completeStatementRequestBody); + /** * Fetch results * diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkGateWayServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkGateWayServiceImpl.java index f14280b84c..2d17c11d7a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkGateWayServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkGateWayServiceImpl.java @@ -24,17 +24,21 @@ import org.apache.streampark.console.core.enums.GatewayTypeEnum; import org.apache.streampark.console.core.mapper.FlinkGateWayMapper; import org.apache.streampark.console.core.service.FlinkGateWayService; -import org.apache.streampark.gateway.flink.client.dto.GetApiVersionResponseBody; import org.apache.hc.client5.http.config.RequestConfig; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; @Slf4j @@ -72,6 +76,7 @@ public boolean existsByGatewayName(String name) { @Override public GatewayTypeEnum getGatewayVersion(String address) { + // change to use SqlGatewayService to get version in future String restUrl = address + "/api_versions"; try { String result = @@ -79,9 +84,19 @@ public GatewayTypeEnum getGatewayVersion(String address) { restUrl, RequestConfig.custom().setConnectTimeout(2000, TimeUnit.MILLISECONDS).build()); if (result != null) { - String versionStr = - JacksonUtils.read(result, GetApiVersionResponseBody.class).getVersions().get(0); - return "V1".equals(versionStr) ? GatewayTypeEnum.FLINK_V1 : GatewayTypeEnum.FLINK_V2; + List versions = new ArrayList<>(); + JsonNode jsonNode = JacksonUtils.readTree(result); + Optional.ofNullable(jsonNode.get("versions")) + .filter(JsonNode::isArray) + .map(ArrayNode.class::cast) + .ifPresent( + arrayNode -> arrayNode.elements().forEachRemaining(e -> versions.add(e.asText()))); + // Currently, we only support V1 and V2. Flink 1.17 will return both V1 and V2, so we need + // to get the last one. + if (versions.size() > 0) { + String versionStr = versions.get(versions.size() - 1); + return "V1".equals(versionStr) ? GatewayTypeEnum.FLINK_V1 : GatewayTypeEnum.FLINK_V2; + } } } catch (Exception e) { log.error("get gateway version failed", e); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java index 3b7be04aeb..317205a912 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java @@ -29,10 +29,10 @@ import org.apache.streampark.flink.kubernetes.KubernetesRetriever; import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode; import org.apache.streampark.flink.kubernetes.ingress.IngressController; +import org.apache.streampark.gateway.CompleteStatementRequestBody; import org.apache.streampark.gateway.OperationHandle; import org.apache.streampark.gateway.factories.FactoryUtil; import org.apache.streampark.gateway.factories.SqlGatewayServiceFactoryUtils; -import org.apache.streampark.gateway.flink.FlinkSqlGatewayServiceFactory; import org.apache.streampark.gateway.results.Column; import org.apache.streampark.gateway.results.GatewayInfo; import org.apache.streampark.gateway.results.OperationInfo; @@ -55,10 +55,6 @@ import java.util.Objects; import java.util.UUID; -import static org.apache.streampark.common.enums.ExecutionMode.KUBERNETES_NATIVE_SESSION; -import static org.apache.streampark.common.enums.ExecutionMode.REMOTE; -import static org.apache.streampark.common.enums.ExecutionMode.YARN_SESSION; - @Slf4j @Service @RequiredArgsConstructor @@ -79,7 +75,7 @@ private SqlGatewayService getSqlGateWayService(Long flinkGatewayId) { config.put( FactoryUtil.SQL_GATEWAY_SERVICE_TYPE.getKey(), flinkGateWay.getGatewayType().getIdentifier()); - config.put(FlinkSqlGatewayServiceFactory.BASE_URI.getKey(), flinkGateWay.getAddress()); + config.put("base-uri", flinkGateWay.getAddress()); List actual = SqlGatewayServiceFactoryUtils.createSqlGatewayService(config); if (actual.size() > 1) { log.warn("There are more than one SqlGatewayService instance, please check your config"); @@ -187,6 +183,15 @@ public OperationHandle executeStatement( .executeStatement(new SessionHandle(sessionHandleUUIDStr), statement, 10000L, null); } + @Override + public List completeStatement( + Long flinkGatewayId, + String sessionHandleUUIDStr, + CompleteStatementRequestBody completeStatementRequestBody) { + return getSqlGateWayService(flinkGatewayId) + .completeStatement(new SessionHandle(sessionHandleUUIDStr), completeStatementRequestBody); + } + @Override public ResultSet fetchResults( Long flinkGatewayId, diff --git a/streampark-flink/streampark-flink-sql-gateway/pom.xml b/streampark-flink/streampark-flink-sql-gateway/pom.xml index 20c14fd7b6..9ac1d9acba 100644 --- a/streampark-flink/streampark-flink-sql-gateway/pom.xml +++ b/streampark-flink/streampark-flink-sql-gateway/pom.xml @@ -30,6 +30,7 @@ streampark-flink-sql-gateway-base streampark-flink-sql-gateway-flink-v1 + streampark-flink-sql-gateway-flink-v2 streampark-flink-sql-gateway-kyuubi diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/CompleteStatementRequestBody.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/CompleteStatementRequestBody.java new file mode 100644 index 0000000000..d89b8898b1 --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/CompleteStatementRequestBody.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.gateway; + +import java.io.Serializable; +import java.util.Objects; + +public class CompleteStatementRequestBody implements Serializable { + + private Integer position; + + private String statement; + + public CompleteStatementRequestBody() {} + + public CompleteStatementRequestBody(Integer position, String statement) { + this.position = position; + this.statement = statement; + } + + public Integer getPosition() { + return position; + } + + public String getStatement() { + return statement; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompleteStatementRequestBody that = (CompleteStatementRequestBody) o; + return Objects.equals(position, that.position) && Objects.equals(statement, that.statement); + } + + @Override + public int hashCode() { + return Objects.hash(position, statement); + } + + @Override + public String toString() { + return "CompleteStatementRequestBody{" + + "position=" + + position + + ", statement='" + + statement + + '\'' + + '}'; + } +} diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java index b355ac9dc4..6c1ef14c38 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java @@ -17,6 +17,7 @@ package org.apache.streampark.gateway.service; +import org.apache.streampark.gateway.CompleteStatementRequestBody; import org.apache.streampark.gateway.ExecutionConfiguration; import org.apache.streampark.gateway.OperationHandle; import org.apache.streampark.gateway.OperationStatus; @@ -29,6 +30,8 @@ import org.apache.streampark.gateway.session.SessionEnvironment; import org.apache.streampark.gateway.session.SessionHandle; +import java.util.List; + /** A service of SQL gateway is responsible for handling requests from streampark console. */ public interface SqlGatewayService { @@ -156,4 +159,15 @@ ResultSet fetchResults( OperationHandle operationHandle, ResultQueryCondition resultQueryCondition) throws SqlGatewayException; + + /** + * Get the completion hints for the given statement at the given position. + * + * @param sessionHandle handle to identify the session. + * @param completeStatementRequestBody completion hints request body. + * @return Returns the completion hints. + */ + List completeStatement( + SessionHandle sessionHandle, CompleteStatementRequestBody completeStatementRequestBody) + throws SqlGatewayException; } diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/service/SqlGatewayServiceFactoryUtilsTest.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/service/SqlGatewayServiceFactoryUtilsTest.java index 475676c0f6..e6f4062cd3 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/service/SqlGatewayServiceFactoryUtilsTest.java +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/service/SqlGatewayServiceFactoryUtilsTest.java @@ -90,35 +90,6 @@ public void testCreateUnknownService() { SqlGatewayServiceFactory.class.getCanonicalName())); } - /* @Test - public void testCreateServiceWithMissingOptions() { - Map config = getDefaultConfig(); - config.remove("sql-gateway.Service.mocked.host"); - - validateException( - config, - "One or more required options are missing.\n\n" - + "Missing required options are:\n\n" - + "host"); - }*/ - - /* @Test - public void testCreateServiceWithUnconsumedOptions() { - Map config = getDefaultConfig(); - config.put("sql-gateway.Service.mocked.unconsumed-option", "error"); - - validateException( - config, - "Unsupported options found for 'mocked'.\n\n" - + "Unsupported options:\n\n" - + "unconsumed-option\n\n" - + "Supported options:\n\n" - + "description\n" - + "host\n" - + "id\n" - + "port"); - }*/ - // -------------------------------------------------------------------------------------------- private void validateException(Map config, String errorMessage) { diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java index a9242597d6..5ded1c189e 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java @@ -17,6 +17,7 @@ package org.apache.streampark.gateway.utils; +import org.apache.streampark.gateway.CompleteStatementRequestBody; import org.apache.streampark.gateway.ExecutionConfiguration; import org.apache.streampark.gateway.OperationHandle; import org.apache.streampark.gateway.exception.SqlGatewayException; @@ -29,6 +30,8 @@ import org.apache.streampark.gateway.session.SessionEnvironment; import org.apache.streampark.gateway.session.SessionHandle; +import java.util.List; + /** Mocked implementation of {@link SqlGatewayService}. */ public class FakeSqlGatewayService implements SqlGatewayService { @@ -103,4 +106,11 @@ public ResultSet fetchResults( throws SqlGatewayException { throw new UnsupportedOperationException(); } + + @Override + public List completeStatement( + SessionHandle sessionHandle, CompleteStatementRequestBody completeStatementRequestBody) + throws SqlGatewayException { + throw new UnsupportedOperationException(); + } } diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java index defdea367c..e6e6862d44 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java @@ -17,6 +17,7 @@ package org.apache.streampark.gateway.utils; +import org.apache.streampark.gateway.CompleteStatementRequestBody; import org.apache.streampark.gateway.ExecutionConfiguration; import org.apache.streampark.gateway.OperationHandle; import org.apache.streampark.gateway.exception.SqlGatewayException; @@ -29,6 +30,7 @@ import org.apache.streampark.gateway.session.SessionEnvironment; import org.apache.streampark.gateway.session.SessionHandle; +import java.util.List; import java.util.Objects; /** Mocked implementation of {@link SqlGatewayService}. */ @@ -113,6 +115,13 @@ public ResultSet fetchResults( throw new UnsupportedOperationException(); } + @Override + public List completeStatement( + SessionHandle sessionHandle, CompleteStatementRequestBody completeStatementRequestBody) + throws SqlGatewayException { + throw new UnsupportedOperationException(); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/pom.xml b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/pom.xml index b437f577d5..15d0da8c93 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/pom.xml +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/pom.xml @@ -1,38 +1,4 @@ - - - - - org.apache.streampark.gateway.flink.client.rest.v1 - org.apache.streampark.gateway.flink.client.rest - org.apache.streampark.gateway.flink.client.dto + org.apache.streampark.gateway.flink.v1.client.rest + org.apache.streampark.gateway.flink.v1.client.rest + org.apache.streampark.gateway.flink.v1.client.dto false false diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/v1/FlinkSqlGatewayImpl.java similarity index 85% rename from streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java rename to streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/v1/FlinkSqlGatewayImpl.java index 263cb73ab9..70c9024ae8 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/v1/FlinkSqlGatewayImpl.java @@ -15,22 +15,23 @@ * limitations under the License. */ -package org.apache.streampark.gateway.flink; +package org.apache.streampark.gateway.flink.v1; +import org.apache.streampark.gateway.CompleteStatementRequestBody; import org.apache.streampark.gateway.ExecutionConfiguration; import org.apache.streampark.gateway.OperationHandle; import org.apache.streampark.gateway.OperationStatus; import org.apache.streampark.gateway.exception.SqlGatewayException; -import org.apache.streampark.gateway.flink.client.dto.ExecuteStatementRequestBody; -import org.apache.streampark.gateway.flink.client.dto.FetchResultsResponseBody; -import org.apache.streampark.gateway.flink.client.dto.GetInfoResponseBody; -import org.apache.streampark.gateway.flink.client.dto.OpenSessionRequestBody; -import org.apache.streampark.gateway.flink.client.dto.OperationStatusResponseBody; -import org.apache.streampark.gateway.flink.client.dto.ResultSetColumnsInner; -import org.apache.streampark.gateway.flink.client.dto.ResultSetDataInner; -import org.apache.streampark.gateway.flink.client.rest.ApiClient; -import org.apache.streampark.gateway.flink.client.rest.ApiException; -import org.apache.streampark.gateway.flink.client.rest.v1.DefaultApi; +import org.apache.streampark.gateway.flink.v1.client.dto.ExecuteStatementRequestBody; +import org.apache.streampark.gateway.flink.v1.client.dto.FetchResultsResponseBody; +import org.apache.streampark.gateway.flink.v1.client.dto.GetInfoResponseBody; +import org.apache.streampark.gateway.flink.v1.client.dto.OpenSessionRequestBody; +import org.apache.streampark.gateway.flink.v1.client.dto.OperationStatusResponseBody; +import org.apache.streampark.gateway.flink.v1.client.dto.ResultSetColumnsInner; +import org.apache.streampark.gateway.flink.v1.client.dto.ResultSetDataInner; +import org.apache.streampark.gateway.flink.v1.client.rest.ApiClient; +import org.apache.streampark.gateway.flink.v1.client.rest.ApiException; +import org.apache.streampark.gateway.flink.v1.client.rest.DefaultApi; import org.apache.streampark.gateway.results.Column; import org.apache.streampark.gateway.results.GatewayInfo; import org.apache.streampark.gateway.results.OperationInfo; @@ -94,9 +95,7 @@ public SessionHandle openSession(SessionEnvironment environment) throws SqlGatew @Override public void heartbeat(SessionHandle sessionHandle) throws SqlGatewayException { try { - defaultApi.triggerSession( - new org.apache.streampark.gateway.flink.client.dto.SessionHandle() - .identifier(UUID.fromString(sessionHandle.getIdentifier()))); + defaultApi.triggerSession(sessionHandle.getIdentifier()); } catch (ApiException e) { throw new SqlGatewayException("Flink native SqlGateWay heartbeat failed!", e); } @@ -115,11 +114,7 @@ public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException public void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException { try { - defaultApi.cancelOperation( - new org.apache.streampark.gateway.flink.client.dto.SessionHandle() - .identifier(UUID.fromString(sessionHandle.getIdentifier())), - new org.apache.streampark.gateway.flink.client.dto.OperationHandle() - .identifier(UUID.fromString(operationHandle.getIdentifier()))); + defaultApi.cancelOperation(sessionHandle.getIdentifier(), operationHandle.getIdentifier()); } catch (ApiException e) { throw new SqlGatewayException("Flink native SqlGateWay cancelOperation failed!", e); } @@ -205,7 +200,7 @@ public ResultSet fetchResults( nextToken = Long.valueOf(nextResultUri.substring(nextResultUri.lastIndexOf("/") + 1)); } - org.apache.streampark.gateway.flink.client.dto.ResultSet results = + org.apache.streampark.gateway.flink.v1.client.dto.ResultSet results = fetchResultsResponseBody.getResults(); List resultsColumns = results.getColumns(); @@ -236,4 +231,12 @@ public ResultSet fetchResults( throw new SqlGatewayException("Flink native SqlGateWay fetchResults failed!", e); } } + + @Override + public List completeStatement( + SessionHandle sessionHandle, CompleteStatementRequestBody completeStatementRequestBody) + throws SqlGatewayException { + throw new SqlGatewayException( + "Flink native SqlGateWay don`t support operation:completeStatement!"); + } } diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/v1/FlinkSqlGatewayServiceFactory.java similarity index 97% rename from streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java rename to streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/v1/FlinkSqlGatewayServiceFactory.java index 348c738457..a1aa970786 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/v1/FlinkSqlGatewayServiceFactory.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.streampark.gateway.flink; +package org.apache.streampark.gateway.flink.v1; import org.apache.streampark.gateway.ConfigOption; import org.apache.streampark.gateway.factories.SqlGatewayServiceFactory; diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory index bbefc2c25a..5d627359d8 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory @@ -16,4 +16,4 @@ # limitations under the License. # -org.apache.streampark.gateway.flink.FlinkSqlGatewayServiceFactory +org.apache.streampark.gateway.flink.v1.FlinkSqlGatewayServiceFactory diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/flink_sql_gateway_rest_v1.yml b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/flink_sql_gateway_rest_v1.yml index 3b9614a435..4ee1484491 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/flink_sql_gateway_rest_v1.yml +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/flink_sql_gateway_rest_v1.yml @@ -356,7 +356,7 @@ components: - SYMBOL - UNRESOLVED OperationHandle: - type: object + type: string properties: identifier: type: string @@ -535,7 +535,7 @@ components: status: type: string SessionHandle: - type: object + type: string properties: identifier: type: string diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java index a18f593357..125043cf97 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java @@ -17,11 +17,11 @@ package org.apache.streampark.gateway.flink; -import org.apache.streampark.gateway.flink.client.dto.OpenSessionRequestBody; -import org.apache.streampark.gateway.flink.client.dto.OpenSessionResponseBody; -import org.apache.streampark.gateway.flink.client.rest.ApiClient; -import org.apache.streampark.gateway.flink.client.rest.ApiException; -import org.apache.streampark.gateway.flink.client.rest.v1.DefaultApi; +import org.apache.streampark.gateway.flink.v1.client.dto.OpenSessionRequestBody; +import org.apache.streampark.gateway.flink.v1.client.dto.OpenSessionResponseBody; +import org.apache.streampark.gateway.flink.v1.client.rest.ApiClient; +import org.apache.streampark.gateway.flink.v1.client.rest.ApiException; +import org.apache.streampark.gateway.flink.v1.client.rest.DefaultApi; import java.util.Collections; diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayExample.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayExample.java index 51943270bb..900a734f5c 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayExample.java +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayExample.java @@ -17,13 +17,13 @@ package org.apache.streampark.gateway.flink; -import org.apache.streampark.gateway.flink.client.dto.ExecuteStatementRequestBody; -import org.apache.streampark.gateway.flink.client.dto.ExecuteStatementResponseBody; -import org.apache.streampark.gateway.flink.client.dto.FetchResultsResponseBody; -import org.apache.streampark.gateway.flink.client.dto.OpenSessionRequestBody; -import org.apache.streampark.gateway.flink.client.dto.OpenSessionResponseBody; -import org.apache.streampark.gateway.flink.client.rest.ApiException; -import org.apache.streampark.gateway.flink.client.rest.v1.DefaultApi; +import org.apache.streampark.gateway.flink.v1.client.dto.ExecuteStatementRequestBody; +import org.apache.streampark.gateway.flink.v1.client.dto.ExecuteStatementResponseBody; +import org.apache.streampark.gateway.flink.v1.client.dto.FetchResultsResponseBody; +import org.apache.streampark.gateway.flink.v1.client.dto.OpenSessionRequestBody; +import org.apache.streampark.gateway.flink.v1.client.dto.OpenSessionResponseBody; +import org.apache.streampark.gateway.flink.v1.client.rest.ApiException; +import org.apache.streampark.gateway.flink.v1.client.rest.DefaultApi; import java.util.UUID; diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/pom.xml b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/pom.xml new file mode 100644 index 0000000000..e74f8e07ff --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/pom.xml @@ -0,0 +1,162 @@ + + + + 4.0.0 + + + org.apache.streampark + streampark-flink-sql-gateway + 2.2.0-SNAPSHOT + + + streampark-flink-sql-gateway-flink-v2 + StreamPark : Flink SQL Gateway V2 + + + UTF-8 + 1.8.5 + 1.6.5 + 4.10.0 + 2.9.1 + 3.12.0 + 0.2.4 + 1.3.5 + 5.9.1 + 1.9.1 + 3.12.4 + 2.1.1 + 1.1.1 + + + + + + org.apache.streampark + streampark-flink-sql-gateway-base + ${project.version} + + + + + io.swagger + swagger-annotations + ${swagger-core-version} + + + + com.google.code.findbugs + jsr305 + 3.0.2 + + + com.squareup.okhttp3 + okhttp + ${okhttp-version} + + + com.squareup.okhttp3 + logging-interceptor + ${okhttp-version} + + + com.google.code.gson + gson + ${gson-version} + + + io.gsonfire + gson-fire + ${gson-fire-version} + + + org.apache.commons + commons-lang3 + ${commons-lang3-version} + + + jakarta.annotation + jakarta.annotation-api + ${jakarta-annotation-version} + provided + + + org.openapitools + jackson-databind-nullable + ${jackson-databind-nullable-version} + + + javax.ws.rs + jsr311-api + ${jsr311-api-version} + + + javax.ws.rs + javax.ws.rs-api + ${javax.ws.rs-api-version} + + + + org.junit.platform + junit-platform-runner + ${junit-platform-runner.version} + test + + + org.mockito + mockito-core + ${mockito-core-version} + test + + + + + + + + org.openapitools + openapi-generator-maven-plugin + 6.5.0 + + + + generate + + + ${project.basedir}/src/main/resources/flink_sql_gateway_rest_v2.yml + java + + org.apache.streampark.gateway.flink.v2.client.rest + org.apache.streampark.gateway.flink.v2.client.rest + org.apache.streampark.gateway.flink.v2.client.dto + false + false + + src/gen/java/main + + + + + + + + + diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/java/org/apache/streampark/gateway/flink/v2/FlinkSqlGatewayImpl.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/java/org/apache/streampark/gateway/flink/v2/FlinkSqlGatewayImpl.java new file mode 100644 index 0000000000..fad33f9221 --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/java/org/apache/streampark/gateway/flink/v2/FlinkSqlGatewayImpl.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.gateway.flink.v2; + +import org.apache.streampark.gateway.CompleteStatementRequestBody; +import org.apache.streampark.gateway.ExecutionConfiguration; +import org.apache.streampark.gateway.OperationHandle; +import org.apache.streampark.gateway.OperationStatus; +import org.apache.streampark.gateway.exception.SqlGatewayException; +import org.apache.streampark.gateway.flink.v2.client.dto.ColumnInfo; +import org.apache.streampark.gateway.flink.v2.client.dto.ExecuteStatementRequestBody; +import org.apache.streampark.gateway.flink.v2.client.dto.FetchResultsResponseBody; +import org.apache.streampark.gateway.flink.v2.client.dto.GetInfoResponseBody; +import org.apache.streampark.gateway.flink.v2.client.dto.OpenSessionRequestBody; +import org.apache.streampark.gateway.flink.v2.client.dto.OperationStatusResponseBody; +import org.apache.streampark.gateway.flink.v2.client.dto.ResultType; +import org.apache.streampark.gateway.flink.v2.client.dto.RowFormat; +import org.apache.streampark.gateway.flink.v2.client.rest.ApiClient; +import org.apache.streampark.gateway.flink.v2.client.rest.ApiException; +import org.apache.streampark.gateway.flink.v2.client.rest.DefaultApi; +import org.apache.streampark.gateway.results.Column; +import org.apache.streampark.gateway.results.GatewayInfo; +import org.apache.streampark.gateway.results.OperationInfo; +import org.apache.streampark.gateway.results.ResultKind; +import org.apache.streampark.gateway.results.ResultQueryCondition; +import org.apache.streampark.gateway.results.ResultSet; +import org.apache.streampark.gateway.results.RowData; +import org.apache.streampark.gateway.service.SqlGatewayService; +import org.apache.streampark.gateway.session.SessionEnvironment; +import org.apache.streampark.gateway.session.SessionHandle; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** Implement {@link SqlGatewayService} with Flink native SqlGateway. */ +public class FlinkSqlGatewayImpl implements SqlGatewayService { + + private final DefaultApi defaultApi; + + public FlinkSqlGatewayImpl(String baseUri) { + ApiClient client = new ApiClient(); + client.setBasePath(baseUri); + defaultApi = new DefaultApi(client); + } + + @Override + public boolean check(String flinkMajorVersion) { + // flink gateway v1 api is supported from flink 1.16 + return Double.parseDouble(flinkMajorVersion) >= 1.16; + } + + @Override + public GatewayInfo getGatewayInfo() throws SqlGatewayException { + GetInfoResponseBody info = null; + try { + info = defaultApi.getInfo(); + return new GatewayInfo(info.getProductName(), info.getVersion()); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay getGatewayInfo failed!", e); + } + } + + @Override + public SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException { + try { + return new SessionHandle( + Objects.requireNonNull( + defaultApi + .openSession( + new OpenSessionRequestBody() + .sessionName(environment.getSessionName()) + .properties(environment.getSessionConfig())) + .getSessionHandle())); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay openSession failed!", e); + } + } + + @Override + public void heartbeat(SessionHandle sessionHandle) throws SqlGatewayException { + try { + defaultApi.triggerSession(sessionHandle.getIdentifier()); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay heartbeat failed!", e); + } + } + + @Override + public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException { + try { + defaultApi.closeSession(sessionHandle.getIdentifier()); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay closeSession failed!", e); + } + } + + @Override + public void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException { + try { + defaultApi.cancelOperation(sessionHandle.getIdentifier(), operationHandle.getIdentifier()); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay cancelOperation failed!", e); + } + } + + @Override + public void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException { + try { + defaultApi.closeOperation(sessionHandle.getIdentifier(), operationHandle.getIdentifier()); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay closeOperation failed!", e); + } + } + + @Override + public OperationInfo getOperationInfo( + SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException { + + try { + OperationStatusResponseBody operationStatus = + defaultApi.getOperationStatus( + sessionHandle.getIdentifier(), operationHandle.getIdentifier()); + return new OperationInfo(OperationStatus.valueOf(operationStatus.getStatus()), null); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay closeOperation failed!", e); + } + } + + @Override + public Column getOperationResultSchema( + SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException { + throw new SqlGatewayException( + "Flink native SqlGateWay don`t support operation:getOperationResultSchema!"); + } + + @Override + public OperationHandle executeStatement( + SessionHandle sessionHandle, + String statement, + long executionTimeoutMs, + ExecutionConfiguration executionConfig) + throws SqlGatewayException { + try { + return new OperationHandle( + Objects.requireNonNull( + defaultApi + .executeStatement( + sessionHandle.getIdentifier(), + new ExecuteStatementRequestBody() + .statement(statement) + // currently, sql gateway don't support execution timeout + // .executionTimeout(executionTimeoutMs) + .putExecutionConfigItem( + "pipeline.name", "Flink SQL Gateway SDK on flink cluster Example")) + .getOperationHandle())); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay executeStatement failed!", e); + } + } + + @Override + public ResultSet fetchResults( + SessionHandle sessionHandle, + OperationHandle operationHandle, + ResultQueryCondition resultQueryCondition) + throws SqlGatewayException { + try { + + List data = new ArrayList<>(); + List columns = new ArrayList<>(); + FetchResultsResponseBody fetchResultsResponseBody = + defaultApi.fetchResults( + sessionHandle.getIdentifier(), + operationHandle.getIdentifier(), + resultQueryCondition.getToken(), + RowFormat.JSON); + ResultType resultType = fetchResultsResponseBody.getResultType(); + Long nextToken = null; + if (fetchResultsResponseBody.getNextResultUri() != null) { + String nextResultUri = fetchResultsResponseBody.getNextResultUri(); + nextToken = + Long.valueOf( + nextResultUri.substring( + nextResultUri.lastIndexOf("/") + 1, nextResultUri.lastIndexOf("?"))); + } + + org.apache.streampark.gateway.flink.v2.client.dto.ResultInfo results = + fetchResultsResponseBody.getResults(); + + List resultsColumns = results.getColumns(); + List resultsData = + results.getData(); + + resultsColumns.forEach( + column -> + columns.add( + new Column( + column.getName(), column.getLogicalType().toString(), column.getComment()))); + resultsData.forEach(row -> data.add(new RowData(row.getKind().getValue(), row.getFields()))); + + ResultKind resultKind = + columns.size() == 1 && columns.get(0).getName().equals("result") + ? ResultKind.SUCCESS + : ResultKind.SUCCESS_WITH_CONTENT; + + return new ResultSet( + ResultSet.ResultType.valueOf(resultType.getValue()), + nextToken, + columns, + data, + true, + null, + resultKind); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay fetchResults failed!", e); + } + } + + @Override + public List completeStatement( + SessionHandle sessionHandle, CompleteStatementRequestBody completeStatementRequestBody) + throws SqlGatewayException { + + try { + return defaultApi + .completeStatement( + sessionHandle.getIdentifier(), + new org.apache.streampark.gateway.flink.v2.client.dto.CompleteStatementRequestBody() + .statement(completeStatementRequestBody.getStatement()) + .position(completeStatementRequestBody.getPosition())) + .getCandidates(); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay completeStatement failed!", e); + } + } +} diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/java/org/apache/streampark/gateway/flink/v2/FlinkSqlGatewayServiceFactory.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/java/org/apache/streampark/gateway/flink/v2/FlinkSqlGatewayServiceFactory.java new file mode 100644 index 0000000000..9c18d6bc6d --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/java/org/apache/streampark/gateway/flink/v2/FlinkSqlGatewayServiceFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.gateway.flink.v2; + +import org.apache.streampark.gateway.ConfigOption; +import org.apache.streampark.gateway.factories.SqlGatewayServiceFactory; +import org.apache.streampark.gateway.factories.SqlGatewayServiceFactoryUtils; +import org.apache.streampark.gateway.service.SqlGatewayService; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** Flink sql gateway's Factory for {@link SqlGatewayService}. */ +public class FlinkSqlGatewayServiceFactory implements SqlGatewayServiceFactory { + + public static final ConfigOption BASE_URI = + ConfigOption.key("base-uri") + .stringType() + .noDefaultValue() + .withDescription("The base uri of the flink cluster."); + + @Override + public String factoryIdentifier() { + return "flink-v2"; + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(BASE_URI); + return options; + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + + @Override + public SqlGatewayService createSqlGatewayService(Context context) { + SqlGatewayServiceFactoryUtils.EndpointFactoryHelper helper = + SqlGatewayServiceFactoryUtils.createEndpointFactoryHelper(this, context); + helper.validate(); + String baseUri = context.getGateWayServiceOptions().get(BASE_URI.getKey()); + return new FlinkSqlGatewayImpl(baseUri); + } +} diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory new file mode 100644 index 0000000000..d77670023d --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.streampark.gateway.flink.v2.FlinkSqlGatewayServiceFactory diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/resources/flink_sql_gateway_rest_v2.yml b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/resources/flink_sql_gateway_rest_v2.yml new file mode 100644 index 0000000000..f48975fb25 --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/resources/flink_sql_gateway_rest_v2.yml @@ -0,0 +1,618 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +openapi: 3.0.1 +info: + title: Flink SQL Gateway REST API + contact: + email: user@flink.apache.org + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0.html + version: v2/1.17-SNAPSHOT +paths: + /api_versions: + get: + description: Get the current available versions for the Rest Endpoint. The client + can choose one of the return version as the protocol for later communicate. + operationId: getApiVersion + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/GetApiVersionResponseBody' + /info: + get: + description: Get meta data for this cluster. + operationId: getInfo + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/GetInfoResponseBody' + /sessions: + post: + description: Opens a new session with specific properties. Specific properties + can be given for current session which will override the default properties + of gateway. + operationId: openSession + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/OpenSessionRequestBody' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OpenSessionResponseBody' + /sessions/{session_handle}: + get: + description: Get the session configuration. + operationId: getSessionConfig + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/GetSessionConfigResponseBody' + delete: + description: Closes the specific session. + operationId: closeSession + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/CloseSessionResponseBody' + /sessions/{session_handle}/complete-statement: + get: + description: Get the completion hints for the given statement at the given position. + operationId: completeStatement + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/CompleteStatementRequestBody' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/CompleteStatementResponseBody' + /sessions/{session_handle}/configure-session: + post: + description: |- + Configures the session with the statement which could be: + CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, CREATE VIEW, DROP VIEW, LOAD MODULE, UNLOAD MODULE, USE MODULE, ADD JAR. + operationId: configureSession + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ConfigureSessionRequestBody' + responses: + "200": + description: The request was successful. + /sessions/{session_handle}/heartbeat: + post: + description: "Trigger heartbeat to tell the server that the client is active,\ + \ and to keep the session alive as long as configured timeout value." + operationId: triggerSession + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + responses: + "200": + description: The request was successful. + /sessions/{session_handle}/operations/{operation_handle}/cancel: + post: + description: Cancel the operation. + operationId: cancelOperation + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + - name: operation_handle + in: path + description: The OperationHandle that identifies a operation. + required: true + schema: + $ref: '#/components/schemas/OperationHandle' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OperationStatusResponseBody' + /sessions/{session_handle}/operations/{operation_handle}/close: + delete: + description: Close the operation. + operationId: closeOperation + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + - name: operation_handle + in: path + description: The OperationHandle that identifies a operation. + required: true + schema: + $ref: '#/components/schemas/OperationHandle' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OperationStatusResponseBody' + /sessions/{session_handle}/operations/{operation_handle}/result/{token}: + get: + description: Fetch results of Operation. + operationId: fetchResults + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + - name: operation_handle + in: path + description: The OperationHandle that identifies a operation. + required: true + schema: + $ref: '#/components/schemas/OperationHandle' + - name: token + in: path + description: The OperationHandle that identifies a operation. + required: true + schema: + type: integer + format: int64 + - name: rowFormat + in: query + description: The row format to serialize the RowData. + required: true + style: form + schema: + $ref: '#/components/schemas/RowFormat' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/FetchResultsResponseBody' + /sessions/{session_handle}/operations/{operation_handle}/status: + get: + description: Get the status of operation. + operationId: getOperationStatus + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + - name: operation_handle + in: path + description: The OperationHandle that identifies a operation. + required: true + schema: + $ref: '#/components/schemas/OperationHandle' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OperationStatusResponseBody' + /sessions/{session_handle}/statements: + post: + description: Execute a statement. + operationId: executeStatement + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ExecuteStatementRequestBody' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/ExecuteStatementResponseBody' +components: + schemas: + CloseSessionResponseBody: + type: object + properties: + status: + type: string + Column: + type: object + properties: + comment: + type: string + dataType: + $ref: '#/components/schemas/DataType' + name: + type: string + persisted: + type: boolean + physical: + type: boolean + ColumnInfo: + type: object + properties: + comment: + type: string + logicalType: +# $ref: '#/components/schemas/LogicalType' + type: object + name: + type: string + CompleteStatementRequestBody: + type: object + properties: + position: + type: integer + format: int32 + statement: + type: string + CompleteStatementResponseBody: + type: object + properties: + candidates: + type: array + items: + type: string + ConfigureSessionRequestBody: + type: object + properties: + executionTimeout: + type: integer + format: int64 + statement: + type: string + ConstraintType: + type: string + enum: + - PRIMARY_KEY + - UNIQUE_KEY + DataType: + type: object + properties: + children: + type: array + items: + $ref: '#/components/schemas/DataType' + logicalType: + $ref: '#/components/schemas/LogicalType' + ExecuteStatementRequestBody: + type: object + properties: + executionConfig: + type: object + additionalProperties: + type: string + executionTimeout: + type: integer + format: int64 + statement: + type: string + ExecuteStatementResponseBody: + type: object + properties: + operationHandle: + type: string + Expression: + type: object + FetchResultsResponseBody: + type: object + properties: + jobID: + $ref: '#/components/schemas/JobID' + nextResultUri: + type: string + isQueryResult: + type: boolean + resultKind: + $ref: '#/components/schemas/ResultKind' + resultType: + $ref: '#/components/schemas/ResultType' + results: + $ref: '#/components/schemas/ResultInfo' + FieldGetter: + type: object + GetApiVersionResponseBody: + type: object + properties: + versions: + type: array + items: + type: string + GetInfoResponseBody: + type: object + properties: + productName: + type: string + version: + type: string + GetSessionConfigResponseBody: + type: object + properties: + properties: + type: object + additionalProperties: + type: string + IntermediateDataSetID: + pattern: "[0-9a-f]{32}" + type: string + JobID: + pattern: "[0-9a-f]{32}" + type: string + JobVertexID: + pattern: "[0-9a-f]{32}" + type: string + LogicalType: + type: object + properties: + nullable: + type: boolean + type: + $ref: '#/components/schemas/LogicalTypeRoot' + LogicalTypeRoot: + type: string + enum: + - CHAR + - VARCHAR + - BOOLEAN + - BINARY + - VARBINARY + - DECIMAL + - TINYINT + - SMALLINT + - INTEGER + - BIGINT + - FLOAT + - DOUBLE + - DATE + - TIME_WITHOUT_TIME_ZONE + - TIMESTAMP_WITHOUT_TIME_ZONE + - TIMESTAMP_WITH_TIME_ZONE + - TIMESTAMP_WITH_LOCAL_TIME_ZONE + - INTERVAL_YEAR_MONTH + - INTERVAL_DAY_TIME + - ARRAY + - MULTISET + - MAP + - ROW + - DISTINCT_TYPE + - STRUCTURED_TYPE + - "NULL" + - RAW + - SYMBOL + - UNRESOLVED + OpenSessionRequestBody: + type: object + properties: + properties: + type: object + additionalProperties: + type: string + sessionName: + type: string + OpenSessionResponseBody: + type: object + properties: + sessionHandle: + type: string + OperationHandle: + type: string + properties: + identifier: + type: string + format: uuid + OperationStatusResponseBody: + type: object + properties: + status: + type: string + ResolvedExpression: + type: object + properties: + children: + type: array + items: + $ref: '#/components/schemas/Expression' + outputDataType: + $ref: '#/components/schemas/DataType' + resolvedChildren: + type: array + items: + $ref: '#/components/schemas/ResolvedExpression' + ResolvedSchema: + type: object + properties: + columnCount: + type: integer + format: int32 + columnDataTypes: + type: array + items: + $ref: '#/components/schemas/DataType' + columnNames: + type: array + items: + type: string + columns: + type: array + items: + $ref: '#/components/schemas/Column' + primaryKey: + $ref: '#/components/schemas/UniqueConstraint' + primaryKeyIndexes: + type: array + items: + type: integer + format: int32 + watermarkSpecs: + type: array + items: + $ref: '#/components/schemas/WatermarkSpec' + ResourceID: + pattern: "[0-9a-f]{32}" + type: string + ResultInfo: + type: object + properties: + columns: + type: array + items: + $ref: '#/components/schemas/ColumnInfo' + data: + type: array + items: + $ref: '#/components/schemas/RowData' + resultSchema: + $ref: '#/components/schemas/ResolvedSchema' + rowFormat: + $ref: '#/components/schemas/RowFormat' + ResultKind: + type: string + enum: + - SUCCESS + - SUCCESS_WITH_CONTENT + ResultType: + type: string + enum: + - NOT_READY + - PAYLOAD + - EOS + RowData: + type: object + properties: + fields: + type: array + items: + type: object + kind: + $ref: '#/components/schemas/RowKind' + RowFormat: + type: string + enum: + - JSON + - PLAIN_TEXT + RowKind: + type: string + enum: + - INSERT + - UPDATE_BEFORE + - UPDATE_AFTER + - DELETE + SerializedThrowable: + type: object + properties: + serialized-throwable: + type: string + format: binary + SessionHandle: + type: string + properties: + identifier: + type: string + format: uuid + TriggerId: + pattern: "[0-9a-f]{32}" + type: string + UniqueConstraint: + type: object + properties: + columns: + type: array + items: + type: string + enforced: + type: boolean + name: + type: string + type: + $ref: '#/components/schemas/ConstraintType' + WatermarkSpec: + type: object + properties: + rowtimeAttribute: + type: string + watermarkExpression: + $ref: '#/components/schemas/ResolvedExpression' diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java new file mode 100644 index 0000000000..debe60862a --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.gateway.flink; + +import org.apache.streampark.gateway.flink.v2.client.dto.OpenSessionRequestBody; +import org.apache.streampark.gateway.flink.v2.client.dto.OpenSessionResponseBody; +import org.apache.streampark.gateway.flink.v2.client.rest.ApiClient; +import org.apache.streampark.gateway.flink.v2.client.rest.ApiException; +import org.apache.streampark.gateway.flink.v2.client.rest.DefaultApi; + +import java.util.Collections; + +public class FlinkSqlGateway { + + private FlinkSqlGateway() {} + + public static DefaultApi sqlGatewayApi(String basePath) { + ApiClient client = new ApiClient(); + client.setBasePath(basePath); + return new DefaultApi(client); + } + + public static void main(String[] args) throws ApiException { + DefaultApi api = new DefaultApi(new ApiClient()); + OpenSessionResponseBody openSessionResponseBody = + api.openSession( + new OpenSessionRequestBody() + .sessionName("example") + .properties(Collections.singletonMap("foo", "bar"))); + } +} diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayExample.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayExample.java new file mode 100644 index 0000000000..dbfb88a958 --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayExample.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.gateway.flink; + +import org.apache.streampark.gateway.flink.v2.client.dto.ExecuteStatementRequestBody; +import org.apache.streampark.gateway.flink.v2.client.dto.ExecuteStatementResponseBody; +import org.apache.streampark.gateway.flink.v2.client.dto.FetchResultsResponseBody; +import org.apache.streampark.gateway.flink.v2.client.dto.OpenSessionRequestBody; +import org.apache.streampark.gateway.flink.v2.client.dto.OpenSessionResponseBody; +import org.apache.streampark.gateway.flink.v2.client.dto.RowFormat; +import org.apache.streampark.gateway.flink.v2.client.rest.ApiException; +import org.apache.streampark.gateway.flink.v2.client.rest.DefaultApi; + +public class FlinkSqlGatewayExample { + + private FlinkSqlGatewayExample() {} + + public static void main(String[] args) throws Exception { + DefaultApi api = FlinkSqlGateway.sqlGatewayApi("http://192.168.20.239:8083"); + runOnRemote(api); + // runOnYarn(api); + // runOnKubernetes(api); + } + + public static void runOnRemote(DefaultApi api) throws ApiException, InterruptedException { + OpenSessionResponseBody response = + api.openSession( + new OpenSessionRequestBody() + .putPropertiesItem("rest.address", "192.168.20.239") + .putPropertiesItem("rest.port", "8081") + .putPropertiesItem("execution.target", "remote")); + String sessionHandle = response.getSessionHandle(); + System.out.println("SessionHandle: " + sessionHandle); + + ExecuteStatementResponseBody statement1 = + api.executeStatement( + sessionHandle, + new ExecuteStatementRequestBody() + .statement( + "CREATE TABLE Orders (\n" + + " order_number BIGINT,\n" + + " price DECIMAL(32,2),\n" + + " buyer ROW,\n" + + " order_time TIMESTAMP(3)\n" + + ") WITH (\n" + + " 'connector' = 'datagen',\n" + + " 'number-of-rows' = '2'\n" + + ")") + .putExecutionConfigItem( + "pipeline.name", "Flink SQL Gateway SDK on flink cluster Example")); + + System.out.println("create table: " + statement1.getOperationHandle()); + + ExecuteStatementResponseBody statement2 = + api.executeStatement( + sessionHandle, + new ExecuteStatementRequestBody() + .statement("select * from Orders;") + .putExecutionConfigItem( + "pipeline.name", "Flink SQL Gateway SDK on flink cluster Example")); + + System.out.println("select * from Orders: " + statement2.getOperationHandle()); + + Thread.sleep(1000 * 10); + + FetchResultsResponseBody fetchResultsResponseBody = + api.fetchResults(sessionHandle, statement2.getOperationHandle(), 0L, RowFormat.JSON); + System.out.println(fetchResultsResponseBody.getResults()); + } + + // public static void main(String[] args) { + // System.out.println( + // "CREATE TABLE Orders (\n" + // + " order_number BIGINT,\n" + // + " price DECIMAL(32,2),\n" + // + " buyer ROW,\n" + // + " order_time TIMESTAMP(3)\n" + // + ") WITH (\n" + // + " 'connector' = 'datagen',\n" + // + " 'number-of-rows' = '2'\n" + // + ")"); + // } +}