Skip to content

Commit

Permalink
Yingjianw/list db with follower reader timestamp (#589)
Browse files Browse the repository at this point in the history
* follower_read_timestamp for list db calls

* address comments

---------

Co-authored-by: Yingjian Wu <[email protected]>
  • Loading branch information
stevie9868 and Yingjian Wu authored May 1, 2024
1 parent 9a9f271 commit 5d0a3a0
Show file tree
Hide file tree
Showing 16 changed files with 436 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -555,17 +555,31 @@ public interface Config {
/**
* Get the page size when listing table entities.
*
* @return True if it is.
* @return size of the page
*/
int getListTableEntitiesPageSize();

/**
* Get the page size when listing table names.
*
* @return True if it is.
* @return size of the page
*/
int getListTableNamesPageSize();

/**
* Get the page size when listing db entities.
*
* @return size of the page
*/
int getListDatabaseEntitiesPageSize();

/**
* Get the page size when listing db names.
*
* @return size of the page
*/
int getListDatabaseNamesPageSize();

/**
* Metadata query timeout in seconds.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,16 @@ public int getListTableNamesPageSize() {
return this.metacatProperties.getService().getListTableNamesPageSize();
}

@Override
public int getListDatabaseEntitiesPageSize() {
return this.metacatProperties.getService().getListDatabaseEntitiesPageSize();
}

@Override
public int getListDatabaseNamesPageSize() {
return this.metacatProperties.getService().getListDatabaseNamesPageSize();
}

@Override
public int getMetadataQueryTimeout() {
return this.metacatProperties.getUsermetadata().getQueryTimeoutInSeconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class ServiceProperties {
private boolean listDatabaseNameByDefaultOnGetCatalog = true;
private int listTableEntitiesPageSize = 1000;
private int listTableNamesPageSize = 10000;
private int listDatabaseEntitiesPageSize = 1000;
private int listDatabaseNamesPageSize = 10000;

/**
* Max related properties.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@

package com.netflix.metacat.connector.polaris;

import com.netflix.metacat.common.QualifiedName;
import com.netflix.metacat.common.dto.Pageable;
import com.netflix.metacat.common.dto.Sort;
import com.netflix.metacat.common.dto.SortOrder;
import com.netflix.metacat.common.server.connectors.model.DatabaseInfo;
import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.Arrays;
import java.util.List;


/**
* Test PolarisConnectorTableService.
*/
@Slf4j
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {PolarisPersistenceConfig.class})
@ActiveProfiles(profiles = {"polaris_functional_test"})
@AutoConfigureDataJpa
public class PolarisConnectorDatabaseServiceFunctionalTest extends PolarisConnectorDatabaseServiceTest {
/**
* Test SimpleDBList.
*/
@Test
public void testSimpleListDb() {
// Simulate a delay so that the dbs schema is visible
TestUtil.simulateDelay();
final DatabaseInfo db1 = DatabaseInfo.builder().name(DB1_QUALIFIED_NAME).uri("uri1").build();
final DatabaseInfo db2 = DatabaseInfo.builder().name(DB2_QUALIFIED_NAME).uri("uri2").build();
getPolarisDBService().create(getRequestContext(), db1);
getPolarisDBService().create(getRequestContext(), db2);
Assert.assertTrue(getPolarisDBService().exists(getRequestContext(), DB1_QUALIFIED_NAME));
Assert.assertTrue(getPolarisDBService().exists(getRequestContext(), DB2_QUALIFIED_NAME));

// Since now list dbs use follower_read_timestamp, we will not immediately get the newly created dbs
List<QualifiedName> dbNames =
getPolarisDBService().listNames(
getRequestContext(), QualifiedName.ofCatalog(CATALOG_NAME), null, null, null);
List<DatabaseInfo> dbs =
getPolarisDBService().list(
getRequestContext(), QualifiedName.ofCatalog(CATALOG_NAME), null, null, null);
Assert.assertTrue("Expected dbNames to be empty", dbNames.isEmpty());
Assert.assertTrue("Expected dbs to be empty", dbs.isEmpty());


// After sufficient time, the dbs should return using follower_read_timestamp
TestUtil.simulateDelay();
dbNames = getPolarisDBService().listNames(
getRequestContext(), QualifiedName.ofCatalog(CATALOG_NAME), null, null, null);
Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME, DB2_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(), QualifiedName.ofCatalog(CATALOG_NAME), null, null, null);
Assert.assertEquals(dbs, Arrays.asList(db1, db2));

// Test Prefix
dbNames = getPolarisDBService().listNames(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME), QualifiedName.ofDatabase(CATALOG_NAME, "db"),
null,
null);
Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME, DB2_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME),
QualifiedName.ofDatabase(CATALOG_NAME, "db"),
null,
null);
Assert.assertEquals(dbs, Arrays.asList(db1, db2));

dbNames = getPolarisDBService().listNames(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME),
QualifiedName.ofDatabase(CATALOG_NAME, "db1_"),
null,
null);
Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME),
QualifiedName.ofDatabase(CATALOG_NAME, "db1_"),
null,
null);
Assert.assertEquals(dbs, Arrays.asList(db1));

// Test Order desc
dbNames = getPolarisDBService().listNames(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME),
null,
new Sort("name", SortOrder.DESC),
null);
Assert.assertEquals(dbNames, Arrays.asList(DB2_QUALIFIED_NAME, DB1_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME),
null,
new Sort("name", SortOrder.DESC),
null);
Assert.assertEquals(dbs, Arrays.asList(db2, db1));

