Skip to content

Commit

Permalink
list table names with snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
Yingjian Wu committed Jan 8, 2024
1 parent b4f73aa commit c39b912
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 142 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netflix.metacat.connector.polaris;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.netflix.metacat.common.QualifiedName;
Expand All @@ -17,6 +18,7 @@
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -31,6 +33,88 @@
@ActiveProfiles(profiles = {"polaris_functional_test"})
@AutoConfigureDataJpa
public class PolarisConnectorTableServiceFunctionalTest extends PolarisConnectorTableServiceTest {
/**
* Test get table names.
*/
@Test
public void testGetTableNames() {
final QualifiedName name1 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1");
final TableInfo tableInfo1 = TableInfo.builder()
.name(name1)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1"))
.build();
getPolarisTableService().create(getRequestContext(), tableInfo1);
final QualifiedName name2 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table2");
final TableInfo tableInfo2 = TableInfo.builder()
.name(name2)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2"))
.build();
getPolarisTableService().create(getRequestContext(), tableInfo2);
final QualifiedName name3 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table3");
final TableInfo tableInfo3 = TableInfo.builder()
.name(name3)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc3"))
.build();
getPolarisTableService().create(getRequestContext(), tableInfo3);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}

final List<QualifiedName> tables = getPolarisTableService()
.getTableNames(getRequestContext(), DB_QUALIFIED_NAME, "", -1);
Assert.assertEquals(tables.size(), 3);
Assert.assertEquals(tables, ImmutableList.of(name1, name2, name3));
}

/**
* Test empty list tables.
*/
@Test
public void testListTablesEmpty() {
final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "");

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}

final List<QualifiedName> names = getPolarisTableService().listNames(
getRequestContext(), DB_QUALIFIED_NAME, qualifiedName,
new Sort(null, SortOrder.ASC), new Pageable(2, 0));
Assert.assertEquals(names, Arrays.asList());
}

/**
* Test table creation then list tables.
*/
@Test
public void testTableCreationAndList() {
final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1");
final TableInfo tableInfo = TableInfo.builder()
.name(qualifiedName)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1"))
.build();
getPolarisTableService().create(getRequestContext(), tableInfo);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}

final List<QualifiedName> names = getPolarisTableService().listNames(
getRequestContext(), DB_QUALIFIED_NAME, qualifiedName,
new Sort(null, SortOrder.ASC), new Pageable(2, 0));
Assert.assertEquals(names, Arrays.asList(qualifiedName));
}

/**
* Test table list.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,37 @@
@AutoConfigureDataJpa
public class PolarisStoreConnectorFunctionalTest extends PolarisStoreConnectorTest {

/**
* Test to verify that table names fetch works.
*/
@Test
public void testPaginatedFetch() {
final String dbName = generateDatabaseName();
createDB(dbName);
List<String> tblNames = getPolarisConnector().getTables(dbName, "", 1000);
Assert.assertEquals(0, tblNames.size());

final String tblNameA = "A_" + generateTableName();
final String tblNameB = "B_" + generateTableName();
final String tblNameC = "C_" + generateTableName();
createTable(dbName, tblNameA);
createTable(dbName, tblNameB);
createTable(dbName, tblNameC);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}

tblNames = getPolarisConnector().getTables(dbName, "", 1000);
Assert.assertEquals(3, tblNames.size());
Assert.assertEquals(tblNameA, tblNames.get(0));
Assert.assertEquals(tblNameB, tblNames.get(1));
Assert.assertEquals(tblNameC, tblNames.get(2));
}

