Skip to content

Commit

Permalink
[AMORO-3078]: A lock-free TableBlocker implementation and does not re…
Browse files Browse the repository at this point in the history
…ly on TableRuntime anymore. (apache#3079)

* Fix unit tests

* stash

* ut is passed

* spotless

* fix reviewers

* fix conflict

* fix spotless

* fix spotless
  • Loading branch information
baiyangtx authored Aug 13, 2024
1 parent da62faf commit 78708b8
Show file tree
Hide file tree
Showing 16 changed files with 280 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ public ServerTableIdentifier dropTable(String databaseName, String tableName) {
doAs(
TableMetaMapper.class,
mapper -> mapper.deleteTableMetaById(tableIdentifier.getId())),
() -> doAs(TableBlockerMapper.class, mapper -> mapper.deleteBlockers(tableIdentifier)),
() ->
doAs(
TableBlockerMapper.class,
mapper -> mapper.deleteTableBlockers(this.name(), databaseName, tableName)),
() -> dropTableInternal(databaseName, tableName),
() ->
doAsExisted(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ protected NestedSqlSession beginSession() {
.openSession(TransactionIsolationLevel.READ_COMMITTED));
}

public final <T> Long updateAs(Class<T> mapperClz, Function<T, Number> updateFunction) {
try (NestedSqlSession session = beginSession()) {
try {
T mapper = getMapper(session, mapperClz);
Number number = updateFunction.apply(mapper);
session.commit();
return number.longValue();
} catch (Throwable t) {
session.rollback();
throw AmoroRuntimeException.wrap(t, PersistenceException::new);
}
}
}

protected final <T> void doAs(Class<T> mapperClz, Consumer<T> consumer) {
try (NestedSqlSession session = beginSession()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.amoro.server.persistence.mapper;

import org.apache.amoro.api.ServerTableIdentifier;
import org.apache.amoro.server.persistence.converter.List2StringConverter;
import org.apache.amoro.server.persistence.converter.Long2TsConverter;
import org.apache.amoro.server.persistence.converter.Map2StringConverter;
Expand All @@ -42,14 +41,15 @@ public interface TableBlockerMapper {
+ "expiration_time,properties FROM "
+ TABLE_NAME
+ " "
+ "WHERE catalog_name = #{tableIdentifier.catalog} AND db_name = #{tableIdentifier.database} "
+ "AND table_name = #{tableIdentifier.tableName} "
+ "WHERE catalog_name = #{catalog} "
+ "AND db_name = #{database} "
+ "AND table_name = #{tableName} "
+ "AND expiration_time > #{now, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}")
@Results({
@Result(property = "blockerId", column = "blocker_id"),
@Result(property = "tableIdentifier.catalog", column = "catalog_name"),
@Result(property = "tableIdentifier.database", column = "db_name"),
@Result(property = "tableIdentifier.tableName", column = "table_name"),
@Result(property = "catalog", column = "catalog_name"),
@Result(property = "database", column = "db_name"),
@Result(property = "tableName", column = "table_name"),
@Result(
property = "operations",
column = "operations",
Expand All @@ -62,7 +62,10 @@ public interface TableBlockerMapper {
@Result(property = "properties", column = "properties", typeHandler = Map2StringConverter.class)
})
List<TableBlocker> selectBlockers(
@Param("tableIdentifier") ServerTableIdentifier tableIdentifier, @Param("now") long now);
@Param("catalog") String catalog,
@Param("database") String database,
@Param("tableName") String tableName,
@Param("now") long now);

@Select(
"SELECT blocker_id,catalog_name,db_name,table_name,operations,create_time,"
Expand All @@ -73,9 +76,9 @@ List<TableBlocker> selectBlockers(
+ "AND expiration_time > #{now, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}")
@Results({
@Result(property = "blockerId", column = "blocker_id"),
@Result(property = "tableIdentifier.catalog", column = "catalog_name"),
@Result(property = "tableIdentifier.database", column = "db_name"),
@Result(property = "tableIdentifier.tableName", column = "table_name"),
@Result(property = "catalog", column = "catalog_name"),
@Result(property = "database", column = "db_name"),
@Result(property = "tableName", column = "table_name"),
@Result(
property = "operations",
column = "operations",
Expand All @@ -89,31 +92,34 @@ List<TableBlocker> selectBlockers(
})
TableBlocker selectBlocker(@Param("blockerId") long blockerId, @Param("now") long now);

@Update(
"UPDATE "
+ TABLE_NAME
+ " "
+ "SET "
+ "expiration_time = #{expiration, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter} "
+ "WHERE blocker_id = #{blockerId} "
+ "AND expiration_time > #{now, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}")
int renewBlocker(
@Param("blockerId") long blockerId,
@Param("now") long now,
@Param("expiration") long expiration);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ " (catalog_name,db_name,table_name,operations,create_time,"
+ "expiration_time,properties) VALUES ("
+ "#{blocker.tableIdentifier.catalog},"
+ "#{blocker.tableIdentifier.database},"
+ "#{blocker.tableIdentifier.tableName},"
+ "(catalog_name,db_name,table_name,operations,create_time,expiration_time,prev_blocker_id,properties) "
+ "VALUES ( "
+ "#{blocker.catalog},"
+ "#{blocker.database},"
+ "#{blocker.tableName},"
+ "#{blocker.operations,typeHandler=org.apache.amoro.server.persistence.converter.List2StringConverter},"
+ "#{blocker.createTime,typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter},"
+ "#{blocker.expirationTime,typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter},"
+ "#{blocker.properties,typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter}"
+ ")")
+ "#{blocker.prevBlockerId},"
+ "#{blocker.properties,typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter})")
@Options(useGeneratedKeys = true, keyProperty = "blocker.blockerId")
void insertBlocker(@Param("blocker") TableBlocker blocker);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET "
+ "expiration_time = #{expirationTime, "
+ "typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter} "
+ "WHERE blocker_id = #{blockerId}")
void updateBlockerExpirationTime(
@Param("blockerId") long blockerId, @Param("expirationTime") long expirationTime);
int insert(@Param("blocker") TableBlocker blocker);

@Delete("DELETE FROM " + TABLE_NAME + " " + "WHERE blocker_id = #{blockerId}")
void deleteBlocker(@Param("blockerId") long blockerId);
Expand All @@ -122,17 +128,23 @@ void updateBlockerExpirationTime(
"DELETE FROM "
+ TABLE_NAME
+ " "
+ "WHERE catalog_name = #{tableIdentifier.catalog} AND db_name = #{tableIdentifier.database} "
+ "AND table_name = #{tableIdentifier.tableName} "
+ "WHERE catalog_name = #{catalog} AND db_name = #{database} "
+ "AND table_name = #{tableName} "
+ "AND expiration_time <= #{now, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter}")
int deleteExpiredBlockers(
@Param("tableIdentifier") ServerTableIdentifier tableIdentifier, @Param("now") long now);
@Param("catalog") String catalog,
@Param("database") String database,
@Param("tableName") String tableName,
@Param("now") long now);

@Delete(
"DELETE FROM "
+ TABLE_NAME
+ " "
+ "WHERE catalog_name = #{tableIdentifier.catalog} AND db_name = #{tableIdentifier.database} "
+ "AND table_name = #{tableIdentifier.tableName}")
int deleteBlockers(@Param("tableIdentifier") ServerTableIdentifier tableIdentifier);
+ "WHERE catalog_name = #{catalog} AND db_name = #{database} "
+ "AND table_name = #{tableName}")
int deleteTableBlockers(
@Param("catalog") String catalog,
@Param("database") String database,
@Param("tableName") String tableName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.exception.AlreadyExistsException;
import org.apache.amoro.server.exception.BlockerConflictException;
import org.apache.amoro.server.exception.IllegalMetadataException;
import org.apache.amoro.server.exception.ObjectNotExistsException;
import org.apache.amoro.server.exception.PersistenceException;
import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.persistence.StatedPersistentBase;
import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
import org.apache.amoro.server.table.blocker.TableBlocker;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
Expand All @@ -55,6 +58,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -75,6 +79,7 @@
public class DefaultTableService extends StatedPersistentBase implements TableService {

public static final Logger LOG = LoggerFactory.getLogger(DefaultTableService.class);
private static final int TABLE_BLOCKER_RETRY = 3;
private final long externalCatalogRefreshingInterval;
private final long blockerTimeout;
private final Map<String, InternalCatalog> internalCatalogMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -251,32 +256,90 @@ public Blocker block(
TableIdentifier tableIdentifier,
List<BlockableOperation> operations,
Map<String, String> properties) {
checkStarted();
return getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier))
.block(operations, properties, blockerTimeout)
.buildBlocker();
Preconditions.checkNotNull(operations, "operations should not be null");
Preconditions.checkArgument(!operations.isEmpty(), "operations should not be empty");
Preconditions.checkArgument(blockerTimeout > 0, "blocker timeout must > 0");
String catalog = tableIdentifier.getCatalog();
String database = tableIdentifier.getDatabase();
String table = tableIdentifier.getTableName();
int tryCount = 0;
while (tryCount++ < TABLE_BLOCKER_RETRY) {
long now = System.currentTimeMillis();
doAs(
TableBlockerMapper.class,
mapper -> mapper.deleteExpiredBlockers(catalog, database, table, now));
List<TableBlocker> tableBlockers =
getAs(
TableBlockerMapper.class,
mapper ->
mapper.selectBlockers(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
tableIdentifier.getTableName(),
now));
if (TableBlocker.conflict(operations, tableBlockers)) {
throw new BlockerConflictException(operations + " is conflict with " + tableBlockers);
}
Optional<Long> maxBlockerOpt =
tableBlockers.stream()
.map(TableBlocker::getBlockerId)
.max(Comparator.comparingLong(l -> l));
long prevBlockerId = maxBlockerOpt.orElse(-1L);

TableBlocker tableBlocker =
TableBlocker.buildTableBlocker(
tableIdentifier, operations, properties, now, blockerTimeout, prevBlockerId);
try {
doAs(TableBlockerMapper.class, mapper -> mapper.insert(tableBlocker));
if (tableBlocker.getBlockerId() > 0) {
return tableBlocker.buildBlocker();
}
} catch (PersistenceException e) {
LOG.warn("An exception occurs when creating a blocker:{}", tableBlocker, e);
}
}
throw new BlockerConflictException("Failed to create a blocker: conflict meet max retry");
}

@Override
public void releaseBlocker(TableIdentifier tableIdentifier, String blockerId) {
checkStarted();
TableRuntime tableRuntime = getRuntime(getServerTableIdentifier(tableIdentifier));
if (tableRuntime != null) {
tableRuntime.release(blockerId);
}
doAs(TableBlockerMapper.class, mapper -> mapper.deleteBlocker(Long.parseLong(blockerId)));
}

@Override
public long renewBlocker(TableIdentifier tableIdentifier, String blockerId) {
checkStarted();
TableRuntime tableRuntime = getAndCheckExist(getServerTableIdentifier(tableIdentifier));
return tableRuntime.renew(blockerId, blockerTimeout);
int retry = 0;
while (retry++ < TABLE_BLOCKER_RETRY) {
long now = System.currentTimeMillis();
long id = Long.parseLong(blockerId);
TableBlocker tableBlocker =
getAs(TableBlockerMapper.class, mapper -> mapper.selectBlocker(id, now));
if (tableBlocker == null) {
throw new ObjectNotExistsException("Blocker " + blockerId + " of " + tableIdentifier);
}
long current = System.currentTimeMillis();
long expirationTime = now + blockerTimeout;
long effectRow =
updateAs(
TableBlockerMapper.class, mapper -> mapper.renewBlocker(id, current, expirationTime));
if (effectRow > 0) {
return expirationTime;
}
}
throw new BlockerConflictException("Failed to renew a blocker: conflict meet max retry");
}

@Override
public List<Blocker> getBlockers(TableIdentifier tableIdentifier) {
checkStarted();
return getAndCheckExist(getOrSyncServerTableIdentifier(tableIdentifier)).getBlockers().stream()
return getAs(
TableBlockerMapper.class,
mapper ->
mapper.selectBlockers(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
tableIdentifier.getTableName(),
System.currentTimeMillis()))
.stream()
.map(TableBlocker::buildBlocker)
.collect(Collectors.toList());
}
Expand Down
Loading

0 comments on commit 78708b8

Please sign in to comment.