From 23d56bca5b33bcd04e1adb5d09d4849cc93ea178 Mon Sep 17 00:00:00 2001 From: gongzhongqiang Date: Fri, 21 Jul 2023 17:42:14 +0800 Subject: [PATCH] Polish code --- .../flink/catalog/MysqlCatalog.java | 140 ++++++++++++------ 1 file changed, 94 insertions(+), 46 deletions(-) diff --git a/streampark-flink/streampark-flink-catalog-mysql/src/main/java/org/apache/streampark/flink/catalog/MysqlCatalog.java b/streampark-flink/streampark-flink-catalog-mysql/src/main/java/org/apache/streampark/flink/catalog/MysqlCatalog.java index d4ee04c606..62166da779 100644 --- a/streampark-flink/streampark-flink-catalog-mysql/src/main/java/org/apache/streampark/flink/catalog/MysqlCatalog.java +++ b/streampark-flink/streampark-flink-catalog-mysql/src/main/java/org/apache/streampark/flink/catalog/MysqlCatalog.java @@ -19,6 +19,8 @@ import org.apache.streampark.flink.catalog.utils.Constants; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -65,6 +67,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Wrapper; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -103,7 +106,6 @@ public MysqlCatalog(String name, String jdbcUrl, String username, String passwor @Override public void open() throws CatalogException { - dataSource = new HikariDataSource(hikariConfig); Integer defaultDbId = getDatabaseId(Constants.DEFAULT_DATABASE); // check default database, if not exist, create it. @@ -132,57 +134,61 @@ public void close() throws CatalogException { @Override public List listDatabases() throws CatalogException { List myDatabases = new ArrayList<>(); - String querySql = "SELECT database_name FROM metadata_database"; - try (Connection conn = dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement(querySql)) { - - ResultSet rs = ps.executeQuery(); - while (rs.next()) { - String dbName = rs.getString(1); - myDatabases.add(dbName); - } - - return myDatabases; + try { + String querySql = "SELECT database_name FROM metadata_database"; + jdbcQuery( + querySql, + null, + resultSet -> { + while (resultSet.next()) { + String dbName = resultSet.getString(1); + myDatabases.add(dbName); + } + }); } catch (Exception e) { throw new CatalogException( String.format("Failed listing database in catalog %s", getName()), e); } + return myDatabases; } @Override public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { - String querySql = - "SELECT id, database_name,comment " + " FROM metadata_database where database_name=?"; - try (Connection conn = dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement(querySql)) { - ps.setString(1, databaseName); - ResultSet rs = ps.executeQuery(); - if (rs.next()) { - int id = rs.getInt("id"); - String comment = rs.getString("comment"); + Map map = new HashMap<>(); + + String sql = "SELECT id,database_name,comment FROM metadata_database where database_name=?"; + try { + Tuple2 tuple2 = new Tuple2<>(); + jdbcQuery( + sql, + s -> s.setString(1, databaseName), + r -> { + if (r.next()) { + tuple2.setFields(r.getInt("id"), r.getString("comment")); + } else { + throw new DatabaseNotExistException(getName(), databaseName); + } + }); - Map map = new HashMap<>(); + sql = "select `key`,`value` from metadata_database_property where database_id=? "; - String sql = - "select `key`,`value` " + "from metadata_database_property " + "where database_id=? "; - try (PreparedStatement pStat = conn.prepareStatement(sql)) { - pStat.setInt(1, id); - ResultSet prs = pStat.executeQuery(); - while (prs.next()) { - map.put(prs.getString("key"), prs.getString("value")); - } - } catch (SQLException e) { - throw new CatalogException( - String.format("Failed get database properties in catalog %s", getName()), e); - } - - return new CatalogDatabaseImpl(map, comment); - } else { - throw new DatabaseNotExistException(getName(), databaseName); + try { + jdbcQuery( + sql, + s -> s.setInt(1, tuple2.f0), + r -> { + while (r.next()) { + map.put(r.getString("key"), r.getString("value")); + } + }); + return new CatalogDatabaseImpl(map, tuple2.f1); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed get database properties in catalog %s", getName()), e); } - } catch (SQLException e) { + } catch (Exception e) { throw new CatalogException(String.format("Failed get database in catalog %s", getName()), e); } } @@ -194,15 +200,17 @@ public boolean databaseExists(String databaseName) throws CatalogException { private Integer getDatabaseId(String databaseName) throws CatalogException { String querySql = "select id from metadata_database where database_name=?"; - try (Connection conn = dataSource.getConnection(); - PreparedStatement ps = conn.prepareStatement(querySql)) { - ps.setString(1, databaseName); - ResultSet rs = ps.executeQuery(); - return rs.next() ? rs.getInt("id") : null; - } catch (SQLException e) { + Tuple1 tuple1 = new Tuple1<>(); + try { + jdbcQuery( + querySql, + statement -> statement.setString(1, databaseName), + resultSet -> tuple1.setField(resultSet.next() ? resultSet.getInt("id") : null, 0)); + } catch (Exception e) { throw new CatalogException( String.format("Failed get database id in catalog %s", getName()), e); } + return tuple1.f0; } @Override @@ -217,7 +225,6 @@ public void createDatabase(String databaseName, CatalogDatabase db, boolean igno } } else { String insertSql = "insert into metadata_database(database_name, comment) values(?, ?)"; - try (Connection conn = dataSource.getConnection(); PreparedStatement stat = conn.prepareStatement(insertSql, Statement.RETURN_GENERATED_KEYS)) { @@ -982,4 +989,45 @@ public void alterPartitionColumnStatistics( throws PartitionNotExistException, CatalogException { throw new UnsupportedOperationException(); } + + private void jdbcQuery( + String sql, + JdbcCallBack statementCallBack, + JdbcCallBack resultSetCallBack) + throws Exception { + Connection connection = dataSource.getConnection(); + PreparedStatement statement = connection.prepareStatement(sql); + if (statementCallBack != null) { + statementCallBack.call(statement); + } + ResultSet resultSet = statement.executeQuery(sql); + if (resultSet != null) { + resultSetCallBack.call(resultSet); + } + close(resultSet, statement, connection); + } + + private int jdbcUpdate(String sql, JdbcCallBack statementCallBack) + throws Exception { + Connection connection = dataSource.getConnection(); + PreparedStatement statement = connection.prepareStatement(sql); + if (statementCallBack != null) { + statementCallBack.call(statement); + } + int result = statement.executeUpdate(sql); + close(statement, connection); + return result; + } + + public interface JdbcCallBack { + void call(T wrapper) throws Exception; + } + + public void close(AutoCloseable... closes) throws Exception { + for (AutoCloseable closeable : closes) { + if (closeable != null) { + closeable.close(); + } + } + } }