Skip to content

Commit

Permalink
Implement flink gateway v2 api
Browse files Browse the repository at this point in the history
  • Loading branch information
GOODBOY008 committed Jul 31, 2023
1 parent b71e094 commit 01fd44d
Show file tree
Hide file tree
Showing 26 changed files with 1,483 additions and 113 deletions.
8 changes: 8 additions & 0 deletions streampark-console/streampark-console-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@
<version>${project.version}</version>
</dependency>

<!--region Gateway dependency-->
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-sql-gateway-base</artifactId>
Expand All @@ -383,6 +384,13 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-sql-gateway-flink-v2</artifactId>
<version>${project.version}</version>
</dependency>
<!--endregion-->

<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.module.scala.DefaultScalaModule;
Expand Down Expand Up @@ -57,4 +58,8 @@ public static <T> T read(String json, TypeReference<T> typeReference)
public static String write(Object object) throws JsonProcessingException {
return MAPPER.writeValueAsString(object);
}

public static JsonNode readTree(String json) throws JsonProcessingException {
return MAPPER.readTree(json);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.annotation.ApiAccess;
import org.apache.streampark.console.core.service.SqlWorkBenchService;
import org.apache.streampark.gateway.CompleteStatementRequestBody;
import org.apache.streampark.gateway.results.ResultQueryCondition;
import org.apache.streampark.gateway.session.SessionHandle;

Expand Down Expand Up @@ -186,6 +187,21 @@ public RestResponse fetchResults(
flinkGatewayId, sessionHandle, operationHandle, resultQueryCondition));
}

@ApiAccess
@ApiOperation(
value = "Complete statement",
notes = "Complete statement",
tags = "FLINK_GATEWAY_TAG")
@PostMapping("sessions/{sessionHandle}/statements/complete")
public RestResponse completeStatement(
@PathVariable Long flinkGatewayId,
@PathVariable String sessionHandle,
@RequestBody CompleteStatementRequestBody completeStatementRequestBody) {
return RestResponse.success(
sqlWorkBenchService.completeStatement(
flinkGatewayId, sessionHandle, completeStatementRequestBody));
}

// -------------------------------------------------------------------------------------------
// Catalog API
// -------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.console.core.service;

import org.apache.streampark.gateway.CompleteStatementRequestBody;
import org.apache.streampark.gateway.OperationHandle;
import org.apache.streampark.gateway.results.Column;
import org.apache.streampark.gateway.results.GatewayInfo;
Expand All @@ -25,6 +26,8 @@
import org.apache.streampark.gateway.results.ResultSet;
import org.apache.streampark.gateway.session.SessionHandle;

import java.util.List;

