Skip to content

Commit

Permalink
add create table in streaming mode IT
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry-024 committed Jan 8, 2025
1 parent f5037c4 commit da4d47a
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,22 +145,7 @@ public void beforeEach() throws IOException {
}

private ResolvedSchema createSchema() {
return new ResolvedSchema(
Arrays.asList(
Column.physical("first", DataTypes.STRING()),
Column.physical("second", DataTypes.INT()),
Column.physical("third", DataTypes.STRING()),
Column.physical(
"four",
DataTypes.ROW(
DataTypes.FIELD("f1", DataTypes.STRING()),
DataTypes.FIELD("f2", DataTypes.INT()),
DataTypes.FIELD(
"f3",
DataTypes.MAP(
DataTypes.STRING(), DataTypes.INT()))))),
Collections.emptyList(),
null);
return FlinkCatalogTestUtil.createSchema();
}

private List<String> createPartitionKeys() {
Expand Down Expand Up @@ -192,14 +177,7 @@ private CatalogTable createAnotherPartitionedTable(Map<String, String> options)
}

private CatalogTable createTable(Map<String, String> options) {
ResolvedSchema resolvedSchema = this.createSchema();
CatalogTable origin =
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
"test comment",
Collections.emptyList(),
options);
return new ResolvedCatalogTable(origin, resolvedSchema);
return FlinkCatalogTestUtil.createTable(options);
}

private CatalogTable createPartitionedTable(Map<String, String> options) {
Expand Down Expand Up @@ -364,7 +342,7 @@ public void testCreateFlinkTableWithPath() throws Exception {
@MethodSource("streamingOptionProvider")
public void testCreateTable_Streaming(Map<String, String> options) throws Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
CatalogTable table = createTable(options);
CatalogTable table = FlinkCatalogTestUtil.createTable(options);
catalog.createTable(path1, table, false);
checkCreateTable(path1, table, (CatalogTable) catalog.getTable(path1));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;

/** util for flink catalog test. */
public class FlinkCatalogTestUtil {

public static CatalogTable createTable(Map<String, String> options) {
ResolvedSchema resolvedSchema = FlinkCatalogTestUtil.createSchema();
CatalogTable origin =
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
"test comment",
Collections.emptyList(),
options);
return new ResolvedCatalogTable(origin, resolvedSchema);
}

public static ResolvedSchema createSchema() {
return new ResolvedSchema(
Arrays.asList(
Column.physical("first", DataTypes.STRING()),
Column.physical("second", DataTypes.INT()),
Column.physical("third", DataTypes.STRING()),
Column.physical(
"four",
DataTypes.ROW(
DataTypes.FIELD("f1", DataTypes.STRING()),
DataTypes.FIELD("f2", DataTypes.INT()),
DataTypes.FIELD(
"f3",
DataTypes.MAP(
DataTypes.STRING(), DataTypes.INT()))))),
Collections.emptyList(),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,25 @@

import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
import static org.apache.paimon.flink.FlinkCatalogTestUtil.createTable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -114,4 +119,27 @@ public void testGetDatabase() throws Exception {
.isInstanceOf(DatabaseNotExistException.class)
.hasMessageContaining("Database non does not exist in Catalog test-catalog.");
}

@Test
public void testCreateTable_Streaming() throws Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
Map<String, String> options = new HashMap<>();
options.put("is_streaming", String.valueOf(true));
CatalogTable table = createTable(options);
catalog.createTable(path1, table, false);
CatalogTable tableFromServer = (CatalogTable) catalog.getTable(path1);
checkOptions(options, tableFromServer.getOptions());
assertEquals(tableFromServer.getTableKind(), table.getTableKind());
assertEquals(tableFromServer.getUnresolvedSchema(), table.getUnresolvedSchema());
}

private void checkOptions(Map<String, String> expected, Map<String, String> actual) {
List<String> ignoreKeys = ImmutableList.of(FlinkCatalogOptions.REGISTER_TIMEOUT.key());
for (Map.Entry<String, String> entry : expected.entrySet()) {
String key = entry.getKey();
if (!ignoreKeys.contains(key)) {
assertEquals(actual.get(key), actual.get(key));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@

package org.apache.paimon.flink;

import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.rest.RESTCatalogInternalOptions;
import org.apache.paimon.rest.RESTObjectMapper;
import org.apache.paimon.rest.RESTResponse;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
import org.apache.paimon.rest.requests.CreateTableRequest;
import org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.table.FileStoreTable;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -120,12 +125,49 @@ public MockResponse dispatch(RecordedRequest request) throws InterruptedExceptio
return mockResponse(response, 200);
}
} else if (request.getPath().startsWith("/v1/prefix/databases/")) {
String databaseName =
request.getPath().substring("/v1/prefix/databases/".length());
if (request.getMethod().equals("GET")) {
Database database = catalog.getDatabase(databaseName);
response = new GetDatabaseResponse(database.name(), database.options());
return mockResponse(response, 200);
String[] resources =
request.getPath()
.substring("/v1/prefix/databases/".length())
.split("/");
String databaseName = resources[0];
boolean isTables = resources.length == 2 && "tables".equals(resources[1]);
boolean isTable = resources.length == 3 && "tables".equals(resources[1]);
if (isTable) {
String tableName = resources[2];
if (request.getMethod().equals("GET")) {
Identifier identifier = Identifier.create(databaseName, tableName);
FileStoreTable table =
(FileStoreTable) catalog.getTable(identifier);
response =
new GetTableResponse(
AbstractCatalog.newTableLocation(
catalog.warehouse(), identifier)
.toString(),
table.schema().id(),
table.schema().toSchema());
return mockResponse(response, 200);
}
} else if (isTables) {
// /v1/prefix/databases/db1/tables
if (request.getMethod().equals("POST")) {
CreateTableRequest requestBody =
mapper.readValue(
request.getBody().readUtf8(),
CreateTableRequest.class);
catalog.createTable(
requestBody.getIdentifier(), requestBody.getSchema(), true);
response = new GetTableResponse("", 1L, requestBody.getSchema());
return mockResponse(response, 200);
}

} else {
if (request.getMethod().equals("GET")) {
Database database = catalog.getDatabase(databaseName);
response =
new GetDatabaseResponse(
database.name(), database.options());
return mockResponse(response, 200);
}
}
}
return new MockResponse().setResponseCode(404);
Expand Down

0 comments on commit da4d47a

Please sign in to comment.