Skip to content

Commit

Permalink
Implement flink gateway v2 api
Browse files Browse the repository at this point in the history
  • Loading branch information
GOODBOY008 committed Jul 29, 2023
1 parent b71e094 commit 1ab1f0d
Show file tree
Hide file tree
Showing 12 changed files with 1,264 additions and 81 deletions.
1 change: 1 addition & 0 deletions streampark-flink/streampark-flink-sql-gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<modules>
<module>streampark-flink-sql-gateway-base</module>
<module>streampark-flink-sql-gateway-flink-v1</module>
<module>streampark-flink-sql-gateway-flink-v2</module>
<module>streampark-flink-sql-gateway-kyuubi</module>
</modules>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.apache.streampark.gateway;

import java.io.Serializable;
import java.util.Objects;

public class CompleteStatementRequestBody implements Serializable {

private final Integer position;

private final String statement;

public CompleteStatementRequestBody(Integer position, String statement) {
this.position = position;
this.statement = statement;
}

public Integer getPosition() {
return position;
}

public String getStatement() {
return statement;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompleteStatementRequestBody that = (CompleteStatementRequestBody) o;
return Objects.equals(position, that.position) && Objects.equals(statement, that.statement);
}

@Override
public int hashCode() {
return Objects.hash(position, statement);
}

@Override
public String toString() {
return "CompleteStatementRequestBody{"
+ "position="
+ position
+ ", statement='"
+ statement
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.gateway.service;

import org.apache.streampark.gateway.CompleteStatementRequestBody;
import org.apache.streampark.gateway.ExecutionConfiguration;
import org.apache.streampark.gateway.OperationHandle;
import org.apache.streampark.gateway.OperationStatus;
Expand All @@ -29,6 +30,8 @@
import org.apache.streampark.gateway.session.SessionEnvironment;
import org.apache.streampark.gateway.session.SessionHandle;

import java.util.List;

/** A service of SQL gateway is responsible for handling requests from streampark console. */
public interface SqlGatewayService {

Expand Down Expand Up @@ -156,4 +159,15 @@ ResultSet fetchResults(
OperationHandle operationHandle,
ResultQueryCondition resultQueryCondition)
throws SqlGatewayException;

/**
* Get the completion hints for the given statement at the given position.
*
* @param sessionHandle handle to identify the session.
* @param completeStatementRequestBody completion hints request body.
* @return Returns the completion hints.
*/
List<String> completeStatement(
SessionHandle sessionHandle, CompleteStatementRequestBody completeStatementRequestBody)
throws SqlGatewayException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,35 +90,6 @@ public void testCreateUnknownService() {
SqlGatewayServiceFactory.class.getCanonicalName()));
}

/* @Test
public void testCreateServiceWithMissingOptions() {
Map<String, String> config = getDefaultConfig();
config.remove("sql-gateway.Service.mocked.host");
validateException(
config,
"One or more required options are missing.\n\n"
+ "Missing required options are:\n\n"
+ "host");
}*/

/* @Test
public void testCreateServiceWithUnconsumedOptions() {
Map<String, String> config = getDefaultConfig();
config.put("sql-gateway.Service.mocked.unconsumed-option", "error");
validateException(
config,
"Unsupported options found for 'mocked'.\n\n"
+ "Unsupported options:\n\n"
+ "unconsumed-option\n\n"
+ "Supported options:\n\n"
+ "description\n"
+ "host\n"
+ "id\n"
+ "port");
}*/

// --------------------------------------------------------------------------------------------

private void validateException(Map<String, String> config, String errorMessage) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -60,7 +26,7 @@
</parent>

<artifactId>streampark-flink-sql-gateway-flink-v1</artifactId>
<name>StreamPark : Flink SQL Gateway 1.16</name>
<name>StreamPark : Flink SQL Gateway V1</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down Expand Up @@ -178,9 +144,9 @@
When use jdk11+, use the following line to generate native java code.
<library>native</library>
-->
<apiPackage>org.apache.streampark.gateway.flink.client.rest.v1</apiPackage>
<invokerPackage>org.apache.streampark.gateway.flink.client.rest</invokerPackage>
<modelPackage>org.apache.streampark.gateway.flink.client.dto</modelPackage>
<apiPackage>org.apache.streampark.gateway.flink.v1.client.rest</apiPackage>
<invokerPackage>org.apache.streampark.gateway.flink.v1.client.rest</invokerPackage>
<modelPackage>org.apache.streampark.gateway.flink.v1.client.dto</modelPackage>
<generateApiTests>false</generateApiTests>
<generateModelTests>false</generateModelTests>
<configOptions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@

package org.apache.streampark.gateway.flink;

import org.apache.streampark.gateway.CompleteStatementRequestBody;
import org.apache.streampark.gateway.ExecutionConfiguration;
import org.apache.streampark.gateway.OperationHandle;
import org.apache.streampark.gateway.OperationStatus;
import org.apache.streampark.gateway.exception.SqlGatewayException;
import org.apache.streampark.gateway.flink.client.dto.ExecuteStatementRequestBody;
import org.apache.streampark.gateway.flink.client.dto.FetchResultsResponseBody;
import org.apache.streampark.gateway.flink.client.dto.GetInfoResponseBody;
import org.apache.streampark.gateway.flink.client.dto.OpenSessionRequestBody;
import org.apache.streampark.gateway.flink.client.dto.OperationStatusResponseBody;
import org.apache.streampark.gateway.flink.client.dto.ResultSetColumnsInner;
import org.apache.streampark.gateway.flink.client.dto.ResultSetDataInner;
import org.apache.streampark.gateway.flink.client.rest.ApiClient;
import org.apache.streampark.gateway.flink.client.rest.ApiException;
import org.apache.streampark.gateway.flink.client.rest.v1.DefaultApi;
import org.apache.streampark.gateway.flink.v1.client.dto.ExecuteStatementRequestBody;
import org.apache.streampark.gateway.flink.v1.client.dto.FetchResultsResponseBody;
import org.apache.streampark.gateway.flink.v1.client.dto.GetInfoResponseBody;
import org.apache.streampark.gateway.flink.v1.client.dto.OpenSessionRequestBody;
import org.apache.streampark.gateway.flink.v1.client.dto.OperationStatusResponseBody;
import org.apache.streampark.gateway.flink.v1.client.dto.ResultSetColumnsInner;
import org.apache.streampark.gateway.flink.v1.client.dto.ResultSetDataInner;
import org.apache.streampark.gateway.flink.v1.client.rest.ApiClient;
import org.apache.streampark.gateway.flink.v1.client.rest.ApiException;
import org.apache.streampark.gateway.flink.v1.client.rest.DefaultApi;
import org.apache.streampark.gateway.results.Column;
import org.apache.streampark.gateway.results.GatewayInfo;
import org.apache.streampark.gateway.results.OperationInfo;
Expand Down Expand Up @@ -95,7 +96,7 @@ public SessionHandle openSession(SessionEnvironment environment) throws SqlGatew
public void heartbeat(SessionHandle sessionHandle) throws SqlGatewayException {
try {
defaultApi.triggerSession(
new org.apache.streampark.gateway.flink.client.dto.SessionHandle()
new org.apache.streampark.gateway.flink.v1.client.dto.SessionHandle()
.identifier(UUID.fromString(sessionHandle.getIdentifier())));
} catch (ApiException e) {
throw new SqlGatewayException("Flink native SqlGateWay heartbeat failed!", e);
Expand All @@ -116,9 +117,9 @@ public void cancelOperation(SessionHandle sessionHandle, OperationHandle operati
throws SqlGatewayException {
try {
defaultApi.cancelOperation(
new org.apache.streampark.gateway.flink.client.dto.SessionHandle()
new org.apache.streampark.gateway.flink.v1.client.dto.SessionHandle()
.identifier(UUID.fromString(sessionHandle.getIdentifier())),
new org.apache.streampark.gateway.flink.client.dto.OperationHandle()
new org.apache.streampark.gateway.flink.v1.client.dto.OperationHandle()
.identifier(UUID.fromString(operationHandle.getIdentifier())));
} catch (ApiException e) {
throw new SqlGatewayException("Flink native SqlGateWay cancelOperation failed!", e);
Expand Down Expand Up @@ -205,7 +206,7 @@ public ResultSet fetchResults(
nextToken = Long.valueOf(nextResultUri.substring(nextResultUri.lastIndexOf("/") + 1));
}

org.apache.streampark.gateway.flink.client.dto.ResultSet results =
org.apache.streampark.gateway.flink.v1.client.dto.ResultSet results =
fetchResultsResponseBody.getResults();

List<ResultSetColumnsInner> resultsColumns = results.getColumns();
Expand Down Expand Up @@ -236,4 +237,12 @@ public ResultSet fetchResults(
throw new SqlGatewayException("Flink native SqlGateWay fetchResults failed!", e);
}
}

@Override
public List<String> completeStatement(
SessionHandle sessionHandle, CompleteStatementRequestBody completeStatementRequestBody)
throws SqlGatewayException {
throw new SqlGatewayException(
"Flink native SqlGateWay don`t support operation:completeStatement!");
}
}
Loading

0 comments on commit 1ab1f0d

Please sign in to comment.