Skip to content

Commit

Permalink
Polish code
Browse files Browse the repository at this point in the history
  • Loading branch information
GOODBOY008 committed Jul 21, 2023
1 parent f4bf1b4 commit 23d56bc
Showing 1 changed file with 94 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.streampark.flink.catalog.utils.Constants;

import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand Down Expand Up @@ -65,6 +67,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Wrapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -103,7 +106,6 @@ public MysqlCatalog(String name, String jdbcUrl, String username, String passwor

@Override
public void open() throws CatalogException {

dataSource = new HikariDataSource(hikariConfig);
Integer defaultDbId = getDatabaseId(Constants.DEFAULT_DATABASE);
// check default database, if not exist, create it.
Expand Down Expand Up @@ -132,57 +134,61 @@ public void close() throws CatalogException {
@Override
public List<String> listDatabases() throws CatalogException {
List<String> myDatabases = new ArrayList<>();
String querySql = "SELECT database_name FROM metadata_database";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(querySql)) {

ResultSet rs = ps.executeQuery();
while (rs.next()) {
String dbName = rs.getString(1);
myDatabases.add(dbName);
}

return myDatabases;
try {
String querySql = "SELECT database_name FROM metadata_database";
jdbcQuery(
querySql,
null,
resultSet -> {
while (resultSet.next()) {
String dbName = resultSet.getString(1);
myDatabases.add(dbName);
}
});
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", getName()), e);
}
return myDatabases;
}

@Override
public CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException {
String querySql =
"SELECT id, database_name,comment " + " FROM metadata_database where database_name=?";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, databaseName);
ResultSet rs = ps.executeQuery();

if (rs.next()) {
int id = rs.getInt("id");
String comment = rs.getString("comment");
Map<String, String> map = new HashMap<>();

String sql = "SELECT id,database_name,comment FROM metadata_database where database_name=?";
try {
Tuple2<Integer, String> tuple2 = new Tuple2<>();
jdbcQuery(
sql,
s -> s.setString(1, databaseName),
r -> {
if (r.next()) {
tuple2.setFields(r.getInt("id"), r.getString("comment"));
} else {
throw new DatabaseNotExistException(getName(), databaseName);
}
});

Map<String, String> map = new HashMap<>();
sql = "select `key`,`value` from metadata_database_property where database_id=? ";

String sql =
"select `key`,`value` " + "from metadata_database_property " + "where database_id=? ";
try (PreparedStatement pStat = conn.prepareStatement(sql)) {
pStat.setInt(1, id);
ResultSet prs = pStat.executeQuery();
while (prs.next()) {
map.put(prs.getString("key"), prs.getString("value"));
}
} catch (SQLException e) {
throw new CatalogException(
String.format("Failed get database properties in catalog %s", getName()), e);
}

return new CatalogDatabaseImpl(map, comment);
} else {
throw new DatabaseNotExistException(getName(), databaseName);
try {
jdbcQuery(
sql,
s -> s.setInt(1, tuple2.f0),
r -> {
while (r.next()) {
map.put(r.getString("key"), r.getString("value"));
}
});
return new CatalogDatabaseImpl(map, tuple2.f1);
} catch (Exception e) {
throw new CatalogException(
String.format("Failed get database properties in catalog %s", getName()), e);
}
} catch (SQLException e) {
} catch (Exception e) {
throw new CatalogException(String.format("Failed get database in catalog %s", getName()), e);
}
}
Expand All @@ -194,15 +200,17 @@ public boolean databaseExists(String databaseName) throws CatalogException {

private Integer getDatabaseId(String databaseName) throws CatalogException {
String querySql = "select id from metadata_database where database_name=?";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(querySql)) {
ps.setString(1, databaseName);
ResultSet rs = ps.executeQuery();
return rs.next() ? rs.getInt("id") : null;
} catch (SQLException e) {
Tuple1<Integer> tuple1 = new Tuple1<>();
try {
jdbcQuery(
querySql,
statement -> statement.setString(1, databaseName),
resultSet -> tuple1.setField(resultSet.next() ? resultSet.getInt("id") : null, 0));
} catch (Exception e) {
throw new CatalogException(
String.format("Failed get database id in catalog %s", getName()), e);
}
return tuple1.f0;
}

@Override
Expand All @@ -217,7 +225,6 @@ public void createDatabase(String databaseName, CatalogDatabase db, boolean igno
}
} else {
String insertSql = "insert into metadata_database(database_name, comment) values(?, ?)";

try (Connection conn = dataSource.getConnection();
PreparedStatement stat =
conn.prepareStatement(insertSql, Statement.RETURN_GENERATED_KEYS)) {
Expand Down Expand Up @@ -982,4 +989,45 @@ public void alterPartitionColumnStatistics(
throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}

private void jdbcQuery(
String sql,
JdbcCallBack<PreparedStatement> statementCallBack,
JdbcCallBack<ResultSet> resultSetCallBack)
throws Exception {
Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(sql);
if (statementCallBack != null) {
statementCallBack.call(statement);
}
ResultSet resultSet = statement.executeQuery(sql);
if (resultSet != null) {
resultSetCallBack.call(resultSet);
}
close(resultSet, statement, connection);
}

private int jdbcUpdate(String sql, JdbcCallBack<PreparedStatement> statementCallBack)
throws Exception {
Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(sql);
if (statementCallBack != null) {
statementCallBack.call(statement);
}
int result = statement.executeUpdate(sql);
close(statement, connection);
return result;
}

public interface JdbcCallBack<T extends Wrapper> {
void call(T wrapper) throws Exception;
}

public void close(AutoCloseable... closes) throws Exception {
for (AutoCloseable closeable : closes) {
if (closeable != null) {
closeable.close();
}
}
}
}

0 comments on commit 23d56bc

Please sign in to comment.