/**
* Test getTableEntities.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ public List<QualifiedName> listNames(
try {
final List<QualifiedName> qualifiedNames = Lists.newArrayList();
final String tableFilter = (prefix != null && prefix.isTableDefinition()) ? prefix.getTableName() : "";
for (String tableName : polarisStoreService.getTables(name.getDatabaseName(), tableFilter)) {
for (String tableName : polarisStoreService.getTables(name.getDatabaseName(),
tableFilter,
1000)
) {
final QualifiedName qualifiedName =
QualifiedName.ofTable(name.getCatalogName(), name.getDatabaseName(), tableName);
if (prefix != null && !qualifiedName.toString().startsWith(prefix.toString())) {
Expand Down Expand Up @@ -388,7 +391,10 @@ public List<QualifiedName> getTableNames(
final List<QualifiedName> result = Lists.newArrayList();
for (int i = 0; i < databaseNames.size() && limitSize > 0; i++) {
final String databaseName = databaseNames.get(i);
final List<String> tableNames = polarisStoreService.getTables(name.getDatabaseName(), "");
final List<String> tableNames = polarisStoreService.getTables(
name.getDatabaseName(),
"",
1000);
result.addAll(tableNames.stream()
.map(n -> QualifiedName.ofTable(name.getCatalogName(), databaseName, n))
.limit(limitSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -158,7 +157,8 @@ public Optional<PolarisTableEntity> getTable(final String dbName, final String t
public List<PolarisTableEntity> getTableEntities(final String databaseName,
final String tableNamePrefix,
final int pageFetchSize) {
return tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tableNamePrefix, pageFetchSize);
return (List<PolarisTableEntity>)
tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tableNamePrefix, pageFetchSize, true);
}

/**
Expand Down Expand Up @@ -205,22 +205,9 @@ boolean tableExistsById(final String tblId) {
*/
@Override
@Transactional(propagation = Propagation.SUPPORTS)
public List<String> getTables(final String databaseName, final String tableNamePrefix) {
final int pageFetchSize = 1000;
final List<String> retval = new ArrayList<>();
final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix;
Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tblName").ascending());
Slice<String> tblNames = null;
boolean hasNext = true;
do {
tblNames = tblRepo.findAllByDbNameAndTablePrefix(databaseName, tblPrefix, page);
retval.addAll(tblNames.toList());
hasNext = tblNames.hasNext();
if (hasNext) {
page = tblNames.nextPageable();
}
} while (hasNext);
return retval;
public List<String> getTables(final String databaseName, final String tableNamePrefix, final int pageFetchSize) {
return (List<String>)
tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tableNamePrefix, pageFetchSize, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ public interface PolarisStoreService {
* Gets tables in the database and tableName prefix.
* @param databaseName database name
* @param tableNamePrefix table name prefix
* @param pageFetchSize size of each page
* @return list of table names in the database with the table name prefix.
*/
List<String> getTables(String databaseName, String tableNamePrefix);
List<String> getTables(String databaseName, String tableNamePrefix, int pageFetchSize);

/**
* Do an atomic compare-and-swap to update the table's metadata location.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.netflix.metacat.connector.polaris.store.repos;

import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity;
import java.util.List;

/**
Expand All @@ -12,8 +11,9 @@ public interface PolarisTableCustomRepository {
* @param dbName database name
* @param tableNamePrefix table name prefix. can be empty.
* @param pageSize target size for each page
* @param selectAll if true return the PolarisEntity else return name of the entity
* @return table entities in the database.
*/
List<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
String dbName, String tableNamePrefix, int pageSize);
List<?> findAllTablesByDbNameAndTablePrefix(
String dbName, String tableNamePrefix, int pageSize, boolean selectAll);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class PolarisTableCustomRepositoryImpl implements PolarisTableCustomRepos
@PersistenceContext
private EntityManager entityManager;

private Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefixForCurrentPage(
final String dbName, final String tableNamePrefix, final Pageable page) {
private <T> Slice<T> findAllTablesByDbNameAndTablePrefixForCurrentPage(
final String dbName, final String tableNamePrefix, final Pageable page, final boolean selectAll) {

// Generate ORDER BY clause
String orderBy = "";
Expand All @@ -37,14 +37,21 @@ private Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefixForCurrentP
orderBy = " ORDER BY " + orderBy;
}

final String sql = "SELECT t.* FROM TBLS t "
final String selectClause = selectAll ? "t.*" : "t.tbl_name";
final String sql = "SELECT " + selectClause + " FROM TBLS t "
+ "WHERE t.db_name = :dbName AND t.tbl_name LIKE :tableNamePrefix" + orderBy;
final Query query = entityManager.createNativeQuery(sql, PolarisTableEntity.class);

Query query;
if (selectAll) {
query = entityManager.createNativeQuery(sql, PolarisTableEntity.class);
} else {
query = entityManager.createNativeQuery(sql);
}
query.setParameter("dbName", dbName);
query.setParameter("tableNamePrefix", tableNamePrefix + "%");
query.setFirstResult(page.getPageNumber() * page.getPageSize());
query.setMaxResults(page.getPageSize() + 1); // Fetch one extra result to determine if there is a next page
final List<PolarisTableEntity> resultList = query.getResultList();
final List<T> resultList = query.getResultList();
// Check if there is a next page
final boolean hasNext = resultList.size() > page.getPageSize();
// If there is a next page, remove the last item from the list
Expand All @@ -56,18 +63,18 @@ private Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefixForCurrentP

@Override
@Transactional
public List<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
final String dbName, final String tableNamePrefix, final int pageFetchSize) {
public List<?> findAllTablesByDbNameAndTablePrefix(
final String dbName, final String tableNamePrefix, final int pageFetchSize, final boolean selectAll) {
Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tbl_name").ascending());
entityManager.createNativeQuery("SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()")
.executeUpdate();
final List<PolarisTableEntity> retval = new ArrayList<>();
final List<Object> retval = new ArrayList<>();
final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix;
Slice<PolarisTableEntity> tbls;
Slice<?> tbls;
boolean hasNext;
do {
tbls = findAllTablesByDbNameAndTablePrefixForCurrentPage(dbName, tblPrefix, page);
retval.addAll(tbls.toList());
tbls = findAllTablesByDbNameAndTablePrefixForCurrentPage(dbName, tblPrefix, page, selectAll);
retval.addAll(tbls.getContent());
hasNext = tbls.hasNext();
if (hasNext) {
page = tbls.nextPageable();
Expand Down
Loading

0 comments on commit c39b912

Please sign in to comment.