Skip to content

Commit

Permalink
table-cache: common table methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Lavrukov committed Apr 8, 2024
1 parent 6fbd377 commit b034465
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tech.ydb.yoj.databind.expression.OrderExpression;
import tech.ydb.yoj.databind.schema.ObjectSchema;
import tech.ydb.yoj.databind.schema.Schema;
import tech.ydb.yoj.repository.db.CommonTable;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.EntityExpressions;
import tech.ydb.yoj.repository.db.EntityIdSchema;
Expand Down Expand Up @@ -184,6 +185,11 @@ public <V extends View> V find(Class<V> viewType, Entity.Id<T> id) {
return transaction.doInTransaction("find(" + id + ")", type, shard -> shard.find(id, viewType));
}

@Override
public <ID extends Entity.Id<T>> List<T> find(Set<ID> ids) {
return CommonTable.find(transaction.getTransactionLocal(), this, ids);
}

@Override
@SuppressWarnings("unchecked")
public <ID extends Entity.Id<T>> List<T> find(Range<ID> range) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import lombok.NonNull;
import tech.ydb.yoj.databind.expression.FilterExpression;
import tech.ydb.yoj.databind.expression.OrderExpression;
import tech.ydb.yoj.repository.db.CommonTable;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.Entity.Id;
import tech.ydb.yoj.repository.db.EntityIdSchema;
Expand All @@ -16,7 +17,6 @@
import tech.ydb.yoj.repository.db.Tx;
import tech.ydb.yoj.repository.db.ViewSchema;
import tech.ydb.yoj.repository.db.bulk.BulkParams;
import tech.ydb.yoj.repository.db.cache.FirstLevelCache;
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
import tech.ydb.yoj.repository.db.statement.Changeset;
Expand Down Expand Up @@ -235,6 +235,11 @@ public <V extends View> V find(Class<V> viewType, Entity.Id<T> id) {
return res.isEmpty() ? null : res.get(0);
}

@Override
public <ID extends Id<T>> List<T> find(Set<ID> ids) {
return CommonTable.find(executor.getTransactionLocal(), this, ids);
}

