Skip to content

Commit

Permalink
[opt](rowcount) refresh external table's rowcount async
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Mar 29, 2024
1 parent 50815b1 commit b5cded1
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public ExternalMetaCacheMgr() {
"ExternalMetaCacheMgr", 120, true);
hudiPartitionMgr = HudiPartitionMgr.get(executor);
fsCache = new FileSystemCache(executor);
rowCountCache = new ExternalRowCountCache(executor);
rowCountCache = new ExternalRowCountCache(executor,
Config.external_cache_expire_time_minutes_after_access * 60, null);
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr();
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -37,14 +38,19 @@ public class ExternalRowCountCache {
private static final Logger LOG = LogManager.getLogger(ExternalRowCountCache.class);
private final AsyncLoadingCache<RowCountKey, Optional<Long>> rowCountCache;

public ExternalRowCountCache(ExecutorService executor) {
public ExternalRowCountCache(ExecutorService executor, long refreshAfterWriteSeconds,
BasicAsyncCacheLoader<RowCountKey, Optional<Long>> loader) {
// 1. set expireAfterWrite to 1 day, avoid too many entries
// 2. set refreshAfterWrite to 10min(default), so that the cache will be refreshed after 10min
rowCountCache = Caffeine.newBuilder()
.maximumSize(Config.max_external_table_row_count_cache_num)
.expireAfterWrite(Duration.ofMinutes(Config.external_cache_expire_time_minutes_after_access))
.expireAfterAccess(Duration.ofDays(1))
.refreshAfterWrite(Duration.ofSeconds(refreshAfterWriteSeconds))
.executor(executor)
.buildAsync(new RowCountCacheLoader());
.buildAsync(loader == null ? new RowCountCacheLoader() : loader);
}

@Getter
public static class RowCountKey {
private final long catalogId;
private final long dbId;
Expand Down Expand Up @@ -74,7 +80,6 @@ public int hashCode() {
}

public static class RowCountCacheLoader extends BasicAsyncCacheLoader<RowCountKey, Optional<Long>> {

@Override
protected Optional<Long> doLoad(RowCountKey rowCountKey) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.apache.doris.datasource;

import org.apache.doris.datasource.ExternalRowCountCache.RowCountKey;
import org.apache.doris.statistics.BasicAsyncCacheLoader;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class ExternalRowCountCacheTest {
private ExternalRowCountCache cache;
private ExecutorService executorService;

public static class TestLoader extends BasicAsyncCacheLoader<RowCountKey, Optional<Long>> {

private AtomicLong incr = new AtomicLong(333);

@Override
protected Optional<Long> doLoad(RowCountKey rowCountKey) {
if (rowCountKey.getTableId() == 1) {
return Optional.of(111L);
} else if (rowCountKey.getTableId() == 2) {
return Optional.of(222L);
} else if (rowCountKey.getTableId() == 3) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("load: " + incr.get());
return Optional.of(incr.incrementAndGet());
}
return Optional.empty();
}
}

@BeforeEach
public void setUp() {
executorService = Executors.newFixedThreadPool(2);
cache = new ExternalRowCountCache(executorService, 2, new TestLoader());
}

@Test
public void test() throws Exception {
// table 1
long rowCount = cache.getCachedRowCount(1, 1, 1);
Assertions.assertEquals(0, rowCount);
Thread.sleep(1000);
rowCount = cache.getCachedRowCount(1, 1, 1);
Assertions.assertEquals(111, rowCount);

// table 2
rowCount = cache.getCachedRowCount(1, 1, 2);
Assertions.assertEquals(0, rowCount);
Thread.sleep(1000);
rowCount = cache.getCachedRowCount(1, 1, 2);
Assertions.assertEquals(222, rowCount);

// table 3
rowCount = cache.getCachedRowCount(1, 1, 3);
// first get, it should be 0 because the loader is async
Assertions.assertEquals(0, rowCount);
// After sleep 2 sec and then get, it should be 1
Thread.sleep(2000);
rowCount = cache.getCachedRowCount(1, 1, 3);
Assertions.assertEquals(334, rowCount);
// sleep 3 sec to trigger refresh
Thread.sleep(3000);
rowCount = cache.getCachedRowCount(1, 1, 3);
// the refresh will be triggered only when query it, so it should still be 1
Assertions.assertEquals(334, rowCount);
// sleep 2 sec to wait for the doLoad
Thread.sleep(2000);
rowCount = cache.getCachedRowCount(1, 1, 3);
// refresh done, value should be 2
Assertions.assertEquals(335, rowCount);
}
}

0 comments on commit b5cded1

Please sign in to comment.