From d4326406de64e51756d76fafe9ba5be19ef2953d Mon Sep 17 00:00:00 2001 From: yantian Date: Tue, 31 Dec 2024 10:18:05 +0800 Subject: [PATCH] add IT to flink when use RESTCatalog --- paimon-core/pom.xml | 1 - paimon-flink/paimon-flink-common/pom.xml | 7 + .../paimon/flink/FlinkRESTCatalogTest.java | 136 ++++++++++++++++++ pom.xml | 1 + 4 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRESTCatalogTest.java diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml index 6e9dfa716a05..586483d4ac6f 100644 --- a/paimon-core/pom.xml +++ b/paimon-core/pom.xml @@ -33,7 +33,6 @@ under the License. 6.20.3-ververica-2.0 - 4.12.0 diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index e0f7ce245fa7..84d4622b02b8 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -177,6 +177,13 @@ under the License. jar test + + + com.squareup.okhttp3 + mockwebserver + ${okhttp.version} + test + diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRESTCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRESTCatalogTest.java new file mode 100644 index 000000000000..d169bad0d928 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRESTCatalogTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; +import org.apache.paimon.rest.MockRESTMessage; +import org.apache.paimon.rest.RESTCatalogFactory; +import org.apache.paimon.rest.RESTCatalogInternalOptions; +import org.apache.paimon.rest.RESTCatalogOptions; +import org.apache.paimon.rest.RESTObjectMapper; +import org.apache.paimon.rest.responses.GetDatabaseResponse; +import org.apache.paimon.rest.responses.ListDatabasesResponse; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.ObjectPath; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER; +import static org.junit.Assert.assertEquals; + +public class FlinkRESTCatalogTest { + private static final String TESTING_LOG_STORE = "testing"; + + private final ObjectPath path1 = new ObjectPath("db1", "t1"); + private final ObjectPath path3 = new ObjectPath("db1", "t2"); + + private final ObjectPath tableInDefaultDb = new ObjectPath("default", "t1"); + + private final ObjectPath tableInDefaultDb1 = new ObjectPath("default-db", "t1"); + private final ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist"); + private final ObjectPath nonExistObjectPath = ObjectPath.fromString("db1.nonexist"); + + private static final String DEFINITION_QUERY = "SELECT id, region, county FROM T"; + + private static final IntervalFreshness FRESHNESS = IntervalFreshness.ofMinute("3"); + private final ObjectMapper mapper = RESTObjectMapper.create(); + private MockWebServer mockWebServer; + private String serverUrl; + private String warehouse; + private Catalog catalog; + + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Before + public void beforeEach() throws IOException { + mockWebServer = new MockWebServer(); + mockWebServer.start(); + serverUrl = mockWebServer.url("").toString(); + Options options = new Options(); + options.set(RESTCatalogOptions.URI, serverUrl); + String initToken = "init_token"; + options.set(RESTCatalogOptions.TOKEN, initToken); + options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1); + warehouse = new File(temporaryFolder.newFolder(), UUID.randomUUID().toString()).toString(); + options.set(LOG_SYSTEM_AUTO_REGISTER, true); + options.set(CatalogOptions.METASTORE, RESTCatalogFactory.IDENTIFIER); + mockConfig(warehouse); + GetDatabaseResponse response = + MockRESTMessage.getDatabaseResponse( + org.apache.paimon.catalog.Catalog.DEFAULT_DATABASE); + mockResponse(mapper.writeValueAsString(response), 200); + catalog = + FlinkCatalogFactory.createCatalog( + "test-catalog", + CatalogContext.create(options), + FlinkCatalogTest.class.getClassLoader()); + } + + @After + public void tearDown() throws IOException { + mockWebServer.shutdown(); + } + + @Test + public void testListDatabases() throws JsonProcessingException { + String name = MockRESTMessage.databaseName(); + ListDatabasesResponse response = MockRESTMessage.listDatabasesResponse(name); + mockResponse(mapper.writeValueAsString(response), 200); + List result = catalog.listDatabases(); + assertEquals(response.getDatabases().size(), result.size()); + assertEquals(name, result.get(0)); + } + + private void mockConfig(String warehouseStr) { + String mockResponse = + String.format( + "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}", + RESTCatalogInternalOptions.PREFIX.key(), + "prefix", + CatalogOptions.WAREHOUSE.key(), + warehouseStr); + mockResponse(mockResponse, 200); + } + + private void mockResponse(String mockResponse, int httpCode) { + MockResponse mockResponseObj = + new MockResponse() + .setResponseCode(httpCode) + .setBody(mockResponse) + .addHeader("Content-Type", "application/json"); + mockWebServer.enqueue(mockResponseObj); + } +} diff --git a/pom.xml b/pom.xml index 4524bdde8c6b..7a98a8aa2688 100644 --- a/pom.xml +++ b/pom.xml @@ -125,6 +125,7 @@ under the License. 1.5.5-11 3.0.11 3.4.6 + 4.12.0 2.3.1 1.3.9 2.4.9