From bbf47735baabcd1efe64fdcfaf2f3a9d3c9379f8 Mon Sep 17 00:00:00 2001 From: Qi Yu Date: Tue, 24 Sep 2024 15:12:52 +0800 Subject: [PATCH] [#4832] feat(core): Add basic framework to support multiple JDBC backend (#4998) ### What changes were proposed in this pull request? 1. Add a framework to support multiple JDBC backend 2. Modify all the mapper classes related to metalake, catalog, schema, filesetset, table, topic and use new framework, others will use another PR to avoid a large PR. ### Why are the changes needed? To support more JDBC storage backend. Fix: #4832 ### Does this PR introduce _any_ user-facing change? N/A ### How was this patch tested? Existing tests. --- .../storage/relational/JDBCBackend.java | 20 +- .../mapper/CatalogMetaBaseSQLProvider.java | 173 +++++++++++++++ .../relational/mapper/CatalogMetaMapper.java | 151 +++---------- .../mapper/CatalogMetaSQLProviderFactory.java | 97 +++++++++ .../mapper/FilesetMetaBaseSQLProvider.java | 200 ++++++++++++++++++ .../relational/mapper/FilesetMetaMapper.java | 178 +++------------- .../mapper/FilesetMetaSQLProviderFactory.java | 104 +++++++++ .../mapper/FilesetVersionBaseSQLProvider.java | 135 ++++++++++++ .../mapper/FilesetVersionMapper.java | 118 +++-------- .../FilesetVersionSQLProviderFactory.java | 93 ++++++++ .../mapper/MetalakeMetaBaseSQLProvider.java | 152 +++++++++++++ .../relational/mapper/MetalakeMetaMapper.java | 127 ++--------- .../MetalakeMetaSQLProviderFactory.java | 91 ++++++++ .../mapper/SchemaMetaBaseSQLProvider.java | 174 +++++++++++++++ .../relational/mapper/SchemaMetaMapper.java | 155 +++----------- .../mapper/SchemaMetaSQLProviderFactory.java | 98 +++++++++ .../mapper/TableMetaBaseSQLProvider.java | 178 ++++++++++++++++ .../relational/mapper/TableMetaMapper.java | 159 +++----------- .../mapper/TableMetaSQLProviderFactory.java | 102 +++++++++ .../mapper/TopicMetaBaseSQLProvider.java | 189 +++++++++++++++++ .../relational/mapper/TopicMetaMapper.java | 169 +++------------ .../mapper/TopicMetaSQLProviderFactory.java | 103 +++++++++ .../session/SqlSessionFactoryHelper.java | 6 +- 23 files changed, 2113 insertions(+), 859 deletions(-) create mode 100644 core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaBaseSQLProvider.java create mode 100644 core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java create mode 100644 core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaBaseSQLProvider.java create mode 100644 core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java create mode 100644 core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionBaseSQLProvider.java create mode 100644 core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionSQLProviderFactory.java create mode 100644 core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaBaseSQLProvider.java create mode 100644 core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java create mode 100644 core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaBaseSQLProvider.java create mode 100644 core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java create mode 100644 core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaBaseSQLProvider.java create mode 100644 core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java create mode 100644 core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaBaseSQLProvider.java create mode 100644 core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java index 81e111a289b..b23c7667388 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java @@ -417,9 +417,10 @@ public void insertRelation( } } - enum JDBCBackendType { + public enum JDBCBackendType { H2(true), - MYSQL(false); + MYSQL(false), + POSTGRESQL(false); private final boolean embedded; @@ -432,10 +433,25 @@ public static JDBCBackendType fromURI(String jdbcURI) { return JDBCBackendType.H2; } else if (jdbcURI.startsWith("jdbc:mysql")) { return JDBCBackendType.MYSQL; + } else if (jdbcURI.startsWith("jdbc:postgresql")) { + return JDBCBackendType.POSTGRESQL; } else { throw new IllegalArgumentException("Unknown JDBC URI: " + jdbcURI); } } + + public static JDBCBackendType fromString(String jdbcType) { + switch (jdbcType) { + case "h2": + return JDBCBackendType.H2; + case "mysql": + return JDBCBackendType.MYSQL; + case "postgresql": + return JDBCBackendType.POSTGRESQL; + default: + throw new IllegalArgumentException("Unknown JDBC type: " + jdbcType); + } + } } /** Start JDBC database if necessary. For example, start the H2 database if the backend is H2. */ diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaBaseSQLProvider.java new file mode 100644 index 00000000000..cce5783898f --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaBaseSQLProvider.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.mapper; + +import static org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper.TABLE_NAME; + +import org.apache.gravitino.storage.relational.po.CatalogPO; +import org.apache.ibatis.annotations.Param; + +public class CatalogMetaBaseSQLProvider { + public String listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "SELECT catalog_id as catalogId, catalog_name as catalogName," + + " metalake_id as metalakeId, type, provider," + + " catalog_comment as catalogComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String selectCatalogIdByMetalakeIdAndName( + @Param("metalakeId") Long metalakeId, @Param("catalogName") String name) { + return "SELECT catalog_id as catalogId FROM " + + TABLE_NAME + + " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0"; + } + + public String selectCatalogMetaByMetalakeIdAndName( + @Param("metalakeId") Long metalakeId, @Param("catalogName") String name) { + return "SELECT catalog_id as catalogId, catalog_name as catalogName," + + " metalake_id as metalakeId, type, provider," + + " catalog_comment as catalogComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0"; + } + + public String selectCatalogMetaById(@Param("catalogId") Long catalogId) { + return "SELECT catalog_id as catalogId, catalog_name as catalogName," + + " metalake_id as metalakeId, type, provider," + + " catalog_comment as catalogComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String insertCatalogMeta(@Param("catalogMeta") CatalogPO catalogPO) { + return "INSERT INTO " + + TABLE_NAME + + "(catalog_id, catalog_name, metalake_id," + + " type, provider, catalog_comment, properties, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{catalogMeta.catalogId}," + + " #{catalogMeta.catalogName}," + + " #{catalogMeta.metalakeId}," + + " #{catalogMeta.type}," + + " #{catalogMeta.provider}," + + " #{catalogMeta.catalogComment}," + + " #{catalogMeta.properties}," + + " #{catalogMeta.auditInfo}," + + " #{catalogMeta.currentVersion}," + + " #{catalogMeta.lastVersion}," + + " #{catalogMeta.deletedAt}" + + " )"; + } + + public String insertCatalogMetaOnDuplicateKeyUpdate(@Param("catalogMeta") CatalogPO catalogPO) { + return "INSERT INTO " + + TABLE_NAME + + "(catalog_id, catalog_name, metalake_id," + + " type, provider, catalog_comment, properties, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{catalogMeta.catalogId}," + + " #{catalogMeta.catalogName}," + + " #{catalogMeta.metalakeId}," + + " #{catalogMeta.type}," + + " #{catalogMeta.provider}," + + " #{catalogMeta.catalogComment}," + + " #{catalogMeta.properties}," + + " #{catalogMeta.auditInfo}," + + " #{catalogMeta.currentVersion}," + + " #{catalogMeta.lastVersion}," + + " #{catalogMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " catalog_name = #{catalogMeta.catalogName}," + + " metalake_id = #{catalogMeta.metalakeId}," + + " type = #{catalogMeta.type}," + + " provider = #{catalogMeta.provider}," + + " catalog_comment = #{catalogMeta.catalogComment}," + + " properties = #{catalogMeta.properties}," + + " audit_info = #{catalogMeta.auditInfo}," + + " current_version = #{catalogMeta.currentVersion}," + + " last_version = #{catalogMeta.lastVersion}," + + " deleted_at = #{catalogMeta.deletedAt}"; + } + + public String updateCatalogMeta( + @Param("newCatalogMeta") CatalogPO newCatalogPO, + @Param("oldCatalogMeta") CatalogPO oldCatalogPO) { + return "UPDATE " + + TABLE_NAME + + " SET catalog_name = #{newCatalogMeta.catalogName}," + + " metalake_id = #{newCatalogMeta.metalakeId}," + + " type = #{newCatalogMeta.type}," + + " provider = #{newCatalogMeta.provider}," + + " catalog_comment = #{newCatalogMeta.catalogComment}," + + " properties = #{newCatalogMeta.properties}," + + " audit_info = #{newCatalogMeta.auditInfo}," + + " current_version = #{newCatalogMeta.currentVersion}," + + " last_version = #{newCatalogMeta.lastVersion}," + + " deleted_at = #{newCatalogMeta.deletedAt}" + + " WHERE catalog_id = #{oldCatalogMeta.catalogId}" + + " AND catalog_name = #{oldCatalogMeta.catalogName}" + + " AND metalake_id = #{oldCatalogMeta.metalakeId}" + + " AND type = #{oldCatalogMeta.type}" + + " AND provider = #{oldCatalogMeta.provider}" + + " AND (catalog_comment = #{oldCatalogMeta.catalogComment} " + + " OR (catalog_comment IS NULL and #{oldCatalogMeta.catalogComment} IS NULL))" + + " AND properties = #{oldCatalogMeta.properties}" + + " AND audit_info = #{oldCatalogMeta.auditInfo}" + + " AND current_version = #{oldCatalogMeta.currentVersion}" + + " AND last_version = #{oldCatalogMeta.lastVersion}" + + " AND deleted_at = 0"; + } + + public String softDeleteCatalogMetasByCatalogId(@Param("catalogId") Long catalogId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String softDeleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String deleteCatalogMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}"; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaMapper.java index faedbcd9642..01cadbb6e8b 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaMapper.java @@ -21,11 +21,11 @@ import java.util.List; import org.apache.gravitino.storage.relational.po.CatalogPO; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; /** * A MyBatis Mapper for catalog meta operation SQLs. @@ -38,149 +38,50 @@ public interface CatalogMetaMapper { String TABLE_NAME = "catalog_meta"; - @Select( - "SELECT catalog_id as catalogId, catalog_name as catalogName," - + " metalake_id as metalakeId, type, provider," - + " catalog_comment as catalogComment, properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @SelectProvider(type = CatalogMetaSQLProviderFactory.class, method = "listCatalogPOsByMetalakeId") List listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId); - @Select( - "SELECT catalog_id as catalogId FROM " - + TABLE_NAME - + " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0") + @SelectProvider( + type = CatalogMetaSQLProviderFactory.class, + method = "selectCatalogIdByMetalakeIdAndName") Long selectCatalogIdByMetalakeIdAndName( @Param("metalakeId") Long metalakeId, @Param("catalogName") String name); - @Select( - "SELECT catalog_id as catalogId, catalog_name as catalogName," - + " metalake_id as metalakeId, type, provider," - + " catalog_comment as catalogComment, properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0") + @SelectProvider( + type = CatalogMetaSQLProviderFactory.class, + method = "selectCatalogMetaByMetalakeIdAndName") CatalogPO selectCatalogMetaByMetalakeIdAndName( @Param("metalakeId") Long metalakeId, @Param("catalogName") String name); - @Select( - "SELECT catalog_id as catalogId, catalog_name as catalogName," - + " metalake_id as metalakeId, type, provider," - + " catalog_comment as catalogComment, properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @SelectProvider(type = CatalogMetaSQLProviderFactory.class, method = "selectCatalogMetaById") CatalogPO selectCatalogMetaById(@Param("catalogId") Long catalogId); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(catalog_id, catalog_name, metalake_id," - + " type, provider, catalog_comment, properties, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{catalogMeta.catalogId}," - + " #{catalogMeta.catalogName}," - + " #{catalogMeta.metalakeId}," - + " #{catalogMeta.type}," - + " #{catalogMeta.provider}," - + " #{catalogMeta.catalogComment}," - + " #{catalogMeta.properties}," - + " #{catalogMeta.auditInfo}," - + " #{catalogMeta.currentVersion}," - + " #{catalogMeta.lastVersion}," - + " #{catalogMeta.deletedAt}" - + " )") + @InsertProvider(type = CatalogMetaSQLProviderFactory.class, method = "insertCatalogMeta") void insertCatalogMeta(@Param("catalogMeta") CatalogPO catalogPO); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(catalog_id, catalog_name, metalake_id," - + " type, provider, catalog_comment, properties, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{catalogMeta.catalogId}," - + " #{catalogMeta.catalogName}," - + " #{catalogMeta.metalakeId}," - + " #{catalogMeta.type}," - + " #{catalogMeta.provider}," - + " #{catalogMeta.catalogComment}," - + " #{catalogMeta.properties}," - + " #{catalogMeta.auditInfo}," - + " #{catalogMeta.currentVersion}," - + " #{catalogMeta.lastVersion}," - + " #{catalogMeta.deletedAt}" - + " )" - + " ON DUPLICATE KEY UPDATE" - + " catalog_name = #{catalogMeta.catalogName}," - + " metalake_id = #{catalogMeta.metalakeId}," - + " type = #{catalogMeta.type}," - + " provider = #{catalogMeta.provider}," - + " catalog_comment = #{catalogMeta.catalogComment}," - + " properties = #{catalogMeta.properties}," - + " audit_info = #{catalogMeta.auditInfo}," - + " current_version = #{catalogMeta.currentVersion}," - + " last_version = #{catalogMeta.lastVersion}," - + " deleted_at = #{catalogMeta.deletedAt}") + @InsertProvider( + type = CatalogMetaSQLProviderFactory.class, + method = "insertCatalogMetaOnDuplicateKeyUpdate") void insertCatalogMetaOnDuplicateKeyUpdate(@Param("catalogMeta") CatalogPO catalogPO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET catalog_name = #{newCatalogMeta.catalogName}," - + " metalake_id = #{newCatalogMeta.metalakeId}," - + " type = #{newCatalogMeta.type}," - + " provider = #{newCatalogMeta.provider}," - + " catalog_comment = #{newCatalogMeta.catalogComment}," - + " properties = #{newCatalogMeta.properties}," - + " audit_info = #{newCatalogMeta.auditInfo}," - + " current_version = #{newCatalogMeta.currentVersion}," - + " last_version = #{newCatalogMeta.lastVersion}," - + " deleted_at = #{newCatalogMeta.deletedAt}" - + " WHERE catalog_id = #{oldCatalogMeta.catalogId}" - + " AND catalog_name = #{oldCatalogMeta.catalogName}" - + " AND metalake_id = #{oldCatalogMeta.metalakeId}" - + " AND type = #{oldCatalogMeta.type}" - + " AND provider = #{oldCatalogMeta.provider}" - + " AND (catalog_comment = #{oldCatalogMeta.catalogComment} " - + " OR (catalog_comment IS NULL and #{oldCatalogMeta.catalogComment} IS NULL))" - + " AND properties = #{oldCatalogMeta.properties}" - + " AND audit_info = #{oldCatalogMeta.auditInfo}" - + " AND current_version = #{oldCatalogMeta.currentVersion}" - + " AND last_version = #{oldCatalogMeta.lastVersion}" - + " AND deleted_at = 0") + @UpdateProvider(type = CatalogMetaSQLProviderFactory.class, method = "updateCatalogMeta") Integer updateCatalogMeta( @Param("newCatalogMeta") CatalogPO newCatalogPO, @Param("oldCatalogMeta") CatalogPO oldCatalogPO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @UpdateProvider( + type = CatalogMetaSQLProviderFactory.class, + method = "softDeleteCatalogMetasByCatalogId") Integer softDeleteCatalogMetasByCatalogId(@Param("catalogId") Long catalogId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @UpdateProvider( + type = CatalogMetaSQLProviderFactory.class, + method = "softDeleteCatalogMetasByMetalakeId") Integer softDeleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId); - @Delete( - "DELETE FROM " - + TABLE_NAME - + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}") + @DeleteProvider( + type = CatalogMetaSQLProviderFactory.class, + method = "deleteCatalogMetasByLegacyTimeline") Integer deleteCatalogMetasByLegacyTimeline( @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit); } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java new file mode 100644 index 00000000000..5c0e63f531b --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.mapper; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; +import org.apache.gravitino.storage.relational.po.CatalogPO; +import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import org.apache.ibatis.annotations.Param; + +public class CatalogMetaSQLProviderFactory { + + private static final Map + METALAKE_META_SQL_PROVIDER_MAP = + ImmutableMap.of( + JDBCBackendType.MYSQL, new CatalogMetaMySQLProvider(), + JDBCBackendType.H2, new CatalogMetaH2Provider()); + + public static CatalogMetaBaseSQLProvider getProvider() { + String databaseId = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getDatabaseId(); + + JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId); + return METALAKE_META_SQL_PROVIDER_MAP.get(jdbcBackendType); + } + + static class CatalogMetaMySQLProvider extends CatalogMetaBaseSQLProvider {} + + static class CatalogMetaH2Provider extends CatalogMetaBaseSQLProvider {} + + public static String listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().listCatalogPOsByMetalakeId(metalakeId); + } + + public static String selectCatalogIdByMetalakeIdAndName( + @Param("metalakeId") Long metalakeId, @Param("catalogName") String name) { + return getProvider().selectCatalogIdByMetalakeIdAndName(metalakeId, name); + } + + public static String selectCatalogMetaByMetalakeIdAndName( + @Param("metalakeId") Long metalakeId, @Param("catalogName") String name) { + return getProvider().selectCatalogMetaByMetalakeIdAndName(metalakeId, name); + } + + public static String selectCatalogMetaById(@Param("catalogId") Long catalogId) { + return getProvider().selectCatalogMetaById(catalogId); + } + + public static String insertCatalogMeta(@Param("catalogMeta") CatalogPO catalogPO) { + return getProvider().insertCatalogMeta(catalogPO); + } + + public static String insertCatalogMetaOnDuplicateKeyUpdate( + @Param("catalogMeta") CatalogPO catalogPO) { + return getProvider().insertCatalogMetaOnDuplicateKeyUpdate(catalogPO); + } + + public static String updateCatalogMeta( + @Param("newCatalogMeta") CatalogPO newCatalogPO, + @Param("oldCatalogMeta") CatalogPO oldCatalogPO) { + return getProvider().updateCatalogMeta(newCatalogPO, oldCatalogPO); + } + + public static String softDeleteCatalogMetasByCatalogId(@Param("catalogId") Long catalogId) { + return getProvider().softDeleteCatalogMetasByCatalogId(catalogId); + } + + public static String softDeleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().softDeleteCatalogMetasByMetalakeId(metalakeId); + } + + public static String deleteCatalogMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return getProvider().deleteCatalogMetasByLegacyTimeline(legacyTimeline, limit); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaBaseSQLProvider.java new file mode 100644 index 00000000000..fc79e6d8ead --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaBaseSQLProvider.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.mapper; + +import static org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper.META_TABLE_NAME; +import static org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper.VERSION_TABLE_NAME; + +import org.apache.gravitino.storage.relational.po.FilesetPO; +import org.apache.ibatis.annotations.Param; + +public class FilesetMetaBaseSQLProvider { + + public String listFilesetPOsBySchemaId(@Param("schemaId") Long schemaId) { + return "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id," + + " fm.type, fm.audit_info, fm.current_version, fm.last_version, fm.deleted_at," + + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as version_catalog_id," + + " vi.schema_id as version_schema_id, vi.fileset_id as version_fileset_id," + + " vi.version, vi.fileset_comment, vi.properties, vi.storage_location," + + " vi.deleted_at as version_deleted_at" + + " FROM " + + META_TABLE_NAME + + " fm INNER JOIN " + + VERSION_TABLE_NAME + + " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = vi.version" + + " WHERE fm.schema_id = #{schemaId} AND fm.deleted_at = 0 AND vi.deleted_at = 0"; + } + + public String selectFilesetIdBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("filesetName") String name) { + return "SELECT fileset_id as filesetId FROM " + + META_TABLE_NAME + + " WHERE schema_id = #{schemaId} AND fileset_name = #{filesetName}" + + " AND deleted_at = 0"; + } + + public String selectFilesetMetaBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("filesetName") String name) { + return "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id," + + " fm.type, fm.audit_info, fm.current_version, fm.last_version, fm.deleted_at," + + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as version_catalog_id," + + " vi.schema_id as version_schema_id, vi.fileset_id as version_fileset_id," + + " vi.version, vi.fileset_comment, vi.properties, vi.storage_location," + + " vi.deleted_at as version_deleted_at" + + " FROM " + + META_TABLE_NAME + + " fm INNER JOIN " + + VERSION_TABLE_NAME + + " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = vi.version" + + " WHERE fm.schema_id = #{schemaId} AND fm.fileset_name = #{filesetName}" + + " AND fm.deleted_at = 0 AND vi.deleted_at = 0"; + } + + public String selectFilesetMetaById(@Param("filesetId") Long filesetId) { + return "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id," + + " fm.type, fm.audit_info, fm.current_version, fm.last_version, fm.deleted_at," + + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as version_catalog_id," + + " vi.schema_id as version_schema_id, vi.fileset_id as version_fileset_id," + + " vi.version, vi.fileset_comment, vi.properties, vi.storage_location," + + " vi.deleted_at as version_deleted_at" + + " FROM " + + META_TABLE_NAME + + " fm INNER JOIN " + + VERSION_TABLE_NAME + + " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = vi.version" + + " WHERE fm.fileset_id = #{filesetId}" + + " AND fm.deleted_at = 0 AND vi.deleted_at = 0"; + } + + public String insertFilesetMeta(@Param("filesetMeta") FilesetPO filesetPO) { + return "INSERT INTO " + + META_TABLE_NAME + + "(fileset_id, fileset_name, metalake_id," + + " catalog_id, schema_id, type, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{filesetMeta.filesetId}," + + " #{filesetMeta.filesetName}," + + " #{filesetMeta.metalakeId}," + + " #{filesetMeta.catalogId}," + + " #{filesetMeta.schemaId}," + + " #{filesetMeta.type}," + + " #{filesetMeta.auditInfo}," + + " #{filesetMeta.currentVersion}," + + " #{filesetMeta.lastVersion}," + + " #{filesetMeta.deletedAt}" + + " )"; + } + + public String insertFilesetMetaOnDuplicateKeyUpdate(@Param("filesetMeta") FilesetPO filesetPO) { + return "INSERT INTO " + + META_TABLE_NAME + + "(fileset_id, fileset_name, metalake_id," + + " catalog_id, schema_id, type, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{filesetMeta.filesetId}," + + " #{filesetMeta.filesetName}," + + " #{filesetMeta.metalakeId}," + + " #{filesetMeta.catalogId}," + + " #{filesetMeta.schemaId}," + + " #{filesetMeta.type}," + + " #{filesetMeta.auditInfo}," + + " #{filesetMeta.currentVersion}," + + " #{filesetMeta.lastVersion}," + + " #{filesetMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " fileset_name = #{filesetMeta.filesetName}," + + " metalake_id = #{filesetMeta.metalakeId}," + + " catalog_id = #{filesetMeta.catalogId}," + + " schema_id = #{filesetMeta.schemaId}," + + " type = #{filesetMeta.type}," + + " audit_info = #{filesetMeta.auditInfo}," + + " current_version = #{filesetMeta.currentVersion}," + + " last_version = #{filesetMeta.lastVersion}," + + " deleted_at = #{filesetMeta.deletedAt}"; + } + + public String updateFilesetMeta( + @Param("newFilesetMeta") FilesetPO newFilesetPO, + @Param("oldFilesetMeta") FilesetPO oldFilesetPO) { + return "UPDATE " + + META_TABLE_NAME + + " SET fileset_name = #{newFilesetMeta.filesetName}," + + " metalake_id = #{newFilesetMeta.metalakeId}," + + " catalog_id = #{newFilesetMeta.catalogId}," + + " schema_id = #{newFilesetMeta.schemaId}," + + " type = #{newFilesetMeta.type}," + + " audit_info = #{newFilesetMeta.auditInfo}," + + " current_version = #{newFilesetMeta.currentVersion}," + + " last_version = #{newFilesetMeta.lastVersion}," + + " deleted_at = #{newFilesetMeta.deletedAt}" + + " WHERE fileset_id = #{oldFilesetMeta.filesetId}" + + " AND fileset_name = #{oldFilesetMeta.filesetName}" + + " AND metalake_id = #{oldFilesetMeta.metalakeId}" + + " AND catalog_id = #{oldFilesetMeta.catalogId}" + + " AND schema_id = #{oldFilesetMeta.schemaId}" + + " AND type = #{oldFilesetMeta.type}" + + " AND audit_info = #{oldFilesetMeta.auditInfo}" + + " AND current_version = #{oldFilesetMeta.currentVersion}" + + " AND last_version = #{oldFilesetMeta.lastVersion}" + + " AND deleted_at = 0"; + } + + public String softDeleteFilesetMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "UPDATE " + + META_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String softDeleteFilesetMetasByCatalogId(@Param("catalogId") Long catalogId) { + return "UPDATE " + + META_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String softDeleteFilesetMetasBySchemaId(@Param("schemaId") Long schemaId) { + return "UPDATE " + + META_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String softDeleteFilesetMetasByFilesetId(@Param("filesetId") Long filesetId) { + return "UPDATE " + + META_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE fileset_id = #{filesetId} AND deleted_at = 0"; + } + + public String deleteFilesetMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return "DELETE FROM " + + META_TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}"; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java index 61e4d2f95ff..8692f8f890d 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java @@ -21,13 +21,13 @@ import java.util.List; import org.apache.gravitino.storage.relational.po.FilesetPO; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Result; import org.apache.ibatis.annotations.Results; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; /** * A MyBatis Mapper for fileset meta operation SQLs. @@ -42,19 +42,6 @@ public interface FilesetMetaMapper { String VERSION_TABLE_NAME = "fileset_version_info"; - @Select( - "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id," - + " fm.type, fm.audit_info, fm.current_version, fm.last_version, fm.deleted_at," - + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as version_catalog_id," - + " vi.schema_id as version_schema_id, vi.fileset_id as version_fileset_id," - + " vi.version, vi.fileset_comment, vi.properties, vi.storage_location," - + " vi.deleted_at as version_deleted_at" - + " FROM " - + META_TABLE_NAME - + " fm INNER JOIN " - + VERSION_TABLE_NAME - + " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = vi.version" - + " WHERE fm.schema_id = #{schemaId} AND fm.deleted_at = 0 AND vi.deleted_at = 0") @Results({ @Result(property = "filesetId", column = "fileset_id"), @Result(property = "filesetName", column = "fileset_name"), @@ -77,30 +64,15 @@ public interface FilesetMetaMapper { @Result(property = "filesetVersionPO.storageLocation", column = "storage_location"), @Result(property = "filesetVersionPO.deletedAt", column = "version_deleted_at") }) + @SelectProvider(type = FilesetMetaSQLProviderFactory.class, method = "listFilesetPOsBySchemaId") List listFilesetPOsBySchemaId(@Param("schemaId") Long schemaId); - @Select( - "SELECT fileset_id as filesetId FROM " - + META_TABLE_NAME - + " WHERE schema_id = #{schemaId} AND fileset_name = #{filesetName}" - + " AND deleted_at = 0") + @SelectProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "selectFilesetIdBySchemaIdAndName") Long selectFilesetIdBySchemaIdAndName( @Param("schemaId") Long schemaId, @Param("filesetName") String name); - @Select( - "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id," - + " fm.type, fm.audit_info, fm.current_version, fm.last_version, fm.deleted_at," - + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as version_catalog_id," - + " vi.schema_id as version_schema_id, vi.fileset_id as version_fileset_id," - + " vi.version, vi.fileset_comment, vi.properties, vi.storage_location," - + " vi.deleted_at as version_deleted_at" - + " FROM " - + META_TABLE_NAME - + " fm INNER JOIN " - + VERSION_TABLE_NAME - + " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = vi.version" - + " WHERE fm.schema_id = #{schemaId} AND fm.fileset_name = #{filesetName}" - + " AND fm.deleted_at = 0 AND vi.deleted_at = 0") @Results({ @Result(property = "filesetId", column = "fileset_id"), @Result(property = "filesetName", column = "fileset_name"), @@ -123,23 +95,12 @@ Long selectFilesetIdBySchemaIdAndName( @Result(property = "filesetVersionPO.storageLocation", column = "storage_location"), @Result(property = "filesetVersionPO.deletedAt", column = "version_deleted_at") }) + @SelectProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "selectFilesetMetaBySchemaIdAndName") FilesetPO selectFilesetMetaBySchemaIdAndName( @Param("schemaId") Long schemaId, @Param("filesetName") String name); - @Select( - "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id," - + " fm.type, fm.audit_info, fm.current_version, fm.last_version, fm.deleted_at," - + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as version_catalog_id," - + " vi.schema_id as version_schema_id, vi.fileset_id as version_fileset_id," - + " vi.version, vi.fileset_comment, vi.properties, vi.storage_location," - + " vi.deleted_at as version_deleted_at" - + " FROM " - + META_TABLE_NAME - + " fm INNER JOIN " - + VERSION_TABLE_NAME - + " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = vi.version" - + " WHERE fm.fileset_id = #{filesetId}" - + " AND fm.deleted_at = 0 AND vi.deleted_at = 0") @Results({ @Result(property = "filesetId", column = "fileset_id"), @Result(property = "filesetName", column = "fileset_name"), @@ -162,120 +123,45 @@ FilesetPO selectFilesetMetaBySchemaIdAndName( @Result(property = "filesetVersionPO.storageLocation", column = "storage_location"), @Result(property = "filesetVersionPO.deletedAt", column = "version_deleted_at") }) + @SelectProvider(type = FilesetMetaSQLProviderFactory.class, method = "selectFilesetMetaById") FilesetPO selectFilesetMetaById(@Param("filesetId") Long filesetId); - @Insert( - "INSERT INTO " - + META_TABLE_NAME - + "(fileset_id, fileset_name, metalake_id," - + " catalog_id, schema_id, type, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{filesetMeta.filesetId}," - + " #{filesetMeta.filesetName}," - + " #{filesetMeta.metalakeId}," - + " #{filesetMeta.catalogId}," - + " #{filesetMeta.schemaId}," - + " #{filesetMeta.type}," - + " #{filesetMeta.auditInfo}," - + " #{filesetMeta.currentVersion}," - + " #{filesetMeta.lastVersion}," - + " #{filesetMeta.deletedAt}" - + " )") + @InsertProvider(type = FilesetMetaSQLProviderFactory.class, method = "insertFilesetMeta") void insertFilesetMeta(@Param("filesetMeta") FilesetPO filesetPO); - @Insert( - "INSERT INTO " - + META_TABLE_NAME - + "(fileset_id, fileset_name, metalake_id," - + " catalog_id, schema_id, type, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{filesetMeta.filesetId}," - + " #{filesetMeta.filesetName}," - + " #{filesetMeta.metalakeId}," - + " #{filesetMeta.catalogId}," - + " #{filesetMeta.schemaId}," - + " #{filesetMeta.type}," - + " #{filesetMeta.auditInfo}," - + " #{filesetMeta.currentVersion}," - + " #{filesetMeta.lastVersion}," - + " #{filesetMeta.deletedAt}" - + " )" - + " ON DUPLICATE KEY UPDATE" - + " fileset_name = #{filesetMeta.filesetName}," - + " metalake_id = #{filesetMeta.metalakeId}," - + " catalog_id = #{filesetMeta.catalogId}," - + " schema_id = #{filesetMeta.schemaId}," - + " type = #{filesetMeta.type}," - + " audit_info = #{filesetMeta.auditInfo}," - + " current_version = #{filesetMeta.currentVersion}," - + " last_version = #{filesetMeta.lastVersion}," - + " deleted_at = #{filesetMeta.deletedAt}") + @InsertProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "insertFilesetMetaOnDuplicateKeyUpdate") void insertFilesetMetaOnDuplicateKeyUpdate(@Param("filesetMeta") FilesetPO filesetPO); - @Update( - "UPDATE " - + META_TABLE_NAME - + " SET fileset_name = #{newFilesetMeta.filesetName}," - + " metalake_id = #{newFilesetMeta.metalakeId}," - + " catalog_id = #{newFilesetMeta.catalogId}," - + " schema_id = #{newFilesetMeta.schemaId}," - + " type = #{newFilesetMeta.type}," - + " audit_info = #{newFilesetMeta.auditInfo}," - + " current_version = #{newFilesetMeta.currentVersion}," - + " last_version = #{newFilesetMeta.lastVersion}," - + " deleted_at = #{newFilesetMeta.deletedAt}" - + " WHERE fileset_id = #{oldFilesetMeta.filesetId}" - + " AND fileset_name = #{oldFilesetMeta.filesetName}" - + " AND metalake_id = #{oldFilesetMeta.metalakeId}" - + " AND catalog_id = #{oldFilesetMeta.catalogId}" - + " AND schema_id = #{oldFilesetMeta.schemaId}" - + " AND type = #{oldFilesetMeta.type}" - + " AND audit_info = #{oldFilesetMeta.auditInfo}" - + " AND current_version = #{oldFilesetMeta.currentVersion}" - + " AND last_version = #{oldFilesetMeta.lastVersion}" - + " AND deleted_at = 0") + @UpdateProvider(type = FilesetMetaSQLProviderFactory.class, method = "updateFilesetMeta") Integer updateFilesetMeta( @Param("newFilesetMeta") FilesetPO newFilesetPO, @Param("oldFilesetMeta") FilesetPO oldFilesetPO); - @Update( - "UPDATE " - + META_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "softDeleteFilesetMetasByMetalakeId") Integer softDeleteFilesetMetasByMetalakeId(@Param("metalakeId") Long metalakeId); - @Update( - "UPDATE " - + META_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "softDeleteFilesetMetasByCatalogId") Integer softDeleteFilesetMetasByCatalogId(@Param("catalogId") Long catalogId); - @Update( - "UPDATE " - + META_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "softDeleteFilesetMetasBySchemaId") Integer softDeleteFilesetMetasBySchemaId(@Param("schemaId") Long schemaId); - @Update( - "UPDATE " - + META_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE fileset_id = #{filesetId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "softDeleteFilesetMetasByFilesetId") Integer softDeleteFilesetMetasByFilesetId(@Param("filesetId") Long filesetId); - @Delete( - "DELETE FROM " - + META_TABLE_NAME - + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}") + @DeleteProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "deleteFilesetMetasByLegacyTimeline") Integer deleteFilesetMetasByLegacyTimeline( @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit); } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java new file mode 100644 index 00000000000..36ea94d5862 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.mapper; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; +import org.apache.gravitino.storage.relational.po.FilesetPO; +import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import org.apache.ibatis.annotations.Param; + +public class FilesetMetaSQLProviderFactory { + private static final Map + METALAKE_META_SQL_PROVIDER_MAP = + ImmutableMap.of( + JDBCBackendType.MYSQL, new FilesetMetaMySQLProvider(), + JDBCBackendType.H2, new FilesetMetaH2Provider()); + + public static FilesetMetaBaseSQLProvider getProvider() { + String databaseId = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getDatabaseId(); + + JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId); + return METALAKE_META_SQL_PROVIDER_MAP.get(jdbcBackendType); + } + + static class FilesetMetaMySQLProvider extends FilesetMetaBaseSQLProvider {} + + static class FilesetMetaH2Provider extends FilesetMetaBaseSQLProvider {} + + public static String listFilesetPOsBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().listFilesetPOsBySchemaId(schemaId); + } + + public static String selectFilesetIdBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("filesetName") String name) { + return getProvider().selectFilesetIdBySchemaIdAndName(schemaId, name); + } + + public static String selectFilesetMetaBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("filesetName") String name) { + return getProvider().selectFilesetMetaBySchemaIdAndName(schemaId, name); + } + + public static String selectFilesetMetaById(@Param("filesetId") Long filesetId) { + return getProvider().selectFilesetMetaById(filesetId); + } + + public static String insertFilesetMeta(@Param("filesetMeta") FilesetPO filesetPO) { + return getProvider().insertFilesetMeta(filesetPO); + } + + public static String insertFilesetMetaOnDuplicateKeyUpdate( + @Param("filesetMeta") FilesetPO filesetPO) { + return getProvider().insertFilesetMetaOnDuplicateKeyUpdate(filesetPO); + } + + public static String updateFilesetMeta( + @Param("newFilesetMeta") FilesetPO newFilesetPO, + @Param("oldFilesetMeta") FilesetPO oldFilesetPO) { + return getProvider().updateFilesetMeta(newFilesetPO, oldFilesetPO); + } + + public static String softDeleteFilesetMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().softDeleteFilesetMetasByMetalakeId(metalakeId); + } + + public static String softDeleteFilesetMetasByCatalogId(@Param("catalogId") Long catalogId) { + return getProvider().softDeleteFilesetMetasByCatalogId(catalogId); + } + + public static String softDeleteFilesetMetasBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().softDeleteFilesetMetasBySchemaId(schemaId); + } + + public String softDeleteFilesetMetasByFilesetId(@Param("filesetId") Long filesetId) { + return getProvider().softDeleteFilesetMetasByFilesetId(filesetId); + } + + public String deleteFilesetMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return getProvider().deleteFilesetMetasByLegacyTimeline(legacyTimeline, limit); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionBaseSQLProvider.java new file mode 100644 index 00000000000..f6ab85b38be --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionBaseSQLProvider.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.mapper; + +import static org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper.VERSION_TABLE_NAME; + +import org.apache.gravitino.storage.relational.po.FilesetVersionPO; +import org.apache.ibatis.annotations.Param; + +public class FilesetVersionBaseSQLProvider { + public String insertFilesetVersion(@Param("filesetVersion") FilesetVersionPO filesetVersionPO) { + return "INSERT INTO " + + VERSION_TABLE_NAME + + "(metalake_id, catalog_id, schema_id, fileset_id," + + " version, fileset_comment, properties, storage_location," + + " deleted_at)" + + " VALUES(" + + " #{filesetVersion.metalakeId}," + + " #{filesetVersion.catalogId}," + + " #{filesetVersion.schemaId}," + + " #{filesetVersion.filesetId}," + + " #{filesetVersion.version}," + + " #{filesetVersion.filesetComment}," + + " #{filesetVersion.properties}," + + " #{filesetVersion.storageLocation}," + + " #{filesetVersion.deletedAt}" + + " )"; + } + + public String insertFilesetVersionOnDuplicateKeyUpdate( + @Param("filesetVersion") FilesetVersionPO filesetVersionPO) { + return "INSERT INTO " + + VERSION_TABLE_NAME + + "(metalake_id, catalog_id, schema_id, fileset_id," + + " version, fileset_comment, properties, storage_location," + + " deleted_at)" + + " VALUES(" + + " #{filesetVersion.metalakeId}," + + " #{filesetVersion.catalogId}," + + " #{filesetVersion.schemaId}," + + " #{filesetVersion.filesetId}," + + " #{filesetVersion.version}," + + " #{filesetVersion.filesetComment}," + + " #{filesetVersion.properties}," + + " #{filesetVersion.storageLocation}," + + " #{filesetVersion.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " metalake_id = #{filesetVersion.metalakeId}," + + " catalog_id = #{filesetVersion.catalogId}," + + " schema_id = #{filesetVersion.schemaId}," + + " fileset_id = #{filesetVersion.filesetId}," + + " version = #{filesetVersion.version}," + + " fileset_comment = #{filesetVersion.filesetComment}," + + " properties = #{filesetVersion.properties}," + + " storage_location = #{filesetVersion.storageLocation}," + + " deleted_at = #{filesetVersion.deletedAt}"; + } + + public String softDeleteFilesetVersionsByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "UPDATE " + + VERSION_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String softDeleteFilesetVersionsByCatalogId(@Param("catalogId") Long catalogId) { + return "UPDATE " + + VERSION_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String softDeleteFilesetVersionsBySchemaId(@Param("schemaId") Long schemaId) { + return "UPDATE " + + VERSION_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String softDeleteFilesetVersionsByFilesetId(@Param("filesetId") Long filesetId) { + return "UPDATE " + + VERSION_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE fileset_id = #{filesetId} AND deleted_at = 0"; + } + + public String deleteFilesetVersionsByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return "DELETE FROM " + + VERSION_TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}"; + } + + public String selectFilesetVersionsByRetentionCount( + @Param("versionRetentionCount") Long versionRetentionCount) { + return "SELECT fileset_id as filesetId," + + " Max(version) as version" + + " FROM " + + VERSION_TABLE_NAME + + " WHERE version > #{versionRetentionCount} AND deleted_at = 0" + + " GROUP BY fileset_id"; + } + + public String softDeleteFilesetVersionsByRetentionLine( + @Param("filesetId") Long filesetId, + @Param("versionRetentionLine") long versionRetentionLine, + @Param("limit") int limit) { + return "UPDATE " + + VERSION_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE fileset_id = #{filesetId} AND version <= #{versionRetentionLine} AND deleted_at = 0 LIMIT #{limit}"; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionMapper.java index cd4bfb9b613..09eca4b90e0 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionMapper.java @@ -22,11 +22,11 @@ import java.util.List; import org.apache.gravitino.storage.relational.po.FilesetMaxVersionPO; import org.apache.gravitino.storage.relational.po.FilesetVersionPO; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; /** * A MyBatis Mapper for fileset version info operation SQLs. @@ -39,110 +39,50 @@ public interface FilesetVersionMapper { String VERSION_TABLE_NAME = "fileset_version_info"; - @Insert( - "INSERT INTO " - + VERSION_TABLE_NAME - + "(metalake_id, catalog_id, schema_id, fileset_id," - + " version, fileset_comment, properties, storage_location," - + " deleted_at)" - + " VALUES(" - + " #{filesetVersion.metalakeId}," - + " #{filesetVersion.catalogId}," - + " #{filesetVersion.schemaId}," - + " #{filesetVersion.filesetId}," - + " #{filesetVersion.version}," - + " #{filesetVersion.filesetComment}," - + " #{filesetVersion.properties}," - + " #{filesetVersion.storageLocation}," - + " #{filesetVersion.deletedAt}" - + " )") + @InsertProvider(type = FilesetVersionSQLProviderFactory.class, method = "insertFilesetVersion") void insertFilesetVersion(@Param("filesetVersion") FilesetVersionPO filesetVersionPO); - @Insert( - "INSERT INTO " - + VERSION_TABLE_NAME - + "(metalake_id, catalog_id, schema_id, fileset_id," - + " version, fileset_comment, properties, storage_location," - + " deleted_at)" - + " VALUES(" - + " #{filesetVersion.metalakeId}," - + " #{filesetVersion.catalogId}," - + " #{filesetVersion.schemaId}," - + " #{filesetVersion.filesetId}," - + " #{filesetVersion.version}," - + " #{filesetVersion.filesetComment}," - + " #{filesetVersion.properties}," - + " #{filesetVersion.storageLocation}," - + " #{filesetVersion.deletedAt}" - + " )" - + " ON DUPLICATE KEY UPDATE" - + " metalake_id = #{filesetVersion.metalakeId}," - + " catalog_id = #{filesetVersion.catalogId}," - + " schema_id = #{filesetVersion.schemaId}," - + " fileset_id = #{filesetVersion.filesetId}," - + " version = #{filesetVersion.version}," - + " fileset_comment = #{filesetVersion.filesetComment}," - + " properties = #{filesetVersion.properties}," - + " storage_location = #{filesetVersion.storageLocation}," - + " deleted_at = #{filesetVersion.deletedAt}") + @InsertProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "insertFilesetVersionOnDuplicateKeyUpdate") void insertFilesetVersionOnDuplicateKeyUpdate( @Param("filesetVersion") FilesetVersionPO filesetVersionPO); - @Update( - "UPDATE " - + VERSION_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "softDeleteFilesetVersionsByMetalakeId") Integer softDeleteFilesetVersionsByMetalakeId(@Param("metalakeId") Long metalakeId); - @Update( - "UPDATE " - + VERSION_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "softDeleteFilesetVersionsByCatalogId") Integer softDeleteFilesetVersionsByCatalogId(@Param("catalogId") Long catalogId); - @Update( - "UPDATE " - + VERSION_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "softDeleteFilesetVersionsBySchemaId") Integer softDeleteFilesetVersionsBySchemaId(@Param("schemaId") Long schemaId); - @Update( - "UPDATE " - + VERSION_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE fileset_id = #{filesetId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "softDeleteFilesetVersionsByFilesetId") Integer softDeleteFilesetVersionsByFilesetId(@Param("filesetId") Long filesetId); - @Delete( - "DELETE FROM " - + VERSION_TABLE_NAME - + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}") + @DeleteProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "deleteFilesetVersionsByLegacyTimeline") Integer deleteFilesetVersionsByLegacyTimeline( @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit); - @Select( - "SELECT fileset_id as filesetId," - + " Max(version) as version" - + " FROM " - + VERSION_TABLE_NAME - + " WHERE version > #{versionRetentionCount} AND deleted_at = 0" - + " GROUP BY fileset_id") + @SelectProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "selectFilesetVersionsByRetentionCount") List selectFilesetVersionsByRetentionCount( @Param("versionRetentionCount") Long versionRetentionCount); - @Update( - "UPDATE " - + VERSION_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE fileset_id = #{filesetId} AND version <= #{versionRetentionLine} AND deleted_at = 0 LIMIT #{limit}") + @UpdateProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "softDeleteFilesetVersionsByRetentionLine") Integer softDeleteFilesetVersionsByRetentionLine( @Param("filesetId") Long filesetId, @Param("versionRetentionLine") long versionRetentionLine, diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionSQLProviderFactory.java new file mode 100644 index 00000000000..163f2c882fe --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionSQLProviderFactory.java @@ -0,0 +1,93 @@ +package org.apache.gravitino.storage.relational.mapper; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; +import org.apache.gravitino.storage.relational.po.FilesetVersionPO; +import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import org.apache.ibatis.annotations.Param; + +public class FilesetVersionSQLProviderFactory { + private static final Map + METALAKE_META_SQL_PROVIDER_MAP = + ImmutableMap.of( + JDBCBackendType.MYSQL, new FilesetVersionMySQLProvider(), + JDBCBackendType.H2, new FilesetVersionH2Provider()); + + public static FilesetVersionBaseSQLProvider getProvider() { + String databaseId = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getDatabaseId(); + + JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId); + return METALAKE_META_SQL_PROVIDER_MAP.get(jdbcBackendType); + } + + static class FilesetVersionMySQLProvider extends FilesetVersionBaseSQLProvider {} + + static class FilesetVersionH2Provider extends FilesetVersionBaseSQLProvider {} + + public static String insertFilesetVersion( + @Param("filesetVersion") FilesetVersionPO filesetVersionPO) { + return getProvider().insertFilesetVersion(filesetVersionPO); + } + + public static String insertFilesetVersionOnDuplicateKeyUpdate( + @Param("filesetVersion") FilesetVersionPO filesetVersionPO) { + return getProvider().insertFilesetVersionOnDuplicateKeyUpdate(filesetVersionPO); + } + + public static String softDeleteFilesetVersionsByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().softDeleteFilesetVersionsByMetalakeId(metalakeId); + } + + public static String softDeleteFilesetVersionsByCatalogId(@Param("catalogId") Long catalogId) { + return getProvider().softDeleteFilesetVersionsByCatalogId(catalogId); + } + + public static String softDeleteFilesetVersionsBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().softDeleteFilesetVersionsBySchemaId(schemaId); + } + + public static String softDeleteFilesetVersionsByFilesetId(@Param("filesetId") Long filesetId) { + return getProvider().softDeleteFilesetVersionsByFilesetId(filesetId); + } + + public static String deleteFilesetVersionsByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return getProvider().deleteFilesetVersionsByLegacyTimeline(legacyTimeline, limit); + } + + public static String selectFilesetVersionsByRetentionCount( + @Param("versionRetentionCount") Long versionRetentionCount) { + return getProvider().selectFilesetVersionsByRetentionCount(versionRetentionCount); + } + + public static String softDeleteFilesetVersionsByRetentionLine( + @Param("filesetId") Long filesetId, + @Param("versionRetentionLine") long versionRetentionLine, + @Param("limit") int limit) { + return getProvider() + .softDeleteFilesetVersionsByRetentionLine(filesetId, versionRetentionLine, limit); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaBaseSQLProvider.java new file mode 100644 index 00000000000..a782ddb8f0f --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaBaseSQLProvider.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.mapper; + +import static org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper.TABLE_NAME; + +import org.apache.gravitino.storage.relational.po.MetalakePO; +import org.apache.ibatis.annotations.Param; + +public class MetalakeMetaBaseSQLProvider { + + public String listMetalakePOs() { + return "SELECT metalake_id as metalakeId, metalake_name as metalakeName," + + " metalake_comment as metalakeComment, properties," + + " audit_info as auditInfo, schema_version as schemaVersion," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE deleted_at = 0"; + } + + public String selectMetalakeMetaByName(@Param("metalakeName") String metalakeName) { + return "SELECT metalake_id as metalakeId, metalake_name as metalakeName," + + " metalake_comment as metalakeComment, properties," + + " audit_info as auditInfo, schema_version as schemaVersion," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE metalake_name = #{metalakeName} and deleted_at = 0"; + } + + public String selectMetalakeMetaById(@Param("metalakeId") Long metalakeId) { + return "SELECT metalake_id as metalakeId, metalake_name as metalakeName," + + " metalake_comment as metalakeComment, properties," + + " audit_info as auditInfo, schema_version as schemaVersion," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE metalake_id = #{metalakeId} and deleted_at = 0"; + } + + public String selectMetalakeIdMetaByName(@Param("metalakeName") String metalakeName) { + return "SELECT metalake_id as metalakeId" + + " FROM " + + TABLE_NAME + + " WHERE metalake_name = #{metalakeName} and deleted_at = 0"; + } + + public String insertMetalakeMeta(@Param("metalakeMeta") MetalakePO metalakePO) { + return "INSERT INTO " + + TABLE_NAME + + "(metalake_id, metalake_name, metalake_comment, properties, audit_info," + + " schema_version, current_version, last_version, deleted_at)" + + " VALUES(" + + " #{metalakeMeta.metalakeId}," + + " #{metalakeMeta.metalakeName}," + + " #{metalakeMeta.metalakeComment}," + + " #{metalakeMeta.properties}," + + " #{metalakeMeta.auditInfo}," + + " #{metalakeMeta.schemaVersion}," + + " #{metalakeMeta.currentVersion}," + + " #{metalakeMeta.lastVersion}," + + " #{metalakeMeta.deletedAt}" + + " )"; + } + + public String insertMetalakeMetaOnDuplicateKeyUpdate( + @Param("metalakeMeta") MetalakePO metalakePO) { + return "INSERT INTO " + + TABLE_NAME + + "(metalake_id, metalake_name, metalake_comment, properties, audit_info," + + " schema_version, current_version, last_version, deleted_at)" + + " VALUES(" + + " #{metalakeMeta.metalakeId}," + + " #{metalakeMeta.metalakeName}," + + " #{metalakeMeta.metalakeComment}," + + " #{metalakeMeta.properties}," + + " #{metalakeMeta.auditInfo}," + + " #{metalakeMeta.schemaVersion}," + + " #{metalakeMeta.currentVersion}," + + " #{metalakeMeta.lastVersion}," + + " #{metalakeMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " metalake_name = #{metalakeMeta.metalakeName}," + + " metalake_comment = #{metalakeMeta.metalakeComment}," + + " properties = #{metalakeMeta.properties}," + + " audit_info = #{metalakeMeta.auditInfo}," + + " schema_version = #{metalakeMeta.schemaVersion}," + + " current_version = #{metalakeMeta.currentVersion}," + + " last_version = #{metalakeMeta.lastVersion}," + + " deleted_at = #{metalakeMeta.deletedAt}"; + } + + public String updateMetalakeMeta( + @Param("newMetalakeMeta") MetalakePO newMetalakePO, + @Param("oldMetalakeMeta") MetalakePO oldMetalakePO) { + return "UPDATE " + + TABLE_NAME + + " SET metalake_name = #{newMetalakeMeta.metalakeName}," + + " metalake_comment = #{newMetalakeMeta.metalakeComment}," + + " properties = #{newMetalakeMeta.properties}," + + " audit_info = #{newMetalakeMeta.auditInfo}," + + " schema_version = #{newMetalakeMeta.schemaVersion}," + + " current_version = #{newMetalakeMeta.currentVersion}," + + " last_version = #{newMetalakeMeta.lastVersion}" + + " WHERE metalake_id = #{oldMetalakeMeta.metalakeId}" + + " AND metalake_name = #{oldMetalakeMeta.metalakeName}" + + " AND (metalake_comment = #{oldMetalakeMeta.metalakeComment} " + + " OR (metalake_comment IS NULL and #{oldMetalakeMeta.metalakeComment} IS NULL))" + + " AND properties = #{oldMetalakeMeta.properties}" + + " AND audit_info = #{oldMetalakeMeta.auditInfo}" + + " AND schema_version = #{oldMetalakeMeta.schemaVersion}" + + " AND current_version = #{oldMetalakeMeta.currentVersion}" + + " AND last_version = #{oldMetalakeMeta.lastVersion}" + + " AND deleted_at = 0"; + } + + public String softDeleteMetalakeMetaByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String deleteMetalakeMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}"; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java index 89f8d13ceb1..d5dc809bfe2 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java @@ -21,11 +21,11 @@ import java.util.List; import org.apache.gravitino.storage.relational.po.MetalakePO; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; /** * A MyBatis Mapper for metalake meta operation SQLs. @@ -38,126 +38,41 @@ public interface MetalakeMetaMapper { String TABLE_NAME = "metalake_meta"; - @Select( - "SELECT metalake_id as metalakeId, metalake_name as metalakeName," - + " metalake_comment as metalakeComment, properties," - + " audit_info as auditInfo, schema_version as schemaVersion," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE deleted_at = 0") + @SelectProvider(type = MetalakeMetaSQLProviderFactory.class, method = "listMetalakePOs") List listMetalakePOs(); - @Select( - "SELECT metalake_id as metalakeId, metalake_name as metalakeName," - + " metalake_comment as metalakeComment, properties," - + " audit_info as auditInfo, schema_version as schemaVersion," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE metalake_name = #{metalakeName} and deleted_at = 0") + @SelectProvider(type = MetalakeMetaSQLProviderFactory.class, method = "selectMetalakeMetaByName") MetalakePO selectMetalakeMetaByName(@Param("metalakeName") String name); - @Select( - "SELECT metalake_id as metalakeId, metalake_name as metalakeName," - + " metalake_comment as metalakeComment, properties," - + " audit_info as auditInfo, schema_version as schemaVersion," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE metalake_id = #{metalakeId} and deleted_at = 0") + @SelectProvider(type = MetalakeMetaSQLProviderFactory.class, method = "selectMetalakeMetaById") MetalakePO selectMetalakeMetaById(@Param("metalakeId") Long metalakeId); - @Select( - "SELECT metalake_id FROM " - + TABLE_NAME - + " WHERE metalake_name = #{metalakeName} and deleted_at = 0") + @SelectProvider( + type = MetalakeMetaSQLProviderFactory.class, + method = "selectMetalakeIdMetaByName") Long selectMetalakeIdMetaByName(@Param("metalakeName") String name); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(metalake_id, metalake_name, metalake_comment, properties, audit_info," - + " schema_version, current_version, last_version, deleted_at)" - + " VALUES(" - + " #{metalakeMeta.metalakeId}," - + " #{metalakeMeta.metalakeName}," - + " #{metalakeMeta.metalakeComment}," - + " #{metalakeMeta.properties}," - + " #{metalakeMeta.auditInfo}," - + " #{metalakeMeta.schemaVersion}," - + " #{metalakeMeta.currentVersion}," - + " #{metalakeMeta.lastVersion}," - + " #{metalakeMeta.deletedAt}" - + " )") + @InsertProvider(type = MetalakeMetaSQLProviderFactory.class, method = "insertMetalakeMeta") void insertMetalakeMeta(@Param("metalakeMeta") MetalakePO metalakePO); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(metalake_id, metalake_name, metalake_comment, properties, audit_info," - + " schema_version, current_version, last_version, deleted_at)" - + " VALUES(" - + " #{metalakeMeta.metalakeId}," - + " #{metalakeMeta.metalakeName}," - + " #{metalakeMeta.metalakeComment}," - + " #{metalakeMeta.properties}," - + " #{metalakeMeta.auditInfo}," - + " #{metalakeMeta.schemaVersion}," - + " #{metalakeMeta.currentVersion}," - + " #{metalakeMeta.lastVersion}," - + " #{metalakeMeta.deletedAt}" - + " )" - + " ON DUPLICATE KEY UPDATE" - + " metalake_name = #{metalakeMeta.metalakeName}," - + " metalake_comment = #{metalakeMeta.metalakeComment}," - + " properties = #{metalakeMeta.properties}," - + " audit_info = #{metalakeMeta.auditInfo}," - + " schema_version = #{metalakeMeta.schemaVersion}," - + " current_version = #{metalakeMeta.currentVersion}," - + " last_version = #{metalakeMeta.lastVersion}," - + " deleted_at = #{metalakeMeta.deletedAt}") + @InsertProvider( + type = MetalakeMetaSQLProviderFactory.class, + method = "insertMetalakeMetaOnDuplicateKeyUpdate") void insertMetalakeMetaOnDuplicateKeyUpdate(@Param("metalakeMeta") MetalakePO metalakePO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET metalake_name = #{newMetalakeMeta.metalakeName}," - + " metalake_comment = #{newMetalakeMeta.metalakeComment}," - + " properties = #{newMetalakeMeta.properties}," - + " audit_info = #{newMetalakeMeta.auditInfo}," - + " schema_version = #{newMetalakeMeta.schemaVersion}," - + " current_version = #{newMetalakeMeta.currentVersion}," - + " last_version = #{newMetalakeMeta.lastVersion}" - + " WHERE metalake_id = #{oldMetalakeMeta.metalakeId}" - + " AND metalake_name = #{oldMetalakeMeta.metalakeName}" - + " AND (metalake_comment = #{oldMetalakeMeta.metalakeComment} " - + " OR (metalake_comment IS NULL and #{oldMetalakeMeta.metalakeComment} IS NULL))" - + " AND properties = #{oldMetalakeMeta.properties}" - + " AND audit_info = #{oldMetalakeMeta.auditInfo}" - + " AND schema_version = #{oldMetalakeMeta.schemaVersion}" - + " AND current_version = #{oldMetalakeMeta.currentVersion}" - + " AND last_version = #{oldMetalakeMeta.lastVersion}" - + " AND deleted_at = 0") + @UpdateProvider(type = MetalakeMetaSQLProviderFactory.class, method = "updateMetalakeMeta") Integer updateMetalakeMeta( @Param("newMetalakeMeta") MetalakePO newMetalakePO, @Param("oldMetalakeMeta") MetalakePO oldMetalakePO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @UpdateProvider( + type = MetalakeMetaSQLProviderFactory.class, + method = "softDeleteMetalakeMetaByMetalakeId") Integer softDeleteMetalakeMetaByMetalakeId(@Param("metalakeId") Long metalakeId); - @Delete( - "DELETE FROM " - + TABLE_NAME - + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}") + @DeleteProvider( + type = MetalakeMetaSQLProviderFactory.class, + method = "deleteMetalakeMetasByLegacyTimeline") Integer deleteMetalakeMetasByLegacyTimeline( @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit); } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java new file mode 100644 index 00000000000..e28cbc9d774 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.mapper; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; +import org.apache.gravitino.storage.relational.po.MetalakePO; +import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import org.apache.ibatis.annotations.Param; + +/** SQL Provider for Metalake Meta operations. */ +public class MetalakeMetaSQLProviderFactory { + private static final Map + METALAKE_META_SQL_PROVIDER_MAP = + ImmutableMap.of( + JDBCBackendType.MYSQL, new MetalakeMetaMySQLProvider(), + JDBCBackendType.H2, new MetalakeMetaH2Provider()); + + public static MetalakeMetaBaseSQLProvider getProvider() { + String databaseId = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getDatabaseId(); + + JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId); + return METALAKE_META_SQL_PROVIDER_MAP.get(jdbcBackendType); + } + + static class MetalakeMetaMySQLProvider extends MetalakeMetaBaseSQLProvider {} + + static class MetalakeMetaH2Provider extends MetalakeMetaBaseSQLProvider {} + + public String listMetalakePOs() { + return getProvider().listMetalakePOs(); + } + + public static String selectMetalakeMetaByName(@Param("metalakeName") String metalakeName) { + return getProvider().selectMetalakeMetaByName(metalakeName); + } + + public static String selectMetalakeMetaById(@Param("metalakeId") Long metalakeId) { + return getProvider().selectMetalakeMetaById(metalakeId); + } + + public static String selectMetalakeIdMetaByName(@Param("metalakeName") String metalakeName) { + return getProvider().selectMetalakeIdMetaByName(metalakeName); + } + + public static String insertMetalakeMeta(@Param("metalakeMeta") MetalakePO metalakePO) { + return getProvider().insertMetalakeMeta(metalakePO); + } + + public static String insertMetalakeMetaOnDuplicateKeyUpdate( + @Param("metalakeMeta") MetalakePO metalakePO) { + return getProvider().insertMetalakeMetaOnDuplicateKeyUpdate(metalakePO); + } + + public static String updateMetalakeMeta( + @Param("newMetalakeMeta") MetalakePO newMetalakePO, + @Param("oldMetalakeMeta") MetalakePO oldMetalakePO) { + return getProvider().updateMetalakeMeta(newMetalakePO, oldMetalakePO); + } + + public static String softDeleteMetalakeMetaByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().softDeleteMetalakeMetaByMetalakeId(metalakeId); + } + + public static String deleteMetalakeMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return getProvider().deleteMetalakeMetasByLegacyTimeline(legacyTimeline, limit); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaBaseSQLProvider.java new file mode 100644 index 00000000000..056b8d2b581 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaBaseSQLProvider.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.mapper; + +import static org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper.TABLE_NAME; + +import org.apache.gravitino.storage.relational.po.SchemaPO; +import org.apache.ibatis.annotations.Param; + +public class SchemaMetaBaseSQLProvider { + public String listSchemaPOsByCatalogId(@Param("catalogId") Long catalogId) { + return "SELECT schema_id as schemaId, schema_name as schemaName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_comment as schemaComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String selectSchemaIdByCatalogIdAndName( + @Param("catalogId") Long catalogId, @Param("schemaName") String name) { + return "SELECT schema_id as schemaId FROM " + + TABLE_NAME + + " WHERE catalog_id = #{catalogId} AND schema_name = #{schemaName}" + + " AND deleted_at = 0"; + } + + public String selectSchemaMetaByCatalogIdAndName( + @Param("catalogId") Long catalogId, @Param("schemaName") String name) { + return "SELECT schema_id as schemaId, schema_name as schemaName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_comment as schemaComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE catalog_id = #{catalogId} AND schema_name = #{schemaName} AND deleted_at = 0"; + } + + public String selectSchemaMetaById(@Param("schemaId") Long schemaId) { + return "SELECT schema_id as schemaId, schema_name as schemaName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_comment as schemaComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String insertSchemaMeta(@Param("schemaMeta") SchemaPO schemaPO) { + return "INSERT INTO " + + TABLE_NAME + + "(schema_id, schema_name, metalake_id," + + " catalog_id, schema_comment, properties, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{schemaMeta.schemaId}," + + " #{schemaMeta.schemaName}," + + " #{schemaMeta.metalakeId}," + + " #{schemaMeta.catalogId}," + + " #{schemaMeta.schemaComment}," + + " #{schemaMeta.properties}," + + " #{schemaMeta.auditInfo}," + + " #{schemaMeta.currentVersion}," + + " #{schemaMeta.lastVersion}," + + " #{schemaMeta.deletedAt}" + + " )"; + } + + public String insertSchemaMetaOnDuplicateKeyUpdate(@Param("schemaMeta") SchemaPO schemaPO) { + return "INSERT INTO " + + TABLE_NAME + + "(schema_id, schema_name, metalake_id," + + " catalog_id, schema_comment, properties, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{schemaMeta.schemaId}," + + " #{schemaMeta.schemaName}," + + " #{schemaMeta.metalakeId}," + + " #{schemaMeta.catalogId}," + + " #{schemaMeta.schemaComment}," + + " #{schemaMeta.properties}," + + " #{schemaMeta.auditInfo}," + + " #{schemaMeta.currentVersion}," + + " #{schemaMeta.lastVersion}," + + " #{schemaMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " schema_name = #{schemaMeta.schemaName}," + + " metalake_id = #{schemaMeta.metalakeId}," + + " catalog_id = #{schemaMeta.catalogId}," + + " schema_comment = #{schemaMeta.schemaComment}," + + " properties = #{schemaMeta.properties}," + + " audit_info = #{schemaMeta.auditInfo}," + + " current_version = #{schemaMeta.currentVersion}," + + " last_version = #{schemaMeta.lastVersion}," + + " deleted_at = #{schemaMeta.deletedAt}"; + } + + public String updateSchemaMeta( + @Param("newSchemaMeta") SchemaPO newSchemaPO, @Param("oldSchemaMeta") SchemaPO oldSchemaPO) { + return "UPDATE " + + TABLE_NAME + + " SET schema_name = #{newSchemaMeta.schemaName}," + + " metalake_id = #{newSchemaMeta.metalakeId}," + + " catalog_id = #{newSchemaMeta.catalogId}," + + " schema_comment = #{newSchemaMeta.schemaComment}," + + " properties = #{newSchemaMeta.properties}," + + " audit_info = #{newSchemaMeta.auditInfo}," + + " current_version = #{newSchemaMeta.currentVersion}," + + " last_version = #{newSchemaMeta.lastVersion}," + + " deleted_at = #{newSchemaMeta.deletedAt}" + + " WHERE schema_id = #{oldSchemaMeta.schemaId}" + + " AND schema_name = #{oldSchemaMeta.schemaName}" + + " AND metalake_id = #{oldSchemaMeta.metalakeId}" + + " AND catalog_id = #{oldSchemaMeta.catalogId}" + + " AND (schema_comment IS NULL OR schema_comment = #{oldSchemaMeta.schemaComment})" + + " AND properties = #{oldSchemaMeta.properties}" + + " AND audit_info = #{oldSchemaMeta.auditInfo}" + + " AND current_version = #{oldSchemaMeta.currentVersion}" + + " AND last_version = #{oldSchemaMeta.lastVersion}" + + " AND deleted_at = 0"; + } + + public String softDeleteSchemaMetasBySchemaId(@Param("schemaId") Long schemaId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String softDeleteSchemaMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String softDeleteSchemaMetasByCatalogId(@Param("catalogId") Long catalogId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String deleteSchemaMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}"; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java index a71a4b7b6f3..054248dac0c 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java @@ -21,11 +21,11 @@ import java.util.List; import org.apache.gravitino.storage.relational.po.SchemaPO; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; /** * A MyBatis Mapper for schema meta operation SQLs. @@ -38,151 +38,54 @@ public interface SchemaMetaMapper { String TABLE_NAME = "schema_meta"; - @Select( - "SELECT schema_id as schemaId, schema_name as schemaName," - + " metalake_id as metalakeId, catalog_id as catalogId," - + " schema_comment as schemaComment, properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @SelectProvider(type = SchemaMetaSQLProviderFactory.class, method = "listSchemaPOsByCatalogId") List listSchemaPOsByCatalogId(@Param("catalogId") Long catalogId); - @Select( - "SELECT schema_id as schemaId FROM " - + TABLE_NAME - + " WHERE catalog_id = #{catalogId} AND schema_name = #{schemaName}" - + " AND deleted_at = 0") + @SelectProvider( + type = SchemaMetaSQLProviderFactory.class, + method = "selectSchemaIdByCatalogIdAndName") Long selectSchemaIdByCatalogIdAndName( @Param("catalogId") Long catalogId, @Param("schemaName") String name); - @Select( - "SELECT schema_id as schemaId, schema_name as schemaName," - + " metalake_id as metalakeId, catalog_id as catalogId," - + " schema_comment as schemaComment, properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE catalog_id = #{catalogId} AND schema_name = #{schemaName} AND deleted_at = 0") + @SelectProvider( + type = SchemaMetaSQLProviderFactory.class, + method = "selectSchemaMetaByCatalogIdAndName") SchemaPO selectSchemaMetaByCatalogIdAndName( @Param("catalogId") Long catalogId, @Param("schemaName") String name); - @Select( - "SELECT schema_id as schemaId, schema_name as schemaName," - + " metalake_id as metalakeId, catalog_id as catalogId," - + " schema_comment as schemaComment, properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @SelectProvider(type = SchemaMetaSQLProviderFactory.class, method = "selectSchemaMetaById") SchemaPO selectSchemaMetaById(@Param("schemaId") Long schemaId); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(schema_id, schema_name, metalake_id," - + " catalog_id, schema_comment, properties, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{schemaMeta.schemaId}," - + " #{schemaMeta.schemaName}," - + " #{schemaMeta.metalakeId}," - + " #{schemaMeta.catalogId}," - + " #{schemaMeta.schemaComment}," - + " #{schemaMeta.properties}," - + " #{schemaMeta.auditInfo}," - + " #{schemaMeta.currentVersion}," - + " #{schemaMeta.lastVersion}," - + " #{schemaMeta.deletedAt}" - + " )") + @InsertProvider(type = SchemaMetaSQLProviderFactory.class, method = "insertSchemaMeta") void insertSchemaMeta(@Param("schemaMeta") SchemaPO schemaPO); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(schema_id, schema_name, metalake_id," - + " catalog_id, schema_comment, properties, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{schemaMeta.schemaId}," - + " #{schemaMeta.schemaName}," - + " #{schemaMeta.metalakeId}," - + " #{schemaMeta.catalogId}," - + " #{schemaMeta.schemaComment}," - + " #{schemaMeta.properties}," - + " #{schemaMeta.auditInfo}," - + " #{schemaMeta.currentVersion}," - + " #{schemaMeta.lastVersion}," - + " #{schemaMeta.deletedAt}" - + " )" - + " ON DUPLICATE KEY UPDATE" - + " schema_name = #{schemaMeta.schemaName}," - + " metalake_id = #{schemaMeta.metalakeId}," - + " catalog_id = #{schemaMeta.catalogId}," - + " schema_comment = #{schemaMeta.schemaComment}," - + " properties = #{schemaMeta.properties}," - + " audit_info = #{schemaMeta.auditInfo}," - + " current_version = #{schemaMeta.currentVersion}," - + " last_version = #{schemaMeta.lastVersion}," - + " deleted_at = #{schemaMeta.deletedAt}") + @InsertProvider( + type = SchemaMetaSQLProviderFactory.class, + method = "insertSchemaMetaOnDuplicateKeyUpdate") void insertSchemaMetaOnDuplicateKeyUpdate(@Param("schemaMeta") SchemaPO schemaPO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET schema_name = #{newSchemaMeta.schemaName}," - + " metalake_id = #{newSchemaMeta.metalakeId}," - + " catalog_id = #{newSchemaMeta.catalogId}," - + " schema_comment = #{newSchemaMeta.schemaComment}," - + " properties = #{newSchemaMeta.properties}," - + " audit_info = #{newSchemaMeta.auditInfo}," - + " current_version = #{newSchemaMeta.currentVersion}," - + " last_version = #{newSchemaMeta.lastVersion}," - + " deleted_at = #{newSchemaMeta.deletedAt}" - + " WHERE schema_id = #{oldSchemaMeta.schemaId}" - + " AND schema_name = #{oldSchemaMeta.schemaName}" - + " AND metalake_id = #{oldSchemaMeta.metalakeId}" - + " AND catalog_id = #{oldSchemaMeta.catalogId}" - + " AND (schema_comment IS NULL OR schema_comment = #{oldSchemaMeta.schemaComment})" - + " AND properties = #{oldSchemaMeta.properties}" - + " AND audit_info = #{oldSchemaMeta.auditInfo}" - + " AND current_version = #{oldSchemaMeta.currentVersion}" - + " AND last_version = #{oldSchemaMeta.lastVersion}" - + " AND deleted_at = 0") + @UpdateProvider(type = SchemaMetaSQLProviderFactory.class, method = "updateSchemaMeta") Integer updateSchemaMeta( @Param("newSchemaMeta") SchemaPO newSchemaPO, @Param("oldSchemaMeta") SchemaPO oldSchemaPO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @UpdateProvider( + type = SchemaMetaSQLProviderFactory.class, + method = "softDeleteSchemaMetasBySchemaId") Integer softDeleteSchemaMetasBySchemaId(@Param("schemaId") Long schemaId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @UpdateProvider( + type = SchemaMetaSQLProviderFactory.class, + method = "softDeleteSchemaMetasByMetalakeId") Integer softDeleteSchemaMetasByMetalakeId(@Param("metalakeId") Long metalakeId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @UpdateProvider( + type = SchemaMetaSQLProviderFactory.class, + method = "softDeleteSchemaMetasByCatalogId") Integer softDeleteSchemaMetasByCatalogId(@Param("catalogId") Long catalogId); - @Delete( - "DELETE FROM " - + TABLE_NAME - + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}") + @DeleteProvider( + type = SchemaMetaSQLProviderFactory.class, + method = "deleteSchemaMetasByLegacyTimeline") Integer deleteSchemaMetasByLegacyTimeline( @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit); } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java new file mode 100644 index 00000000000..5fa6252d5b6 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.mapper; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; +import org.apache.gravitino.storage.relational.po.SchemaPO; +import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import org.apache.ibatis.annotations.Param; + +public class SchemaMetaSQLProviderFactory { + private static final Map + METALAKE_META_SQL_PROVIDER_MAP = + ImmutableMap.of( + JDBCBackendType.MYSQL, new SchemaMetaMySQLProvider(), + JDBCBackendType.H2, new SchemaMetaH2Provider()); + + public static SchemaMetaBaseSQLProvider getProvider() { + String databaseId = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getDatabaseId(); + + JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId); + return METALAKE_META_SQL_PROVIDER_MAP.get(jdbcBackendType); + } + + static class SchemaMetaMySQLProvider extends SchemaMetaBaseSQLProvider {} + + static class SchemaMetaH2Provider extends SchemaMetaBaseSQLProvider {} + + public static String listSchemaPOsByCatalogId(@Param("catalogId") Long catalogId) { + return getProvider().listSchemaPOsByCatalogId(catalogId); + } + + public static String selectSchemaIdByCatalogIdAndName( + @Param("catalogId") Long catalogId, @Param("schemaName") String name) { + return getProvider().selectSchemaIdByCatalogIdAndName(catalogId, name); + } + + public static String selectSchemaMetaByCatalogIdAndName( + @Param("catalogId") Long catalogId, @Param("schemaName") String name) { + return getProvider().selectSchemaMetaByCatalogIdAndName(catalogId, name); + } + + public static String selectSchemaMetaById(@Param("schemaId") Long schemaId) { + return getProvider().selectSchemaMetaById(schemaId); + } + + public static String insertSchemaMeta(@Param("schemaMeta") SchemaPO schemaPO) { + return getProvider().insertSchemaMeta(schemaPO); + } + + public static String insertSchemaMetaOnDuplicateKeyUpdate( + @Param("schemaMeta") SchemaPO schemaPO) { + return getProvider().insertSchemaMetaOnDuplicateKeyUpdate(schemaPO); + } + + public static String updateSchemaMeta( + @Param("newSchemaMeta") SchemaPO newSchemaPO, @Param("oldSchemaMeta") SchemaPO oldSchemaPO) { + return getProvider().updateSchemaMeta(newSchemaPO, oldSchemaPO); + } + + public static String softDeleteSchemaMetasBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().softDeleteSchemaMetasBySchemaId(schemaId); + } + + public static String softDeleteSchemaMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().softDeleteSchemaMetasByMetalakeId(metalakeId); + } + + public static String softDeleteSchemaMetasByCatalogId(@Param("catalogId") Long catalogId) { + return getProvider().softDeleteSchemaMetasByCatalogId(catalogId); + } + + public static String deleteSchemaMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return getProvider().deleteSchemaMetasByLegacyTimeline(legacyTimeline, limit); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaBaseSQLProvider.java new file mode 100644 index 00000000000..03cf2582ff6 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaBaseSQLProvider.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.mapper; + +import static org.apache.gravitino.storage.relational.mapper.TableMetaMapper.TABLE_NAME; + +import org.apache.gravitino.storage.relational.po.TablePO; +import org.apache.ibatis.annotations.Param; + +public class TableMetaBaseSQLProvider { + + public String listTablePOsBySchemaId(@Param("schemaId") Long schemaId) { + return "SELECT table_id as tableId, table_name as tableName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_id as schemaId, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String selectTableIdBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("tableName") String name) { + return "SELECT table_id as tableId FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND table_name = #{tableName}" + + " AND deleted_at = 0"; + } + + public String selectTableMetaBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("tableName") String name) { + return "SELECT table_id as tableId, table_name as tableName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_id as schemaId, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND table_name = #{tableName} AND deleted_at = 0"; + } + + public String selectTableMetaById(@Param("tableId") Long tableId) { + return "SELECT table_id as tableId, table_name as tableName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_id as schemaId, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE table_id = #{tableId} AND deleted_at = 0"; + } + + public String insertTableMeta(@Param("tableMeta") TablePO tablePO) { + return "INSERT INTO " + + TABLE_NAME + + "(table_id, table_name, metalake_id," + + " catalog_id, schema_id, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{tableMeta.tableId}," + + " #{tableMeta.tableName}," + + " #{tableMeta.metalakeId}," + + " #{tableMeta.catalogId}," + + " #{tableMeta.schemaId}," + + " #{tableMeta.auditInfo}," + + " #{tableMeta.currentVersion}," + + " #{tableMeta.lastVersion}," + + " #{tableMeta.deletedAt}" + + " )"; + } + + public String insertTableMetaOnDuplicateKeyUpdate(@Param("tableMeta") TablePO tablePO) { + return "INSERT INTO " + + TABLE_NAME + + "(table_id, table_name, metalake_id," + + " catalog_id, schema_id, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{tableMeta.tableId}," + + " #{tableMeta.tableName}," + + " #{tableMeta.metalakeId}," + + " #{tableMeta.catalogId}," + + " #{tableMeta.schemaId}," + + " #{tableMeta.auditInfo}," + + " #{tableMeta.currentVersion}," + + " #{tableMeta.lastVersion}," + + " #{tableMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " table_name = #{tableMeta.tableName}," + + " metalake_id = #{tableMeta.metalakeId}," + + " catalog_id = #{tableMeta.catalogId}," + + " schema_id = #{tableMeta.schemaId}," + + " audit_info = #{tableMeta.auditInfo}," + + " current_version = #{tableMeta.currentVersion}," + + " last_version = #{tableMeta.lastVersion}," + + " deleted_at = #{tableMeta.deletedAt}"; + } + + public String updateTableMeta( + @Param("newTableMeta") TablePO newTablePO, @Param("oldTableMeta") TablePO oldTablePO) { + return "UPDATE " + + TABLE_NAME + + " SET table_name = #{newTableMeta.tableName}," + + " metalake_id = #{newTableMeta.metalakeId}," + + " catalog_id = #{newTableMeta.catalogId}," + + " schema_id = #{newTableMeta.schemaId}," + + " audit_info = #{newTableMeta.auditInfo}," + + " current_version = #{newTableMeta.currentVersion}," + + " last_version = #{newTableMeta.lastVersion}," + + " deleted_at = #{newTableMeta.deletedAt}" + + " WHERE table_id = #{oldTableMeta.tableId}" + + " AND table_name = #{oldTableMeta.tableName}" + + " AND metalake_id = #{oldTableMeta.metalakeId}" + + " AND catalog_id = #{oldTableMeta.catalogId}" + + " AND schema_id = #{oldTableMeta.schemaId}" + + " AND audit_info = #{oldTableMeta.auditInfo}" + + " AND current_version = #{oldTableMeta.currentVersion}" + + " AND last_version = #{oldTableMeta.lastVersion}" + + " AND deleted_at = 0"; + } + + public String softDeleteTableMetasByTableId(@Param("tableId") Long tableId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE table_id = #{tableId} AND deleted_at = 0"; + } + + public String softDeleteTableMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String softDeleteTableMetasByCatalogId(@Param("catalogId") Long catalogId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String softDeleteTableMetasBySchemaId(@Param("schemaId") Long schemaId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String deleteTableMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}"; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java index 98edc595fd7..a5224593765 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java @@ -21,11 +21,11 @@ import java.util.List; import org.apache.gravitino.storage.relational.po.TablePO; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; /** * A MyBatis Mapper for table meta operation SQLs. @@ -38,154 +38,59 @@ public interface TableMetaMapper { String TABLE_NAME = "table_meta"; - @Select( - "SELECT table_id as tableId, table_name as tableName," - + " metalake_id as metalakeId, catalog_id as catalogId," - + " schema_id as schemaId, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @SelectProvider(type = TableMetaSQLProviderFactory.class, method = "listTablePOsBySchemaId") List listTablePOsBySchemaId(@Param("schemaId") Long schemaId); - @Select( - "SELECT table_id as tableId FROM " - + TABLE_NAME - + " WHERE schema_id = #{schemaId} AND table_name = #{tableName}" - + " AND deleted_at = 0") + @SelectProvider( + type = TableMetaSQLProviderFactory.class, + method = "selectTableIdBySchemaIdAndName") Long selectTableIdBySchemaIdAndName( @Param("schemaId") Long schemaId, @Param("tableName") String name); - @Select( - "SELECT table_id as tableId, table_name as tableName," - + " metalake_id as metalakeId, catalog_id as catalogId," - + " schema_id as schemaId, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE schema_id = #{schemaId} AND table_name = #{tableName} AND deleted_at = 0") + @SelectProvider( + type = TableMetaSQLProviderFactory.class, + method = "selectTableMetaBySchemaIdAndName") TablePO selectTableMetaBySchemaIdAndName( @Param("schemaId") Long schemaId, @Param("tableName") String name); - @Select( - "SELECT table_id as tableId, table_name as tableName," - + " metalake_id as metalakeId, catalog_id as catalogId," - + " schema_id as schemaId, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE table_id = #{tableId} AND deleted_at = 0") + @SelectProvider(type = TableMetaSQLProviderFactory.class, method = "selectTableMetaById") TablePO selectTableMetaById(@Param("tableId") Long tableId); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(table_id, table_name, metalake_id," - + " catalog_id, schema_id, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{tableMeta.tableId}," - + " #{tableMeta.tableName}," - + " #{tableMeta.metalakeId}," - + " #{tableMeta.catalogId}," - + " #{tableMeta.schemaId}," - + " #{tableMeta.auditInfo}," - + " #{tableMeta.currentVersion}," - + " #{tableMeta.lastVersion}," - + " #{tableMeta.deletedAt}" - + " )") + @InsertProvider(type = TableMetaSQLProviderFactory.class, method = "insertTableMeta") void insertTableMeta(@Param("tableMeta") TablePO tablePO); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(table_id, table_name, metalake_id," - + " catalog_id, schema_id, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{tableMeta.tableId}," - + " #{tableMeta.tableName}," - + " #{tableMeta.metalakeId}," - + " #{tableMeta.catalogId}," - + " #{tableMeta.schemaId}," - + " #{tableMeta.auditInfo}," - + " #{tableMeta.currentVersion}," - + " #{tableMeta.lastVersion}," - + " #{tableMeta.deletedAt}" - + " )" - + " ON DUPLICATE KEY UPDATE" - + " table_name = #{tableMeta.tableName}," - + " metalake_id = #{tableMeta.metalakeId}," - + " catalog_id = #{tableMeta.catalogId}," - + " schema_id = #{tableMeta.schemaId}," - + " audit_info = #{tableMeta.auditInfo}," - + " current_version = #{tableMeta.currentVersion}," - + " last_version = #{tableMeta.lastVersion}," - + " deleted_at = #{tableMeta.deletedAt}") + @InsertProvider( + type = TableMetaSQLProviderFactory.class, + method = "insertTableMetaOnDuplicateKeyUpdate") void insertTableMetaOnDuplicateKeyUpdate(@Param("tableMeta") TablePO tablePO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET table_name = #{newTableMeta.tableName}," - + " metalake_id = #{newTableMeta.metalakeId}," - + " catalog_id = #{newTableMeta.catalogId}," - + " schema_id = #{newTableMeta.schemaId}," - + " audit_info = #{newTableMeta.auditInfo}," - + " current_version = #{newTableMeta.currentVersion}," - + " last_version = #{newTableMeta.lastVersion}," - + " deleted_at = #{newTableMeta.deletedAt}" - + " WHERE table_id = #{oldTableMeta.tableId}" - + " AND table_name = #{oldTableMeta.tableName}" - + " AND metalake_id = #{oldTableMeta.metalakeId}" - + " AND catalog_id = #{oldTableMeta.catalogId}" - + " AND schema_id = #{oldTableMeta.schemaId}" - + " AND audit_info = #{oldTableMeta.auditInfo}" - + " AND current_version = #{oldTableMeta.currentVersion}" - + " AND last_version = #{oldTableMeta.lastVersion}" - + " AND deleted_at = 0") + @UpdateProvider(type = TableMetaSQLProviderFactory.class, method = "updateTableMeta") Integer updateTableMeta( @Param("newTableMeta") TablePO newTablePO, @Param("oldTableMeta") TablePO oldTablePO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE table_id = #{tableId} AND deleted_at = 0") + @UpdateProvider( + type = TableMetaSQLProviderFactory.class, + method = "softDeleteTableMetasByTableId") Integer softDeleteTableMetasByTableId(@Param("tableId") Long tableId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @UpdateProvider( + type = TableMetaSQLProviderFactory.class, + method = "softDeleteTableMetasByMetalakeId") Integer softDeleteTableMetasByMetalakeId(@Param("metalakeId") Long metalakeId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @UpdateProvider( + type = TableMetaSQLProviderFactory.class, + method = "softDeleteTableMetasByCatalogId") Integer softDeleteTableMetasByCatalogId(@Param("catalogId") Long catalogId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @UpdateProvider( + type = TableMetaSQLProviderFactory.class, + method = "softDeleteTableMetasBySchemaId") Integer softDeleteTableMetasBySchemaId(@Param("schemaId") Long schemaId); - @Delete( - "DELETE FROM " - + TABLE_NAME - + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}") + @DeleteProvider( + type = TableMetaSQLProviderFactory.class, + method = "deleteTableMetasByLegacyTimeline") Integer deleteTableMetasByLegacyTimeline( @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit); } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java new file mode 100644 index 00000000000..833ba9a059d --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.mapper; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; +import org.apache.gravitino.storage.relational.po.TablePO; +import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import org.apache.ibatis.annotations.Param; + +public class TableMetaSQLProviderFactory { + + private static final Map + METALAKE_META_SQL_PROVIDER_MAP = + ImmutableMap.of( + JDBCBackendType.MYSQL, new TableMetaMySQLProvider(), + JDBCBackendType.H2, new TableMetaH2Provider()); + + public static TableMetaBaseSQLProvider getProvider() { + String databaseId = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getDatabaseId(); + + JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId); + return METALAKE_META_SQL_PROVIDER_MAP.get(jdbcBackendType); + } + + static class TableMetaMySQLProvider extends TableMetaBaseSQLProvider {} + + static class TableMetaH2Provider extends TableMetaBaseSQLProvider {} + + public static String listTablePOsBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().listTablePOsBySchemaId(schemaId); + } + + public static String selectTableIdBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("tableName") String name) { + return getProvider().selectTableIdBySchemaIdAndName(schemaId, name); + } + + public static String selectTableMetaBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("tableName") String name) { + return getProvider().selectTableMetaBySchemaIdAndName(schemaId, name); + } + + public static String selectTableMetaById(@Param("tableId") Long tableId) { + return getProvider().selectTableMetaById(tableId); + } + + public static String insertTableMeta(@Param("tableMeta") TablePO tablePO) { + return getProvider().insertTableMeta(tablePO); + } + + public static String insertTableMetaOnDuplicateKeyUpdate(@Param("tableMeta") TablePO tablePO) { + return getProvider().insertTableMetaOnDuplicateKeyUpdate(tablePO); + } + + public static String updateTableMeta( + @Param("newTableMeta") TablePO newTablePO, @Param("oldTableMeta") TablePO oldTablePO) { + return getProvider().updateTableMeta(newTablePO, oldTablePO); + } + + public static String softDeleteTableMetasByTableId(@Param("tableId") Long tableId) { + return getProvider().softDeleteTableMetasByTableId(tableId); + } + + public static String softDeleteTableMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().softDeleteTableMetasByMetalakeId(metalakeId); + } + + public static String softDeleteTableMetasByCatalogId(@Param("catalogId") Long catalogId) { + return getProvider().softDeleteTableMetasByCatalogId(catalogId); + } + + public static String softDeleteTableMetasBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().softDeleteTableMetasBySchemaId(schemaId); + } + + public static String deleteTableMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return getProvider().deleteTableMetasByLegacyTimeline(legacyTimeline, limit); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaBaseSQLProvider.java new file mode 100644 index 00000000000..aa46bbd7f9c --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaBaseSQLProvider.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.mapper; + +import static org.apache.gravitino.storage.relational.mapper.TopicMetaMapper.TABLE_NAME; + +import org.apache.gravitino.storage.relational.po.TopicPO; +import org.apache.ibatis.annotations.Param; + +public class TopicMetaBaseSQLProvider { + + public String insertTopicMeta(@Param("topicMeta") TopicPO topicPO) { + return "INSERT INTO " + + TABLE_NAME + + "(topic_id, topic_name, metalake_id, catalog_id, schema_id," + + " comment, properties, audit_info, current_version, last_version," + + " deleted_at)" + + " VALUES(" + + " #{topicMeta.topicId}," + + " #{topicMeta.topicName}," + + " #{topicMeta.metalakeId}," + + " #{topicMeta.catalogId}," + + " #{topicMeta.schemaId}," + + " #{topicMeta.comment}," + + " #{topicMeta.properties}," + + " #{topicMeta.auditInfo}," + + " #{topicMeta.currentVersion}," + + " #{topicMeta.lastVersion}," + + " #{topicMeta.deletedAt}" + + " )"; + } + + public String insertTopicMetaOnDuplicateKeyUpdate(@Param("topicMeta") TopicPO topicPO) { + return "INSERT INTO " + + TABLE_NAME + + "(topic_id, topic_name, metalake_id, catalog_id, schema_id," + + " comment, properties, audit_info, current_version, last_version," + + " deleted_at)" + + " VALUES(" + + " #{topicMeta.topicId}," + + " #{topicMeta.topicName}," + + " #{topicMeta.metalakeId}," + + " #{topicMeta.catalogId}," + + " #{topicMeta.schemaId}," + + " #{topicMeta.comment}," + + " #{topicMeta.properties}," + + " #{topicMeta.auditInfo}," + + " #{topicMeta.currentVersion}," + + " #{topicMeta.lastVersion}," + + " #{topicMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " topic_name = #{topicMeta.topicName}," + + " metalake_id = #{topicMeta.metalakeId}," + + " catalog_id = #{topicMeta.catalogId}," + + " schema_id = #{topicMeta.schemaId}," + + " comment = #{topicMeta.comment}," + + " properties = #{topicMeta.properties}," + + " audit_info = #{topicMeta.auditInfo}," + + " current_version = #{topicMeta.currentVersion}," + + " last_version = #{topicMeta.lastVersion}," + + " deleted_at = #{topicMeta.deletedAt}"; + } + + public String listTopicPOsBySchemaId(@Param("schemaId") Long schemaId) { + return "SELECT topic_id as topicId, topic_name as topicName, metalake_id as metalakeId," + + " catalog_id as catalogId, schema_id as schemaId," + + " comment as comment, properties as properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String selectTopicMetaBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("topicName") String topicName) { + return "SELECT topic_id as topicId, topic_name as topicName," + + " metalake_id as metalakeId, catalog_id as catalogId, schema_id as schemaId," + + " comment as comment, properties as properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND topic_name = #{topicName} AND deleted_at = 0"; + } + + public String selectTopicMetaById(@Param("topicId") Long topicId) { + return "SELECT topic_id as topicId, topic_name as topicName," + + " metalake_id as metalakeId, catalog_id as catalogId, schema_id as schemaId," + + " comment as comment, properties as properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE topic_id = #{topicId} AND deleted_at = 0"; + } + + public String updateTopicMeta( + @Param("newTopicMeta") TopicPO newTopicPO, @Param("oldTopicMeta") TopicPO oldTopicPO) { + return "UPDATE " + + TABLE_NAME + + " SET topic_name = #{newTopicMeta.topicName}," + + " metalake_id = #{newTopicMeta.metalakeId}," + + " catalog_id = #{newTopicMeta.catalogId}," + + " schema_id = #{newTopicMeta.schemaId}," + + " comment = #{newTopicMeta.comment}," + + " properties = #{newTopicMeta.properties}," + + " audit_info = #{newTopicMeta.auditInfo}," + + " current_version = #{newTopicMeta.currentVersion}," + + " last_version = #{newTopicMeta.lastVersion}," + + " deleted_at = #{newTopicMeta.deletedAt}" + + " WHERE topic_id = #{oldTopicMeta.topicId}" + + " AND topic_name = #{oldTopicMeta.topicName}" + + " AND metalake_id = #{oldTopicMeta.metalakeId}" + + " AND catalog_id = #{oldTopicMeta.catalogId}" + + " AND schema_id = #{oldTopicMeta.schemaId}" + + " AND comment = #{oldTopicMeta.comment}" + + " AND properties = #{oldTopicMeta.properties}" + + " AND audit_info = #{oldTopicMeta.auditInfo}" + + " AND current_version = #{oldTopicMeta.currentVersion}" + + " AND last_version = #{oldTopicMeta.lastVersion}" + + " AND deleted_at = 0"; + } + + public String selectTopicIdBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("topicName") String name) { + return "SELECT topic_id as topicId FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND topic_name = #{topicName}" + + " AND deleted_at = 0"; + } + + public String softDeleteTopicMetasByTopicId(@Param("topicId") Long topicId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE topic_id = #{topicId} AND deleted_at = 0"; + } + + public String softDeleteTopicMetasByCatalogId(@Param("catalogId") Long catalogId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String softDeleteTopicMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String softDeleteTopicMetasBySchemaId(@Param("schemaId") Long schemaId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String deleteTopicMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}"; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java index d8d08286e38..8c194caff4f 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java @@ -20,173 +20,68 @@ import java.util.List; import org.apache.gravitino.storage.relational.po.TopicPO; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; public interface TopicMetaMapper { String TABLE_NAME = "topic_meta"; - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(topic_id, topic_name, metalake_id, catalog_id, schema_id," - + " comment, properties, audit_info, current_version, last_version," - + " deleted_at)" - + " VALUES(" - + " #{topicMeta.topicId}," - + " #{topicMeta.topicName}," - + " #{topicMeta.metalakeId}," - + " #{topicMeta.catalogId}," - + " #{topicMeta.schemaId}," - + " #{topicMeta.comment}," - + " #{topicMeta.properties}," - + " #{topicMeta.auditInfo}," - + " #{topicMeta.currentVersion}," - + " #{topicMeta.lastVersion}," - + " #{topicMeta.deletedAt}" - + " )") + @InsertProvider(type = TopicMetaSQLProviderFactory.class, method = "insertTopicMeta") void insertTopicMeta(@Param("topicMeta") TopicPO topicPO); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(topic_id, topic_name, metalake_id, catalog_id, schema_id," - + " comment, properties, audit_info, current_version, last_version," - + " deleted_at)" - + " VALUES(" - + " #{topicMeta.topicId}," - + " #{topicMeta.topicName}," - + " #{topicMeta.metalakeId}," - + " #{topicMeta.catalogId}," - + " #{topicMeta.schemaId}," - + " #{topicMeta.comment}," - + " #{topicMeta.properties}," - + " #{topicMeta.auditInfo}," - + " #{topicMeta.currentVersion}," - + " #{topicMeta.lastVersion}," - + " #{topicMeta.deletedAt}" - + " )" - + " ON DUPLICATE KEY UPDATE" - + " topic_name = #{topicMeta.topicName}," - + " metalake_id = #{topicMeta.metalakeId}," - + " catalog_id = #{topicMeta.catalogId}," - + " schema_id = #{topicMeta.schemaId}," - + " comment = #{topicMeta.comment}," - + " properties = #{topicMeta.properties}," - + " audit_info = #{topicMeta.auditInfo}," - + " current_version = #{topicMeta.currentVersion}," - + " last_version = #{topicMeta.lastVersion}," - + " deleted_at = #{topicMeta.deletedAt}") + @InsertProvider( + type = TopicMetaSQLProviderFactory.class, + method = "insertTopicMetaOnDuplicateKeyUpdate") void insertTopicMetaOnDuplicateKeyUpdate(@Param("topicMeta") TopicPO topicPO); - @Select( - "SELECT topic_id as topicId, topic_name as topicName, metalake_id as metalakeId," - + " catalog_id as catalogId, schema_id as schemaId," - + " comment as comment, properties as properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @SelectProvider(type = TopicMetaSQLProviderFactory.class, method = "listTopicPOsBySchemaId") List listTopicPOsBySchemaId(@Param("schemaId") Long schemaId); - @Select( - "SELECT topic_id as topicId, topic_name as topicName," - + " metalake_id as metalakeId, catalog_id as catalogId, schema_id as schemaId," - + " comment as comment, properties as properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE schema_id = #{schemaId} AND topic_name = #{topicName} AND deleted_at = 0") + @SelectProvider( + type = TopicMetaSQLProviderFactory.class, + method = "selectTopicMetaBySchemaIdAndName") TopicPO selectTopicMetaBySchemaIdAndName( @Param("schemaId") Long schemaId, @Param("topicName") String topicName); - @Select( - "SELECT topic_id as topicId, topic_name as topicName," - + " metalake_id as metalakeId, catalog_id as catalogId, schema_id as schemaId," - + " comment as comment, properties as properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE topic_id = #{topicId} AND deleted_at = 0") + @SelectProvider(type = TopicMetaSQLProviderFactory.class, method = "selectTopicMetaById") TopicPO selectTopicMetaById(@Param("topicId") Long topicId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET topic_name = #{newTopicMeta.topicName}," - + " metalake_id = #{newTopicMeta.metalakeId}," - + " catalog_id = #{newTopicMeta.catalogId}," - + " schema_id = #{newTopicMeta.schemaId}," - + " comment = #{newTopicMeta.comment}," - + " properties = #{newTopicMeta.properties}," - + " audit_info = #{newTopicMeta.auditInfo}," - + " current_version = #{newTopicMeta.currentVersion}," - + " last_version = #{newTopicMeta.lastVersion}," - + " deleted_at = #{newTopicMeta.deletedAt}" - + " WHERE topic_id = #{oldTopicMeta.topicId}" - + " AND topic_name = #{oldTopicMeta.topicName}" - + " AND metalake_id = #{oldTopicMeta.metalakeId}" - + " AND catalog_id = #{oldTopicMeta.catalogId}" - + " AND schema_id = #{oldTopicMeta.schemaId}" - + " AND comment = #{oldTopicMeta.comment}" - + " AND properties = #{oldTopicMeta.properties}" - + " AND audit_info = #{oldTopicMeta.auditInfo}" - + " AND current_version = #{oldTopicMeta.currentVersion}" - + " AND last_version = #{oldTopicMeta.lastVersion}" - + " AND deleted_at = 0") + @UpdateProvider(type = TopicMetaSQLProviderFactory.class, method = "updateTopicMeta") Integer updateTopicMeta( @Param("newTopicMeta") TopicPO newTopicPO, @Param("oldTopicMeta") TopicPO oldTopicPO); - @Select( - "SELECT topic_id as topicId FROM " - + TABLE_NAME - + " WHERE schema_id = #{schemaId} AND topic_name = #{topicName}" - + " AND deleted_at = 0") + @SelectProvider( + type = TopicMetaSQLProviderFactory.class, + method = "selectTopicIdBySchemaIdAndName") Long selectTopicIdBySchemaIdAndName( @Param("schemaId") Long schemaId, @Param("topicName") String name); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE topic_id = #{topicId} AND deleted_at = 0") + @UpdateProvider( + type = TopicMetaSQLProviderFactory.class, + method = "softDeleteTopicMetasByTopicId") Integer softDeleteTopicMetasByTopicId(@Param("topicId") Long topicId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @UpdateProvider( + type = TopicMetaSQLProviderFactory.class, + method = "softDeleteTopicMetasByCatalogId") Integer softDeleteTopicMetasByCatalogId(@Param("catalogId") Long catalogId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @UpdateProvider( + type = TopicMetaSQLProviderFactory.class, + method = "softDeleteTopicMetasByMetalakeId") Integer softDeleteTopicMetasByMetalakeId(@Param("metalakeId") Long metalakeId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @UpdateProvider( + type = TopicMetaSQLProviderFactory.class, + method = "softDeleteTopicMetasBySchemaId") Integer softDeleteTopicMetasBySchemaId(@Param("schemaId") Long schemaId); - @Delete( - "DELETE FROM " - + TABLE_NAME - + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}") + @DeleteProvider( + type = TopicMetaSQLProviderFactory.class, + method = "deleteTopicMetasByLegacyTimeline") Integer deleteTopicMetasByLegacyTimeline( @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit); } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java new file mode 100644 index 00000000000..9a417e011a7 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.mapper; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; +import org.apache.gravitino.storage.relational.po.TopicPO; +import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import org.apache.ibatis.annotations.Param; + +public class TopicMetaSQLProviderFactory { + + private static final Map + METALAKE_META_SQL_PROVIDER_MAP = + ImmutableMap.of( + JDBCBackendType.MYSQL, new TopicMetaMySQLProvider(), + JDBCBackendType.H2, new TopicMetaH2Provider()); + + public static TopicMetaBaseSQLProvider getProvider() { + String databaseId = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getDatabaseId(); + + JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId); + return METALAKE_META_SQL_PROVIDER_MAP.get(jdbcBackendType); + } + + static class TopicMetaMySQLProvider extends TopicMetaBaseSQLProvider {} + + static class TopicMetaH2Provider extends TopicMetaBaseSQLProvider {} + + public static String insertTopicMeta(@Param("topicMeta") TopicPO topicPO) { + return getProvider().insertTopicMeta(topicPO); + } + + public static String insertTopicMetaOnDuplicateKeyUpdate(@Param("topicMeta") TopicPO topicPO) { + return getProvider().insertTopicMetaOnDuplicateKeyUpdate(topicPO); + } + + public static String listTopicPOsBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().listTopicPOsBySchemaId(schemaId); + } + + public static String selectTopicMetaBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("topicName") String topicName) { + return getProvider().selectTopicMetaBySchemaIdAndName(schemaId, topicName); + } + + public static String selectTopicMetaById(@Param("topicId") Long topicId) { + return getProvider().selectTopicMetaById(topicId); + } + + public static String updateTopicMeta( + @Param("newTopicMeta") TopicPO newTopicPO, @Param("oldTopicMeta") TopicPO oldTopicPO) { + return getProvider().updateTopicMeta(newTopicPO, oldTopicPO); + } + + public static String selectTopicIdBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("topicName") String name) { + return getProvider().selectTopicIdBySchemaIdAndName(schemaId, name); + } + + public static String softDeleteTopicMetasByTopicId(@Param("topicId") Long topicId) { + return getProvider().softDeleteTopicMetasByTopicId(topicId); + } + + public static String softDeleteTopicMetasByCatalogId(@Param("catalogId") Long catalogId) { + return getProvider().softDeleteTopicMetasByCatalogId(catalogId); + } + + public static String softDeleteTopicMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().softDeleteTopicMetasByMetalakeId(metalakeId); + } + + public static String softDeleteTopicMetasBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().softDeleteTopicMetasBySchemaId(schemaId); + } + + public static String deleteTopicMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return getProvider().deleteTopicMetasByLegacyTimeline(legacyTimeline, limit); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java b/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java index 684252f17ee..9d928271623 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java @@ -26,6 +26,7 @@ import org.apache.commons.pool2.impl.BaseObjectPoolConfig; import org.apache.gravitino.Config; import org.apache.gravitino.Configs; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper; import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper; import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper; @@ -73,7 +74,9 @@ private SqlSessionFactoryHelper() {} public void init(Config config) { // Initialize the data source BasicDataSource dataSource = new BasicDataSource(); - dataSource.setUrl(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL)); + String jdbcUrl = config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL); + JDBCBackendType jdbcType = JDBCBackendType.fromURI(jdbcUrl); + dataSource.setUrl(jdbcUrl); dataSource.setDriverClassName(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)); dataSource.setUsername(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER)); dataSource.setPassword(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD)); @@ -102,6 +105,7 @@ public void init(Config config) { // Initialize the configuration Configuration configuration = new Configuration(environment); + configuration.setDatabaseId(jdbcType.name().toLowerCase()); configuration.addMapper(MetalakeMetaMapper.class); configuration.addMapper(CatalogMetaMapper.class); configuration.addMapper(SchemaMetaMapper.class);