From 78708b8f80210da7dbd6b5e1dc130333e00c286c Mon Sep 17 00:00:00 2001 From: baiyangtx Date: Tue, 13 Aug 2024 17:59:04 +0800 Subject: [PATCH] [AMORO-3078]: A lock-free TableBlocker implementation and does not rely on TableRuntime anymore. (#3079) * Fix unit tests * stash * ut is passed * spotless * fix reviewers * fix conflict * fix spotless * fix spotless --- .../amoro/server/catalog/InternalCatalog.java | 5 +- .../server/persistence/PersistentBase.java | 14 ++ .../mapper/TableBlockerMapper.java | 80 ++++++---- .../server/table/DefaultTableService.java | 91 +++++++++-- .../amoro/server/table/TableRuntime.java | 147 ++---------------- .../server/table/blocker/TableBlocker.java | 83 +++++++++- .../executor/BlockerExpiringExecutor.java | 6 +- .../main/resources/derby/ams-derby-init.sql | 4 +- .../main/resources/mysql/ams-mysql-init.sql | 3 +- .../mysql/upgrade-0.6.1-to-0.7.0.sql | 2 +- .../src/main/resources/mysql/upgrade.sql | 6 + .../resources/postgres/ams-postgres-init.sql | 8 +- .../postgres/upgrade-0.6.1-to-0.7.0.sql | 4 +- .../src/main/resources/postgres/upgrade.sql | 7 + .../persistence/mapper/MySQLTestBase.java | 21 --- .../executor/TestBlockerExpiringExecutor.java | 27 +++- 16 files changed, 280 insertions(+), 228 deletions(-) delete mode 100644 amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/persistence/mapper/MySQLTestBase.java diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java index 952e7b207b..4bdfe23733 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java @@ -165,7 +165,10 @@ public ServerTableIdentifier dropTable(String databaseName, String tableName) { doAs( TableMetaMapper.class, mapper -> mapper.deleteTableMetaById(tableIdentifier.getId())), - () -> doAs(TableBlockerMapper.class, mapper -> mapper.deleteBlockers(tableIdentifier)), + () -> + doAs( + TableBlockerMapper.class, + mapper -> mapper.deleteTableBlockers(this.name(), databaseName, tableName)), () -> dropTableInternal(databaseName, tableName), () -> doAsExisted( diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java index 07e6887932..8c9d00389b 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java @@ -42,6 +42,20 @@ protected NestedSqlSession beginSession() { .openSession(TransactionIsolationLevel.READ_COMMITTED)); } + public final Long updateAs(Class mapperClz, Function updateFunction) { + try (NestedSqlSession session = beginSession()) { + try { + T mapper = getMapper(session, mapperClz); + Number number = updateFunction.apply(mapper); + session.commit(); + return number.longValue(); + } catch (Throwable t) { + session.rollback(); + throw AmoroRuntimeException.wrap(t, PersistenceException::new); + } + } + } + protected final void doAs(Class mapperClz, Consumer consumer) { try (NestedSqlSession session = beginSession()) { try { diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java index 2b772cfa55..57e2ebd949 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/persistence/mapper/TableBlockerMapper.java @@ -18,7 +18,6 @@ package org.apache.amoro.server.persistence.mapper; -import org.apache.amoro.api.ServerTableIdentifier; import org.apache.amoro.server.persistence.converter.List2StringConverter; import org.apache.amoro.server.persistence.converter.Long2TsConverter; import org.apache.amoro.server.persistence.converter.Map2StringConverter; @@ -42,14 +41,15 @@ public interface TableBlockerMapper { + "expiration_time,properties FROM " + TABLE_NAME + " " - + "WHERE catalog_name = #{tableIdentifier.catalog} AND db_name = #{tableIdentifier.database} " - + "AND table_name = #{tableIdentifier.tableName} " + + "WHERE catalog_name = #{catalog} " + + "AND db_name = #{database} " + + "AND table_name = #{tableName} " + "AND expiration_time > #{now, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}") @Results({ @Result(property = "blockerId", column = "blocker_id"), - @Result(property = "tableIdentifier.catalog", column = "catalog_name"), - @Result(property = "tableIdentifier.database", column = "db_name"), - @Result(property = "tableIdentifier.tableName", column = "table_name"), + @Result(property = "catalog", column = "catalog_name"), + @Result(property = "database", column = "db_name"), + @Result(property = "tableName", column = "table_name"), @Result( property = "operations", column = "operations", @@ -62,7 +62,10 @@ public interface TableBlockerMapper { @Result(property = "properties", column = "properties", typeHandler = Map2StringConverter.class) }) List selectBlockers( - @Param("tableIdentifier") ServerTableIdentifier tableIdentifier, @Param("now") long now); + @Param("catalog") String catalog, + @Param("database") String database, + @Param("tableName") String tableName, + @Param("now") long now); @Select( "SELECT blocker_id,catalog_name,db_name,table_name,operations,create_time," @@ -73,9 +76,9 @@ List selectBlockers( + "AND expiration_time > #{now, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}") @Results({ @Result(property = "blockerId", column = "blocker_id"), - @Result(property = "tableIdentifier.catalog", column = "catalog_name"), - @Result(property = "tableIdentifier.database", column = "db_name"), - @Result(property = "tableIdentifier.tableName", column = "table_name"), + @Result(property = "catalog", column = "catalog_name"), + @Result(property = "database", column = "db_name"), + @Result(property = "tableName", column = "table_name"), @Result( property = "operations", column = "operations", @@ -89,31 +92,34 @@ List selectBlockers( }) TableBlocker selectBlocker(@Param("blockerId") long blockerId, @Param("now") long now); + @Update( + "UPDATE " + + TABLE_NAME + + " " + + "SET " + + "expiration_time = #{expiration, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter} " + + "WHERE blocker_id = #{blockerId} " + + "AND expiration_time > #{now, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}") + int renewBlocker( + @Param("blockerId") long blockerId, + @Param("now") long now, + @Param("expiration") long expiration); + @Insert( "INSERT INTO " + TABLE_NAME - + " (catalog_name,db_name,table_name,operations,create_time," - + "expiration_time,properties) VALUES (" - + "#{blocker.tableIdentifier.catalog}," - + "#{blocker.tableIdentifier.database}," - + "#{blocker.tableIdentifier.tableName}," + + "(catalog_name,db_name,table_name,operations,create_time,expiration_time,prev_blocker_id,properties) " + + "VALUES ( " + + "#{blocker.catalog}," + + "#{blocker.database}," + + "#{blocker.tableName}," + "#{blocker.operations,typeHandler=org.apache.amoro.server.persistence.converter.List2StringConverter}," + "#{blocker.createTime,typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," + "#{blocker.expirationTime,typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}," - + "#{blocker.properties,typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter}" - + ")") + + "#{blocker.prevBlockerId}," + + "#{blocker.properties,typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter})") @Options(useGeneratedKeys = true, keyProperty = "blocker.blockerId") - void insertBlocker(@Param("blocker") TableBlocker blocker); - - @Update( - "UPDATE " - + TABLE_NAME - + " SET " - + "expiration_time = #{expirationTime, " - + "typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter} " - + "WHERE blocker_id = #{blockerId}") - void updateBlockerExpirationTime( - @Param("blockerId") long blockerId, @Param("expirationTime") long expirationTime); + int insert(@Param("blocker") TableBlocker blocker); @Delete("DELETE FROM " + TABLE_NAME + " " + "WHERE blocker_id = #{blockerId}") void deleteBlocker(@Param("blockerId") long blockerId); @@ -122,17 +128,23 @@ void updateBlockerExpirationTime( "DELETE FROM " + TABLE_NAME + " " - + "WHERE catalog_name = #{tableIdentifier.catalog} AND db_name = #{tableIdentifier.database} " - + "AND table_name = #{tableIdentifier.tableName} " + + "WHERE catalog_name = #{catalog} AND db_name = #{database} " + + "AND table_name = #{tableName} " + "AND expiration_time <= #{now, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}") int deleteExpiredBlockers( - @Param("tableIdentifier") ServerTableIdentifier tableIdentifier, @Param("now") long now); + @Param("catalog") String catalog, + @Param("database") String database, + @Param("tableName") String tableName, + @Param("now") long now); @Delete( "DELETE FROM " + TABLE_NAME + " " - + "WHERE catalog_name = #{tableIdentifier.catalog} AND db_name = #{tableIdentifier.database} " - + "AND table_name = #{tableIdentifier.tableName}") - int deleteBlockers(@Param("tableIdentifier") ServerTableIdentifier tableIdentifier); + + "WHERE catalog_name = #{catalog} AND db_name = #{database} " + + "AND table_name = #{tableName}") + int deleteTableBlockers( + @Param("catalog") String catalog, + @Param("database") String database, + @Param("tableName") String tableName); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index b3266b9d7c..9b54d18c95 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -35,12 +35,15 @@ import org.apache.amoro.server.catalog.InternalCatalog; import org.apache.amoro.server.catalog.ServerCatalog; import org.apache.amoro.server.exception.AlreadyExistsException; +import org.apache.amoro.server.exception.BlockerConflictException; import org.apache.amoro.server.exception.IllegalMetadataException; import org.apache.amoro.server.exception.ObjectNotExistsException; +import org.apache.amoro.server.exception.PersistenceException; import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.persistence.StatedPersistentBase; import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper; +import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; import org.apache.amoro.server.table.blocker.TableBlocker; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; @@ -55,6 +58,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -75,6 +79,7 @@ public class DefaultTableService extends StatedPersistentBase implements TableService { public static final Logger LOG = LoggerFactory.getLogger(DefaultTableService.class); + private static final int TABLE_BLOCKER_RETRY = 3; private final long externalCatalogRefreshingInterval; private final long blockerTimeout; private final Map internalCatalogMap = new ConcurrentHashMap<>(); @@ -251,32 +256,90 @@ public Blocker block( TableIdentifier tableIdentifier, List operations, Map properties) { - checkStarted(); - return getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier)) - .block(operations, properties, blockerTimeout) - .buildBlocker(); + Preconditions.checkNotNull(operations, "operations should not be null"); + Preconditions.checkArgument(!operations.isEmpty(), "operations should not be empty"); + Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must > 0"); + String catalog = tableIdentifier.getCatalog(); + String database = tableIdentifier.getDatabase(); + String table = tableIdentifier.getTableName(); + int tryCount = 0; + while (tryCount++ < TABLE_BLOCKER_RETRY) { + long now = System.currentTimeMillis(); + doAs( + TableBlockerMapper.class, + mapper -> mapper.deleteExpiredBlockers(catalog, database, table, now)); + List tableBlockers = + getAs( + TableBlockerMapper.class, + mapper -> + mapper.selectBlockers( + tableIdentifier.getCatalog(), + tableIdentifier.getDatabase(), + tableIdentifier.getTableName(), + now)); + if (TableBlocker.conflict(operations, tableBlockers)) { + throw new BlockerConflictException(operations + " is conflict with " + tableBlockers); + } + Optional maxBlockerOpt = + tableBlockers.stream() + .map(TableBlocker::getBlockerId) + .max(Comparator.comparingLong(l -> l)); + long prevBlockerId = maxBlockerOpt.orElse(-1L); + + TableBlocker tableBlocker = + TableBlocker.buildTableBlocker( + tableIdentifier, operations, properties, now, blockerTimeout, prevBlockerId); + try { + doAs(TableBlockerMapper.class, mapper -> mapper.insert(tableBlocker)); + if (tableBlocker.getBlockerId() > 0) { + return tableBlocker.buildBlocker(); + } + } catch (PersistenceException e) { + LOG.warn("An exception occurs when creating a blocker:{}", tableBlocker, e); + } + } + throw new BlockerConflictException("Failed to create a blocker: conflict meet max retry"); } @Override public void releaseBlocker(TableIdentifier tableIdentifier, String blockerId) { - checkStarted(); - TableRuntime tableRuntime = getRuntime(getServerTableIdentifier(tableIdentifier)); - if (tableRuntime != null) { - tableRuntime.release(blockerId); - } + doAs(TableBlockerMapper.class, mapper -> mapper.deleteBlocker(Long.parseLong(blockerId))); } @Override public long renewBlocker(TableIdentifier tableIdentifier, String blockerId) { - checkStarted(); - TableRuntime tableRuntime = getAndCheckExist(getServerTableIdentifier(tableIdentifier)); - return tableRuntime.renew(blockerId, blockerTimeout); + int retry = 0; + while (retry++ < TABLE_BLOCKER_RETRY) { + long now = System.currentTimeMillis(); + long id = Long.parseLong(blockerId); + TableBlocker tableBlocker = + getAs(TableBlockerMapper.class, mapper -> mapper.selectBlocker(id, now)); + if (tableBlocker == null) { + throw new ObjectNotExistsException("Blocker " + blockerId + " of " + tableIdentifier); + } + long current = System.currentTimeMillis(); + long expirationTime = now + blockerTimeout; + long effectRow = + updateAs( + TableBlockerMapper.class, mapper -> mapper.renewBlocker(id, current, expirationTime)); + if (effectRow > 0) { + return expirationTime; + } + } + throw new BlockerConflictException("Failed to renew a blocker: conflict meet max retry"); } @Override public List getBlockers(TableIdentifier tableIdentifier) { - checkStarted(); - return getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier)).getBlockers().stream() + return getAs( + TableBlockerMapper.class, + mapper -> + mapper.selectBlockers( + tableIdentifier.getCatalog(), + tableIdentifier.getDatabase(), + tableIdentifier.getTableName(), + System.currentTimeMillis())) + .stream() .map(TableBlocker::buildBlocker) .collect(Collectors.toList()); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java index e97143133c..95c68e08d8 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java @@ -26,8 +26,6 @@ import org.apache.amoro.api.config.OptimizingConfig; import org.apache.amoro.api.config.TableConfiguration; import org.apache.amoro.server.AmoroServiceConstants; -import org.apache.amoro.server.exception.BlockerConflictException; -import org.apache.amoro.server.exception.ObjectNotExistsException; import org.apache.amoro.server.metrics.MetricRegistry; import org.apache.amoro.server.optimizing.OptimizingProcess; import org.apache.amoro.server.optimizing.OptimizingStatus; @@ -43,24 +41,18 @@ import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.table.MixedTable; -import org.apache.amoro.table.blocker.RenewableBlocker; import org.apache.iceberg.Snapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; - import java.math.BigDecimal; import java.math.RoundingMode; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; public class TableRuntime extends StatedPersistentBase { @@ -95,7 +87,6 @@ public class TableRuntime extends StatedPersistentBase { @StateField private volatile OptimizingEvaluator.PendingInput pendingInput; private volatile long lastPlanTime; private final TableOptimizingMetrics optimizingMetrics; - private final ReentrantLock blockerLock = new ReentrantLock(); protected TableRuntime( ServerTableIdentifier tableIdentifier, @@ -513,96 +504,6 @@ public double calculateQuotaOccupy() { .doubleValue(); } - /** - * Get all valid blockers. - * - * @return all valid blockers - */ - public List getBlockers() { - blockerLock.lock(); - try { - return getAs( - TableBlockerMapper.class, - mapper -> mapper.selectBlockers(tableIdentifier, System.currentTimeMillis())); - } finally { - blockerLock.unlock(); - } - } - - /** - * Block some operations for table. - * - * @param operations - operations to be blocked - * @param properties - - * @param blockerTimeout - - * @return TableBlocker if success - */ - public TableBlocker block( - List operations, - @Nonnull Map properties, - long blockerTimeout) { - Preconditions.checkNotNull(operations, "operations should not be null"); - Preconditions.checkArgument(!operations.isEmpty(), "operations should not be empty"); - Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must > 0"); - blockerLock.lock(); - try { - long now = System.currentTimeMillis(); - List tableBlockers = - getAs(TableBlockerMapper.class, mapper -> mapper.selectBlockers(tableIdentifier, now)); - if (conflict(operations, tableBlockers)) { - throw new BlockerConflictException(operations + " is conflict with " + tableBlockers); - } - TableBlocker tableBlocker = - buildTableBlocker(tableIdentifier, operations, properties, now, blockerTimeout); - doAs(TableBlockerMapper.class, mapper -> mapper.insertBlocker(tableBlocker)); - return tableBlocker; - } finally { - blockerLock.unlock(); - } - } - - /** - * Renew blocker. - * - * @param blockerId - blockerId - * @param blockerTimeout - timeout - * @throws IllegalStateException if blocker not exist - */ - public long renew(String blockerId, long blockerTimeout) { - blockerLock.lock(); - try { - long now = System.currentTimeMillis(); - TableBlocker tableBlocker = - getAs( - TableBlockerMapper.class, - mapper -> mapper.selectBlocker(Long.parseLong(blockerId), now)); - if (tableBlocker == null) { - throw new ObjectNotExistsException("Blocker " + blockerId + " of " + tableIdentifier); - } - long expirationTime = now + blockerTimeout; - doAs( - TableBlockerMapper.class, - mapper -> mapper.updateBlockerExpirationTime(Long.parseLong(blockerId), expirationTime)); - return expirationTime; - } finally { - blockerLock.unlock(); - } - } - - /** - * Release blocker, succeed when blocker not exist. - * - * @param blockerId - blockerId - */ - public void release(String blockerId) { - blockerLock.lock(); - try { - doAs(TableBlockerMapper.class, mapper -> mapper.deleteBlocker(Long.parseLong(blockerId))); - } finally { - blockerLock.unlock(); - } - } - /** * Check if operation are blocked now. * @@ -610,43 +511,15 @@ public void release(String blockerId) { * @return true if blocked */ public boolean isBlocked(BlockableOperation operation) { - blockerLock.lock(); - try { - List tableBlockers = - getAs( - TableBlockerMapper.class, - mapper -> mapper.selectBlockers(tableIdentifier, System.currentTimeMillis())); - return conflict(operation, tableBlockers); - } finally { - blockerLock.unlock(); - } - } - - private boolean conflict( - List blockableOperations, List blockers) { - return blockableOperations.stream().anyMatch(operation -> conflict(operation, blockers)); - } - - private boolean conflict(BlockableOperation blockableOperation, List blockers) { - return blockers.stream() - .anyMatch(blocker -> blocker.getOperations().contains(blockableOperation.name())); - } - - private TableBlocker buildTableBlocker( - ServerTableIdentifier tableIdentifier, - List operations, - Map properties, - long now, - long blockerTimeout) { - TableBlocker tableBlocker = new TableBlocker(); - tableBlocker.setTableIdentifier(tableIdentifier); - tableBlocker.setCreateTime(now); - tableBlocker.setExpirationTime(now + blockerTimeout); - tableBlocker.setOperations( - operations.stream().map(BlockableOperation::name).collect(Collectors.toList())); - HashMap propertiesOfTableBlocker = new HashMap<>(properties); - propertiesOfTableBlocker.put(RenewableBlocker.BLOCKER_TIMEOUT, blockerTimeout + ""); - tableBlocker.setProperties(propertiesOfTableBlocker); - return tableBlocker; + List tableBlockers = + getAs( + TableBlockerMapper.class, + mapper -> + mapper.selectBlockers( + tableIdentifier.getCatalog(), + tableIdentifier.getDatabase(), + tableIdentifier.getTableName(), + System.currentTimeMillis())); + return TableBlocker.conflict(operation, tableBlockers); } } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java index 8cb68d0a10..000b5054af 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/blocker/TableBlocker.java @@ -20,31 +20,89 @@ import org.apache.amoro.api.BlockableOperation; import org.apache.amoro.api.Blocker; -import org.apache.amoro.api.ServerTableIdentifier; +import org.apache.amoro.api.TableIdentifier; import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.table.blocker.RenewableBlocker; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; public class TableBlocker { - private ServerTableIdentifier tableIdentifier; + private String catalog; + private String database; + private String tableName; private long blockerId; private List operations; private long createTime; private long expirationTime; + private long prevBlockerId; private Map properties; + public static boolean conflict( + List blockableOperations, List blockers) { + return blockableOperations.stream().anyMatch(operation -> conflict(operation, blockers)); + } + + public static boolean conflict( + BlockableOperation blockableOperation, List blockers) { + return blockers.stream() + .anyMatch(blocker -> blocker.getOperations().contains(blockableOperation.name())); + } + + public static TableBlocker buildTableBlocker( + TableIdentifier tableIdentifier, + List operations, + Map properties, + long now, + long blockerTimeout, + long prevBlockerId) { + TableBlocker tableBlocker = new TableBlocker(); + tableBlocker.setCatalog(tableIdentifier.getCatalog()); + tableBlocker.setDatabase(tableIdentifier.getDatabase()); + tableBlocker.setTableName(tableIdentifier.getTableName()); + tableBlocker.setCreateTime(now); + tableBlocker.setExpirationTime(now + blockerTimeout); + tableBlocker.setOperations( + operations.stream().map(BlockableOperation::name).collect(Collectors.toList())); + HashMap propertiesOfTableBlocker = new HashMap<>(properties); + propertiesOfTableBlocker.put(RenewableBlocker.BLOCKER_TIMEOUT, blockerTimeout + ""); + tableBlocker.setProperties(propertiesOfTableBlocker); + return tableBlocker; + } + public TableBlocker() {} - public ServerTableIdentifier getTableIdentifier() { - return tableIdentifier; + public String getCatalog() { + return catalog; + } + + public void setCatalog(String catalog) { + this.catalog = catalog; + } + + public String getDatabase() { + return database; } - public void setTableIdentifier(ServerTableIdentifier tableIdentifier) { - this.tableIdentifier = tableIdentifier; + public void setDatabase(String database) { + this.database = database; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public void setTableIdentifier(TableIdentifier tableIdentifier) { + this.catalog = tableIdentifier.getCatalog(); + this.database = tableIdentifier.getDatabase(); + this.tableName = tableIdentifier.getTableName(); } public long getBlockerId() { @@ -79,6 +137,14 @@ public void setExpirationTime(long expirationTime) { this.expirationTime = expirationTime; } + public void setPrevBlockerId(long prevBlockerId) { + this.prevBlockerId = prevBlockerId; + } + + public long getPrevBlockerId() { + return this.prevBlockerId; + } + public Map getProperties() { return properties; } @@ -99,11 +165,14 @@ public Blocker buildBlocker() { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("tableIdentifier", tableIdentifier) + .add("catalog", catalog) + .add("database", database) + .add("tableName", tableName) .add("blockerId", blockerId) .add("operations", operations) .add("createTime", createTime) .add("expirationTime", expirationTime) + .add("prevBlockerId", prevBlockerId) .add("properties", properties) .toString(); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java index 1698198156..909d7c6a0b 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/executor/BlockerExpiringExecutor.java @@ -55,11 +55,13 @@ protected void execute(TableRuntime tableRuntime) { private static class Persistency extends PersistentBase { public void doExpiring(TableRuntime tableRuntime) { + String catalog = tableRuntime.getTableIdentifier().getCatalog(); + String database = tableRuntime.getTableIdentifier().getDatabase(); + String table = tableRuntime.getTableIdentifier().getTableName(); doAs( TableBlockerMapper.class, mapper -> - mapper.deleteExpiredBlockers( - tableRuntime.getTableIdentifier(), System.currentTimeMillis())); + mapper.deleteExpiredBlockers(catalog, database, table, System.currentTimeMillis())); } } } diff --git a/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql b/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql index e7856a57df..a7dc2ecf11 100644 --- a/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql +++ b/amoro-ams/amoro-ams-server/src/main/resources/derby/ams-derby-init.sql @@ -200,5 +200,7 @@ CREATE TABLE table_blocker ( create_time timestamp DEFAULT NULL, expiration_time timestamp DEFAULT NULL, properties clob(64m), - PRIMARY KEY (blocker_id) + prev_blocker_id bigint NOT NULL DEFAULT -1, + PRIMARY KEY (blocker_id), + CONSTRAINT prev_uq UNIQUE (catalog_name, db_name, table_name, prev_blocker_id) ); diff --git a/amoro-ams/amoro-ams-server/src/main/resources/mysql/ams-mysql-init.sql b/amoro-ams/amoro-ams-server/src/main/resources/mysql/ams-mysql-init.sql index 82fe7072d3..45bc860927 100644 --- a/amoro-ams/amoro-ams-server/src/main/resources/mysql/ams-mysql-init.sql +++ b/amoro-ams/amoro-ams-server/src/main/resources/mysql/ams-mysql-init.sql @@ -219,6 +219,7 @@ CREATE TABLE `table_blocker` ( `create_time` timestamp NULL DEFAULT NULL COMMENT 'Blocker create time', `expiration_time` timestamp NULL DEFAULT NULL COMMENT 'Blocker expiration time', `properties` mediumtext COMMENT 'Blocker properties', + `prev_blocker_id` bigint(20) NOT NULL DEFAULT -1 COMMENT 'prev blocker id when created', PRIMARY KEY (`blocker_id`), - KEY `table_index` (`catalog_name`,`db_name`,`table_name`) + UNIQUE KEY `uq_prev` (`catalog_name`,`db_name`,`table_name`, `prev_blocker_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Table blockers' ROW_FORMAT=DYNAMIC; diff --git a/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.6.1-to-0.7.0.sql b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.6.1-to-0.7.0.sql index 0489199904..3fff473886 100644 --- a/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.6.1-to-0.7.0.sql +++ b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade-0.6.1-to-0.7.0.sql @@ -17,4 +17,4 @@ ALTER TABLE table_identifier CHANGE COLUMN table_name table_name varchar(256) NO ALTER TABLE table_optimizing_process CHANGE COLUMN table_name table_name varchar(256) NOT NULL; ALTER TABLE table_metadata CHANGE COLUMN table_name table_name varchar(256) NOT NULL; ALTER TABLE table_runtime CHANGE COLUMN table_name table_name varchar(256) NOT NULL; -ALTER TABLE table_blocker CHANGE COLUMN table_name table_name varchar(256) NOT NULL; \ No newline at end of file +ALTER TABLE table_blocker CHANGE COLUMN table_name table_name varchar(256) NOT NULL; diff --git a/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade.sql b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade.sql index 69f4095550..a457d458d4 100644 --- a/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade.sql +++ b/amoro-ams/amoro-ams-server/src/main/resources/mysql/upgrade.sql @@ -15,3 +15,9 @@ -- If you have any changes to the AMS database, please record them in this file. -- We will confirm the corresponding version of these upgrade scripts when releasing. + +-- NEW SCHEMA CHANGE FOR CAS BASE BLOCKER +TRUNCATE TABLE `table_blocker`; +ALTER TABLE `table_blocker` DROP INDEX `table_index`; +ALTER TABLE `table_blocker` ADD COLUMN `prev_blocker_id` bigint(20) NOT NULL DEFAULT -1 COMMENT 'prev blocker id when created'; +ALTER TABLE `table_blocker` ADD UNIQUE KEY `uq_prev` (`catalog_name`,`db_name`,`table_name`, `prev_blocker_id`); diff --git a/amoro-ams/amoro-ams-server/src/main/resources/postgres/ams-postgres-init.sql b/amoro-ams/amoro-ams-server/src/main/resources/postgres/ams-postgres-init.sql index dd6f0e0dd3..40688a5ed6 100644 --- a/amoro-ams/amoro-ams-server/src/main/resources/postgres/ams-postgres-init.sql +++ b/amoro-ams/amoro-ams-server/src/main/resources/postgres/ams-postgres-init.sql @@ -352,9 +352,10 @@ CREATE TABLE table_blocker operations VARCHAR(128) NOT NULL, create_time TIMESTAMP, expiration_time TIMESTAMP, - properties TEXT + properties TEXT, + prev_blocker_id BIGSERIAL NOT NULL ); -CREATE INDEX blocker_index ON table_optimizing_process (catalog_name, db_name, table_name); +CREATE UNIQUE INDEX uq_prev ON table_blocker (catalog_name, db_name, table_name, prev_blocker_id); COMMENT ON TABLE table_blocker IS 'Table blockers'; COMMENT ON COLUMN table_blocker.blocker_id IS 'Blocker unique ID'; @@ -364,4 +365,5 @@ COMMENT ON COLUMN table_blocker.table_name IS 'Table name'; COMMENT ON COLUMN table_blocker.operations IS 'Blocked operations'; COMMENT ON COLUMN table_blocker.create_time IS 'Blocker create time'; COMMENT ON COLUMN table_blocker.expiration_time IS 'Blocker expiration time'; -COMMENT ON COLUMN table_blocker.properties IS 'Blocker properties'; \ No newline at end of file +COMMENT ON COLUMN table_blocker.properties IS 'Blocker properties'; +COMMENT ON COLUMN table_blocker.prev_blocker_id is 'prev blocker id when created'; diff --git a/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade-0.6.1-to-0.7.0.sql b/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade-0.6.1-to-0.7.0.sql index c1bb519958..84cc86db23 100644 --- a/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade-0.6.1-to-0.7.0.sql +++ b/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade-0.6.1-to-0.7.0.sql @@ -20,4 +20,6 @@ ALTER TABLE table_identifier ALTER COLUMN table_name TYPE varchar(256) NOT NULL; ALTER TABLE table_optimizing_process ALTER COLUMN table_name TYPE varchar(256) NOT NULL; ALTER TABLE table_metadata ALTER COLUMN table_name TYPE varchar(256) NOT NULL; ALTER TABLE table_runtime ALTER COLUMN table_name TYPE varchar(256) NOT NULL; -ALTER TABLE table_blocker ALTER COLUMN table_name TYPE varchar(256) NOT NULL; \ No newline at end of file +ALTER TABLE table_blocker ALTER COLUMN table_name TYPE varchar(256) NOT NULL; + + diff --git a/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade.sql b/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade.sql index 69f4095550..0b07e88050 100644 --- a/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade.sql +++ b/amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade.sql @@ -15,3 +15,10 @@ -- If you have any changes to the AMS database, please record them in this file. -- We will confirm the corresponding version of these upgrade scripts when releasing. + +-- NEW SCHEMA CHANGE FOR CAS BASE BLOCKER +TRUNCATE TABLE `table_blocker`; +ALTER TABLE `table_blocker` DROP INDEX `table_index`; +ALTER TABLE `table_blocker` ADD COLUMN `prev_blocker_id` bigint(20) NOT NULL DEFAULT -1; +COMMENT ON COLUMN table_blocker.prev_blocker_id IS 'prev blocker id when created'; +ALTER TABLE `table_blocker` ADD UNIQUE KEY `uq_prev` (`catalog_name`,`db_name`,`table_name`, `prev_blocker_id`); diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/persistence/mapper/MySQLTestBase.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/persistence/mapper/MySQLTestBase.java deleted file mode 100644 index 0ca6d6ced6..0000000000 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/persistence/mapper/MySQLTestBase.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.persistence.mapper; - -public class MySQLTestBase {} diff --git a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java index 5a38cea302..ef87ea60f9 100644 --- a/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java +++ b/amoro-ams/amoro-ams-server/src/test/java/org/apache/amoro/server/table/executor/TestBlockerExpiringExecutor.java @@ -55,19 +55,23 @@ public void mock() { public void testExpireBlocker() { BlockerExpiringExecutor blockerExpiringExecutor = new BlockerExpiringExecutor(tableManager); TableBlocker tableBlocker = new TableBlocker(); - tableBlocker.setTableIdentifier(tableIdentifier); + tableBlocker.setTableIdentifier(tableIdentifier.getIdentifier()); tableBlocker.setExpirationTime(System.currentTimeMillis() - 10); tableBlocker.setCreateTime(System.currentTimeMillis() - 20); tableBlocker.setOperations(Collections.singletonList(BlockableOperation.OPTIMIZE.name())); + tableBlocker.setPrevBlockerId(-1L); persistency.insertTableBlocker(tableBlocker); TableBlocker tableBlocker2 = new TableBlocker(); - tableBlocker2.setTableIdentifier(tableIdentifier); + tableBlocker2.setTableIdentifier(tableIdentifier.getIdentifier()); tableBlocker2.setExpirationTime(System.currentTimeMillis() + 100000); tableBlocker2.setCreateTime(System.currentTimeMillis() - 20); tableBlocker2.setOperations(Collections.singletonList(BlockableOperation.BATCH_WRITE.name())); + tableBlocker2.setPrevBlockerId(tableBlocker.getBlockerId()); persistency.insertTableBlocker(tableBlocker2); + Assert.assertThrows(Exception.class, () -> persistency.insertTableBlocker(tableBlocker2)); + Assert.assertEquals(2, persistency.selectTableBlockers(tableIdentifier).size()); Assert.assertNotNull(persistency.selectTableBlocker(tableBlocker.getBlockerId())); Assert.assertNotNull(persistency.selectTableBlocker(tableBlocker2.getBlockerId())); @@ -83,15 +87,28 @@ public void testExpireBlocker() { private static class Persistency extends PersistentBase { public void insertTableBlocker(TableBlocker tableBlocker) { - doAs(TableBlockerMapper.class, mapper -> mapper.insertBlocker(tableBlocker)); + doAs(TableBlockerMapper.class, mapper -> mapper.insert(tableBlocker)); } public List selectTableBlockers(ServerTableIdentifier tableIdentifier) { - return getAs(TableBlockerMapper.class, mapper -> mapper.selectBlockers(tableIdentifier, 1)); + return getAs( + TableBlockerMapper.class, + mapper -> + mapper.selectBlockers( + tableIdentifier.getCatalog(), + tableIdentifier.getDatabase(), + tableIdentifier.getTableName(), + 1)); } public void deleteBlockers(ServerTableIdentifier tableIdentifier) { - doAs(TableBlockerMapper.class, mapper -> mapper.deleteBlockers(tableIdentifier)); + doAs( + TableBlockerMapper.class, + mapper -> + mapper.deleteTableBlockers( + tableIdentifier.getCatalog(), + tableIdentifier.getDatabase(), + tableIdentifier.getTableName())); } public TableBlocker selectTableBlocker(long blockerId) {