// Test pageable
dbNames = getPolarisDBService().listNames(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME),
null,
null,
new Pageable(5, 0));
Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME, DB2_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME),
null,
null,
new Pageable(5, 0));
Assert.assertEquals(dbs, Arrays.asList(db1, db2));

dbNames = getPolarisDBService().listNames(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(1, 0));
Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(1, 0));
Assert.assertEquals(dbs, Arrays.asList(db1));

dbNames = getPolarisDBService().listNames(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(1, 1));
Assert.assertEquals(dbNames, Arrays.asList(DB2_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(1, 1));
Assert.assertEquals(dbs, Arrays.asList(db2));

dbNames = getPolarisDBService().listNames(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(5, 1));
Assert.assertEquals(dbNames, Arrays.asList(DB2_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(5, 1));
Assert.assertEquals(dbs, Arrays.asList(db2));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,7 @@ public void testGetTableNames() {
.build();
getPolarisTableService().create(getRequestContext(), tableInfo3);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}
TestUtil.simulateDelay();

final List<QualifiedName> tables = getPolarisTableService()
.getTableNames(getRequestContext(), DB_QUALIFIED_NAME, "", -1);
Expand All @@ -77,12 +72,7 @@ public void testGetTableNames() {
public void testListTablesEmpty() {
final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "");

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}
TestUtil.simulateDelay();

final List<QualifiedName> names = getPolarisTableService().listNames(
getRequestContext(), DB_QUALIFIED_NAME, qualifiedName,
Expand All @@ -102,12 +92,7 @@ public void testTableCreationAndList() {
.build();
getPolarisTableService().create(getRequestContext(), tableInfo);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}
TestUtil.simulateDelay();

final List<QualifiedName> names = getPolarisTableService().listNames(
getRequestContext(), DB_QUALIFIED_NAME, qualifiedName,
Expand Down Expand Up @@ -136,12 +121,7 @@ public void testList() {

final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "");

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}
TestUtil.simulateDelay();

List<TableInfo> tables = this.getPolarisTableService().list(
this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig;
import com.netflix.metacat.connector.polaris.store.PolarisStoreConnectorTest;
import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity;
import com.netflix.metacat.connector.polaris.store.entities.PolarisDatabaseEntity;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
Expand All @@ -13,7 +14,9 @@
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* Test persistence operations on Database objects.
Expand Down Expand Up @@ -42,12 +45,7 @@ public void testPaginatedFetch() {
createTable(dbName, tblNameB);
createTable(dbName, tblNameC);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}
TestUtil.simulateDelay();

tblNames = getPolarisConnector().getTables(dbName, "", 1000);
Assert.assertEquals(3, tblNames.size());
Expand All @@ -65,12 +63,7 @@ public void testGetTableEntities() {
final String dbName = generateDatabaseName();
createDB(dbName);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}
TestUtil.simulateDelay();

// Test when db is empty
List<PolarisTableEntity> entities = getPolarisConnector().getTableEntities(dbName, "", 1);
Expand All @@ -85,12 +78,7 @@ public void testGetTableEntities() {
createTable(dbName, tblNameB);
createTable(dbName, tblNameC);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}
TestUtil.simulateDelay();

// Test pagination and sort
entities = getPolarisConnector().getTableEntities(dbName, "", 1);
Expand All @@ -117,4 +105,40 @@ public void testGetTableEntities() {
Assert.assertEquals(tblNameB, entities.get(1).getTblName());
Assert.assertEquals(tblNameC, entities.get(2).getTblName());
}

/**
* test list database with different db page size config.
*/
@Test
public void testListDbPage() {
createDB("db1");
createDB("db2");
createDB("db3");

TestUtil.simulateDelay();

List<String> dbNames = getPolarisConnector().getDatabaseNames("db", null, 1);
List<PolarisDatabaseEntity> dbs = getPolarisConnector().getDatabases("db", null, 1);
Assert.assertEquals("Expected dbNames ", Arrays.asList("db1", "db2", "db3"), dbNames);
Assert.assertEquals("Expected dbs ", Arrays.asList("db1", "db2", "db3"),
dbs.stream().map(PolarisDatabaseEntity::getDbName).collect(Collectors.toList()));

dbNames = getPolarisConnector().getDatabaseNames("db", null, 2);
dbs = getPolarisConnector().getDatabases("db", null, 2);
Assert.assertEquals("Expected dbNames ", Arrays.asList("db1", "db2", "db3"), dbNames);
Assert.assertEquals("Expected dbs ", Arrays.asList("db1", "db2", "db3"),
dbs.stream().map(PolarisDatabaseEntity::getDbName).collect(Collectors.toList()));

dbNames = getPolarisConnector().getDatabaseNames("db", null, 3);
dbs = getPolarisConnector().getDatabases("db", null, 3);
Assert.assertEquals("Expected dbNames ", Arrays.asList("db1", "db2", "db3"), dbNames);
Assert.assertEquals("Expected dbs ", Arrays.asList("db1", "db2", "db3"),
dbs.stream().map(PolarisDatabaseEntity::getDbName).collect(Collectors.toList()));

dbNames = getPolarisConnector().getDatabaseNames("db", null, 4);
dbs = getPolarisConnector().getDatabases("db", null, 4);
Assert.assertEquals("Expected dbNames ", Arrays.asList("db1", "db2", "db3"), dbNames);
Assert.assertEquals("Expected dbs ", Arrays.asList("db1", "db2", "db3"),
dbs.stream().map(PolarisDatabaseEntity::getDbName).collect(Collectors.toList()));
}
}
Loading

0 comments on commit 5d0a3a0

Please sign in to comment.