Skip to content

Commit

Permalink
follower_read_timestamp for list db calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Yingjian Wu committed Apr 29, 2024
1 parent ea1c8c1 commit 289adee
Show file tree
Hide file tree
Showing 11 changed files with 389 additions and 84 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@

package com.netflix.metacat.connector.polaris;

import com.netflix.metacat.common.QualifiedName;
import com.netflix.metacat.common.dto.Pageable;
import com.netflix.metacat.common.dto.Sort;
import com.netflix.metacat.common.dto.SortOrder;
import com.netflix.metacat.common.server.connectors.model.DatabaseInfo;
import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.Arrays;
import java.util.List;


/**
* Test PolarisConnectorTableService.
*/
@Slf4j
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {PolarisPersistenceConfig.class})
@ActiveProfiles(profiles = {"polaris_functional_test"})
@AutoConfigureDataJpa
public class PolarisConnectorDatabaseServiceFunctionalTest extends PolarisConnectorDatabaseServiceTest {
private void simulateDelay() {
try {
Thread.sleep(10000); // 10 seconds delay
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupted status
log.debug("Sleep was interrupted", e);
}
}

/**
* Test SimpleDBList.
*/
@Test
public void testSimpleListDb() {
// Simulate a delay so that the dbs schema is visible
simulateDelay();
final DatabaseInfo db1 = DatabaseInfo.builder().name(DB1_QUALIFIED_NAME).uri("uri1").build();
final DatabaseInfo db2 = DatabaseInfo.builder().name(DB2_QUALIFIED_NAME).uri("uri2").build();
getPolarisDBService().create(getRequestContext(), db1);
getPolarisDBService().create(getRequestContext(), db2);
Assert.assertTrue(getPolarisDBService().exists(getRequestContext(), DB1_QUALIFIED_NAME));
Assert.assertTrue(getPolarisDBService().exists(getRequestContext(), DB2_QUALIFIED_NAME));

// Since now list dbs use follower_read_timestamp, we will not immediately get the newly created dbs
List<QualifiedName> dbNames =
getPolarisDBService().listNames(
getRequestContext(), QualifiedName.ofCatalog(CATALOG_NAME), null, null, null);
List<DatabaseInfo> dbs =
getPolarisDBService().list(
getRequestContext(), QualifiedName.ofCatalog(CATALOG_NAME), null, null, null);
Assert.assertTrue("Expected dbNames to be empty", dbNames.isEmpty());
Assert.assertTrue("Expected dbs to be empty", dbs.isEmpty());


// After sufficient time, the dbs should return using follower_read_timestamp
simulateDelay();
dbNames = getPolarisDBService().listNames(
getRequestContext(), QualifiedName.ofCatalog(CATALOG_NAME), null, null, null);
Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME, DB2_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(), QualifiedName.ofCatalog(CATALOG_NAME), null, null, null);
Assert.assertEquals(dbs, Arrays.asList(db1, db2));

// Test Prefix
dbNames = getPolarisDBService().listNames(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME), QualifiedName.ofDatabase(CATALOG_NAME, "db"),
null,
null);
Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME, DB2_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME),
QualifiedName.ofDatabase(CATALOG_NAME, "db"),
null,
null);
Assert.assertEquals(dbs, Arrays.asList(db1, db2));

dbNames = getPolarisDBService().listNames(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME),
QualifiedName.ofDatabase(CATALOG_NAME, "db1_"),
null,
null);
Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME),
QualifiedName.ofDatabase(CATALOG_NAME, "db1_"),
null,
null);
Assert.assertEquals(dbs, Arrays.asList(db1));

// Test Order desc
dbNames = getPolarisDBService().listNames(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME),
null,
new Sort("name", SortOrder.DESC),
null);
Assert.assertEquals(dbNames, Arrays.asList(DB2_QUALIFIED_NAME, DB1_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME),
null,
new Sort("name", SortOrder.DESC),
null);
Assert.assertEquals(dbs, Arrays.asList(db2, db1));

