Skip to content

Commit

Permalink
use as of system time when listing tables to improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
Yingjian Wu committed Jan 4, 2024
1 parent 2c0f438 commit 1864797
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.netflix.metacat.connector.polaris;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.netflix.metacat.common.QualifiedName;
import com.netflix.metacat.common.dto.Pageable;
import com.netflix.metacat.common.dto.Sort;
import com.netflix.metacat.common.dto.SortOrder;
import com.netflix.metacat.common.server.connectors.model.TableInfo;
import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.List;
import java.util.stream.Collectors;

/**
* Test PolarisConnectorTableService in functional test.
* Some of the tests cannot be run in unit test as it uses h2 database, which does not support all
* functionalities in crdb so include those tests here.
*/
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {PolarisPersistenceConfig.class})
@ActiveProfiles(profiles = {"polaris_functional_test"})
@AutoConfigureDataJpa
public class PolarisConnectorTableServiceFunctionalTest extends PolarisConnectorTableServiceTest {
/**
* Test table list.
*/
@Test
public void testList() {
final QualifiedName name1 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1");
final TableInfo tableInfo1 = TableInfo.builder()
.name(name1)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1"))
.build();
this.getPolarisTableService().create(this.getRequestContext(), tableInfo1);
final QualifiedName name2 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table2");
final TableInfo tableInfo2 = TableInfo.builder()
.name(name2)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2"))
.build();
this.getPolarisTableService().create(this.getRequestContext(), tableInfo2);


final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "");

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("Sleep was interrupted");
}

List<TableInfo> tables = this.getPolarisTableService().list(
this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC),
new Pageable(2, 0));
Assert.assertEquals(tables.size(), 2);
Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()),
ImmutableSet.of(name1, name2));

// Create a 3rd table, but this time does not sleep so this table should not be included
final QualifiedName name3 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table3");
final TableInfo tableInfo3 = TableInfo.builder()
.name(name3)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2"))
.build();
this.getPolarisTableService().create(this.getRequestContext(), tableInfo3);