public interface SqlWorkBenchService {

/**
Expand Down Expand Up @@ -103,6 +106,19 @@ Column getOperationResultSchema(
OperationHandle executeStatement(
Long flinkGatewayId, String sessionHandleUUIDStr, String statement);

/**
* Get the completion hints for the given statement at the given position.
*
* @param flinkGatewayId flink gateway id
* @param sessionHandleUUIDStr session handle uuid string
* @param completeStatementRequestBody complete statement request body
* @return completion hints
*/
List<String> completeStatement(
Long flinkGatewayId,
String sessionHandleUUIDStr,
CompleteStatementRequestBody completeStatementRequestBody);

/**
* Fetch results
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,21 @@
import org.apache.streampark.console.core.enums.GatewayTypeEnum;
import org.apache.streampark.console.core.mapper.FlinkGateWayMapper;
import org.apache.streampark.console.core.service.FlinkGateWayService;
import org.apache.streampark.gateway.flink.client.dto.GetApiVersionResponseBody;

import org.apache.hc.client5.http.config.RequestConfig;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

@Slf4j
Expand Down Expand Up @@ -72,16 +76,27 @@ public boolean existsByGatewayName(String name) {

@Override
public GatewayTypeEnum getGatewayVersion(String address) {
// change to use SqlGatewayService to get version in future
String restUrl = address + "/api_versions";
try {
String result =
HttpClientUtils.httpGetRequest(
restUrl,
RequestConfig.custom().setConnectTimeout(2000, TimeUnit.MILLISECONDS).build());
if (result != null) {
String versionStr =
JacksonUtils.read(result, GetApiVersionResponseBody.class).getVersions().get(0);
return "V1".equals(versionStr) ? GatewayTypeEnum.FLINK_V1 : GatewayTypeEnum.FLINK_V2;
List<String> versions = new ArrayList<>();
JsonNode jsonNode = JacksonUtils.readTree(result);
Optional.ofNullable(jsonNode.get("versions"))
.filter(JsonNode::isArray)
.map(ArrayNode.class::cast)
.ifPresent(
arrayNode -> arrayNode.elements().forEachRemaining(e -> versions.add(e.asText())));
// Currently, we only support V1 and V2. Flink 1.17 will return both V1 and V2, so we need
// to get the last one.
if (versions.size() > 0) {
String versionStr = versions.get(versions.size() - 1);
return "V1".equals(versionStr) ? GatewayTypeEnum.FLINK_V1 : GatewayTypeEnum.FLINK_V2;
}
}
} catch (Exception e) {
log.error("get gateway version failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
import org.apache.streampark.flink.kubernetes.ingress.IngressController;
import org.apache.streampark.gateway.CompleteStatementRequestBody;
import org.apache.streampark.gateway.OperationHandle;
import org.apache.streampark.gateway.factories.FactoryUtil;
import org.apache.streampark.gateway.factories.SqlGatewayServiceFactoryUtils;
import org.apache.streampark.gateway.flink.FlinkSqlGatewayServiceFactory;
import org.apache.streampark.gateway.results.Column;
import org.apache.streampark.gateway.results.GatewayInfo;
import org.apache.streampark.gateway.results.OperationInfo;
Expand All @@ -55,10 +55,6 @@
import java.util.Objects;
import java.util.UUID;

import static org.apache.streampark.common.enums.ExecutionMode.KUBERNETES_NATIVE_SESSION;
import static org.apache.streampark.common.enums.ExecutionMode.REMOTE;
import static org.apache.streampark.common.enums.ExecutionMode.YARN_SESSION;

@Slf4j
@Service
@RequiredArgsConstructor
Expand All @@ -79,7 +75,7 @@ private SqlGatewayService getSqlGateWayService(Long flinkGatewayId) {
config.put(
FactoryUtil.SQL_GATEWAY_SERVICE_TYPE.getKey(),
flinkGateWay.getGatewayType().getIdentifier());
config.put(FlinkSqlGatewayServiceFactory.BASE_URI.getKey(), flinkGateWay.getAddress());
config.put("base-uri", flinkGateWay.getAddress());
List<SqlGatewayService> actual = SqlGatewayServiceFactoryUtils.createSqlGatewayService(config);
if (actual.size() > 1) {
log.warn("There are more than one SqlGatewayService instance, please check your config");
Expand Down Expand Up @@ -187,6 +183,15 @@ public OperationHandle executeStatement(
.executeStatement(new SessionHandle(sessionHandleUUIDStr), statement, 10000L, null);
}

@Override
public List<String> completeStatement(
Long flinkGatewayId,
String sessionHandleUUIDStr,
CompleteStatementRequestBody completeStatementRequestBody) {
return getSqlGateWayService(flinkGatewayId)
.completeStatement(new SessionHandle(sessionHandleUUIDStr), completeStatementRequestBody);
}

@Override
public ResultSet fetchResults(
Long flinkGatewayId,
Expand Down
1 change: 1 addition & 0 deletions streampark-flink/streampark-flink-sql-gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<modules>
<module>streampark-flink-sql-gateway-base</module>
<module>streampark-flink-sql-gateway-flink-v1</module>
<module>streampark-flink-sql-gateway-flink-v2</module>
<module>streampark-flink-sql-gateway-kyuubi</module>
</modules>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.gateway;

import java.io.Serializable;
import java.util.Objects;

public class CompleteStatementRequestBody implements Serializable {

private Integer position;

private String statement;

public CompleteStatementRequestBody() {}

public CompleteStatementRequestBody(Integer position, String statement) {
this.position = position;
this.statement = statement;
}

public Integer getPosition() {
return position;
}

public String getStatement() {
return statement;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompleteStatementRequestBody that = (CompleteStatementRequestBody) o;
return Objects.equals(position, that.position) && Objects.equals(statement, that.statement);
}

@Override
public int hashCode() {
return Objects.hash(position, statement);
}

@Override
public String toString() {
return "CompleteStatementRequestBody{"
+ "position="
+ position
+ ", statement='"
+ statement
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.gateway.service;

import org.apache.streampark.gateway.CompleteStatementRequestBody;
import org.apache.streampark.gateway.ExecutionConfiguration;
import org.apache.streampark.gateway.OperationHandle;
import org.apache.streampark.gateway.OperationStatus;
Expand All @@ -29,6 +30,8 @@
import org.apache.streampark.gateway.session.SessionEnvironment;
import org.apache.streampark.gateway.session.SessionHandle;

import java.util.List;

/** A service of SQL gateway is responsible for handling requests from streampark console. */
public interface SqlGatewayService {

Expand Down Expand Up @@ -156,4 +159,15 @@ ResultSet fetchResults(
OperationHandle operationHandle,
ResultQueryCondition resultQueryCondition)
throws SqlGatewayException;

/**
* Get the completion hints for the given statement at the given position.
*
* @param sessionHandle handle to identify the session.
* @param completeStatementRequestBody completion hints request body.
* @return Returns the completion hints.
*/
List<String> completeStatement(
SessionHandle sessionHandle, CompleteStatementRequestBody completeStatementRequestBody)
throws SqlGatewayException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,35 +90,6 @@ public void testCreateUnknownService() {
SqlGatewayServiceFactory.class.getCanonicalName()));
}