// Test pageable
dbNames = getPolarisDBService().listNames(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME),
null,
null,
new Pageable(5, 0));
Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME, DB2_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME),
null,
null,
new Pageable(5, 0));
Assert.assertEquals(dbs, Arrays.asList(db1, db2));

dbNames = getPolarisDBService().listNames(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(1, 0));
Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(1, 0));
Assert.assertEquals(dbs, Arrays.asList(db1));

dbNames = getPolarisDBService().listNames(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(1, 1));
Assert.assertEquals(dbNames, Arrays.asList(DB2_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(1, 1));
Assert.assertEquals(dbs, Arrays.asList(db2));

dbNames = getPolarisDBService().listNames(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(5, 1));
Assert.assertEquals(dbNames, Arrays.asList(DB2_QUALIFIED_NAME));
dbs = getPolarisDBService().list(
getRequestContext(),
QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(5, 1));
Assert.assertEquals(dbs, Arrays.asList(db2));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig;
import com.netflix.metacat.connector.polaris.store.PolarisStoreConnectorTest;
import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity;
import com.netflix.metacat.connector.polaris.store.entities.PolarisDatabaseEntity;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
Expand All @@ -13,7 +14,9 @@
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* Test persistence operations on Database objects.
Expand All @@ -25,6 +28,15 @@
@AutoConfigureDataJpa
public class PolarisStoreConnectorFunctionalTest extends PolarisStoreConnectorTest {

private void simulateDelay() {
try {
Thread.sleep(10000); // 10 seconds delay
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupted status
log.debug("Sleep was interrupted", e);
}
}

/**
* Test to verify that table names fetch works.
*/
Expand All @@ -42,12 +54,7 @@ public void testPaginatedFetch() {
createTable(dbName, tblNameB);
createTable(dbName, tblNameC);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}
simulateDelay();

tblNames = getPolarisConnector().getTables(dbName, "", 1000);
Assert.assertEquals(3, tblNames.size());
Expand All @@ -65,12 +72,7 @@ public void testGetTableEntities() {
final String dbName = generateDatabaseName();
createDB(dbName);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}
simulateDelay();

// Test when db is empty
List<PolarisTableEntity> entities = getPolarisConnector().getTableEntities(dbName, "", 1);
Expand Down Expand Up @@ -117,4 +119,40 @@ public void testGetTableEntities() {
Assert.assertEquals(tblNameB, entities.get(1).getTblName());
Assert.assertEquals(tblNameC, entities.get(2).getTblName());
}

/**
* test list database with different db page size config.
*/
@Test
public void testListDbPage() {
createDB("db1");
createDB("db2");
createDB("db3");

simulateDelay();

List<String> dbNames = getPolarisConnector().getDatabaseNames("db", null, 1);
List<PolarisDatabaseEntity> dbs = getPolarisConnector().getDatabases("db", null, 1);
Assert.assertEquals("Expected dbNames ", Arrays.asList("db1", "db2", "db3"), dbNames);
Assert.assertEquals("Expected dbs ", Arrays.asList("db1", "db2", "db3"),
dbs.stream().map(PolarisDatabaseEntity::getDbName).collect(Collectors.toList()));

dbNames = getPolarisConnector().getDatabaseNames("db", null, 2);
dbs = getPolarisConnector().getDatabases("db", null, 2);
Assert.assertEquals("Expected dbNames ", Arrays.asList("db1", "db2", "db3"), dbNames);
Assert.assertEquals("Expected dbs ", Arrays.asList("db1", "db2", "db3"),
dbs.stream().map(PolarisDatabaseEntity::getDbName).collect(Collectors.toList()));

dbNames = getPolarisConnector().getDatabaseNames("db", null, 3);
dbs = getPolarisConnector().getDatabases("db", null, 3);
Assert.assertEquals("Expected dbNames ", Arrays.asList("db1", "db2", "db3"), dbNames);
Assert.assertEquals("Expected dbs ", Arrays.asList("db1", "db2", "db3"),
dbs.stream().map(PolarisDatabaseEntity::getDbName).collect(Collectors.toList()));

dbNames = getPolarisConnector().getDatabaseNames("db", null, 4);
dbs = getPolarisConnector().getDatabases("db", null, 4);
Assert.assertEquals("Expected dbNames ", Arrays.asList("db1", "db2", "db3"), dbNames);
Assert.assertEquals("Expected dbs ", Arrays.asList("db1", "db2", "db3"),
dbs.stream().map(PolarisDatabaseEntity::getDbName).collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
import com.netflix.metacat.connector.polaris.store.entities.PolarisDatabaseEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -77,6 +77,7 @@ public void create(final ConnectorRequestContext context, final DatabaseInfo dat
* {@inheritDoc}.
*/
@Override
@Transactional
public void delete(final ConnectorRequestContext context, final QualifiedName name) {
// check exists then delete in non-transactional optimistic manner
if (!exists(context, name)) {
Expand Down Expand Up @@ -159,17 +160,12 @@ public List<QualifiedName> listNames(
@Nullable final Pageable pageable
) {
try {
List<QualifiedName> qualifiedNames = polarisStoreService.getAllDatabases().stream()
.map(d -> QualifiedName.ofDatabase(name.getCatalogName(), d.getDbName()))
final String dbPrefix = prefix == null ? "" : prefix.getDatabaseName();
final List<QualifiedName> qualifiedNames = polarisStoreService.getDatabaseNames(
dbPrefix, sort, 1000)
.stream()
.map(dbName -> QualifiedName.ofDatabase(name.getCatalogName(), dbName))
.collect(Collectors.toCollection(ArrayList::new));
if (prefix != null) {
qualifiedNames = qualifiedNames.stream()
.filter(n -> n.startsWith(prefix))
.collect(Collectors.toCollection(ArrayList::new));
}
if (sort != null) {
ConnectorUtils.sort(qualifiedNames, sort, Comparator.comparing(QualifiedName::toString));
}
return ConnectorUtils.paginate(qualifiedNames, pageable);
} catch (Exception exception) {
throw new ConnectorException(
Expand All @@ -190,15 +186,12 @@ public List<DatabaseInfo> list(
) {
try {
final PolarisDatabaseMapper mapper = new PolarisDatabaseMapper(name.getCatalogName());
List<PolarisDatabaseEntity> dbs = polarisStoreService.getAllDatabases();
if (prefix != null) {
dbs = dbs.stream()
.filter(n -> QualifiedName.ofDatabase(name.getCatalogName(), n.getDbName()).startsWith(prefix))
.collect(Collectors.toCollection(ArrayList::new));
}
if (sort != null) {
ConnectorUtils.sort(dbs, sort, Comparator.comparing(p -> p.getDbName()));
}
final String dbPrefix = prefix == null ? "" : prefix.getDatabaseName();

final List<PolarisDatabaseEntity> dbs = polarisStoreService.getDatabases(
dbPrefix, sort, 1000
);

return ConnectorUtils.paginate(dbs, pageable).stream()
.map(d -> mapper.toInfo(d)).collect(Collectors.toList());
} catch (Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public List<QualifiedName> getTableNames(
log.warn(String.format("Calling Polaris getTableNames with nonempty filter %s", filter));
}
final List<String> databaseNames = name.isDatabaseDefinition() ? ImmutableList.of(name.getDatabaseName())
: polarisStoreService.getAllDatabases().stream().map(d -> d.getDbName()).collect(Collectors.toList());
: polarisStoreService.getDatabaseNames(null, null, 1000);
int limitSize = limit == null || limit < 0 ? Integer.MAX_VALUE : limit;
final List<QualifiedName> result = Lists.newArrayList();
for (int i = 0; i < databaseNames.size() && limitSize > 0; i++) {
Expand Down
Loading

0 comments on commit 289adee

Please sign in to comment.