tables = this.getPolarisTableService().list(
this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC),
new Pageable(3, 0));
Assert.assertEquals(tables.size(), 2);
Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()),
ImmutableSet.of(name1, name2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@

import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig;
import com.netflix.metacat.connector.polaris.store.PolarisStoreConnectorTest;
import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.List;

/**
* Test persistence operations on Database objects.
*/
Expand All @@ -18,4 +23,65 @@
@AutoConfigureDataJpa
public class PolarisStoreConnectorFunctionalTest extends PolarisStoreConnectorTest {

/**
* Test getTableEntities.
*/
@Test
public void testGetTableEntities() {
// Create the db
final String dbName = generateDatabaseName();
createDB(dbName);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("Sleep was interrupted");
}

// Test when db is empty
List<PolarisTableEntity> entities = getPolarisConnector().getTableEntities(dbName, "", 1);
Assert.assertEquals(0, entities.size());


// Add some tables
final String tblNameA = "A_" + generateTableName();
final String tblNameB = "B_" + generateTableName();
final String tblNameC = "C_" + generateTableName();
createTable(dbName, tblNameA);
createTable(dbName, tblNameB);
createTable(dbName, tblNameC);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("Sleep was interrupted");
}

// Test pagination and sort
entities = getPolarisConnector().getTableEntities(dbName, "", 1);
Assert.assertEquals(3, entities.size());
Assert.assertEquals(tblNameA, entities.get(0).getTblName());
Assert.assertEquals(tblNameB, entities.get(1).getTblName());
Assert.assertEquals(tblNameC, entities.get(2).getTblName());

entities = getPolarisConnector().getTableEntities(dbName, "", 2);
Assert.assertEquals(3, entities.size());
Assert.assertEquals(tblNameA, entities.get(0).getTblName());
Assert.assertEquals(tblNameB, entities.get(1).getTblName());
Assert.assertEquals(tblNameC, entities.get(2).getTblName());

entities = getPolarisConnector().getTableEntities(dbName, "", 3);
Assert.assertEquals(3, entities.size());
Assert.assertEquals(tblNameA, entities.get(0).getTblName());
Assert.assertEquals(tblNameB, entities.get(1).getTblName());
Assert.assertEquals(tblNameC, entities.get(2).getTblName());

entities = getPolarisConnector().getTableEntities(dbName, "", 4);
Assert.assertEquals(3, entities.size());
Assert.assertEquals(tblNameA, entities.get(0).getTblName());
Assert.assertEquals(tblNameB, entities.get(1).getTblName());
Assert.assertEquals(tblNameC, entities.get(2).getTblName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ spring:
init:
schema-locations: classpath:schema.sql
mode: always
platform: h2db
platform: postgresql

logging:
level:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public List<TableInfo> list(
try {
final String tableFilter = (prefix != null && prefix.isTableDefinition()) ? prefix.getTableName() : "";
final List<PolarisTableEntity> tbls =
polarisStoreService.getTableEntities(name.getDatabaseName(), tableFilter);
polarisStoreService.getTableEntities(name.getDatabaseName(), tableFilter, 1000);
if (sort != null) {
ConnectorUtils.sort(tbls, sort, Comparator.comparing(t -> t.getTblName()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public DataSourceProperties dataSourceProperties() {
*/
@Bean
public PolarisStoreService polarisStoreService(
final PolarisDatabaseRepository repo, final PolarisTableRepository tblRepo) {
final PolarisDatabaseRepository repo, final PolarisTableRepository tblRepo) {
return new PolarisStoreConnector(repo, tblRepo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,22 +155,10 @@ public Optional<PolarisTableEntity> getTable(final String dbName, final String t
*/
@Override
@Transactional(propagation = Propagation.SUPPORTS)
public List<PolarisTableEntity> getTableEntities(final String databaseName, final String tableNamePrefix) {
final int pageFetchSize = 1000;
final List<PolarisTableEntity> retval = new ArrayList<>();
final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix;
Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tblName").ascending());
Slice<PolarisTableEntity> tbls;
boolean hasNext;
do {
tbls = tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tblPrefix, page);
retval.addAll(tbls.toList());
hasNext = tbls.hasNext();
if (hasNext) {
page = tbls.nextPageable();
}
} while (hasNext);
return retval;
public List<PolarisTableEntity> getTableEntities(final String databaseName,
final String tableNamePrefix,
final int pageFetchSize) {
return tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tableNamePrefix, pageFetchSize);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ public interface PolarisStoreService {
* Fetch table entities for given database.
* @param databaseName database name
* @param tableNamePrefix table name prefix. can be empty.
* @param pageFetchSize target size for each page
* @return table entities in the database.
*/
List<PolarisTableEntity> getTableEntities(final String databaseName, final String tableNamePrefix);
List<PolarisTableEntity> getTableEntities(String databaseName, String tableNamePrefix, int pageFetchSize);

/**
* Updates existing or creates new table entry.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.netflix.metacat.connector.polaris.store.repos;

import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity;
import java.util.List;

/**
* Custom JPA repository implementation for storing PolarisTableEntity.
*/
public interface PolarisTableCustomRepository {
/**
* Fetch table entities for given database using AS OF SYSTEM TIME follower_read_timestamp().
* @param dbName database name
* @param tableNamePrefix table name prefix. can be empty.
* @param pageSize target size for each page
* @return table entities in the database.
*/
List<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
String dbName, String tableNamePrefix, int pageSize);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.netflix.metacat.connector.polaris.store.repos;

import javax.persistence.PersistenceContext;
import javax.persistence.EntityManager;
import javax.persistence.Query;

import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.SliceImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Repository;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
* Implementation for Custom JPA repository implementation for storing PolarisTableEntity.
*/
@Repository
public class PolarisTableCustomRepositoryImpl implements PolarisTableCustomRepository {
@PersistenceContext
private EntityManager entityManager;

private Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefixForCurrentPage(
final String dbName, final String tableNamePrefix, final Pageable page) {

// Generate ORDER BY clause
String orderBy = "";
if (page.getSort().isSorted()) {
orderBy = page.getSort().stream()
.map(order -> order.getProperty() + " " + order.getDirection())
.collect(Collectors.joining(", "));
orderBy = " ORDER BY " + orderBy;
}

final String sql = "SELECT t.* FROM TBLS t "
+ "WHERE t.db_name = :dbName AND t.tbl_name LIKE :tableNamePrefix" + orderBy;
final Query query = entityManager.createNativeQuery(sql, PolarisTableEntity.class);
query.setParameter("dbName", dbName);
query.setParameter("tableNamePrefix", tableNamePrefix + "%");
query.setFirstResult(page.getPageNumber() * page.getPageSize());
query.setMaxResults(page.getPageSize() + 1); // Fetch one extra result to determine if there is a next page
final List<PolarisTableEntity> resultList = query.getResultList();
// Check if there is a next page
final boolean hasNext = resultList.size() > page.getPageSize();
// If there is a next page, remove the last item from the list
if (hasNext) {
resultList.remove(resultList.size() - 1);
}
return new SliceImpl<>(resultList, page, hasNext);
}

@Override
@Transactional
public List<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
final String dbName, final String tableNamePrefix, final int pageFetchSize) {
Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tbl_name").ascending());
entityManager.createNativeQuery("SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()")
.executeUpdate();
final List<PolarisTableEntity> retval = new ArrayList<>();
final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix;
Slice<PolarisTableEntity> tbls;
boolean hasNext;
do {
tbls = findAllTablesByDbNameAndTablePrefixForCurrentPage(dbName, tblPrefix, page);
retval.addAll(tbls.toList());
hasNext = tbls.hasNext();
if (hasNext) {
page = tbls.nextPageable();
}
} while (hasNext);
return retval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
@Repository
public interface PolarisTableRepository extends JpaRepository<PolarisTableEntity, String>,
JpaSpecificationExecutor {
JpaSpecificationExecutor, PolarisTableCustomRepository {

/**
* Delete table entry by name.
Expand Down Expand Up @@ -67,19 +67,6 @@ boolean existsByDbNameAndTblName(
@Param("dbName") final String dbName,
@Param("tblName") final String tblName);

/**
* Fetch table entities in database.
* @param dbName database name
* @param tableNamePrefix table name prefix. can be empty.
* @param page pageable.
* @return table entities that belong to the database.
*/
@Query("SELECT e FROM PolarisTableEntity e WHERE e.dbName = :dbName AND e.tblName LIKE :tableNamePrefix%")
Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
@Param("dbName") final String dbName,
@Param("tableNamePrefix") final String tableNamePrefix,
Pageable page);

/**
* Do an atomic compare-and-swap on the metadata location of the table.
* @param dbName database name of the table
Expand Down
Loading

0 comments on commit 1864797

Please sign in to comment.