From da4d47ae3507851f6d520b736e3f993b0131d243 Mon Sep 17 00:00:00 2001 From: yantian Date: Wed, 8 Jan 2025 10:40:35 +0800 Subject: [PATCH] add create table in streaming mode IT --- .../apache/paimon/flink/FlinkCatalogTest.java | 28 +------- .../paimon/flink/FlinkCatalogTestUtil.java | 64 +++++++++++++++++++ .../paimon/flink/FlinkRESTCatalogTest.java | 28 ++++++++ .../paimon/flink/MockRESTCatalogServer.java | 54 ++++++++++++++-- 4 files changed, 143 insertions(+), 31 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestUtil.java diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index 4b8cf7912192..ab0f937dde55 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -145,22 +145,7 @@ public void beforeEach() throws IOException { } private ResolvedSchema createSchema() { - return new ResolvedSchema( - Arrays.asList( - Column.physical("first", DataTypes.STRING()), - Column.physical("second", DataTypes.INT()), - Column.physical("third", DataTypes.STRING()), - Column.physical( - "four", - DataTypes.ROW( - DataTypes.FIELD("f1", DataTypes.STRING()), - DataTypes.FIELD("f2", DataTypes.INT()), - DataTypes.FIELD( - "f3", - DataTypes.MAP( - DataTypes.STRING(), DataTypes.INT()))))), - Collections.emptyList(), - null); + return FlinkCatalogTestUtil.createSchema(); } private List createPartitionKeys() { @@ -192,14 +177,7 @@ private CatalogTable createAnotherPartitionedTable(Map options) } private CatalogTable createTable(Map options) { - ResolvedSchema resolvedSchema = this.createSchema(); - CatalogTable origin = - CatalogTable.of( - Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), - "test comment", - Collections.emptyList(), - options); - return new ResolvedCatalogTable(origin, resolvedSchema); + return FlinkCatalogTestUtil.createTable(options); } private CatalogTable createPartitionedTable(Map options) { @@ -364,7 +342,7 @@ public void testCreateFlinkTableWithPath() throws Exception { @MethodSource("streamingOptionProvider") public void testCreateTable_Streaming(Map options) throws Exception { catalog.createDatabase(path1.getDatabaseName(), null, false); - CatalogTable table = createTable(options); + CatalogTable table = FlinkCatalogTestUtil.createTable(options); catalog.createTable(path1, table, false); checkCreateTable(path1, table, (CatalogTable) catalog.getTable(path1)); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestUtil.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestUtil.java new file mode 100644 index 000000000000..ec4cfc3272db --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestUtil.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +/** util for flink catalog test. */ +public class FlinkCatalogTestUtil { + + public static CatalogTable createTable(Map options) { + ResolvedSchema resolvedSchema = FlinkCatalogTestUtil.createSchema(); + CatalogTable origin = + CatalogTable.of( + Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(), + "test comment", + Collections.emptyList(), + options); + return new ResolvedCatalogTable(origin, resolvedSchema); + } + + public static ResolvedSchema createSchema() { + return new ResolvedSchema( + Arrays.asList( + Column.physical("first", DataTypes.STRING()), + Column.physical("second", DataTypes.INT()), + Column.physical("third", DataTypes.STRING()), + Column.physical( + "four", + DataTypes.ROW( + DataTypes.FIELD("f1", DataTypes.STRING()), + DataTypes.FIELD("f2", DataTypes.INT()), + DataTypes.FIELD( + "f3", + DataTypes.MAP( + DataTypes.STRING(), DataTypes.INT()))))), + Collections.emptyList(), + null); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRESTCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRESTCatalogTest.java index 707a54d2f029..2c3b554c9e5d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRESTCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRESTCatalogTest.java @@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.junit.After; @@ -34,13 +35,17 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; import java.io.File; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER; +import static org.apache.paimon.flink.FlinkCatalogTestUtil.createTable; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; @@ -114,4 +119,27 @@ public void testGetDatabase() throws Exception { .isInstanceOf(DatabaseNotExistException.class) .hasMessageContaining("Database non does not exist in Catalog test-catalog."); } + + @Test + public void testCreateTable_Streaming() throws Exception { + catalog.createDatabase(path1.getDatabaseName(), null, false); + Map options = new HashMap<>(); + options.put("is_streaming", String.valueOf(true)); + CatalogTable table = createTable(options); + catalog.createTable(path1, table, false); + CatalogTable tableFromServer = (CatalogTable) catalog.getTable(path1); + checkOptions(options, tableFromServer.getOptions()); + assertEquals(tableFromServer.getTableKind(), table.getTableKind()); + assertEquals(tableFromServer.getUnresolvedSchema(), table.getUnresolvedSchema()); + } + + private void checkOptions(Map expected, Map actual) { + List ignoreKeys = ImmutableList.of(FlinkCatalogOptions.REGISTER_TIMEOUT.key()); + for (Map.Entry entry : expected.entrySet()) { + String key = entry.getKey(); + if (!ignoreKeys.contains(key)) { + assertEquals(actual.get(key), actual.get(key)); + } + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MockRESTCatalogServer.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MockRESTCatalogServer.java index dbc7c7297fdc..aef96e84a1e5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MockRESTCatalogServer.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/MockRESTCatalogServer.java @@ -18,19 +18,24 @@ package org.apache.paimon.flink; +import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Database; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.rest.RESTCatalogInternalOptions; import org.apache.paimon.rest.RESTObjectMapper; import org.apache.paimon.rest.RESTResponse; import org.apache.paimon.rest.requests.CreateDatabaseRequest; +import org.apache.paimon.rest.requests.CreateTableRequest; import org.apache.paimon.rest.responses.CreateDatabaseResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; +import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -120,12 +125,49 @@ public MockResponse dispatch(RecordedRequest request) throws InterruptedExceptio return mockResponse(response, 200); } } else if (request.getPath().startsWith("/v1/prefix/databases/")) { - String databaseName = - request.getPath().substring("/v1/prefix/databases/".length()); - if (request.getMethod().equals("GET")) { - Database database = catalog.getDatabase(databaseName); - response = new GetDatabaseResponse(database.name(), database.options()); - return mockResponse(response, 200); + String[] resources = + request.getPath() + .substring("/v1/prefix/databases/".length()) + .split("/"); + String databaseName = resources[0]; + boolean isTables = resources.length == 2 && "tables".equals(resources[1]); + boolean isTable = resources.length == 3 && "tables".equals(resources[1]); + if (isTable) { + String tableName = resources[2]; + if (request.getMethod().equals("GET")) { + Identifier identifier = Identifier.create(databaseName, tableName); + FileStoreTable table = + (FileStoreTable) catalog.getTable(identifier); + response = + new GetTableResponse( + AbstractCatalog.newTableLocation( + catalog.warehouse(), identifier) + .toString(), + table.schema().id(), + table.schema().toSchema()); + return mockResponse(response, 200); + } + } else if (isTables) { + // /v1/prefix/databases/db1/tables + if (request.getMethod().equals("POST")) { + CreateTableRequest requestBody = + mapper.readValue( + request.getBody().readUtf8(), + CreateTableRequest.class); + catalog.createTable( + requestBody.getIdentifier(), requestBody.getSchema(), true); + response = new GetTableResponse("", 1L, requestBody.getSchema()); + return mockResponse(response, 200); + } + + } else { + if (request.getMethod().equals("GET")) { + Database database = catalog.getDatabase(databaseName); + response = + new GetDatabaseResponse( + database.name(), database.options()); + return mockResponse(response, 200); + } } } return new MockResponse().setResponseCode(404);