Skip to content

Commit

Permalink
[Optimize ]Optimize Paimon log printing that can cause ambiguity (Dat…
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyan1998 authored Jan 11, 2024
1 parent 1ca1dfe commit 2b71dfe
Showing 1 changed file with 31 additions and 15 deletions.
46 changes: 31 additions & 15 deletions dinky-admin/src/main/java/org/dinky/utils/PaimonUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,42 @@
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.URLUtil;
import cn.hutool.json.JSONUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Data
public class PaimonUtil {
private static final Cache<Class<?>, Schema> SCHEMA_CACHE = CacheUtil.newLRUCache(100);
private static final CatalogContext CONTEXT =
CatalogContext.create(new Path(URLUtil.toURI(URLUtil.url(PathConstant.TMP_PATH + "paimon"))));
private static final Catalog CATALOG = CatalogFactory.createCatalog(CONTEXT);

static {
private static PaimonUtil instance;

private final Cache<Class<?>, Schema> schemaCache;
private final CatalogContext context;
private final Catalog catalog;

public PaimonUtil() {
schemaCache = CacheUtil.newLRUCache(100);
context = CatalogContext.create(new Path(URLUtil.toURI(URLUtil.url(PathConstant.TMP_PATH + "paimon"))));
catalog = CatalogFactory.createCatalog(context);
try {
CATALOG.createDatabase(DINKY_DB, true);
catalog.createDatabase(DINKY_DB, true);
} catch (Catalog.DatabaseAlreadyExistException e) {
throw new RuntimeException(e);
}
}

public static synchronized PaimonUtil getInstance() {
if (instance == null) {
instance = new PaimonUtil();
}
return instance;
}

public static void dropTable(String table) {
Identifier identifier = Identifier.create(DINKY_DB, table);
if (CATALOG.tableExists(identifier)) {
if (getInstance().getCatalog().tableExists(identifier)) {
try {
CATALOG.dropTable(identifier, true);
getInstance().getCatalog().dropTable(identifier, true);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -122,6 +136,8 @@ public static <T> void write(String table, List<T> dataList, Class<?> clazz) {
String fieldName = StrUtil.toCamelCase(dataField.name());
Object fieldValue = ReflectUtil.getFieldValue(t, fieldName);
try {
// TODO BinaryWriter.write已被废弃,后续可以考虑改成这种方式
// BinaryWriter.createValueSetter(type).setValue(writer, i, fieldValue);
if (type.getTypeRoot() == DataTypeRoot.VARCHAR) {
BinaryWriter.write(
writer, i, BinaryString.fromString(JSONUtil.toJsonStr(fieldValue)), type, null);
Expand Down Expand Up @@ -169,10 +185,10 @@ public static <T> List<T> batchReadTable(

ReadBuilder readBuilder;
try {
if (!CATALOG.tableExists(identifier)) {
if (!getInstance().getCatalog().tableExists(identifier)) {
return dataList;
}
readBuilder = CATALOG.getTable(identifier).newReadBuilder();
readBuilder = getInstance().getCatalog().getTable(identifier).newReadBuilder();
if (filter != null) {
List<Predicate> predicates = filter.apply(builder);
readBuilder.withFilter(predicates);
Expand Down Expand Up @@ -218,18 +234,18 @@ public static <T> List<T> batchReadTable(
public static Table createOrGetTable(String tableName, Class<?> clazz) {
try {
Identifier identifier = Identifier.create(DINKY_DB, tableName);
if (CATALOG.tableExists(identifier)) {
return CATALOG.getTable(identifier);
if (getInstance().getCatalog().tableExists(identifier)) {
return getInstance().getCatalog().getTable(identifier);
}
CATALOG.createTable(identifier, getSchemaByClass(clazz), false);
return CATALOG.getTable(identifier);
getInstance().getCatalog().createTable(identifier, getSchemaByClass(clazz), false);
return getInstance().getCatalog().getTable(identifier);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static Schema getSchemaByClass(Class<?> clazz) {
return SCHEMA_CACHE.get(clazz, () -> {
return getInstance().getSchemaCache().get(clazz, () -> {
List<String> primaryKeys = new ArrayList<>();
List<String> partitionKeys = new ArrayList<>();
Schema.Builder builder = Schema.newBuilder();
Expand Down

0 comments on commit 2b71dfe

Please sign in to comment.