@Override
public <ID extends Entity.Id<T>> List<T> find(Range<ID> range) {
return postLoad(executor.execute(YqlStatement.findRange(type, range), range));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import lombok.NonNull;
import tech.ydb.yoj.databind.expression.FilterExpression;
import tech.ydb.yoj.databind.expression.OrderExpression;
import tech.ydb.yoj.repository.db.CommonTable;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.Entity.Id;
import tech.ydb.yoj.repository.db.EntityIdSchema;
Expand Down Expand Up @@ -234,6 +235,11 @@ public <V extends View> V find(Class<V> viewType, Entity.Id<T> id) {
return res.isEmpty() ? null : res.get(0);
}

@Override
public <ID extends Id<T>> List<T> find(Set<ID> ids) {
return CommonTable.find(executor.getTransactionLocal(), this, ids);
}

@Override
public <ID extends Entity.Id<T>> List<T> find(Range<ID> range) {
return postLoad(executor.execute(YqlStatement.findRange(type, range), range));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package tech.ydb.yoj.repository.db;

import com.google.common.collect.Sets;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import tech.ydb.yoj.repository.db.cache.TransactionLocal;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toSet;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class CommonTable {
public static <E extends Entity<E>, ID extends Entity.Id<E>> List<E> find(
TransactionLocal transactionLocal, Table<E> table, Set<ID> ids) {
if (ids.isEmpty()) {
return List.of();
}

var orderBy = EntityExpressions.defaultOrder(table.getType());
var cache = Tx.Current.get().getRepositoryTransaction().getTransactionLocal().firstLevelCache();
var isPartialIdMode = ids.iterator().next().isPartial();

var foundInCache = ids.stream()
.filter(cache::containsKey)
.map(cache::peek)
.flatMap(Optional::stream)
.collect(Collectors.toMap(Entity::getId, Function.identity()));
var remainingIds = Sets.difference(ids, foundInCache.keySet());
var foundInDb = table.findUncached(remainingIds, null, orderBy, null);

var merged = new HashMap<Entity.Id<E>, E>();

// some entries found in db with partial id query may already be in cache (after update/delete),
// so we must return actual entries from cache
for (var entry : foundInDb) {
var id = entry.getId();
if (cache.containsKey(id)) {
var cached = cache.peek(id);
cached.ifPresent(t -> merged.put(id, t));
// not present means marked as deleted in cache
} else {
merged.put(id, table.postLoad(entry));
}
}

// add entries found in cache and not fetched from db
for (var pair : foundInCache.entrySet()) {
var id = pair.getKey();
var entry = pair.getValue();
merged.put(id, entry);
}

if (!isPartialIdMode) {
Set<Entity.Id<E>> foundInDbIds = foundInDb.stream().map(Entity::getId).collect(toSet());
Set<Entity.Id<E>> foundInCacheIds = new HashSet<>(foundInCache.keySet());
Sets.difference(Sets.difference(ids, foundInDbIds), foundInCacheIds).forEach(cache::putEmpty);
}

return merged.values().stream().sorted(EntityIdSchema.SORT_ENTITY_BY_ID).collect(Collectors.toList());
}
}
54 changes: 2 additions & 52 deletions repository/src/main/java/tech/ydb/yoj/repository/db/Table.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package tech.ydb.yoj.repository.db;

import com.google.common.collect.Sets;
import lombok.NonNull;
import tech.ydb.yoj.databind.expression.FilterExpression;
import tech.ydb.yoj.databind.expression.OrderExpression;
Expand All @@ -14,8 +13,6 @@
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -25,7 +22,6 @@
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Stream.concat;

public interface Table<T extends Entity<T>> {
Expand All @@ -42,6 +38,8 @@ public interface Table<T extends Entity<T>> {

<V extends View> V find(Class<V> viewType, Entity.Id<T> id);

<ID extends Entity.Id<T>> List<T> find(Set<ID> ids);

<ID extends Entity.Id<T>> List<T> find(Range<ID> range);

<ID extends Entity.Id<T>> List<ID> findIds(Range<ID> range);
Expand Down Expand Up @@ -265,54 +263,6 @@ default <V extends Table.View> ViewListResult<T, V> list(Class<V> viewType, List
return ViewListResult.forPage(request, viewType, nextPage);
}

default <ID extends Entity.Id<T>> List<T> find(Set<ID> ids) {
if (ids.isEmpty()) {
return List.of();
}

var orderBy = EntityExpressions.defaultOrder(getType());
var cache = Tx.Current.get().getRepositoryTransaction().getTransactionLocal().firstLevelCache();
var isPartialIdMode = ids.iterator().next().isPartial();

var foundInCache = ids.stream()
.filter(cache::containsKey)
.map(cache::peek)
.flatMap(Optional::stream)
.collect(Collectors.toMap(Entity::getId, Function.identity()));
var remainingIds = Sets.difference(ids, foundInCache.keySet());
var foundInDb = findUncached(remainingIds, null, orderBy, null);

var merged = new HashMap<Entity.Id<T>, T>();

// some entries found in db with partial id query may already be in cache (after update/delete),
// so we must return actual entries from cache
for (var entry : foundInDb) {
var id = entry.getId();
if (cache.containsKey(id)) {
var cached = cache.peek(id);
cached.ifPresent(t -> merged.put(id, t));
// not present means marked as deleted in cache
} else {
merged.put(id, this.postLoad(entry));
}
}

// add entries found in cache and not fetched from db
for (var pair : foundInCache.entrySet()) {
var id = pair.getKey();
var entry = pair.getValue();
merged.put(id, entry);
}

if (!isPartialIdMode) {
Set<Entity.Id<T>> foundInDbIds = foundInDb.stream().map(Entity::getId).collect(toSet());
Set<Entity.Id<T>> foundInCacheIds = new HashSet<>(foundInCache.keySet());
Sets.difference(Sets.difference(ids, foundInDbIds), foundInCacheIds).forEach(cache::putEmpty);
}

return merged.values().stream().sorted(EntityIdSchema.SORT_ENTITY_BY_ID).collect(Collectors.toList());
}

default void bulkUpsert(List<T> input, BulkParams params) {
throw new UnsupportedOperationException();
}
Expand Down

0 comments on commit b034465

Please sign in to comment.