Skip to content

Commit

Permalink
[Feature][catalog] Streampark start job ship catalogstore plugin and …
Browse files Browse the repository at this point in the history
…database crud.
  • Loading branch information
Mrart committed Aug 22, 2024
1 parent edfb69a commit 71cd408
Show file tree
Hide file tree
Showing 43 changed files with 863 additions and 134 deletions.
5 changes: 2 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@
<scala.xml.version>1.3.0</scala.xml.version>
<scalatest.version>3.2.9</scalatest.version>
<scala.binary.flink.version>_${scala.binary.version}</scala.binary.flink.version>

<flink.version>1.14.4</flink.version>
<flink.connector.version>3.2.0-1.18</flink.connector.version>
<flink.version>1.18.1</flink.version>
<flink.shaded.version>1.8.1</flink.shaded.version>
<streampark.shaded.version>1.0.0</streampark.shaded.version>
<streampark.flink.shims.version>1.14</streampark.flink.shims.version>
Expand Down Expand Up @@ -139,7 +139,6 @@
<owasp-dependency-check-maven.version>10.0.2</owasp-dependency-check-maven.version>
<build-helper-maven-plugin.version>3.3.0</build-helper-maven-plugin.version>
<streampark.shaded.package>org.apache.streampark.shaded</streampark.shaded.package>
<flink.table.uber.artifact.id>flink-table-uber_${scala.binary.version}</flink.table.uber.artifact.id>
<httpclient5.version>5.1</httpclient5.version>
<lombok.version>1.18.24</lombok.version>
<jupiter.version>5.9.1</jupiter.version>
Expand Down
7 changes: 7 additions & 0 deletions streampark-console/streampark-console-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,13 @@
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-connector-plugin</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.module.scala.DefaultScalaModule;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Map;

/** Serialization utils */
public final class JacksonUtils {
Expand All @@ -41,6 +43,7 @@ private JacksonUtils() {
static {
MAPPER = new ObjectMapper();
MAPPER.registerModule(new DefaultScalaModule());
MAPPER.setPropertyNamingStrategy(PropertyNamingStrategies.KEBAB_CASE);
MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
Expand All @@ -67,4 +70,8 @@ public static boolean isValidJson(String jsonStr) {
return false;
}
}

public static Map<String, String> toMap(String jsonStr) throws JsonProcessingException {
return (Map<String, String>) MAPPER.readValue(jsonStr, Map.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,7 @@ public static File getAppClientDir() {
return getAppDir(CLIENT);
}

public static File getPluginDir() {
return getAppDir(PLUGINS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.bean;

import lombok.Data;

import javax.validation.constraints.NotBlank;

@Data
public class DatabaseParam {

@NotBlank(message = "invalid.databaseName")
private String name;

@NotBlank(message = "invalid.catalogId")
private Long catalogId;

private String catalogName;
private boolean ignoreIfExits;
private boolean cascade;
private String description;
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.controller;

import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.core.bean.DatabaseParam;
import org.apache.streampark.console.core.service.DatabaseService;

import org.apache.shiro.authz.annotation.RequiresPermissions;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.util.List;

@Slf4j
@Validated
@RestController
@RequestMapping("flink/database")
public class DatabaseController {}
public class DatabaseController {

@Autowired
DatabaseService databaseService;

@PostMapping("create")
@RequiresPermissions("database:create")
public RestResponse create(DatabaseParam databaseParam) throws IOException {
boolean saved = databaseService.createDatabase(databaseParam);
return RestResponse.success(saved);
}

@PostMapping("list")
@RequiresPermissions("database:view")
public RestResponse list(DatabaseParam databaseParam, RestRequest request) {
List<DatabaseParam> databaseParamList =
databaseService.listDatabases(databaseParam.getCatalogId());
return RestResponse.success(databaseParamList);
}

@PostMapping("delete")
@RequiresPermissions("database:delete")
public RestResponse remove(DatabaseParam databaseParam) {
boolean deleted = databaseService.dropDatabase(databaseParam);
return RestResponse.success(deleted);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Database {

private String name;

private Integer catalogId;

private String catalogName;

private String description;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ public interface CatalogMapper extends BaseMapper<FlinkCatalog> {

boolean existsByCatalogName(@Param("catalogName") String catalogName);

FlinkCatalog selectByCatalogName(@Param("catalogName") String catalogName);

IPage<FlinkCatalog> selectPage(Page<FlinkCatalog> page, @Param("catalog") FlinkCatalog catalog);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.mapper;

import org.apache.streampark.console.core.entity.Database;

import org.apache.ibatis.annotations.Mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

@Mapper
public interface DatabaseMapper extends BaseMapper<Database> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public interface CatalogService extends IService<FlinkCatalog> {
*/
IPage<FlinkCatalogParams> page(FlinkCatalogParams catalog, RestRequest request);

FlinkCatalog getCatalog(Long catalogId);

FlinkCatalog getCatalog(String catalogName);
/**
* update Catalog
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.service;

import org.apache.streampark.console.core.bean.DatabaseParam;
import org.apache.streampark.console.core.entity.Database;

import com.baomidou.mybatisplus.extension.service.IService;

import java.util.List;

public interface DatabaseService extends IService<Database> {

/**
* Checks if the specified database exists.
*
* @param databaseParam The database to check
* @return true if the database exists, false otherwise
*/
boolean databaseExists(DatabaseParam databaseParam);

/**
* Creates a new database given {@link Database}.
*
* @param databaseParam The {@link DatabaseParam} object that contains the detail of the created
* database
* @return true if the operation is successful, false otherwise
*/
boolean createDatabase(DatabaseParam databaseParam);

/**
* Lists databases given catalog id.
*
* @return The list of databases of given catalog
*/
List<DatabaseParam> listDatabases(Long catalogId);

/**
* Drops database given database name.
*
* @param databaseParam The dropping database
* @return true if the operation is successful, false otherwise
*/
boolean dropDatabase(DatabaseParam databaseParam);
}
Loading

0 comments on commit 71cd408

Please sign in to comment.