Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

separate-projections-poc: MigrationProjectionCache #76

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -405,25 +405,25 @@ public T insert(T tt) {
T t = tt.preSave();
transaction.getWatcher().markRowRead(type, t.getId());
transaction.doInWriteTransaction("insert(" + t + ")", type, shard -> shard.insert(t));
transaction.getTransactionLocal().projectionCache().save(transaction, t);
transaction.getTransactionLocal().firstLevelCache().put(t);
transaction.getTransactionLocal().projectionCache().save(t);
return t;
}

@Override
public T save(T tt) {
T t = tt.preSave();
transaction.doInWriteTransaction("save(" + t + ")", type, shard -> shard.save(t));
transaction.getTransactionLocal().projectionCache().save(transaction, t);
transaction.getTransactionLocal().firstLevelCache().put(t);
transaction.getTransactionLocal().projectionCache().save(t);
return t;
}

@Override
public void delete(Entity.Id<T> id) {
transaction.doInWriteTransaction("delete(" + id + ")", type, shard -> shard.delete(id));
transaction.getTransactionLocal().projectionCache().delete(transaction, id);
transaction.getTransactionLocal().firstLevelCache().putEmpty(id);
transaction.getTransactionLocal().projectionCache().delete(id);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
import tech.ydb.yoj.repository.db.EntitySchema;
import tech.ydb.yoj.repository.db.Range;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.Tx;
import tech.ydb.yoj.repository.db.ViewSchema;
import tech.ydb.yoj.repository.db.bulk.BulkParams;
import tech.ydb.yoj.repository.db.cache.FirstLevelCache;
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
import tech.ydb.yoj.repository.db.statement.Changeset;
import tech.ydb.yoj.repository.ydb.YdbRepositoryTransaction;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapper;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapperImpl;
import tech.ydb.yoj.repository.ydb.readtable.EntityIdKeyMapper;
Expand Down Expand Up @@ -54,17 +54,17 @@
import static tech.ydb.yoj.repository.db.EntityExpressions.defaultOrder;

public class YdbTable<T extends Entity<T>> implements Table<T> {
private final QueryExecutor executor;
private final YdbRepositoryTransaction<?> executor;
@Getter
private final Class<T> type;

public YdbTable(Class<T> type, QueryExecutor executor) {
public YdbTable(Class<T> type, YdbRepositoryTransaction<?> executor) {
this.type = type;
this.executor = new CheckingQueryExecutor(executor);
this.executor = executor;
}

protected YdbTable(QueryExecutor executor) {
this.executor = new CheckingQueryExecutor(executor);
this.executor = (YdbRepositoryTransaction<?>) executor;
this.type = resolveEntityType();
}

Expand Down Expand Up @@ -421,7 +421,7 @@ public T insert(T t) {
T entityToSave = t.preSave();
executor.pendingExecute(YqlStatement.insert(type), entityToSave);
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
return t;
}

Expand All @@ -430,15 +430,15 @@ public T save(T t) {
T entityToSave = t.preSave();
executor.pendingExecute(YqlStatement.save(type), entityToSave);
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
return t;
}

@Override
public void delete(Entity.Id<T> id) {
executor.pendingExecute(YqlStatement.delete(type), id);
executor.getTransactionLocal().firstLevelCache().putEmpty(id);
executor.getTransactionLocal().projectionCache().delete(id);
executor.getTransactionLocal().projectionCache().delete(executor, id);
}

/**
Expand All @@ -458,7 +458,7 @@ public <ID extends Id<T>> void migrate(ID id) {
T rawEntity = foundRaw.get(0);
T entityToSave = rawEntity.postLoad().preSave();
executor.pendingExecute(YqlStatement.save(type), entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
}

@Override
Expand Down Expand Up @@ -494,55 +494,6 @@ default <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams p
TransactionLocal getTransactionLocal();
}

public static class CheckingQueryExecutor implements QueryExecutor {
Copy link
Contributor Author

@lavrukov lavrukov Jun 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't work. I gave birth to this, I will kill it

private final QueryExecutor delegate;
private final Tx originTx;

public CheckingQueryExecutor(QueryExecutor delegate) {
this.delegate = delegate;
this.originTx = Tx.Current.exists() ? Tx.Current.get() : null;
}

private void check() {
Tx.checkSameTx(originTx);
}

@Override
public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params) {
check();
return delegate.execute(statement, params);
}

@Override
public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
return delegate.executeScanQuery(statement, params);
}

@Override
public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value) {
check();
delegate.pendingExecute(statement, value);
}

@Override
public <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams params) {
check();
delegate.bulkUpsert(mapper, input, params);
}

@Override
public <IN, OUT> Stream<OUT> readTable(ReadTableMapper<IN, OUT> mapper, ReadTableParams<IN> params) {
check();
return delegate.readTable(mapper, params);
}

@Override
public TransactionLocal getTransactionLocal() {
check();
return delegate.getTransactionLocal();
}
}

public <ID extends Id<T>> void updateIn(Collection<ID> ids, Changeset changeset) {
var params = new UpdateInStatement.UpdateInStatementInput<>(ids, changeset.toMap());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public TxManager immediateWrites() {
return withOptions(this.options.withImmediateWrites(true));
}

@Override
public TxManager separateProjections() {
return withOptions(this.options.withSeparateProjections(true));
}

@Override
public TxManager noFirstLevelCache() {
return withOptions(this.options.withFirstLevelCache(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public interface TxManager {
*/
TxManager immediateWrites();

TxManager separateProjections();

/**
* Turn off first level cache
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class TxOptions {

boolean immediateWrites;

boolean separateProjections;

public static TxOptions create(@NonNull IsolationLevel isolationLevel) {
return builder()
.isolationLevel(isolationLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,48 @@
import lombok.NonNull;
import tech.ydb.yoj.repository.BaseDb;
import tech.ydb.yoj.repository.db.TxOptions;
import tech.ydb.yoj.repository.db.projection.MigrationProjectionCache;
import tech.ydb.yoj.repository.db.projection.ProjectionCache;
import tech.ydb.yoj.repository.db.projection.RoProjectionCache;
import tech.ydb.yoj.repository.db.projection.RwProjectionCache;

import java.util.IdentityHashMap;
import java.util.Map;
import java.util.function.Supplier;

public class TransactionLocal {
private final Map<Supplier<?>, Object> singletons = new IdentityHashMap<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this singletons is made for case when in tx wasn't any query. But this objects is lightwave, maybe we can do it without singletons? Else I have to do some painful refactoring for this feature ;(


private final Supplier<FirstLevelCache> firstLevelCacheSupplier;
private final Supplier<ProjectionCache> projectionCacheSupplier;
private final Supplier<TransactionLog> logSupplier;
private final FirstLevelCache firstLevelCache;
private final ProjectionCache projectionCache;
private final TransactionLog log;

public TransactionLocal(@NonNull TxOptions options) {
this.firstLevelCacheSupplier = options.isFirstLevelCache() ? FirstLevelCache::create : FirstLevelCache::empty;
this.projectionCacheSupplier = options.isMutable() ? RwProjectionCache::new : RoProjectionCache::new;
this.logSupplier = () -> new TransactionLog(options.getLogLevel());
this.firstLevelCache = options.isFirstLevelCache() ? FirstLevelCache.create() : FirstLevelCache.empty();
this.projectionCache = createProjectionCache(firstLevelCache, options);
this.log = new TransactionLog(options.getLogLevel());
}

public static TransactionLocal get() {
return BaseDb.current(Holder.class).getTransactionLocal();
private static ProjectionCache createProjectionCache(FirstLevelCache firstLevelCache, TxOptions options) {
if (options.isMutable()) {
if (options.isSeparateProjections()) {
return new MigrationProjectionCache(firstLevelCache);
}

return new RwProjectionCache();
}

return new RoProjectionCache();
}

@SuppressWarnings("unchecked")
public <X> X instance(@NonNull Supplier<X> supplier) {
return (X) singletons.computeIfAbsent(supplier, Supplier::get);
public static TransactionLocal get() {
return BaseDb.current(Holder.class).getTransactionLocal();
}

public ProjectionCache projectionCache() {
return instance(projectionCacheSupplier);
return projectionCache;
}

public FirstLevelCache firstLevelCache() {
return instance(firstLevelCacheSupplier);
return firstLevelCache;
}

public TransactionLog log() {
return instance(logSupplier);
return log;
}

public interface Holder {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package tech.ydb.yoj.repository.db.projection;

import lombok.RequiredArgsConstructor;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.cache.FirstLevelCache;

import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;

@RequiredArgsConstructor
public class MigrationProjectionCache implements ProjectionCache {
private final FirstLevelCache cache;

@Override
public void load(Entity<?> entity) {
}

@Override
public void save(RepositoryTransaction transaction, Entity<?> entity) {
delete(transaction, entity.getId());

List<Entity<?>> newProjections = entity.createProjections();
for (Entity<?> projection : newProjections) {
saveEntity(transaction, projection);
}
}

@Override
public void delete(RepositoryTransaction transaction, Entity.Id<?> id) {
Optional<? extends Entity<?>> oldEntity;
try {
oldEntity = cache.peek(id);
} catch (NoSuchElementException e) {
return;
}

if (oldEntity.isPresent()) {
List<Entity<?>> oldProjections = oldEntity.get().createProjections();
for (Entity<?> projection : oldProjections) {
deleteEntity(transaction, projection.getId());
}
}
}

@Override
public void applyProjectionChanges(RepositoryTransaction transaction) {
}

private <T extends Entity<T>> void deleteEntity(RepositoryTransaction transaction, Entity.Id<T> entityId) {
transaction.table(entityId.getType()).delete(entityId);
}

private <T extends Entity<T>> void saveEntity(RepositoryTransaction transaction, Entity<T> entity) {
@SuppressWarnings("unchecked")
T castedEntity = (T) entity;

transaction.table(entity.getId().getType()).save(castedEntity);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
public interface ProjectionCache {
void load(Entity<?> entity);

void save(Entity<?> entity);
void save(RepositoryTransaction transaction, Entity<?> entity);

void delete(Entity.Id<?> id);
void delete(RepositoryTransaction transaction, Entity.Id<?> id);

void applyProjectionChanges(RepositoryTransaction transaction);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ public void load(Entity<?> entity) {
}

@Override
public void save(Entity<?> entity) {
public void save(RepositoryTransaction transaction, Entity<?> entity) {
throw new UnsupportedOperationException("Should not be invoked in RO");
}

@Override
public void delete(Entity.Id<?> id) {
public void delete(RepositoryTransaction transaction, Entity.Id<?> id) {
throw new UnsupportedOperationException("Should not be invoked in RO");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ public void load(Entity<?> entity) {
}

@Override
public void save(Entity<?> entity) {
public void save(RepositoryTransaction transaction, Entity<?> entity) {
row(entity.getId()).save(entity);
}

@Override
public void delete(Entity.Id<?> id) {
public void delete(RepositoryTransaction transaction, Entity.Id<?> id) {
row(id).delete();
}

Expand Down