-
Notifications
You must be signed in to change notification settings - Fork 285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
use as of system time when listing tables to improve performance #565
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
package com.netflix.metacat.connector.polaris; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import com.google.common.collect.ImmutableSet; | ||
import com.netflix.metacat.common.QualifiedName; | ||
import com.netflix.metacat.common.dto.Pageable; | ||
import com.netflix.metacat.common.dto.Sort; | ||
import com.netflix.metacat.common.dto.SortOrder; | ||
import com.netflix.metacat.common.server.connectors.model.TableInfo; | ||
import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.junit.Assert; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.ExtendWith; | ||
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa; | ||
import org.springframework.boot.test.context.SpringBootTest; | ||
import org.springframework.test.context.ActiveProfiles; | ||
import org.springframework.test.context.junit.jupiter.SpringExtension; | ||
|
||
import java.util.List; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Test PolarisConnectorTableService in functional test. | ||
* Some of the tests cannot be run in unit test as it uses h2 database, which does not support all | ||
* functionalities in crdb so include those tests here. | ||
*/ | ||
@Slf4j | ||
@ExtendWith(SpringExtension.class) | ||
@SpringBootTest(classes = {PolarisPersistenceConfig.class}) | ||
@ActiveProfiles(profiles = {"polaris_functional_test"}) | ||
@AutoConfigureDataJpa | ||
public class PolarisConnectorTableServiceFunctionalTest extends PolarisConnectorTableServiceTest { | ||
/** | ||
* Test table list. | ||
*/ | ||
@Test | ||
public void testList() { | ||
final QualifiedName name1 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1"); | ||
final TableInfo tableInfo1 = TableInfo.builder() | ||
.name(name1) | ||
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1")) | ||
.build(); | ||
this.getPolarisTableService().create(this.getRequestContext(), tableInfo1); | ||
final QualifiedName name2 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table2"); | ||
final TableInfo tableInfo2 = TableInfo.builder() | ||
.name(name2) | ||
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2")) | ||
.build(); | ||
this.getPolarisTableService().create(this.getRequestContext(), tableInfo2); | ||
|
||
|
||
final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, ""); | ||
|
||
try { | ||
// pause execution for 10000 milliseconds (10 seconds) | ||
Thread.sleep(10000); | ||
} catch (InterruptedException e) { | ||
log.debug("Sleep was interrupted"); | ||
} | ||
|
||
List<TableInfo> tables = this.getPolarisTableService().list( | ||
this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC), | ||
new Pageable(2, 0)); | ||
Assert.assertEquals(tables.size(), 2); | ||
Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()), | ||
ImmutableSet.of(name1, name2)); | ||
|
||
// Create a 3rd table, but this time does not sleep so this table should not be included | ||
final QualifiedName name3 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table3"); | ||
final TableInfo tableInfo3 = TableInfo.builder() | ||
.name(name3) | ||
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2")) | ||
.build(); | ||
this.getPolarisTableService().create(this.getRequestContext(), tableInfo3); | ||
|
||
tables = this.getPolarisTableService().list( | ||
this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC), | ||
new Pageable(3, 0)); | ||
Assert.assertEquals(tables.size(), 2); | ||
Assert.assertEquals(tables.stream().map(TableInfo::getName).collect(Collectors.toSet()), | ||
ImmutableSet.of(name1, name2)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,19 +3,87 @@ | |
|
||
import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig; | ||
import com.netflix.metacat.connector.polaris.store.PolarisStoreConnectorTest; | ||
import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.junit.Assert; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.ExtendWith; | ||
import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa; | ||
import org.springframework.boot.test.context.SpringBootTest; | ||
import org.springframework.test.context.ActiveProfiles; | ||
import org.springframework.test.context.junit.jupiter.SpringExtension; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* Test persistence operations on Database objects. | ||
*/ | ||
@Slf4j | ||
@ExtendWith(SpringExtension.class) | ||
@SpringBootTest(classes = {PolarisPersistenceConfig.class}) | ||
@ActiveProfiles(profiles = {"polaris_functional_test"}) | ||
@AutoConfigureDataJpa | ||
public class PolarisStoreConnectorFunctionalTest extends PolarisStoreConnectorTest { | ||
|
||
/** | ||
* Test getTableEntities. | ||
*/ | ||
@Test | ||
public void testGetTableEntities() { | ||
// Create the db | ||
final String dbName = generateDatabaseName(); | ||
createDB(dbName); | ||
|
||
try { | ||
// pause execution for 10000 milliseconds (10 seconds) | ||
Thread.sleep(10000); | ||
} catch (InterruptedException e) { | ||
log.debug("Sleep was interrupted"); | ||
} | ||
|
||
// Test when db is empty | ||
List<PolarisTableEntity> entities = getPolarisConnector().getTableEntities(dbName, "", 1); | ||
Assert.assertEquals(0, entities.size()); | ||
|
||
|
||
// Add some tables | ||
final String tblNameA = "A_" + generateTableName(); | ||
final String tblNameB = "B_" + generateTableName(); | ||
final String tblNameC = "C_" + generateTableName(); | ||
createTable(dbName, tblNameA); | ||
createTable(dbName, tblNameB); | ||
createTable(dbName, tblNameC); | ||
|
||
try { | ||
// pause execution for 10000 milliseconds (10 seconds) | ||
Thread.sleep(10000); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the purpose of sleep to wait for the create to complete, the create call should be synchronous and return the created object after it is done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we are now only reading the snapshot that is follower_reader_timestamp (4.8s) ago, we need to sleep for sometime so the following list operation can read the 3 tables created. |
||
} catch (InterruptedException e) { | ||
log.debug("Sleep was interrupted"); | ||
} | ||
|
||
// Test pagination and sort | ||
entities = getPolarisConnector().getTableEntities(dbName, "", 1); | ||
Assert.assertEquals(3, entities.size()); | ||
Assert.assertEquals(tblNameA, entities.get(0).getTblName()); | ||
Assert.assertEquals(tblNameB, entities.get(1).getTblName()); | ||
Assert.assertEquals(tblNameC, entities.get(2).getTblName()); | ||
|
||
entities = getPolarisConnector().getTableEntities(dbName, "", 2); | ||
Assert.assertEquals(3, entities.size()); | ||
Assert.assertEquals(tblNameA, entities.get(0).getTblName()); | ||
Assert.assertEquals(tblNameB, entities.get(1).getTblName()); | ||
Assert.assertEquals(tblNameC, entities.get(2).getTblName()); | ||
|
||
entities = getPolarisConnector().getTableEntities(dbName, "", 3); | ||
Assert.assertEquals(3, entities.size()); | ||
Assert.assertEquals(tblNameA, entities.get(0).getTblName()); | ||
Assert.assertEquals(tblNameB, entities.get(1).getTblName()); | ||
Assert.assertEquals(tblNameC, entities.get(2).getTblName()); | ||
|
||
entities = getPolarisConnector().getTableEntities(dbName, "", 4); | ||
Assert.assertEquals(3, entities.size()); | ||
Assert.assertEquals(tblNameA, entities.get(0).getTblName()); | ||
Assert.assertEquals(tblNameB, entities.get(1).getTblName()); | ||
Assert.assertEquals(tblNameC, entities.get(2).getTblName()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package com.netflix.metacat.connector.polaris.store.repos; | ||
|
||
import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity; | ||
import java.util.List; | ||
|
||
/** | ||
* Custom JPA repository implementation for storing PolarisTableEntity. | ||
*/ | ||
public interface PolarisTableCustomRepository { | ||
/** | ||
* Fetch table entities for given database using AS OF SYSTEM TIME follower_read_timestamp(). | ||
* @param dbName database name | ||
* @param tableNamePrefix table name prefix. can be empty. | ||
* @param pageSize target size for each page | ||
* @return table entities in the database. | ||
*/ | ||
List<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix( | ||
String dbName, String tableNamePrefix, int pageSize); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package com.netflix.metacat.connector.polaris.store.repos; | ||
|
||
import javax.persistence.PersistenceContext; | ||
import javax.persistence.EntityManager; | ||
import javax.persistence.Query; | ||
|
||
import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity; | ||
import org.springframework.transaction.annotation.Transactional; | ||
import org.springframework.data.domain.Slice; | ||
import org.springframework.data.domain.SliceImpl; | ||
import org.springframework.data.domain.Pageable; | ||
import org.springframework.stereotype.Repository; | ||
import org.springframework.data.domain.PageRequest; | ||
import org.springframework.data.domain.Sort; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Implementation for Custom JPA repository implementation for storing PolarisTableEntity. | ||
*/ | ||
@Repository | ||
public class PolarisTableCustomRepositoryImpl implements PolarisTableCustomRepository { | ||
@PersistenceContext | ||
private EntityManager entityManager; | ||
|
||
private Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefixForCurrentPage( | ||
final String dbName, final String tableNamePrefix, final Pageable page) { | ||
|
||
// Generate ORDER BY clause | ||
String orderBy = ""; | ||
if (page.getSort().isSorted()) { | ||
orderBy = page.getSort().stream() | ||
.map(order -> order.getProperty() + " " + order.getDirection()) | ||
.collect(Collectors.joining(", ")); | ||
orderBy = " ORDER BY " + orderBy; | ||
} | ||
|
||
final String sql = "SELECT t.* FROM TBLS t " | ||
+ "WHERE t.db_name = :dbName AND t.tbl_name LIKE :tableNamePrefix" + orderBy; | ||
final Query query = entityManager.createNativeQuery(sql, PolarisTableEntity.class); | ||
query.setParameter("dbName", dbName); | ||
query.setParameter("tableNamePrefix", tableNamePrefix + "%"); | ||
query.setFirstResult(page.getPageNumber() * page.getPageSize()); | ||
query.setMaxResults(page.getPageSize() + 1); // Fetch one extra result to determine if there is a next page | ||
final List<PolarisTableEntity> resultList = query.getResultList(); | ||
// Check if there is a next page | ||
final boolean hasNext = resultList.size() > page.getPageSize(); | ||
// If there is a next page, remove the last item from the list | ||
if (hasNext) { | ||
resultList.remove(resultList.size() - 1); | ||
} | ||
return new SliceImpl<>(resultList, page, hasNext); | ||
} | ||
|
||
@Override | ||
@Transactional | ||
public List<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix( | ||
final String dbName, final String tableNamePrefix, final int pageFetchSize) { | ||
Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tbl_name").ascending()); | ||
entityManager.createNativeQuery("SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the entityManager is annotated with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the info and reference, upon closer look seems like spring will inject a special proxy for the |
||
.executeUpdate(); | ||
final List<PolarisTableEntity> retval = new ArrayList<>(); | ||
final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix; | ||
Slice<PolarisTableEntity> tbls; | ||
boolean hasNext; | ||
do { | ||
tbls = findAllTablesByDbNameAndTablePrefixForCurrentPage(dbName, tblPrefix, page); | ||
retval.addAll(tbls.toList()); | ||
hasNext = tbls.hasNext(); | ||
if (hasNext) { | ||
page = tbls.nextPageable(); | ||
} | ||
} while (hasNext); | ||
return retval; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for changing from h2db to postgresql and adding these functional tests for both the table service and store service