/* @Test
public void testCreateServiceWithMissingOptions() {
Map<String, String> config = getDefaultConfig();
config.remove("sql-gateway.Service.mocked.host");
validateException(
config,
"One or more required options are missing.\n\n"
+ "Missing required options are:\n\n"
+ "host");
}*/

/* @Test
public void testCreateServiceWithUnconsumedOptions() {
Map<String, String> config = getDefaultConfig();
config.put("sql-gateway.Service.mocked.unconsumed-option", "error");
validateException(
config,
"Unsupported options found for 'mocked'.\n\n"
+ "Unsupported options:\n\n"
+ "unconsumed-option\n\n"
+ "Supported options:\n\n"
+ "description\n"
+ "host\n"
+ "id\n"
+ "port");
}*/

// --------------------------------------------------------------------------------------------

private void validateException(Map<String, String> config, String errorMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.gateway.utils;

import org.apache.streampark.gateway.CompleteStatementRequestBody;
import org.apache.streampark.gateway.ExecutionConfiguration;
import org.apache.streampark.gateway.OperationHandle;
import org.apache.streampark.gateway.exception.SqlGatewayException;
Expand All @@ -29,6 +30,8 @@
import org.apache.streampark.gateway.session.SessionEnvironment;
import org.apache.streampark.gateway.session.SessionHandle;

import java.util.List;

/** Mocked implementation of {@link SqlGatewayService}. */
public class FakeSqlGatewayService implements SqlGatewayService {

Expand Down Expand Up @@ -103,4 +106,11 @@ public ResultSet fetchResults(
throws SqlGatewayException {
throw new UnsupportedOperationException();
}

@Override
public List<String> completeStatement(
SessionHandle sessionHandle, CompleteStatementRequestBody completeStatementRequestBody)
throws SqlGatewayException {
throw new UnsupportedOperationException();
}
}
Loading

0 comments on commit 01fd44d

Please sign in to comment.