diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java index 9c682ffd..8444cd09 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java @@ -405,8 +405,8 @@ public T insert(T tt) { T t = tt.preSave(); transaction.getWatcher().markRowRead(type, t.getId()); transaction.doInWriteTransaction("insert(" + t + ")", type, shard -> shard.insert(t)); + transaction.getTransactionLocal().projectionCache().save(transaction, t); transaction.getTransactionLocal().firstLevelCache().put(t); - transaction.getTransactionLocal().projectionCache().save(t); return t; } @@ -414,16 +414,16 @@ public T insert(T tt) { public T save(T tt) { T t = tt.preSave(); transaction.doInWriteTransaction("save(" + t + ")", type, shard -> shard.save(t)); + transaction.getTransactionLocal().projectionCache().save(transaction, t); transaction.getTransactionLocal().firstLevelCache().put(t); - transaction.getTransactionLocal().projectionCache().save(t); return t; } @Override public void delete(Entity.Id id) { transaction.doInWriteTransaction("delete(" + id + ")", type, shard -> shard.delete(id)); + transaction.getTransactionLocal().projectionCache().delete(transaction, id); transaction.getTransactionLocal().firstLevelCache().putEmpty(id); - transaction.getTransactionLocal().projectionCache().delete(id); } @Override diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java index ae58f271..da9ea7de 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java @@ -13,13 +13,13 @@ import tech.ydb.yoj.repository.db.EntitySchema; import tech.ydb.yoj.repository.db.Range; import tech.ydb.yoj.repository.db.Table; -import tech.ydb.yoj.repository.db.Tx; import tech.ydb.yoj.repository.db.ViewSchema; import tech.ydb.yoj.repository.db.bulk.BulkParams; import tech.ydb.yoj.repository.db.cache.FirstLevelCache; import tech.ydb.yoj.repository.db.cache.TransactionLocal; import tech.ydb.yoj.repository.db.readtable.ReadTableParams; import tech.ydb.yoj.repository.db.statement.Changeset; +import tech.ydb.yoj.repository.ydb.YdbRepositoryTransaction; import tech.ydb.yoj.repository.ydb.bulk.BulkMapper; import tech.ydb.yoj.repository.ydb.bulk.BulkMapperImpl; import tech.ydb.yoj.repository.ydb.readtable.EntityIdKeyMapper; @@ -54,17 +54,17 @@ import static tech.ydb.yoj.repository.db.EntityExpressions.defaultOrder; public class YdbTable> implements Table { - private final QueryExecutor executor; + private final YdbRepositoryTransaction executor; @Getter private final Class type; - public YdbTable(Class type, QueryExecutor executor) { + public YdbTable(Class type, YdbRepositoryTransaction executor) { this.type = type; - this.executor = new CheckingQueryExecutor(executor); + this.executor = executor; } protected YdbTable(QueryExecutor executor) { - this.executor = new CheckingQueryExecutor(executor); + this.executor = (YdbRepositoryTransaction) executor; this.type = resolveEntityType(); } @@ -421,7 +421,7 @@ public T insert(T t) { T entityToSave = t.preSave(); executor.pendingExecute(YqlStatement.insert(type), entityToSave); executor.getTransactionLocal().firstLevelCache().put(entityToSave); - executor.getTransactionLocal().projectionCache().save(entityToSave); + executor.getTransactionLocal().projectionCache().save(executor, entityToSave); return t; } @@ -430,7 +430,7 @@ public T save(T t) { T entityToSave = t.preSave(); executor.pendingExecute(YqlStatement.save(type), entityToSave); executor.getTransactionLocal().firstLevelCache().put(entityToSave); - executor.getTransactionLocal().projectionCache().save(entityToSave); + executor.getTransactionLocal().projectionCache().save(executor, entityToSave); return t; } @@ -438,7 +438,7 @@ public T save(T t) { public void delete(Entity.Id id) { executor.pendingExecute(YqlStatement.delete(type), id); executor.getTransactionLocal().firstLevelCache().putEmpty(id); - executor.getTransactionLocal().projectionCache().delete(id); + executor.getTransactionLocal().projectionCache().delete(executor, id); } /** @@ -458,7 +458,7 @@ public > void migrate(ID id) { T rawEntity = foundRaw.get(0); T entityToSave = rawEntity.postLoad().preSave(); executor.pendingExecute(YqlStatement.save(type), entityToSave); - executor.getTransactionLocal().projectionCache().save(entityToSave); + executor.getTransactionLocal().projectionCache().save(executor, entityToSave); } @Override @@ -494,55 +494,6 @@ default void bulkUpsert(BulkMapper mapper, List input, BulkParams p TransactionLocal getTransactionLocal(); } - public static class CheckingQueryExecutor implements QueryExecutor { - private final QueryExecutor delegate; - private final Tx originTx; - - public CheckingQueryExecutor(QueryExecutor delegate) { - this.delegate = delegate; - this.originTx = Tx.Current.exists() ? Tx.Current.get() : null; - } - - private void check() { - Tx.checkSameTx(originTx); - } - - @Override - public List execute(Statement statement, PARAMS params) { - check(); - return delegate.execute(statement, params); - } - - @Override - public Stream executeScanQuery(Statement statement, PARAMS params) { - return delegate.executeScanQuery(statement, params); - } - - @Override - public void pendingExecute(Statement statement, PARAMS value) { - check(); - delegate.pendingExecute(statement, value); - } - - @Override - public void bulkUpsert(BulkMapper mapper, List input, BulkParams params) { - check(); - delegate.bulkUpsert(mapper, input, params); - } - - @Override - public Stream readTable(ReadTableMapper mapper, ReadTableParams params) { - check(); - return delegate.readTable(mapper, params); - } - - @Override - public TransactionLocal getTransactionLocal() { - check(); - return delegate.getTransactionLocal(); - } - } - public > void updateIn(Collection ids, Changeset changeset) { var params = new UpdateInStatement.UpdateInStatementInput<>(ids, changeset.toMap()); diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java b/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java index 49bb2407..1a92a780 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java @@ -133,6 +133,11 @@ public TxManager immediateWrites() { return withOptions(this.options.withImmediateWrites(true)); } + @Override + public TxManager separateProjections() { + return withOptions(this.options.withSeparateProjections(true)); + } + @Override public TxManager noFirstLevelCache() { return withOptions(this.options.withFirstLevelCache(false)); diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java b/repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java index 37a00333..e524b8f1 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java @@ -41,6 +41,8 @@ public interface TxManager { */ TxManager immediateWrites(); + TxManager separateProjections(); + /** * Turn off first level cache */ diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/TxOptions.java b/repository/src/main/java/tech/ydb/yoj/repository/db/TxOptions.java index cd3c5576..9d2efafc 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/TxOptions.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/TxOptions.java @@ -33,6 +33,8 @@ public class TxOptions { boolean immediateWrites; + boolean separateProjections; + public static TxOptions create(@NonNull IsolationLevel isolationLevel) { return builder() .isolationLevel(isolationLevel) diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java b/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java index 7be6daa1..59dac58c 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java @@ -3,46 +3,48 @@ import lombok.NonNull; import tech.ydb.yoj.repository.BaseDb; import tech.ydb.yoj.repository.db.TxOptions; +import tech.ydb.yoj.repository.db.projection.MigrationProjectionCache; import tech.ydb.yoj.repository.db.projection.ProjectionCache; import tech.ydb.yoj.repository.db.projection.RoProjectionCache; import tech.ydb.yoj.repository.db.projection.RwProjectionCache; -import java.util.IdentityHashMap; -import java.util.Map; -import java.util.function.Supplier; - public class TransactionLocal { - private final Map, Object> singletons = new IdentityHashMap<>(); - - private final Supplier firstLevelCacheSupplier; - private final Supplier projectionCacheSupplier; - private final Supplier logSupplier; + private final FirstLevelCache firstLevelCache; + private final ProjectionCache projectionCache; + private final TransactionLog log; public TransactionLocal(@NonNull TxOptions options) { - this.firstLevelCacheSupplier = options.isFirstLevelCache() ? FirstLevelCache::create : FirstLevelCache::empty; - this.projectionCacheSupplier = options.isMutable() ? RwProjectionCache::new : RoProjectionCache::new; - this.logSupplier = () -> new TransactionLog(options.getLogLevel()); + this.firstLevelCache = options.isFirstLevelCache() ? FirstLevelCache.create() : FirstLevelCache.empty(); + this.projectionCache = createProjectionCache(firstLevelCache, options); + this.log = new TransactionLog(options.getLogLevel()); } - public static TransactionLocal get() { - return BaseDb.current(Holder.class).getTransactionLocal(); + private static ProjectionCache createProjectionCache(FirstLevelCache firstLevelCache, TxOptions options) { + if (options.isMutable()) { + if (options.isSeparateProjections()) { + return new MigrationProjectionCache(firstLevelCache); + } + + return new RwProjectionCache(); + } + + return new RoProjectionCache(); } - @SuppressWarnings("unchecked") - public X instance(@NonNull Supplier supplier) { - return (X) singletons.computeIfAbsent(supplier, Supplier::get); + public static TransactionLocal get() { + return BaseDb.current(Holder.class).getTransactionLocal(); } public ProjectionCache projectionCache() { - return instance(projectionCacheSupplier); + return projectionCache; } public FirstLevelCache firstLevelCache() { - return instance(firstLevelCacheSupplier); + return firstLevelCache; } public TransactionLog log() { - return instance(logSupplier); + return log; } public interface Holder { diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/MigrationProjectionCache.java b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/MigrationProjectionCache.java new file mode 100644 index 00000000..229fe5ba --- /dev/null +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/MigrationProjectionCache.java @@ -0,0 +1,61 @@ +package tech.ydb.yoj.repository.db.projection; + +import lombok.RequiredArgsConstructor; +import tech.ydb.yoj.repository.db.Entity; +import tech.ydb.yoj.repository.db.RepositoryTransaction; +import tech.ydb.yoj.repository.db.cache.FirstLevelCache; + +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; + +@RequiredArgsConstructor +public class MigrationProjectionCache implements ProjectionCache { + private final FirstLevelCache cache; + + @Override + public void load(Entity entity) { + } + + @Override + public void save(RepositoryTransaction transaction, Entity entity) { + delete(transaction, entity.getId()); + + List> newProjections = entity.createProjections(); + for (Entity projection : newProjections) { + saveEntity(transaction, projection); + } + } + + @Override + public void delete(RepositoryTransaction transaction, Entity.Id id) { + Optional> oldEntity; + try { + oldEntity = cache.peek(id); + } catch (NoSuchElementException e) { + return; + } + + if (oldEntity.isPresent()) { + List> oldProjections = oldEntity.get().createProjections(); + for (Entity projection : oldProjections) { + deleteEntity(transaction, projection.getId()); + } + } + } + + @Override + public void applyProjectionChanges(RepositoryTransaction transaction) { + } + + private > void deleteEntity(RepositoryTransaction transaction, Entity.Id entityId) { + transaction.table(entityId.getType()).delete(entityId); + } + + private > void saveEntity(RepositoryTransaction transaction, Entity entity) { + @SuppressWarnings("unchecked") + T castedEntity = (T) entity; + + transaction.table(entity.getId().getType()).save(castedEntity); + } +} diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionCache.java b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionCache.java index b72f1eec..3c3f9ebf 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionCache.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionCache.java @@ -7,9 +7,9 @@ public interface ProjectionCache { void load(Entity entity); - void save(Entity entity); + void save(RepositoryTransaction transaction, Entity entity); - void delete(Entity.Id id); + void delete(RepositoryTransaction transaction, Entity.Id id); void applyProjectionChanges(RepositoryTransaction transaction); } diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/RoProjectionCache.java b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/RoProjectionCache.java index 7684a077..c9954a73 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/RoProjectionCache.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/RoProjectionCache.java @@ -9,12 +9,12 @@ public void load(Entity entity) { } @Override - public void save(Entity entity) { + public void save(RepositoryTransaction transaction, Entity entity) { throw new UnsupportedOperationException("Should not be invoked in RO"); } @Override - public void delete(Entity.Id id) { + public void delete(RepositoryTransaction transaction, Entity.Id id) { throw new UnsupportedOperationException("Should not be invoked in RO"); } diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/RwProjectionCache.java b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/RwProjectionCache.java index d1bd28b9..7ae2fa1a 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/RwProjectionCache.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/RwProjectionCache.java @@ -22,12 +22,12 @@ public void load(Entity entity) { } @Override - public void save(Entity entity) { + public void save(RepositoryTransaction transaction, Entity entity) { row(entity.getId()).save(entity); } @Override - public void delete(Entity.Id id) { + public void delete(RepositoryTransaction transaction, Entity.Id id) { row(id).delete(); }