diff --git a/streampark-flink/streampark-flink-sql-gateway/pom.xml b/streampark-flink/streampark-flink-sql-gateway/pom.xml index 20c14fd7b6..9ac1d9acba 100644 --- a/streampark-flink/streampark-flink-sql-gateway/pom.xml +++ b/streampark-flink/streampark-flink-sql-gateway/pom.xml @@ -30,6 +30,7 @@ streampark-flink-sql-gateway-base streampark-flink-sql-gateway-flink-v1 + streampark-flink-sql-gateway-flink-v2 streampark-flink-sql-gateway-kyuubi diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/CompleteStatementRequestBody.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/CompleteStatementRequestBody.java new file mode 100644 index 0000000000..23938fbc0f --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/CompleteStatementRequestBody.java @@ -0,0 +1,52 @@ +package org.apache.streampark.gateway; + +import java.io.Serializable; +import java.util.Objects; + +public class CompleteStatementRequestBody implements Serializable { + + private final Integer position; + + private final String statement; + + public CompleteStatementRequestBody(Integer position, String statement) { + this.position = position; + this.statement = statement; + } + + public Integer getPosition() { + return position; + } + + public String getStatement() { + return statement; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompleteStatementRequestBody that = (CompleteStatementRequestBody) o; + return Objects.equals(position, that.position) && Objects.equals(statement, that.statement); + } + + @Override + public int hashCode() { + return Objects.hash(position, statement); + } + + @Override + public String toString() { + return "CompleteStatementRequestBody{" + + "position=" + + position + + ", statement='" + + statement + + '\'' + + '}'; + } +} diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java index b355ac9dc4..6c1ef14c38 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java @@ -17,6 +17,7 @@ package org.apache.streampark.gateway.service; +import org.apache.streampark.gateway.CompleteStatementRequestBody; import org.apache.streampark.gateway.ExecutionConfiguration; import org.apache.streampark.gateway.OperationHandle; import org.apache.streampark.gateway.OperationStatus; @@ -29,6 +30,8 @@ import org.apache.streampark.gateway.session.SessionEnvironment; import org.apache.streampark.gateway.session.SessionHandle; +import java.util.List; + /** A service of SQL gateway is responsible for handling requests from streampark console. */ public interface SqlGatewayService { @@ -156,4 +159,15 @@ ResultSet fetchResults( OperationHandle operationHandle, ResultQueryCondition resultQueryCondition) throws SqlGatewayException; + + /** + * Get the completion hints for the given statement at the given position. + * + * @param sessionHandle handle to identify the session. + * @param completeStatementRequestBody completion hints request body. + * @return Returns the completion hints. + */ + List completeStatement( + SessionHandle sessionHandle, CompleteStatementRequestBody completeStatementRequestBody) + throws SqlGatewayException; } diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/service/SqlGatewayServiceFactoryUtilsTest.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/service/SqlGatewayServiceFactoryUtilsTest.java index 475676c0f6..e6f4062cd3 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/service/SqlGatewayServiceFactoryUtilsTest.java +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/service/SqlGatewayServiceFactoryUtilsTest.java @@ -90,35 +90,6 @@ public void testCreateUnknownService() { SqlGatewayServiceFactory.class.getCanonicalName())); } - /* @Test - public void testCreateServiceWithMissingOptions() { - Map config = getDefaultConfig(); - config.remove("sql-gateway.Service.mocked.host"); - - validateException( - config, - "One or more required options are missing.\n\n" - + "Missing required options are:\n\n" - + "host"); - }*/ - - /* @Test - public void testCreateServiceWithUnconsumedOptions() { - Map config = getDefaultConfig(); - config.put("sql-gateway.Service.mocked.unconsumed-option", "error"); - - validateException( - config, - "Unsupported options found for 'mocked'.\n\n" - + "Unsupported options:\n\n" - + "unconsumed-option\n\n" - + "Supported options:\n\n" - + "description\n" - + "host\n" - + "id\n" - + "port"); - }*/ - // -------------------------------------------------------------------------------------------- private void validateException(Map config, String errorMessage) { diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/pom.xml b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/pom.xml index b437f577d5..15d0da8c93 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/pom.xml +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/pom.xml @@ -1,38 +1,4 @@ - - - - - org.apache.streampark.gateway.flink.client.rest.v1 - org.apache.streampark.gateway.flink.client.rest - org.apache.streampark.gateway.flink.client.dto + org.apache.streampark.gateway.flink.v1.client.rest + org.apache.streampark.gateway.flink.v1.client.rest + org.apache.streampark.gateway.flink.v1.client.dto false false diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java index 263cb73ab9..aeee982ebd 100644 --- a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java @@ -17,20 +17,21 @@ package org.apache.streampark.gateway.flink; +import org.apache.streampark.gateway.CompleteStatementRequestBody; import org.apache.streampark.gateway.ExecutionConfiguration; import org.apache.streampark.gateway.OperationHandle; import org.apache.streampark.gateway.OperationStatus; import org.apache.streampark.gateway.exception.SqlGatewayException; -import org.apache.streampark.gateway.flink.client.dto.ExecuteStatementRequestBody; -import org.apache.streampark.gateway.flink.client.dto.FetchResultsResponseBody; -import org.apache.streampark.gateway.flink.client.dto.GetInfoResponseBody; -import org.apache.streampark.gateway.flink.client.dto.OpenSessionRequestBody; -import org.apache.streampark.gateway.flink.client.dto.OperationStatusResponseBody; -import org.apache.streampark.gateway.flink.client.dto.ResultSetColumnsInner; -import org.apache.streampark.gateway.flink.client.dto.ResultSetDataInner; -import org.apache.streampark.gateway.flink.client.rest.ApiClient; -import org.apache.streampark.gateway.flink.client.rest.ApiException; -import org.apache.streampark.gateway.flink.client.rest.v1.DefaultApi; +import org.apache.streampark.gateway.flink.v1.client.dto.ExecuteStatementRequestBody; +import org.apache.streampark.gateway.flink.v1.client.dto.FetchResultsResponseBody; +import org.apache.streampark.gateway.flink.v1.client.dto.GetInfoResponseBody; +import org.apache.streampark.gateway.flink.v1.client.dto.OpenSessionRequestBody; +import org.apache.streampark.gateway.flink.v1.client.dto.OperationStatusResponseBody; +import org.apache.streampark.gateway.flink.v1.client.dto.ResultSetColumnsInner; +import org.apache.streampark.gateway.flink.v1.client.dto.ResultSetDataInner; +import org.apache.streampark.gateway.flink.v1.client.rest.ApiClient; +import org.apache.streampark.gateway.flink.v1.client.rest.ApiException; +import org.apache.streampark.gateway.flink.v1.client.rest.DefaultApi; import org.apache.streampark.gateway.results.Column; import org.apache.streampark.gateway.results.GatewayInfo; import org.apache.streampark.gateway.results.OperationInfo; @@ -95,7 +96,7 @@ public SessionHandle openSession(SessionEnvironment environment) throws SqlGatew public void heartbeat(SessionHandle sessionHandle) throws SqlGatewayException { try { defaultApi.triggerSession( - new org.apache.streampark.gateway.flink.client.dto.SessionHandle() + new org.apache.streampark.gateway.flink.v1.client.dto.SessionHandle() .identifier(UUID.fromString(sessionHandle.getIdentifier()))); } catch (ApiException e) { throw new SqlGatewayException("Flink native SqlGateWay heartbeat failed!", e); @@ -116,9 +117,9 @@ public void cancelOperation(SessionHandle sessionHandle, OperationHandle operati throws SqlGatewayException { try { defaultApi.cancelOperation( - new org.apache.streampark.gateway.flink.client.dto.SessionHandle() + new org.apache.streampark.gateway.flink.v1.client.dto.SessionHandle() .identifier(UUID.fromString(sessionHandle.getIdentifier())), - new org.apache.streampark.gateway.flink.client.dto.OperationHandle() + new org.apache.streampark.gateway.flink.v1.client.dto.OperationHandle() .identifier(UUID.fromString(operationHandle.getIdentifier()))); } catch (ApiException e) { throw new SqlGatewayException("Flink native SqlGateWay cancelOperation failed!", e); @@ -205,7 +206,7 @@ public ResultSet fetchResults( nextToken = Long.valueOf(nextResultUri.substring(nextResultUri.lastIndexOf("/") + 1)); } - org.apache.streampark.gateway.flink.client.dto.ResultSet results = + org.apache.streampark.gateway.flink.v1.client.dto.ResultSet results = fetchResultsResponseBody.getResults(); List resultsColumns = results.getColumns(); @@ -236,4 +237,12 @@ public ResultSet fetchResults( throw new SqlGatewayException("Flink native SqlGateWay fetchResults failed!", e); } } + + @Override + public List completeStatement( + SessionHandle sessionHandle, CompleteStatementRequestBody completeStatementRequestBody) + throws SqlGatewayException { + throw new SqlGatewayException( + "Flink native SqlGateWay don`t support operation:completeStatement!"); + } } diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/pom.xml b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/pom.xml new file mode 100644 index 0000000000..e74f8e07ff --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/pom.xml @@ -0,0 +1,162 @@ + + + + 4.0.0 + + + org.apache.streampark + streampark-flink-sql-gateway + 2.2.0-SNAPSHOT + + + streampark-flink-sql-gateway-flink-v2 + StreamPark : Flink SQL Gateway V2 + + + UTF-8 + 1.8.5 + 1.6.5 + 4.10.0 + 2.9.1 + 3.12.0 + 0.2.4 + 1.3.5 + 5.9.1 + 1.9.1 + 3.12.4 + 2.1.1 + 1.1.1 + + + + + + org.apache.streampark + streampark-flink-sql-gateway-base + ${project.version} + + + + + io.swagger + swagger-annotations + ${swagger-core-version} + + + + com.google.code.findbugs + jsr305 + 3.0.2 + + + com.squareup.okhttp3 + okhttp + ${okhttp-version} + + + com.squareup.okhttp3 + logging-interceptor + ${okhttp-version} + + + com.google.code.gson + gson + ${gson-version} + + + io.gsonfire + gson-fire + ${gson-fire-version} + + + org.apache.commons + commons-lang3 + ${commons-lang3-version} + + + jakarta.annotation + jakarta.annotation-api + ${jakarta-annotation-version} + provided + + + org.openapitools + jackson-databind-nullable + ${jackson-databind-nullable-version} + + + javax.ws.rs + jsr311-api + ${jsr311-api-version} + + + javax.ws.rs + javax.ws.rs-api + ${javax.ws.rs-api-version} + + + + org.junit.platform + junit-platform-runner + ${junit-platform-runner.version} + test + + + org.mockito + mockito-core + ${mockito-core-version} + test + + + + + + + + org.openapitools + openapi-generator-maven-plugin + 6.5.0 + + + + generate + + + ${project.basedir}/src/main/resources/flink_sql_gateway_rest_v2.yml + java + + org.apache.streampark.gateway.flink.v2.client.rest + org.apache.streampark.gateway.flink.v2.client.rest + org.apache.streampark.gateway.flink.v2.client.dto + false + false + + src/gen/java/main + + + + + + + + + diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java new file mode 100644 index 0000000000..602b0c2d48 --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.gateway.flink; + +import org.apache.streampark.gateway.CompleteStatementRequestBody; +import org.apache.streampark.gateway.ExecutionConfiguration; +import org.apache.streampark.gateway.OperationHandle; +import org.apache.streampark.gateway.OperationStatus; +import org.apache.streampark.gateway.exception.SqlGatewayException; +import org.apache.streampark.gateway.flink.v2.client.dto.ColumnInfo; +import org.apache.streampark.gateway.flink.v2.client.dto.ExecuteStatementRequestBody; +import org.apache.streampark.gateway.flink.v2.client.dto.FetchResultsResponseBody; +import org.apache.streampark.gateway.flink.v2.client.dto.GetInfoResponseBody; +import org.apache.streampark.gateway.flink.v2.client.dto.OpenSessionRequestBody; +import org.apache.streampark.gateway.flink.v2.client.dto.OperationStatusResponseBody; +import org.apache.streampark.gateway.flink.v2.client.dto.ResultType; +import org.apache.streampark.gateway.flink.v2.client.dto.RowFormat; +import org.apache.streampark.gateway.flink.v2.client.rest.ApiClient; +import org.apache.streampark.gateway.flink.v2.client.rest.ApiException; +import org.apache.streampark.gateway.flink.v2.client.rest.DefaultApi; +import org.apache.streampark.gateway.results.Column; +import org.apache.streampark.gateway.results.GatewayInfo; +import org.apache.streampark.gateway.results.OperationInfo; +import org.apache.streampark.gateway.results.ResultKind; +import org.apache.streampark.gateway.results.ResultQueryCondition; +import org.apache.streampark.gateway.results.ResultSet; +import org.apache.streampark.gateway.results.RowData; +import org.apache.streampark.gateway.service.SqlGatewayService; +import org.apache.streampark.gateway.session.SessionEnvironment; +import org.apache.streampark.gateway.session.SessionHandle; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.UUID; + +/** Implement {@link SqlGatewayService} with Flink native SqlGateway. */ +public class FlinkSqlGatewayImpl implements SqlGatewayService { + + private final DefaultApi defaultApi; + + public FlinkSqlGatewayImpl(String baseUri) { + ApiClient client = new ApiClient(); + client.setBasePath(baseUri); + defaultApi = new DefaultApi(client); + } + + @Override + public boolean check(String flinkMajorVersion) { + // flink gateway v1 api is supported from flink 1.16 + return Double.parseDouble(flinkMajorVersion) >= 1.16; + } + + @Override + public GatewayInfo getGatewayInfo() throws SqlGatewayException { + GetInfoResponseBody info = null; + try { + info = defaultApi.getInfo(); + return new GatewayInfo(info.getProductName(), info.getVersion()); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay getGatewayInfo failed!", e); + } + } + + @Override + public SessionHandle openSession(SessionEnvironment environment) throws SqlGatewayException { + try { + return new SessionHandle( + Objects.requireNonNull( + defaultApi + .openSession( + new OpenSessionRequestBody() + .sessionName(environment.getSessionName()) + .properties(environment.getSessionConfig())) + .getSessionHandle())); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay openSession failed!", e); + } + } + + @Override + public void heartbeat(SessionHandle sessionHandle) throws SqlGatewayException { + try { + defaultApi.triggerSession( + new org.apache.streampark.gateway.flink.v2.client.dto.SessionHandle() + .identifier(UUID.fromString(sessionHandle.getIdentifier()))); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay heartbeat failed!", e); + } + } + + @Override + public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException { + try { + defaultApi.closeSession( + new org.apache.streampark.gateway.flink.v2.client.dto.SessionHandle() + .identifier(UUID.fromString(sessionHandle.getIdentifier()))); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay closeSession failed!", e); + } + } + + @Override + public void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException { + try { + defaultApi.cancelOperation( + new org.apache.streampark.gateway.flink.v2.client.dto.SessionHandle() + .identifier(UUID.fromString(sessionHandle.getIdentifier())), + new org.apache.streampark.gateway.flink.v2.client.dto.OperationHandle() + .identifier(UUID.fromString(operationHandle.getIdentifier()))); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay cancelOperation failed!", e); + } + } + + @Override + public void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException { + try { + defaultApi.closeOperation( + new org.apache.streampark.gateway.flink.v2.client.dto.SessionHandle() + .identifier(UUID.fromString(sessionHandle.getIdentifier())), + new org.apache.streampark.gateway.flink.v2.client.dto.OperationHandle() + .identifier(UUID.fromString(operationHandle.getIdentifier()))); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay closeOperation failed!", e); + } + } + + @Override + public OperationInfo getOperationInfo( + SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException { + + try { + OperationStatusResponseBody operationStatus = + defaultApi.getOperationStatus( + new org.apache.streampark.gateway.flink.v2.client.dto.SessionHandle() + .identifier(UUID.fromString(sessionHandle.getIdentifier())), + new org.apache.streampark.gateway.flink.v2.client.dto.OperationHandle() + .identifier(UUID.fromString(operationHandle.getIdentifier()))); + return new OperationInfo(OperationStatus.valueOf(operationStatus.getStatus()), null); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay closeOperation failed!", e); + } + } + + @Override + public Column getOperationResultSchema( + SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException { + throw new SqlGatewayException( + "Flink native SqlGateWay don`t support operation:getOperationResultSchema!"); + } + + @Override + public OperationHandle executeStatement( + SessionHandle sessionHandle, + String statement, + long executionTimeoutMs, + ExecutionConfiguration executionConfig) + throws SqlGatewayException { + try { + return new OperationHandle( + Objects.requireNonNull( + defaultApi + .executeStatement( + new org.apache.streampark.gateway.flink.v2.client.dto.SessionHandle() + .identifier(UUID.fromString(sessionHandle.getIdentifier())), + new ExecuteStatementRequestBody() + .statement(statement) + // currently, sql gateway don't support execution timeout + // .executionTimeout(executionTimeoutMs) + .executionConfig(null)) + .getOperationHandle())); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay executeStatement failed!", e); + } + } + + @Override + public ResultSet fetchResults( + SessionHandle sessionHandle, + OperationHandle operationHandle, + ResultQueryCondition resultQueryCondition) + throws SqlGatewayException { + try { + + List data = new ArrayList<>(); + List columns = new ArrayList<>(); + FetchResultsResponseBody fetchResultsResponseBody = + defaultApi.fetchResults( + new org.apache.streampark.gateway.flink.v2.client.dto.SessionHandle() + .identifier(UUID.fromString(sessionHandle.getIdentifier())), + new org.apache.streampark.gateway.flink.v2.client.dto.OperationHandle() + .identifier(UUID.fromString(operationHandle.getIdentifier())), + resultQueryCondition.getToken(), + RowFormat.JSON); + ResultType resultType = fetchResultsResponseBody.getResultType(); + Long nextToken = null; + if (fetchResultsResponseBody.getNextResultUri() != null) { + String nextResultUri = fetchResultsResponseBody.getNextResultUri(); + nextToken = Long.valueOf(nextResultUri.substring(nextResultUri.lastIndexOf("/") + 1)); + } + + org.apache.streampark.gateway.flink.v2.client.dto.ResultInfo results = + fetchResultsResponseBody.getResults(); + + List resultsColumns = results.getColumnInfos(); + List resultsData = + results.getData(); + + resultsColumns.forEach( + column -> + columns.add( + new Column( + column.getName(), column.getLogicalType().toJson(), column.getComment()))); + + // todo: currently, sql gateway don't support result data type + resultsData.forEach(row -> data.add(new RowData(row.getRowKind().getValue(), null))); + + ResultKind resultKind = + columns.size() == 1 && columns.get(0).getName().equals("result") + ? ResultKind.SUCCESS + : ResultKind.SUCCESS_WITH_CONTENT; + + return new ResultSet( + ResultSet.ResultType.valueOf(resultType.getValue()), + nextToken, + columns, + data, + true, + null, + resultKind); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay fetchResults failed!", e); + } + } + + @Override + public List completeStatement( + SessionHandle sessionHandle, CompleteStatementRequestBody completeStatementRequestBody) + throws SqlGatewayException { + org.apache.streampark.gateway.flink.v2.client.dto.CompleteStatementRequestBody + completeStatementRequestBody1 = + new org.apache.streampark.gateway.flink.v2.client.dto.CompleteStatementRequestBody(); + try { + return defaultApi + .completeStatement( + new org.apache.streampark.gateway.flink.v2.client.dto.SessionHandle() + .identifier(UUID.fromString(sessionHandle.getIdentifier())), + completeStatementRequestBody1) + .getCandidates(); + } catch (ApiException e) { + throw new SqlGatewayException("Flink native SqlGateWay completeStatement failed!", e); + } + } +} diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java new file mode 100644 index 0000000000..25b9a5e65d --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.gateway.flink; + +import org.apache.streampark.gateway.ConfigOption; +import org.apache.streampark.gateway.factories.SqlGatewayServiceFactory; +import org.apache.streampark.gateway.factories.SqlGatewayServiceFactoryUtils; +import org.apache.streampark.gateway.service.SqlGatewayService; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** Flink sql gateway's Factory for {@link SqlGatewayService}. */ +public class FlinkSqlGatewayServiceFactory implements SqlGatewayServiceFactory { + + public static final ConfigOption BASE_URI = + ConfigOption.key("base-uri") + .stringType() + .noDefaultValue() + .withDescription("The base uri of the flink cluster."); + + @Override + public String factoryIdentifier() { + return "flink-v2"; + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(BASE_URI); + return options; + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + + @Override + public SqlGatewayService createSqlGatewayService(Context context) { + SqlGatewayServiceFactoryUtils.EndpointFactoryHelper helper = + SqlGatewayServiceFactoryUtils.createEndpointFactoryHelper(this, context); + helper.validate(); + String baseUri = context.getGateWayServiceOptions().get(BASE_URI.getKey()); + return new FlinkSqlGatewayImpl(baseUri); + } +} diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory new file mode 100644 index 0000000000..bbefc2c25a --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.streampark.gateway.flink.FlinkSqlGatewayServiceFactory diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/resources/flink_sql_gateway_rest_v2.yml b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/resources/flink_sql_gateway_rest_v2.yml new file mode 100644 index 0000000000..b48d2f4f9a --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/main/resources/flink_sql_gateway_rest_v2.yml @@ -0,0 +1,608 @@ +openapi: 3.0.1 +info: + title: Flink SQL Gateway REST API + contact: + email: user@flink.apache.org + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0.html + version: v2/1.17-SNAPSHOT +paths: + /api_versions: + get: + description: Get the current available versions for the Rest Endpoint. The client + can choose one of the return version as the protocol for later communicate. + operationId: getApiVersion + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/GetApiVersionResponseBody' + /info: + get: + description: Get meta data for this cluster. + operationId: getInfo + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/GetInfoResponseBody' + /sessions: + post: + description: Opens a new session with specific properties. Specific properties + can be given for current session which will override the default properties + of gateway. + operationId: openSession + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/OpenSessionRequestBody' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OpenSessionResponseBody' + /sessions/{session_handle}: + get: + description: Get the session configuration. + operationId: getSessionConfig + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/GetSessionConfigResponseBody' + delete: + description: Closes the specific session. + operationId: closeSession + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/CloseSessionResponseBody' + /sessions/{session_handle}/complete-statement: + get: + description: Get the completion hints for the given statement at the given position. + operationId: completeStatement + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/CompleteStatementRequestBody' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/CompleteStatementResponseBody' + /sessions/{session_handle}/configure-session: + post: + description: |- + Configures the session with the statement which could be: + CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, CREATE VIEW, DROP VIEW, LOAD MODULE, UNLOAD MODULE, USE MODULE, ADD JAR. + operationId: configureSession + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ConfigureSessionRequestBody' + responses: + "200": + description: The request was successful. + /sessions/{session_handle}/heartbeat: + post: + description: "Trigger heartbeat to tell the server that the client is active,\ + \ and to keep the session alive as long as configured timeout value." + operationId: triggerSession + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + responses: + "200": + description: The request was successful. + /sessions/{session_handle}/operations/{operation_handle}/cancel: + post: + description: Cancel the operation. + operationId: cancelOperation + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + - name: operation_handle + in: path + description: The OperationHandle that identifies a operation. + required: true + schema: + $ref: '#/components/schemas/OperationHandle' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OperationStatusResponseBody' + /sessions/{session_handle}/operations/{operation_handle}/close: + delete: + description: Close the operation. + operationId: closeOperation + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + - name: operation_handle + in: path + description: The OperationHandle that identifies a operation. + required: true + schema: + $ref: '#/components/schemas/OperationHandle' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OperationStatusResponseBody' + /sessions/{session_handle}/operations/{operation_handle}/result/{token}: + get: + description: Fetch results of Operation. + operationId: fetchResults + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + - name: operation_handle + in: path + description: The OperationHandle that identifies a operation. + required: true + schema: + $ref: '#/components/schemas/OperationHandle' + - name: token + in: path + description: The OperationHandle that identifies a operation. + required: true + schema: + type: integer + format: int64 + - name: rowFormat + in: query + description: The row format to serialize the RowData. + required: true + style: form + schema: + $ref: '#/components/schemas/RowFormat' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/FetchResultsResponseBody' + /sessions/{session_handle}/operations/{operation_handle}/status: + get: + description: Get the status of operation. + operationId: getOperationStatus + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + - name: operation_handle + in: path + description: The OperationHandle that identifies a operation. + required: true + schema: + $ref: '#/components/schemas/OperationHandle' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/OperationStatusResponseBody' + /sessions/{session_handle}/statements: + post: + description: Execute a statement. + operationId: executeStatement + parameters: + - name: session_handle + in: path + description: The SessionHandle that identifies a session. + required: true + schema: + $ref: '#/components/schemas/SessionHandle' + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ExecuteStatementRequestBody' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/ExecuteStatementResponseBody' +components: + schemas: + CloseSessionResponseBody: + type: object + properties: + status: + type: string + Column: + type: object + properties: + comment: + type: string + dataType: + $ref: '#/components/schemas/DataType' + name: + type: string + persisted: + type: boolean + physical: + type: boolean + ColumnInfo: + type: object + properties: + comment: + type: string + logicalType: + $ref: '#/components/schemas/LogicalType' + name: + type: string + CompleteStatementRequestBody: + type: object + properties: + position: + type: integer + format: int32 + statement: + type: string + CompleteStatementResponseBody: + type: object + properties: + candidates: + type: array + items: + type: string + ConfigureSessionRequestBody: + type: object + properties: + executionTimeout: + type: integer + format: int64 + statement: + type: string + ConstraintType: + type: string + enum: + - PRIMARY_KEY + - UNIQUE_KEY + DataType: + type: object + properties: + children: + type: array + items: + $ref: '#/components/schemas/DataType' + logicalType: + $ref: '#/components/schemas/LogicalType' + ExecuteStatementRequestBody: + type: object + properties: + executionConfig: + type: object + additionalProperties: + type: string + executionTimeout: + type: integer + format: int64 + statement: + type: string + ExecuteStatementResponseBody: + type: object + properties: + operationHandle: + type: string + Expression: + type: object + FetchResultsResponseBody: + type: object + properties: + jobID: + $ref: '#/components/schemas/JobID' + nextResultUri: + type: string + queryResult: + type: boolean + resultKind: + $ref: '#/components/schemas/ResultKind' + resultType: + $ref: '#/components/schemas/ResultType' + results: + $ref: '#/components/schemas/ResultInfo' + FieldGetter: + type: object + GetApiVersionResponseBody: + type: object + properties: + versions: + type: array + items: + type: string + GetInfoResponseBody: + type: object + properties: + productName: + type: string + version: + type: string + GetSessionConfigResponseBody: + type: object + properties: + properties: + type: object + additionalProperties: + type: string + IntermediateDataSetID: + pattern: "[0-9a-f]{32}" + type: string + JobID: + pattern: "[0-9a-f]{32}" + type: string + JobVertexID: + pattern: "[0-9a-f]{32}" + type: string + LogicalType: + type: object + properties: + children: + type: array + items: + $ref: '#/components/schemas/LogicalType' + nullable: + type: boolean + typeRoot: + $ref: '#/components/schemas/LogicalTypeRoot' + LogicalTypeRoot: + type: string + enum: + - CHAR + - VARCHAR + - BOOLEAN + - BINARY + - VARBINARY + - DECIMAL + - TINYINT + - SMALLINT + - INTEGER + - BIGINT + - FLOAT + - DOUBLE + - DATE + - TIME_WITHOUT_TIME_ZONE + - TIMESTAMP_WITHOUT_TIME_ZONE + - TIMESTAMP_WITH_TIME_ZONE + - TIMESTAMP_WITH_LOCAL_TIME_ZONE + - INTERVAL_YEAR_MONTH + - INTERVAL_DAY_TIME + - ARRAY + - MULTISET + - MAP + - ROW + - DISTINCT_TYPE + - STRUCTURED_TYPE + - "NULL" + - RAW + - SYMBOL + - UNRESOLVED + OpenSessionRequestBody: + type: object + properties: + properties: + type: object + additionalProperties: + type: string + sessionName: + type: string + OpenSessionResponseBody: + type: object + properties: + sessionHandle: + type: string + OperationHandle: + type: object + properties: + identifier: + type: string + format: uuid + OperationStatusResponseBody: + type: object + properties: + status: + type: string + ResolvedExpression: + type: object + properties: + children: + type: array + items: + $ref: '#/components/schemas/Expression' + outputDataType: + $ref: '#/components/schemas/DataType' + resolvedChildren: + type: array + items: + $ref: '#/components/schemas/ResolvedExpression' + ResolvedSchema: + type: object + properties: + columnCount: + type: integer + format: int32 + columnDataTypes: + type: array + items: + $ref: '#/components/schemas/DataType' + columnNames: + type: array + items: + type: string + columns: + type: array + items: + $ref: '#/components/schemas/Column' + primaryKey: + $ref: '#/components/schemas/UniqueConstraint' + primaryKeyIndexes: + type: array + items: + type: integer + format: int32 + watermarkSpecs: + type: array + items: + $ref: '#/components/schemas/WatermarkSpec' + ResourceID: + pattern: "[0-9a-f]{32}" + type: string + ResultInfo: + type: object + properties: + columnInfos: + type: array + items: + $ref: '#/components/schemas/ColumnInfo' + data: + type: array + items: + $ref: '#/components/schemas/RowData' + fieldGetters: + type: array + items: + $ref: '#/components/schemas/FieldGetter' + resultSchema: + $ref: '#/components/schemas/ResolvedSchema' + rowFormat: + $ref: '#/components/schemas/RowFormat' + ResultKind: + type: string + enum: + - SUCCESS + - SUCCESS_WITH_CONTENT + ResultType: + type: string + enum: + - NOT_READY + - PAYLOAD + - EOS + RowData: + type: object + properties: + arity: + type: integer + format: int32 + rowKind: + $ref: '#/components/schemas/RowKind' + RowFormat: + type: string + enum: + - JSON + - PLAIN_TEXT + RowKind: + type: string + enum: + - INSERT + - UPDATE_BEFORE + - UPDATE_AFTER + - DELETE + SerializedThrowable: + type: object + properties: + serialized-throwable: + type: string + format: binary + SessionHandle: + type: object + properties: + identifier: + type: string + format: uuid + TriggerId: + pattern: "[0-9a-f]{32}" + type: string + UniqueConstraint: + type: object + properties: + columns: + type: array + items: + type: string + enforced: + type: boolean + name: + type: string + type: + $ref: '#/components/schemas/ConstraintType' + WatermarkSpec: + type: object + properties: + rowtimeAttribute: + type: string + watermarkExpression: + $ref: '#/components/schemas/ResolvedExpression' diff --git a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java new file mode 100644 index 0000000000..debe60862a --- /dev/null +++ b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v2/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.gateway.flink; + +import org.apache.streampark.gateway.flink.v2.client.dto.OpenSessionRequestBody; +import org.apache.streampark.gateway.flink.v2.client.dto.OpenSessionResponseBody; +import org.apache.streampark.gateway.flink.v2.client.rest.ApiClient; +import org.apache.streampark.gateway.flink.v2.client.rest.ApiException; +import org.apache.streampark.gateway.flink.v2.client.rest.DefaultApi; + +import java.util.Collections; + +public class FlinkSqlGateway { + + private FlinkSqlGateway() {} + + public static DefaultApi sqlGatewayApi(String basePath) { + ApiClient client = new ApiClient(); + client.setBasePath(basePath); + return new DefaultApi(client); + } + + public static void main(String[] args) throws ApiException { + DefaultApi api = new DefaultApi(new ApiClient()); + OpenSessionResponseBody openSessionResponseBody = + api.openSession( + new OpenSessionRequestBody() + .sessionName("example") + .properties(Collections.singletonMap("foo", "bar"))); + } +}