Skip to content

Commit

Permalink
separate-projections-poc: MigrationProjectionCache
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Lavrukov committed Jun 2, 2024
1 parent 443869c commit 145fb89
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -405,25 +405,25 @@ public T insert(T tt) {
T t = tt.preSave();
transaction.getWatcher().markRowRead(type, t.getId());
transaction.doInWriteTransaction("insert(" + t + ")", type, shard -> shard.insert(t));
transaction.getTransactionLocal().projectionCache().save(transaction, t);
transaction.getTransactionLocal().firstLevelCache().put(t);
transaction.getTransactionLocal().projectionCache().save(t);
return t;
}

@Override
public T save(T tt) {
T t = tt.preSave();
transaction.doInWriteTransaction("save(" + t + ")", type, shard -> shard.save(t));
transaction.getTransactionLocal().projectionCache().save(transaction, t);
transaction.getTransactionLocal().firstLevelCache().put(t);
transaction.getTransactionLocal().projectionCache().save(t);
return t;
}

@Override
public void delete(Entity.Id<T> id) {
transaction.doInWriteTransaction("delete(" + id + ")", type, shard -> shard.delete(id));
transaction.getTransactionLocal().projectionCache().delete(transaction, id);
transaction.getTransactionLocal().firstLevelCache().putEmpty(id);
transaction.getTransactionLocal().projectionCache().delete(id);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
import tech.ydb.yoj.repository.db.EntitySchema;
import tech.ydb.yoj.repository.db.Range;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.Tx;
import tech.ydb.yoj.repository.db.ViewSchema;
import tech.ydb.yoj.repository.db.bulk.BulkParams;
import tech.ydb.yoj.repository.db.cache.FirstLevelCache;
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
import tech.ydb.yoj.repository.db.statement.Changeset;
import tech.ydb.yoj.repository.ydb.YdbRepositoryTransaction;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapper;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapperImpl;
import tech.ydb.yoj.repository.ydb.readtable.EntityIdKeyMapper;
Expand Down Expand Up @@ -54,17 +54,17 @@
import static tech.ydb.yoj.repository.db.EntityExpressions.defaultOrder;

public class YdbTable<T extends Entity<T>> implements Table<T> {
private final QueryExecutor executor;
private final YdbRepositoryTransaction<?> executor;
@Getter
private final Class<T> type;

public YdbTable(Class<T> type, QueryExecutor executor) {
public YdbTable(Class<T> type, YdbRepositoryTransaction<?> executor) {
this.type = type;
this.executor = new CheckingQueryExecutor(executor);
this.executor = executor;
}

protected YdbTable(QueryExecutor executor) {
this.executor = new CheckingQueryExecutor(executor);
this.executor = (YdbRepositoryTransaction<?>) executor;
this.type = resolveEntityType();
}

Expand Down Expand Up @@ -420,25 +420,25 @@ public void update(Entity.Id<T> id, Changeset changeset) {
public T insert(T t) {
T entityToSave = t.preSave();
executor.pendingExecute(YqlStatement.insert(type), entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
return t;
}

@Override
public T save(T t) {
T entityToSave = t.preSave();
executor.pendingExecute(YqlStatement.save(type), entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
return t;
}

@Override
public void delete(Entity.Id<T> id) {
executor.pendingExecute(YqlStatement.delete(type), id);
executor.getTransactionLocal().projectionCache().delete(executor, id);
executor.getTransactionLocal().firstLevelCache().putEmpty(id);
executor.getTransactionLocal().projectionCache().delete(id);
}

/**
Expand All @@ -458,7 +458,7 @@ public <ID extends Id<T>> void migrate(ID id) {
T rawEntity = foundRaw.get(0);
T entityToSave = rawEntity.postLoad().preSave();
executor.pendingExecute(YqlStatement.save(type), entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
}

@Override
Expand Down Expand Up @@ -494,55 +494,6 @@ default <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams p
TransactionLocal getTransactionLocal();
}

public static class CheckingQueryExecutor implements QueryExecutor {
private final QueryExecutor delegate;
private final Tx originTx;

public CheckingQueryExecutor(QueryExecutor delegate) {
this.delegate = delegate;
this.originTx = Tx.Current.exists() ? Tx.Current.get() : null;
}

private void check() {
Tx.checkSameTx(originTx);
}

@Override
public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params) {
check();
return delegate.execute(statement, params);
}

@Override
public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
return delegate.executeScanQuery(statement, params);
}

@Override
public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value) {
check();
delegate.pendingExecute(statement, value);
}

@Override
public <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams params) {
check();
delegate.bulkUpsert(mapper, input, params);
}

@Override
public <IN, OUT> Stream<OUT> readTable(ReadTableMapper<IN, OUT> mapper, ReadTableParams<IN> params) {
check();
return delegate.readTable(mapper, params);
}

@Override
public TransactionLocal getTransactionLocal() {
check();
return delegate.getTransactionLocal();
}
}

public <ID extends Id<T>> void updateIn(Collection<ID> ids, Changeset changeset) {
var params = new UpdateInStatement.UpdateInStatementInput<>(ids, changeset.toMap());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
import tech.ydb.yoj.repository.db.EntitySchema;
import tech.ydb.yoj.repository.db.Range;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.Tx;
import tech.ydb.yoj.repository.db.ViewSchema;
import tech.ydb.yoj.repository.db.bulk.BulkParams;
import tech.ydb.yoj.repository.db.cache.FirstLevelCache;
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
import tech.ydb.yoj.repository.db.statement.Changeset;
import tech.ydb.yoj.repository.ydb.YdbRepositoryTransaction;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapper;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapperImpl;
import tech.ydb.yoj.repository.ydb.readtable.EntityIdKeyMapper;
Expand Down Expand Up @@ -54,20 +54,21 @@
import static tech.ydb.yoj.repository.db.EntityExpressions.defaultOrder;

public class YdbTable<T extends Entity<T>> implements Table<T> {
private final QueryExecutor executor;
private final YdbRepositoryTransaction<?> executor;
@Getter
private final Class<T> type;

public YdbTable(Class<T> type, QueryExecutor executor) {
public YdbTable(Class<T> type, YdbRepositoryTransaction<?> executor) {
this.type = type;
this.executor = new CheckingQueryExecutor(executor);
this.executor = executor;
}

protected YdbTable(QueryExecutor executor) {
this.executor = new CheckingQueryExecutor(executor);
this.executor = (YdbRepositoryTransaction<?>) executor;
this.type = resolveEntityType();
}


@SuppressWarnings("unchecked")
private Class<T> resolveEntityType() {
return (Class<T>) (new TypeToken<T>(getClass()) {
Expand Down Expand Up @@ -421,7 +422,7 @@ public T insert(T t) {
T entityToSave = t.preSave();
executor.pendingExecute(YqlStatement.insert(type), entityToSave);
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
return t;
}

Expand All @@ -430,15 +431,15 @@ public T save(T t) {
T entityToSave = t.preSave();
executor.pendingExecute(YqlStatement.save(type), entityToSave);
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
return t;
}

@Override
public void delete(Entity.Id<T> id) {
executor.pendingExecute(YqlStatement.delete(type), id);
executor.getTransactionLocal().firstLevelCache().putEmpty(id);
executor.getTransactionLocal().projectionCache().delete(id);
executor.getTransactionLocal().projectionCache().delete(executor, id);
}

/**
Expand All @@ -458,7 +459,7 @@ public <ID extends Id<T>> void migrate(ID id) {
T rawEntity = foundRaw.get(0);
T entityToSave = rawEntity.postLoad().preSave();
executor.pendingExecute(YqlStatement.save(type), entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
}

@Override
Expand Down Expand Up @@ -494,55 +495,6 @@ default <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams p
TransactionLocal getTransactionLocal();
}

public static class CheckingQueryExecutor implements QueryExecutor {
private final QueryExecutor delegate;
private final Tx originTx;

public CheckingQueryExecutor(QueryExecutor delegate) {
this.delegate = delegate;
this.originTx = Tx.Current.exists() ? Tx.Current.get() : null;
}

private void check() {
Tx.checkSameTx(originTx);
}

@Override
public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params) {
check();
return delegate.execute(statement, params);
}

@Override
public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
return delegate.executeScanQuery(statement, params);
}

@Override
public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value) {
check();
delegate.pendingExecute(statement, value);
}

@Override
public <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams params) {
check();
delegate.bulkUpsert(mapper, input, params);
}

@Override
public <IN, OUT> Stream<OUT> readTable(ReadTableMapper<IN, OUT> mapper, ReadTableParams<IN> params) {
check();
return delegate.readTable(mapper, params);
}

@Override
public TransactionLocal getTransactionLocal() {
check();
return delegate.getTransactionLocal();
}
}

public <ID extends Id<T>> void updateIn(Collection<ID> ids, Changeset changeset) {
var params = new UpdateInStatement.UpdateInStatementInput<>(ids, changeset.toMap());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,39 @@
import lombok.NonNull;
import tech.ydb.yoj.repository.BaseDb;
import tech.ydb.yoj.repository.db.TxOptions;
import tech.ydb.yoj.repository.db.projection.MigrationProjectionCache;
import tech.ydb.yoj.repository.db.projection.ProjectionCache;
import tech.ydb.yoj.repository.db.projection.RoProjectionCache;
import tech.ydb.yoj.repository.db.projection.RwProjectionCache;

import java.util.IdentityHashMap;
import java.util.Map;
import java.util.function.Supplier;

public class TransactionLocal {
private final Map<Supplier<?>, Object> singletons = new IdentityHashMap<>();

private final Supplier<FirstLevelCache> firstLevelCacheSupplier;
private final Supplier<ProjectionCache> projectionCacheSupplier;
private final Supplier<TransactionLog> logSupplier;
private final FirstLevelCache firstLevelCache;
private final ProjectionCache projectionCache;
private final TransactionLog log;

public TransactionLocal(@NonNull TxOptions options) {
this.firstLevelCacheSupplier = options.isFirstLevelCache() ? FirstLevelCache::create : FirstLevelCache::empty;
this.projectionCacheSupplier = options.isMutable() ? RwProjectionCache::new : RoProjectionCache::new;
this.logSupplier = () -> new TransactionLog(options.getLogLevel());
this.firstLevelCache = options.isFirstLevelCache() ? FirstLevelCache.create() : FirstLevelCache.empty();
if (options.isMutable()) {
this.projectionCache = new MigrationProjectionCache(firstLevelCache);
} else {
this.projectionCache = new RoProjectionCache();
}
this.log = new TransactionLog(options.getLogLevel());
}

public static TransactionLocal get() {
return BaseDb.current(Holder.class).getTransactionLocal();
}

@SuppressWarnings("unchecked")
public <X> X instance(@NonNull Supplier<X> supplier) {
return (X) singletons.computeIfAbsent(supplier, Supplier::get);
}

public ProjectionCache projectionCache() {
return instance(projectionCacheSupplier);
return projectionCache;
}

public FirstLevelCache firstLevelCache() {
return instance(firstLevelCacheSupplier);
return firstLevelCache;
}

public TransactionLog log() {
return instance(logSupplier);
return log;
}

public interface Holder {
Expand Down
Loading

0 comments on commit 145fb89

Please sign in to comment.