Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use as of system time when listing tables to improve performance #565

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.netflix.metacat.connector.polaris;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.netflix.metacat.common.QualifiedName;
import com.netflix.metacat.common.dto.Pageable;
import com.netflix.metacat.common.dto.Sort;
import com.netflix.metacat.common.dto.SortOrder;
import com.netflix.metacat.common.server.connectors.model.TableInfo;
import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.List;
import java.util.stream.Collectors;

/**
* Test PolarisConnectorTableService in functional test.
* Some of the tests cannot be run in unit test as it uses h2 database, which does not support all
* functionalities in crdb so include those tests here.
*/
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {PolarisPersistenceConfig.class})
@ActiveProfiles(profiles = {"polaris_functional_test"})
@AutoConfigureDataJpa
public class PolarisConnectorTableServiceFunctionalTest extends PolarisConnectorTableServiceTest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for changing from h2db to postgresql and adding these functional tests for both the table service and store service

/**
* Test table list.
*/
@Test
public void testList() {
final QualifiedName name1 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1");
final TableInfo tableInfo1 = TableInfo.builder()
.name(name1)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1"))
.build();
this.getPolarisTableService().create(this.getRequestContext(), tableInfo1);
final QualifiedName name2 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table2");
final TableInfo tableInfo2 = TableInfo.builder()
.name(name2)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2"))
.build();
this.getPolarisTableService().create(this.getRequestContext(), tableInfo2);


final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "");

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("Sleep was interrupted");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: use logger.

}

List<TableInfo> tables = this.getPolarisTableService().list(
this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC),
new Pageable(2, 0));
Assert.assertEquals(tables.size(), 2);
Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()),
ImmutableSet.of(name1, name2));

// Create a 3rd table, but this time does not sleep so this table should not be included
final QualifiedName name3 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table3");
final TableInfo tableInfo3 = TableInfo.builder()
.name(name3)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2"))
.build();
this.getPolarisTableService().create(this.getRequestContext(), tableInfo3);

tables = this.getPolarisTableService().list(
this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC),
new Pageable(3, 0));
Assert.assertEquals(tables.size(), 2);
Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()),
ImmutableSet.of(name1, name2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@

import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig;
import com.netflix.metacat.connector.polaris.store.PolarisStoreConnectorTest;
import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.List;

/**
* Test persistence operations on Database objects.
*/
Expand All @@ -18,4 +23,65 @@
@AutoConfigureDataJpa
public class PolarisStoreConnectorFunctionalTest extends PolarisStoreConnectorTest {

/**
* Test getTableEntities.
*/
@Test
public void testGetTableEntities() {
// Create the db
final String dbName = generateDatabaseName();
createDB(dbName);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("Sleep was interrupted");
}

// Test when db is empty
List<PolarisTableEntity> entities = getPolarisConnector().getTableEntities(dbName, "", 1);
Assert.assertEquals(0, entities.size());


// Add some tables
final String tblNameA = "A_" + generateTableName();
final String tblNameB = "B_" + generateTableName();
final String tblNameC = "C_" + generateTableName();
createTable(dbName, tblNameA);
createTable(dbName, tblNameB);
createTable(dbName, tblNameC);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the purpose of sleep to wait for the create to complete, the create call should be synchronous and return the created object after it is done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are now only reading the snapshot that is follower_reader_timestamp (4.8s) ago, we need to sleep for sometime so the following list operation can read the 3 tables created.

} catch (InterruptedException e) {
System.out.println("Sleep was interrupted");
}

// Test pagination and sort
entities = getPolarisConnector().getTableEntities(dbName, "", 1);
Assert.assertEquals(3, entities.size());
Assert.assertEquals(tblNameA, entities.get(0).getTblName());
Assert.assertEquals(tblNameB, entities.get(1).getTblName());
Assert.assertEquals(tblNameC, entities.get(2).getTblName());

entities = getPolarisConnector().getTableEntities(dbName, "", 2);
Assert.assertEquals(3, entities.size());
Assert.assertEquals(tblNameA, entities.get(0).getTblName());
Assert.assertEquals(tblNameB, entities.get(1).getTblName());
Assert.assertEquals(tblNameC, entities.get(2).getTblName());

entities = getPolarisConnector().getTableEntities(dbName, "", 3);
Assert.assertEquals(3, entities.size());
Assert.assertEquals(tblNameA, entities.get(0).getTblName());
Assert.assertEquals(tblNameB, entities.get(1).getTblName());
Assert.assertEquals(tblNameC, entities.get(2).getTblName());

entities = getPolarisConnector().getTableEntities(dbName, "", 4);
Assert.assertEquals(3, entities.size());
Assert.assertEquals(tblNameA, entities.get(0).getTblName());
Assert.assertEquals(tblNameB, entities.get(1).getTblName());
Assert.assertEquals(tblNameC, entities.get(2).getTblName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ spring:
init:
schema-locations: classpath:schema.sql
mode: always
platform: h2db
platform: postgresql

logging:
level:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public List<TableInfo> list(
try {
final String tableFilter = (prefix != null && prefix.isTableDefinition()) ? prefix.getTableName() : "";
final List<PolarisTableEntity> tbls =
polarisStoreService.getTableEntities(name.getDatabaseName(), tableFilter);
polarisStoreService.getTableEntities(name.getDatabaseName(), tableFilter, 1000);
if (sort != null) {
ConnectorUtils.sort(tbls, sort, Comparator.comparing(t -> t.getTblName()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public DataSourceProperties dataSourceProperties() {
*/
@Bean
public PolarisStoreService polarisStoreService(
final PolarisDatabaseRepository repo, final PolarisTableRepository tblRepo) {
final PolarisDatabaseRepository repo, final PolarisTableRepository tblRepo) {
return new PolarisStoreConnector(repo, tblRepo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,22 +155,10 @@ public Optional<PolarisTableEntity> getTable(final String dbName, final String t
*/
@Override
@Transactional(propagation = Propagation.SUPPORTS)
public List<PolarisTableEntity> getTableEntities(final String databaseName, final String tableNamePrefix) {
final int pageFetchSize = 1000;
final List<PolarisTableEntity> retval = new ArrayList<>();
final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix;
Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tblName").ascending());
Slice<PolarisTableEntity> tbls;
boolean hasNext;
do {
tbls = tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tblPrefix, page);
retval.addAll(tbls.toList());
hasNext = tbls.hasNext();
if (hasNext) {
page = tbls.nextPageable();
}
} while (hasNext);
return retval;
public List<PolarisTableEntity> getTableEntities(final String databaseName,
final String tableNamePrefix,
final int pageFetchSize) {
return tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tableNamePrefix, pageFetchSize);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ public interface PolarisStoreService {
* Fetch table entities for given database.
* @param databaseName database name
* @param tableNamePrefix table name prefix. can be empty.
* @param pageFetchSize target size for each page
* @return table entities in the database.
*/
List<PolarisTableEntity> getTableEntities(final String databaseName, final String tableNamePrefix);
List<PolarisTableEntity> getTableEntities(String databaseName, String tableNamePrefix, int pageFetchSize);

/**
* Updates existing or creates new table entry.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.netflix.metacat.connector.polaris.store.repos;

import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity;
import java.util.List;

/**
* Custom JPA repository implementation for storing PolarisTableEntity.
*/
public interface PolarisTableCustomRepository {
/**
* Fetch table entities for given database using AS OF SYSTEM TIME follower_read_timestamp().
* @param dbName database name
* @param tableNamePrefix table name prefix. can be empty.
* @param pageSize target size for each page
* @return table entities in the database.
*/
List<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
String dbName, String tableNamePrefix, int pageSize);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.netflix.metacat.connector.polaris.store.repos;

import javax.persistence.PersistenceContext;
import javax.persistence.EntityManager;
import javax.persistence.Query;

import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.SliceImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Repository;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
* Implementation for Custom JPA repository implementation for storing PolarisTableEntity.
*/
@Repository
public class PolarisTableCustomRepositoryImpl implements PolarisTableCustomRepository {
@PersistenceContext
private EntityManager entityManager;

private Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefixForCurrentPage(
final String dbName, final String tableNamePrefix, final Pageable page) {

// Generate ORDER BY clause
String orderBy = "";
if (page.getSort().isSorted()) {
orderBy = page.getSort().stream()
.map(order -> order.getProperty() + " " + order.getDirection())
.collect(Collectors.joining(", "));
orderBy = " ORDER BY " + orderBy;
}

final String sql = "SELECT t.* FROM TBLS t "
+ "WHERE t.db_name = :dbName AND t.tbl_name LIKE :tableNamePrefix" + orderBy;
final Query query = entityManager.createNativeQuery(sql, PolarisTableEntity.class);
query.setParameter("dbName", dbName);
query.setParameter("tableNamePrefix", tableNamePrefix + "%");
query.setFirstResult(page.getPageNumber() * page.getPageSize());
query.setMaxResults(page.getPageSize() + 1); // Fetch one extra result to determine if there is a next page
final List<PolarisTableEntity> resultList = query.getResultList();
// Check if there is a next page
final boolean hasNext = resultList.size() > page.getPageSize();
// If there is a next page, remove the last item from the list
if (hasNext) {
resultList.remove(resultList.size() - 1);
}
return new SliceImpl<>(resultList, page, hasNext);
}

@Override
@Transactional
public List<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
final String dbName, final String tableNamePrefix, final int pageFetchSize) {
Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tbl_name").ascending());
entityManager.createNativeQuery("SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entityManager is an instance variable in this class so if multiple threads are all running for this api at the same time, would there be situations where they overwrite the timestamp set by each other, since follower_read_timestamp() is a crdb function that is evaluated to a value upon execution

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the entityManager is annotated with @PersistenceContext it's scoped within a transaction.

https://www.baeldung.com/jpa-hibernate-persistence-context

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the info and reference, upon closer look seems like spring will inject a special proxy for the entityManager in this setup so indeed should be safe
https://github.com/spring-projects/spring-framework/blob/main/spring-orm/src/main/java/org/springframework/orm/jpa/SharedEntityManagerCreator.java

.executeUpdate();
final List<PolarisTableEntity> retval = new ArrayList<>();
final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix;
Slice<PolarisTableEntity> tbls;
boolean hasNext;
do {
tbls = findAllTablesByDbNameAndTablePrefixForCurrentPage(dbName, tblPrefix, page);
retval.addAll(tbls.toList());
hasNext = tbls.hasNext();
if (hasNext) {
page = tbls.nextPageable();
}
} while (hasNext);
return retval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
@Repository
public interface PolarisTableRepository extends JpaRepository<PolarisTableEntity, String>,
JpaSpecificationExecutor {
JpaSpecificationExecutor, PolarisTableCustomRepository {

/**
* Delete table entry by name.
Expand Down Expand Up @@ -67,19 +67,6 @@ boolean existsByDbNameAndTblName(
@Param("dbName") final String dbName,
@Param("tblName") final String tblName);

/**
* Fetch table entities in database.
* @param dbName database name
* @param tableNamePrefix table name prefix. can be empty.
* @param page pageable.
* @return table entities that belong to the database.
*/
@Query("SELECT e FROM PolarisTableEntity e WHERE e.dbName = :dbName AND e.tblName LIKE :tableNamePrefix%")
Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
@Param("dbName") final String dbName,
@Param("tableNamePrefix") final String tableNamePrefix,
Pageable page);

/**
* Do an atomic compare-and-swap on the metadata location of the table.
* @param dbName database name of the table
Expand Down
Loading
Loading